Source code for scorio.rank.bayesian

"""
Bayesian ranking methods.

These methods rank models using posterior summaries rather than point
estimates alone.

Notation
--------

Let :math:`R \\in \\{0,1\\}^{L \\times M \\times N}`.
Each method introduces latent model parameters :math:`\\theta_l`, computes a
posterior :math:`p(\\theta \\mid R)`, and ranks with posterior scores
:math:`s_l`.

The generic form is

.. math::
    s_l = \\mathbb{E}\\!\\left[g(\\theta_l) \\mid R\\right]
    \\quad\\text{or}\\quad
    s_l = \\mathbb{Q}_q\\!\\left(g(\\theta_l) \\mid R\\right),

where :math:`\\mathbb{Q}_q` denotes a posterior quantile.
"""

import numpy as np

from scorio.utils import rank_scores

from ._base import build_pairwise_wins, validate_input
from ._types import RankMethod, RankResult


[docs] def thompson( R: np.ndarray, n_samples: int = 10000, prior_alpha: float = 1.0, prior_beta: float = 1.0, seed: int = 42, method: RankMethod = "competition", return_scores: bool = False, ) -> RankResult: """ Rank models by Thompson-sampling posterior expected rank. Method context: This method assumes each model has one latent Bernoulli success probability over all ``M*N`` outcomes and uses the conjugate Beta-Binomial posterior. Ranking score is the negative Monte Carlo estimate of posterior expected rank. Args: R: Binary outcome tensor with shape ``(L, M, N)`` or matrix ``(L, M)`` (treated as ``N=1``). n_samples: Positive number of posterior Monte Carlo samples ``T``. prior_alpha: Positive Beta prior alpha. prior_beta: Positive Beta prior beta. seed: Random seed for reproducibility. method: Tie-handling rule passed to :func:`scorio.utils.rank_scores`. return_scores: If ``True``, return ``(ranking, scores)``. Returns: Ranking array of shape ``(L,)``. If ``return_scores=True``, also returns ``scores = - E[r_l | R]`` approximations (shape ``(L,)``), where higher is better. Notation: ``S_l = sum_{m,n} R[l,m,n]`` and ``T_tot = M*N``. ``r_l^(t)`` is model ``l`` rank in posterior draw ``t``. Formula: .. math:: p_l \\mid R \\sim \\mathrm{Beta}(\\alpha + S_l,\\ \\beta + T_{\\mathrm{tot}} - S_l) .. math:: s_l^{\\mathrm{TS}} = -\\frac{1}{T}\\sum_{t=1}^{T} r_l^{(t)} References: Thompson, W. R. (1933). On the Likelihood that One Unknown Probability Exceeds Another in View of the Evidence of Two Samples. Biometrika. https://doi.org/10.1093/biomet/25.3-4.285 Russo, D. J., et al. (2018). A Tutorial on Thompson Sampling. Foundations and Trends in Machine Learning. https://doi.org/10.1561/2200000070 Gelman, A., et al. (2013). Bayesian Data Analysis (3rd ed.). https://doi.org/10.1201/b16018 Examples: >>> import numpy as np >>> from scorio import rank >>> R = np.array([ ... [[1, 1], [1, 1]], ... [[0, 0], [0, 0]], ... ]) >>> ranks, scores = rank.thompson(R, n_samples=2000, return_scores=True) >>> ranks.tolist() [1, 2] Notes: If all models have identical posterior Beta parameters, the exact expected ranks are equal and the implementation returns equal scores deterministically (instead of Monte Carlo tie-breaking noise). """ R = validate_input(R) if isinstance(n_samples, bool) or not isinstance(n_samples, (int, np.integer)): raise TypeError(f"n_samples must be an integer, got {type(n_samples).__name__}") n_samples = int(n_samples) if n_samples < 1: raise ValueError(f"n_samples must be >= 1, got {n_samples}") prior_alpha = float(prior_alpha) if not np.isfinite(prior_alpha) or prior_alpha <= 0.0: raise ValueError("prior_alpha must be > 0 and finite.") prior_beta = float(prior_beta) if not np.isfinite(prior_beta) or prior_beta <= 0.0: raise ValueError("prior_beta must be > 0 and finite.") L, M, N = R.shape rng = np.random.default_rng(seed) # Compute posterior parameters for each model. successes = R.reshape(L, -1).sum(axis=1).astype(float) total = float(M * N) post_alphas = prior_alpha + successes post_betas = prior_beta + (total - successes) # If posterior marginals are identical, expected ranks are exactly equal. if np.allclose(post_alphas, post_alphas[0]) and np.allclose( post_betas, post_betas[0] ): scores = np.full(L, -(L + 1) / 2.0, dtype=float) ranking = rank_scores(scores)[method] return (ranking, scores) if return_scores else ranking # Monte Carlo posterior expected rank. rank_sums = np.zeros(L) for _ in range(n_samples): samples = rng.beta(post_alphas, post_betas) # Rank sampled probabilities: rank 1 is best. ranks = np.argsort(np.argsort(-samples)) + 1 rank_sums += ranks avg_ranks = rank_sums / n_samples # Larger score is better: negative average rank. scores = -avg_ranks ranking = rank_scores(scores)[method] return (ranking, scores) if return_scores else ranking
[docs] def bayesian_mcmc( R: np.ndarray, n_samples: int = 5000, burnin: int = 1000, prior_var: float = 1.0, seed: int = 42, method: RankMethod = "competition", return_scores: bool = False, ) -> RankResult: """ Rank models via Bayesian Bradley-Terry posterior means from MCMC. Method context: This method uses decisive pairwise counts with Bradley-Terry likelihood and independent Gaussian priors on log-strengths. Posterior inference is approximated by random-walk Metropolis-Hastings. Args: R: Binary outcome tensor with shape ``(L, M, N)`` or matrix ``(L, M)`` (treated as ``N=1``). n_samples: Positive number of retained MCMC samples. burnin: Nonnegative number of warmup iterations. prior_var: Positive Gaussian prior variance on ``theta``. seed: Random seed for reproducibility. method: Tie-handling rule passed to :func:`scorio.utils.rank_scores`. return_scores: If ``True``, return ``(ranking, scores)``. Returns: Ranking array of shape ``(L,)``. If ``return_scores=True``, also returns posterior mean ``E[theta|W]`` estimates (shape ``(L,)``). Notation: ``W_ij`` is decisive wins where model ``i`` beats ``j``. ``theta_i`` are log-strengths with ``pi_i = exp(theta_i)``. Formula: .. math:: \\Pr(i\\succ j\\mid\\theta)= \\frac{\\exp(\\theta_i)}{\\exp(\\theta_i)+\\exp(\\theta_j)},\\quad \\theta_i\\sim\\mathcal{N}(0,\\sigma^2) .. math:: s_i^{\\mathrm{MCMC}} = \\mathbb{E}[\\theta_i\\mid W] References: Bradley, R. A., & Terry, M. E. (1952). Rank Analysis of Incomplete Block Designs: I. The Method of Paired Comparisons. Biometrika. https://doi.org/10.1093/biomet/39.3-4.324 Metropolis, N., et al. (1953). Equation of State Calculations by Fast Computing Machines. The Journal of Chemical Physics. https://doi.org/10.1063/1.1699114 Hastings, W. K. (1970). Monte Carlo Sampling Methods Using Markov Chains and Their Applications. Biometrika. https://doi.org/10.1093/biomet/57.1.97 Caron, F., & Doucet, A. (2012). Efficient Bayesian inference for generalized Bradley-Terry models. https://doi.org/10.1080/10618600.2012.638220 Examples: >>> import numpy as np >>> from scorio import rank >>> R = np.array([ ... [[1, 1], [1, 1]], ... [[0, 0], [0, 0]], ... ]) >>> ranks, scores = rank.bayesian_mcmc( ... R, n_samples=2000, burnin=500, return_scores=True ... ) >>> ranks.tolist() [1, 2] Notes: If there are no decisive outcomes, posterior means are exactly equal under the symmetric Gaussian prior, and zeros are returned directly. """ R = validate_input(R) if isinstance(n_samples, bool) or not isinstance(n_samples, (int, np.integer)): raise TypeError(f"n_samples must be an integer, got {type(n_samples).__name__}") n_samples = int(n_samples) if n_samples < 1: raise ValueError(f"n_samples must be >= 1, got {n_samples}") if isinstance(burnin, bool) or not isinstance(burnin, (int, np.integer)): raise TypeError(f"burnin must be an integer, got {type(burnin).__name__}") burnin = int(burnin) if burnin < 0: raise ValueError(f"burnin must be >= 0, got {burnin}") prior_var = float(prior_var) if not np.isfinite(prior_var) or prior_var <= 0.0: raise ValueError("prior_var must be > 0 and finite.") L = R.shape[0] rng = np.random.default_rng(seed) wins = build_pairwise_wins(R) if float(np.sum(wins)) <= 0.0: scores = np.zeros(L, dtype=float) ranking = rank_scores(scores)[method] return (ranking, scores) if return_scores else ranking def log_likelihood(theta): ll = 0.0 for i in range(L): for j in range(L): if i == j or wins[i, j] == 0: continue diff = theta[j] - theta[i] # log P(i beats j) = -log(1 + exp(θ_j - θ_i)) if diff > 20: log_p = -diff elif diff < -20: log_p = 0.0 else: log_p = -np.log(1 + np.exp(diff)) ll += wins[i, j] * log_p return ll def log_prior(theta): return -0.5 * np.sum(theta**2) / prior_var def log_posterior(theta): return log_likelihood(theta) + log_prior(theta) # Initialize at prior mean theta_current = np.zeros(L) log_post_current = log_posterior(theta_current) samples = [] proposal_std = 0.1 accepted = 0 # MCMC sampling with adaptive proposal for iteration in range(n_samples + burnin): # Propose new theta theta_proposed = theta_current + rng.normal(0, proposal_std, L) log_post_proposed = log_posterior(theta_proposed) log_accept_prob = log_post_proposed - log_post_current if np.log(rng.random()) < min(log_accept_prob, 0.0): theta_current = theta_proposed log_post_current = log_post_proposed accepted += 1 if iteration >= burnin: samples.append(theta_current.copy()) # Adaptive proposal tuning if iteration > 0 and iteration % 500 == 0 and iteration < burnin: accept_rate = accepted / iteration if accept_rate < 0.2: proposal_std *= 0.8 elif accept_rate > 0.5: proposal_std *= 1.2 # Posterior mean estimate. scores = np.mean(samples, axis=0) ranking = rank_scores(scores)[method] return (ranking, scores) if return_scores else ranking
__all__ = ["thompson", "bayesian_mcmc"]