Source code for scorio.rank.pairwise

"""
Sequential pairwise rating methods.

This module adapts Elo, TrueSkill, and Glicko to binary response tensors by
streaming induced pairwise outcomes.

Notation
--------

Let :math:`R \\in \\{0,1\\}^{L \\times M \\times N}`.
For each event :math:`(m,n)` and each pair :math:`(i,j)`, define an observed
pair score :math:`S_{ij}^{(m,n)} \\in \\{0, 0.5, 1\\}` according to the
configured tie policy.

Each method updates latent rating states :math:`\\Theta_i` sequentially:

.. math::
    \\Theta_i^{(t+1)}
    = \\mathcal{U}_i\\left(\\Theta^{(t)}, S^{(t)}; \\psi\\right),
    \\qquad
    s_i = h(\\Theta_i^{(T)}),

where :math:`s_i` is the final score used for ranking.
"""

from typing import cast

import numpy as np
from scipy.stats import norm

from scorio.utils import rank_scores

from ._base import validate_input
from ._types import PairwiseTieHandling, RankMethod, RankResult


def _validate_tie_handling(tie_handling: object) -> PairwiseTieHandling:
    tie_policy = str(tie_handling)
    if tie_policy not in {"skip", "draw", "correct_draw_only"}:
        raise ValueError(
            'tie_handling must be one of: "skip", "draw", "correct_draw_only"'
        )
    return cast(PairwiseTieHandling, tie_policy)


