"""
Rank Centrality: ranking from pairwise comparisons.
This module implements the Markov-chain estimator of Negahban, Oh, and Shah
(2017).
Notation
--------
Let :math:`R \\in \\{0,1\\}^{L \\times M \\times N}`. For each pair
:math:`(i,j)`, let :math:`W_{ij}` be decisive wins of :math:`i` over :math:`j`.
Define the pairwise empirical probabilities :math:`\\widehat{P}_{i\\succ j}`
from decisive outcomes (optionally tie adjusted).
Rank Centrality builds a row-stochastic transition matrix :math:`P` with
off-diagonal mass proportional to :math:`\\widehat{P}_{j\\succ i}` and ranks by
the stationary distribution :math:`\\pi`:
.. math::
\\pi^\\top P = \\pi^\\top, \\qquad \\sum_i \\pi_i = 1.
"""
import numpy as np
from scorio.utils import rank_scores
from ._base import build_pairwise_counts, build_pairwise_wins, validate_input
from ._types import RankMethod, RankResult
def _is_connected_undirected(adj: np.ndarray) -> bool:
"""Check connectivity of an undirected graph given a boolean adjacency matrix."""
n = adj.shape[0]
if n == 0:
return True
seen = np.zeros(n, dtype=bool)
stack = [0]
seen[0] = True
while stack:
i = stack.pop()
neighbors = np.flatnonzero(adj[i] & ~seen)
if neighbors.size:
seen[neighbors] = True
stack.extend(int(j) for j in neighbors)
return bool(np.all(seen))
def _stationary_distribution_power(
P: np.ndarray, max_iter: int = 10_000, tol: float = 1e-12
) -> np.ndarray:
"""Compute stationary distribution of a row-stochastic P via power iteration."""
n = P.shape[0]
if n == 0:
return np.array([], dtype=float)
pi = np.ones(n, dtype=float) / n
for _ in range(int(max_iter)):
pi_new = P.T @ pi
s = pi_new.sum()
if s <= 0:
return np.ones(n, dtype=float) / n
pi_new = pi_new / s
if np.linalg.norm(pi_new - pi, 1) < tol:
return pi_new
pi = pi_new
return pi
[docs]
def rank_centrality(
R: np.ndarray,
method: RankMethod = "competition",
return_scores: bool = False,
tie_handling: str = "half",
smoothing: float = 0.0,
teleport: float = 0.0,
max_iter: int = 10_000,
tol: float = 1e-12,
) -> RankResult:
"""
Rank models with Rank Centrality.
Method context:
Build a row-stochastic random-walk matrix over models where transition
probabilities prefer moving from a model to models that beat it. The
stationary distribution of this chain is the score vector.
Args:
R: Binary tensor of shape (L, M, N) (or (L, M) which is treated as N=1).
method: Ranking method passed to `scorio.utils.rank_scores`.
return_scores: If True, return (ranking, scores) where scores are the
stationary distribution.
tie_handling:
- "ignore": only decisive comparisons (i correct, j incorrect)
- "half": treat ties (both same) as 0.5 win for each side
smoothing: Nonnegative pseudocount added to every directed win count.
Use this to avoid disconnected graphs when `tie_handling="ignore"`.
teleport: Teleportation probability in [0, 1). When > 0, makes the
Markov chain ergodic even if the comparison graph is disconnected.
max_iter: Max iterations for the power method.
tol: Convergence tolerance (L1 difference) for the power method.
Returns:
Ranking array of shape ``(L,)``.
If ``return_scores=True``, also returns stationary-distribution scores
(shape ``(L,)``).
Formula:
Let ``d_max`` be the maximum degree of the undirected comparison graph.
For ``i != j``,
.. math::
P_{ij} = \\frac{1}{d_{\\max}}\\,\\widehat{P}_{j\\succ i},
\\quad
P_{ii} = 1 - \\sum_{j\\neq i} P_{ij}
where ``P_hat`` is computed from pairwise tied-split outcomes.
References:
Negahban, S., Oh, S., & Shah, D. (2017).
Rank Centrality: Ranking from Pairwise Comparisons.
Operations Research.
"""
R = validate_input(R)
L = R.shape[0]
tie_handling = str(tie_handling)
if tie_handling not in {"ignore", "half"}:
raise ValueError('tie_handling must be "ignore" or "half"')
smoothing = float(smoothing)
if not np.isfinite(smoothing) or smoothing < 0:
raise ValueError("smoothing must be >= 0")
teleport = float(teleport)
if not np.isfinite(teleport) or not (0.0 <= teleport < 1.0):
raise ValueError("teleport must be in [0, 1)")
if not isinstance(max_iter, (int, np.integer)):
raise TypeError(f"max_iter must be an integer, got {type(max_iter).__name__}")
max_iter = int(max_iter)
if max_iter < 1:
raise ValueError(f"max_iter must be >= 1, got {max_iter}")
tol = float(tol)
if not np.isfinite(tol) or tol <= 0.0:
raise ValueError(f"tol must be a positive finite scalar, got {tol}")
if tie_handling == "ignore":
wins = build_pairwise_wins(R)
else:
wins, ties = build_pairwise_counts(R)
wins = wins + 0.5 * ties
# Apply optional pseudocount smoothing.
wins_s = wins + smoothing
denom = wins_s + wins_s.T # total (possibly smoothed) comparisons per pair
eye = np.eye(L, dtype=bool)
adj = (denom > 0) & ~eye
deg = adj.sum(axis=1)
d_max = int(deg.max()) if deg.size else 0
if d_max == 0:
scores = np.ones(L, dtype=float) / L
ranking = rank_scores(scores)[method]
return (ranking, scores) if return_scores else ranking
if (
teleport == 0.0
and smoothing == 0.0
and tie_handling == "ignore"
and not _is_connected_undirected(adj)
):
# With decisive-only comparisons and no regularization, the graph may
# be disconnected; Rank Centrality is not identifiable across components.
raise ValueError(
"Rank Centrality requires a connected comparison graph; "
"use teleport>0, smoothing>0, or tie_handling='half'."
)
# Transition matrix (row-stochastic), Negahban et al. (2017):
# P_{ij} = (1/d_max) * p̂_{j,i} for i!=j on edges, else 0
# P_{ii} = 1 - Σ_{j!=i} P_{ij}
with np.errstate(divide="ignore", invalid="ignore"):
p_ji = np.zeros((L, L), dtype=float)
p_ji[adj] = (wins_s.T[adj] / denom[adj]).astype(float)
P = np.zeros((L, L), dtype=float)
P[adj] = p_ji[adj] / float(d_max)
P[np.arange(L), np.arange(L)] = 1.0 - P.sum(axis=1)
if teleport > 0.0:
P = (1.0 - teleport) * P + teleport * (np.ones((L, L), dtype=float) / L)
scores = _stationary_distribution_power(P, max_iter=max_iter, tol=tol)
ranking = rank_scores(scores)[method]
return (ranking, scores) if return_scores else ranking
__all__ = ["rank_centrality"]