[docs] def elo( R: np.ndarray, K: float = 32.0, initial_rating: float = 1500.0, tie_handling: PairwiseTieHandling = "correct_draw_only", method: RankMethod = "competition", return_scores: bool = False, ) -> RankResult: """ Rank models with sequential Elo updates on induced pairwise matches. Method context: Each question-trial pair ``(m, n)`` induces head-to-head outcomes for every model pair. Ratings are updated online, so final ratings depend on stream order. References: Elo, A. E. (1978). The Rating of Chessplayers, Past and Present. Arco Publishing. https://archive.org/details/ratingofchesspla0000eloa Args: R: Binary outcome tensor with shape ``(L, M, N)`` or matrix ``(L, M)`` (treated as ``N=1``). K: Positive Elo step size. initial_rating: Initial rating assigned to every model. tie_handling: Policy for ``(1,1)`` and ``(0,0)`` outcomes: - ``"skip"``: ignore ties - ``"draw"``: treat both tie types as draws - ``"correct_draw_only"``: draw only for ``(1,1)``, skip ``(0,0)`` method: Tie-handling rule passed to :func:`scorio.utils.rank_scores`. return_scores: If ``True``, return ``(ranking, ratings)``. Returns: Ranking array of shape ``(L,)``. If ``return_scores=True``, also returns final Elo ratings of shape ``(L,)``. Notation: ``r_i`` is model ``i`` rating, and ``S_{ij}`` is the observed score for model ``i`` against ``j`` in one induced match. Formula: .. math:: E_{ij} = \\frac{1}{1 + 10^{(r_j-r_i)/400}} .. math:: r_i \\leftarrow r_i + K(S_{ij} - E_{ij}), \\quad r_j \\leftarrow r_j + K((1-S_{ij}) - (1-E_{ij})) Examples: >>> import numpy as np >>> from scorio import rank >>> R = np.array([ ... [[1, 1], [1, 1]], ... [[0, 0], [0, 0]], ... ]) >>> ranks, ratings = rank.elo(R, return_scores=True) >>> ranks.tolist() [1, 2] >>> float(ratings[0] > ratings[1]) 1.0 >>> # tie handling can change updates >>> rank.elo(np.array([[[1]], [[1]]]), tie_handling="skip").tolist() [1, 1] Notes: The implementation processes pairs in index order for each ``(trial, question)`` event. """ R = validate_input(R) L, M, N = R.shape K = float(K) if not np.isfinite(K) or K <= 0.0: raise ValueError(f"K must be a positive finite scalar; got {K}") initial_rating = float(initial_rating) if not np.isfinite(initial_rating): raise ValueError("initial_rating must be finite.") tie_policy = _validate_tie_handling(tie_handling) ratings = np.full(L, initial_rating, dtype=float) for t in range(N): for q in range(M): outcomes = R[:, q, t] for i in range(L): for j in range(i + 1, L): r_i, r_j = int(outcomes[i]), int(outcomes[j]) # Determine actual scores S_i, S_j (or skip) if r_i == r_j: if tie_policy == "skip": continue if tie_policy == "draw": S_i, S_j = 0.5, 0.5 elif tie_policy == "correct_draw_only": if r_i == 1: # (1,1) S_i, S_j = 0.5, 0.5 else: # (0,0) continue else: S_i, S_j = (1.0, 0.0) if r_i > r_j else (0.0, 1.0) # Expected scores R_i, R_j = ratings[i], ratings[j] E_i = 1.0 / (1.0 + 10.0 ** ((R_j - R_i) / 400.0)) E_j = 1.0 - E_i # Update ratings[i] = R_i + K * (S_i - E_i) ratings[j] = R_j + K * (S_j - E_j) ranking = rank_scores(ratings)[method] return (ranking, ratings) if return_scores else ranking
[docs] def trueskill( R: np.ndarray, mu_initial: float = 25.0, sigma_initial: float = 25.0 / 3, beta: float = 25.0 / 6, tau: float = 25.0 / 300, method: RankMethod = "competition", return_scores: bool = False, tie_handling: PairwiseTieHandling = "skip", draw_margin: float = 0.0, ) -> RankResult: """ Rank models with a two-player TrueSkill update stream. Method context: Each model has latent skill belief :math:`\\mathcal{N}(\\mu_i, \\sigma_i^2)`. Every decisive pairwise result from ``R`` applies a two-player TrueSkill update. Ties can be skipped or treated as draw events with an optional draw margin. Ranking uses final ``mu``. References: Herbrich, R., Minka, T., & Graepel, T. (2006). TrueSkill(TM): A Bayesian Skill Rating System. NeurIPS 19. https://proceedings.neurips.cc/paper_files/paper/2006/file/f44ee263952e65b3610b8ba51229d1f9-Paper.pdf Args: R: Binary outcome tensor with shape ``(L, M, N)`` or matrix ``(L, M)`` (treated as ``N=1``). mu_initial: Initial mean skill. sigma_initial: Initial skill standard deviation. Must be positive. beta: Performance-noise scale. Must be positive. tau: Per-round dynamics scale for sigma inflation. Must be nonnegative. method: Ranking variant passed to :func:`scorio.utils.rank_scores`. return_scores: If ``True``, return ``(ranking, mu)``. tie_handling: Policy for tied outcomes ``(1,1)`` and ``(0,0)``: - ``"skip"``: ignore ties - ``"draw"``: treat both tie types as draws - ``"correct_draw_only"``: draw only for ``(1,1)``, skip ``(0,0)`` draw_margin: Nonnegative draw margin :math:`\\epsilon` in the performance-difference space. ``0`` recovers no-margin updates. Returns: Ranking array of shape ``(L,)``. If ``return_scores=True``, also returns final ``mu`` values (shape ``(L,)``). Notation: For one match between models ``i`` and ``j``: ``c = sqrt(2 beta^2 + sigma_i^2 + sigma_j^2)``, ``t = (mu_i - mu_j) / c``, ``epsilon = draw_margin / c``. Formula: .. math:: v_{win}(t,\\epsilon) = \\frac{\\phi(t-\\epsilon)}{\\Phi(t-\\epsilon)},\\quad w_{win}(t,\\epsilon)=v_{win}(t,\\epsilon)\\left(v_{win}(t,\\epsilon)+t-\\epsilon\\right) .. math:: \\mu_i' = \\mu_i + \\frac{\\sigma_i^2}{c} v_{win}(t,\\epsilon),\\quad \\mu_j' = \\mu_j - \\frac{\\sigma_j^2}{c} v_{win}(t,\\epsilon) .. math:: \\sigma_i'^2 = \\sigma_i^2\\left(1 - \\frac{\\sigma_i^2}{c^2} w_{win}(t,\\epsilon)\\right),\\quad \\sigma_j'^2 = \\sigma_j^2\\left(1 - \\frac{\\sigma_j^2}{c^2} w_{win}(t,\\epsilon)\\right) .. math:: v_{draw}(t,\\epsilon) = \\frac{\\phi(-\\epsilon-t)-\\phi(\\epsilon-t)}{\\Phi(\\epsilon-t)-\\Phi(-\\epsilon-t)} .. math:: w_{draw}(t,\\epsilon) = v_{draw}(t,\\epsilon)^2 + \\frac{(\\epsilon-t)\\phi(\\epsilon-t)-(-\\epsilon-t)\\phi(-\\epsilon-t)}{\\Phi(\\epsilon-t)-\\Phi(-\\epsilon-t)} .. math:: \\mu_i' = \\mu_i + \\frac{\\sigma_i^2}{c} v_{draw}(t,\\epsilon),\\quad \\mu_j' = \\mu_j - \\frac{\\sigma_j^2}{c} v_{draw}(t,\\epsilon) .. math:: \\sigma_i'^2 = \\sigma_i^2\\left(1 - \\frac{\\sigma_i^2}{c^2} w_{draw}(t,\\epsilon)\\right),\\quad \\sigma_j'^2 = \\sigma_j^2\\left(1 - \\frac{\\sigma_j^2}{c^2} w_{draw}(t,\\epsilon)\\right) Examples: >>> import numpy as np >>> from scorio import rank >>> R = np.array([ ... [[1, 1], [1, 1]], ... [[0, 0], [0, 0]], ... ]) >>> ranks, mu = rank.trueskill(R, return_scores=True) >>> ranks.tolist() [1, 2] >>> float(mu[0] > mu[1]) 1.0 Notes: This is a simplified two-player update stream, not full multiplayer factor-graph inference from the original TrueSkill paper. """ R = validate_input(R) L, M, N = R.shape mu_initial = float(mu_initial) sigma_initial = float(sigma_initial) beta = float(beta) tau = float(tau) draw_margin = float(draw_margin) if not np.isfinite(mu_initial): raise ValueError("mu_initial must be finite.") if not np.isfinite(sigma_initial) or sigma_initial <= 0.0: raise ValueError("sigma_initial must be a positive finite scalar.") if not np.isfinite(beta) or beta <= 0.0: raise ValueError("beta must be a positive finite scalar.") if not np.isfinite(tau) or tau < 0.0: raise ValueError("tau must be a nonnegative finite scalar.") if not np.isfinite(draw_margin) or draw_margin < 0.0: raise ValueError("draw_margin must be a nonnegative finite scalar.") tie_policy = _validate_tie_handling(tie_handling) mu = np.full(L, mu_initial, dtype=float) sigma = np.full(L, sigma_initial, dtype=float) def v_win(t: float, epsilon: float) -> float: """Correction term for decisive outcomes.""" x = t - epsilon denom = float(norm.cdf(x)) if denom < 1e-12: # Asymptotic approximation for extreme tails. return float(-x) return float(norm.pdf(x) / denom) def w_win(t: float, epsilon: float) -> float: v = v_win(t, epsilon) return float(v * (v + t - epsilon)) def v_draw(t: float, epsilon: float) -> float: """Correction term for draw outcomes.""" a = -epsilon - t b = epsilon - t cdf_a = float(norm.cdf(a)) cdf_b = float(norm.cdf(b)) denom = cdf_b - cdf_a if denom < 1e-12: return 0.0 return float((norm.pdf(a) - norm.pdf(b)) / denom) def w_draw(t: float, epsilon: float) -> float: a = -epsilon - t b = epsilon - t cdf_a = float(norm.cdf(a)) cdf_b = float(norm.cdf(b)) denom = cdf_b - cdf_a if denom < 1e-12: return 1.0 v = v_draw(t, epsilon) term = ((b * norm.pdf(b)) - (a * norm.pdf(a))) / denom return float(v * v + term) def update_decisive(mu1, sigma1, mu2, sigma2, player1_wins): """Update ratings for a decisive 2-player match.""" c = np.sqrt(2 * beta**2 + sigma1**2 + sigma2**2) epsilon = draw_margin / c if player1_wins: t = (mu1 - mu2) / c v = v_win(t, epsilon) w = w_win(t, epsilon) mu1_new = mu1 + (sigma1**2 / c) * v mu2_new = mu2 - (sigma2**2 / c) * v else: t = (mu2 - mu1) / c v = v_win(t, epsilon) w = w_win(t, epsilon) mu2_new = mu2 + (sigma2**2 / c) * v mu1_new = mu1 - (sigma1**2 / c) * v sigma1_new = sigma1 * np.sqrt(max(1 - (sigma1**2 / c**2) * w, 1e-12)) sigma2_new = sigma2 * np.sqrt(max(1 - (sigma2**2 / c**2) * w, 1e-12)) return mu1_new, sigma1_new, mu2_new, sigma2_new def update_draw(mu1, sigma1, mu2, sigma2): """Update ratings for a 2-player draw.""" c = np.sqrt(2 * beta**2 + sigma1**2 + sigma2**2) epsilon = draw_margin / c t = (mu1 - mu2) / c v = v_draw(t, epsilon) w = w_draw(t, epsilon) mu1_new = mu1 + (sigma1**2 / c) * v mu2_new = mu2 - (sigma2**2 / c) * v sigma1_new = sigma1 * np.sqrt(max(1 - (sigma1**2 / c**2) * w, 1e-12)) sigma2_new = sigma2 * np.sqrt(max(1 - (sigma2**2 / c**2) * w, 1e-12)) return mu1_new, sigma1_new, mu2_new, sigma2_new # Process trials sequentially for t in range(N): for q in range(M): outcomes = R[:, q, t] # Process all pairs for i in range(L): for j in range(i + 1, L): r_i, r_j = outcomes[i], outcomes[j] if r_i == r_j: if tie_policy == "skip": continue if tie_policy == "correct_draw_only" and int(r_i) == 0: continue mu[i], sigma[i], mu[j], sigma[j] = update_draw( mu[i], sigma[i], mu[j], sigma[j] ) continue mu[i], sigma[i], mu[j], sigma[j] = update_decisive( mu[i], sigma[i], mu[j], sigma[j], player1_wins=(r_i > r_j) ) # Apply dynamics (skill drift) sigma = np.sqrt(sigma**2 + tau**2) ranking = rank_scores(mu)[method] return (ranking, mu) if return_scores else ranking
[docs] def glicko( R: np.ndarray, initial_rating: float = 1500.0, initial_rd: float = 350.0, c: float = 0.0, rd_max: float = 350.0, tie_handling: PairwiseTieHandling = "correct_draw_only", return_deviation: bool = False, method: RankMethod = "competition", return_scores: bool = False, ) -> ( np.ndarray | tuple[np.ndarray, np.ndarray] | tuple[np.ndarray, np.ndarray, np.ndarray] ): """ Rank models with Glicko rating and rating-deviation updates. Method context: Glicko extends Elo by tracking both rating ``r`` and uncertainty ``RD``. Each ``(question, trial)`` is treated as one rating period containing all pairwise induced matches for that slice. References: Glickman, M. E. (1999). Parameter Estimation in Large Dynamic Paired Comparison Experiments. Journal of the Royal Statistical Society: Series C, 48(3), 377-394. https://doi.org/10.1111/1467-9876.00159 Glickman, M. E. (n.d.). The Glicko System. https://www.glicko.net/glicko/glicko.pdf Args: R: Binary outcome tensor with shape ``(L, M, N)`` or matrix ``(L, M)`` (treated as ``N=1``). initial_rating: Initial rating ``r`` for every model. initial_rd: Initial rating deviation ``RD`` for every model. c: Nonnegative RD inflation parameter between rating periods. rd_max: Positive hard cap for ``RD``. tie_handling: Policy for tied outcomes: - ``"skip"``: ignore ``(1,1)`` and ``(0,0)`` - ``"draw"``: treat both as draws - ``"correct_draw_only"``: draw for ``(1,1)``, skip ``(0,0)`` return_deviation: If ``True``, return ``RD`` in addition to ratings. method: Tie-handling rule for :func:`scorio.utils.rank_scores`. return_scores: If ``True``, return ratings with ranking. Returns: If ``return_deviation=False`` and ``return_scores=False``: ranking array of shape ``(L,)``. If ``return_deviation=False`` and ``return_scores=True``: ``(ranking, rating)`` where ``rating`` has shape ``(L,)``. If ``return_deviation=True``: ``(ranking, rating, rd)`` with both vectors shape ``(L,)``. Notation: ``q = ln(10)/400``, ``g(RD) = 1 / sqrt(1 + 3 q^2 RD^2 / pi^2)``. For model ``i`` in one period with opponents ``j``: ``S_ij`` is observed score and ``E_ij = 1 / (1 + 10^{-g(RD_j)(r_i-r_j)/400})``. Formula: .. math:: d_i^2 = \\left(q^2 \\sum_j g(RD_j)^2 E_{ij}(1-E_{ij})\\right)^{-1} .. math:: RD_i' = \\left(\\frac{1}{RD_i^2} + \\frac{1}{d_i^2}\\right)^{-1/2}, \\quad r_i' = r_i + \\frac{q}{\\frac{1}{RD_i^2}+\\frac{1}{d_i^2}} \\sum_j g(RD_j)(S_{ij}-E_{ij}) Examples: >>> import numpy as np >>> from scorio import rank >>> R = np.array([ ... [[1, 1], [1, 1]], ... [[0, 0], [0, 0]], ... ]) >>> ranks, ratings, rds = rank.glicko(R, return_deviation=True) >>> ranks.tolist() [1, 2] >>> float(ratings[0] > ratings[1]) 1.0 Notes: Models with no games in a rating period keep rating unchanged but can still receive RD inflation when ``c > 0``. """ R = validate_input(R) L, M, N = R.shape initial_rating = float(initial_rating) initial_rd = float(initial_rd) if not np.isfinite(initial_rating): raise ValueError("initial_rating must be finite.") if not np.isfinite(initial_rd) or initial_rd <= 0: raise ValueError("initial_rd must be > 0 and finite.") rd_max = float(rd_max) if rd_max <= 0: raise ValueError("rd_max must be > 0") c = float(c) if c < 0: raise ValueError("c must be >= 0") tie_policy = _validate_tie_handling(tie_handling) rating = np.full(L, initial_rating, dtype=float) rd = np.full(L, min(initial_rd, rd_max), dtype=float) # Glicko constants. q = float(np.log(10.0) / 400.0) def g(rd_opponent: np.ndarray) -> np.ndarray: rd_opponent = np.asarray(rd_opponent, dtype=float) return 1.0 / np.sqrt(1.0 + (3.0 * (q**2) * (rd_opponent**2)) / (np.pi**2)) def expected_score( rating_i: float, rating_j: np.ndarray, g_j: np.ndarray ) -> np.ndarray: # E = 1 / (1 + 10^(-g(RD_j) * (r_i - r_j)/400)) return 1.0 / (1.0 + 10.0 ** (-(g_j * (rating_i - rating_j) / 400.0))) for t in range(N): for m in range(M): # RD inflation at the start of each rating period. if c > 0: rd = np.minimum(np.sqrt(rd**2 + c**2), rd_max) outcomes = R[:, m, t] opponents: list[list[int]] = [[] for _ in range(L)] results: list[list[float]] = [[] for _ in range(L)] for i in range(L): for j in range(i + 1, L): r_i = int(outcomes[i]) r_j = int(outcomes[j]) if r_i == r_j: if tie_policy == "skip": continue if tie_policy == "draw": s_i = s_j = 0.5 else: # correct_draw_only if r_i == 1: s_i = s_j = 0.5 else: continue else: if r_i > r_j: s_i, s_j = 1.0, 0.0 else: s_i, s_j = 0.0, 1.0 opponents[i].append(j) results[i].append(s_i) opponents[j].append(i) results[j].append(s_j) new_rating = rating.copy() new_rd = rd.copy() for i in range(L): if not opponents[i]: continue opp = np.asarray(opponents[i], dtype=int) s = np.asarray(results[i], dtype=float) rd_opp = rd[opp] rating_opp = rating[opp] g_opp = g(rd_opp) E = expected_score(rating[i], rating_opp, g_opp) # d^2 = 1 / (q^2 Σ g^2 E(1-E)) denom = float(np.sum((g_opp**2) * E * (1.0 - E))) if denom <= 0 or not np.isfinite(denom): continue d2 = 1.0 / ((q**2) * denom) inv_var = (1.0 / (rd[i] ** 2)) + (1.0 / d2) if inv_var <= 0 or not np.isfinite(inv_var): continue rd_new = float(np.sqrt(1.0 / inv_var)) rd_new = float(np.clip(rd_new, 1e-12, rd_max)) delta = float(np.sum(g_opp * (s - E))) rating_new = float(rating[i] + (q / inv_var) * delta) new_rating[i] = rating_new new_rd[i] = rd_new rating = new_rating rd = new_rd ranking = rank_scores(rating)[method] if return_deviation: return ranking, rating, rd return (ranking, rating) if return_scores else ranking
__all__ = ["elo", "trueskill", "glicko"]