Source code for scorio.rank.irt

"""
Item Response Theory (IRT) ranking methods.

This module estimates latent model abilities and question parameters under
binary IRT families.

Notation
--------

Let :math:`R \\in \\{0,1\\}^{L \\times M \\times N}` and
:math:`k_{lm}=\\sum_{n=1}^{N} R_{lmn}`.
Model abilities are :math:`\\theta_l`; item parameters include difficulty
:math:`b_m`, discrimination :math:`a_m`, and optional pseudo-guessing
:math:`c_m`.

A general binary IRT response model is

.. math::
    P(R_{lmn}=1 \\mid \\theta_l, a_m, b_m, c_m)
    = c_m + (1-c_m)\\sigma\\left(a_m(\\theta_l-b_m)\\right).

Special cases:

- 1PL (Rasch): :math:`a_m=1`, :math:`c_m=0`.
- 2PL: :math:`c_m=0`, free :math:`a_m` and :math:`b_m`.
- 3PL: free :math:`a_m`, :math:`b_m`, and :math:`c_m`.

Rankings are induced by ability scores :math:`s_l`, typically
:math:`s_l=\\hat\\theta_l` or a posterior summary of :math:`\\theta_l`.

The module includes maximum-likelihood and joint maximum-likelihood estimators,
MAP variants with configurable priors, and MML-EAP estimators.
"""

from typing import Literal, TypeAlias

import numpy as np
from scipy.optimize import minimize

from scorio.utils import rank_scores

from ._base import sigmoid, validate_input
from ._types import RankMethod, RankResult
from .priors import GaussianPrior, Prior

DynamicIrtVariant: TypeAlias = Literal["linear", "growth", "state_space"]
DynamicScoreTargetInput: TypeAlias = Literal[
    "initial",
    "final",
    "mean",
    "gain",
    "baseline",
    "start",
    "end",
    "average",
    "delta",
    "trend",
]


def _to_binomial_counts(R: np.ndarray) -> tuple[np.ndarray, int]:
    """
    Convert (L, M, N) Bernoulli trials into per-(model,item) binomial counts.

    Returns:
        k_correct: float array of shape (L, M) with counts in [0, n_trials]
        n_trials: int number of trials per (model, item)
    """
    R = validate_input(R)
    k_correct = R.sum(axis=2, dtype=float)
    n_trials = int(R.shape[2])
    return k_correct, n_trials


def _validate_positive_int(name: str, value: int, min_value: int = 1) -> int:
    """Validate a positive integer hyperparameter."""
    if isinstance(value, bool) or not isinstance(value, (int, np.integer)):
        raise TypeError(f"{name} must be an integer, got {type(value).__name__}")
    ivalue = int(value)
    if ivalue < min_value:
        raise ValueError(f"{name} must be >= {min_value}, got {ivalue}")
    return ivalue


def _coerce_ability_prior(prior: Prior | float) -> Prior:
    """Normalize ability prior argument to a Prior instance."""
    if isinstance(prior, (int, float)):
        prior_var = float(prior)
        if not np.isfinite(prior_var) or prior_var <= 0.0:
            raise ValueError("prior variance must be a positive finite scalar.")
        return GaussianPrior(mean=0.0, var=prior_var)
    if isinstance(prior, Prior):
        return prior
    raise TypeError(
        f"prior must be a Prior object or float, got {type(prior).__name__}"
    )


def _validate_nonnegative_float(name: str, value: float) -> float:
    """Validate a finite non-negative scalar hyperparameter."""
    fvalue = float(value)
    if not np.isfinite(fvalue) or fvalue < 0.0:
        raise ValueError(f"{name} must be a finite scalar >= 0.0, got {value!r}")
    return fvalue


def _validate_guessing_upper(guessing_upper: float) -> float:
    """Validate 3PL guessing upper bound."""
    value = float(guessing_upper)
    if not np.isfinite(value) or not (0.0 < value < 1.0):
        raise ValueError("guessing_upper must be in (0, 1) and finite.")
    return value


def _validate_fix_guessing(
    fix_guessing: float | None, guessing_upper: float
) -> float | None:
    """Validate optional fixed 3PL guessing parameter."""
    if fix_guessing is None:
        return None
    value = float(fix_guessing)
    if not np.isfinite(value) or not (0.0 <= value <= guessing_upper):
        raise ValueError(
            f"fix_guessing must be in [0, guessing_upper={guessing_upper}] and finite."
        )
    return value


def _validate_time_points(
    time_points: np.ndarray | None, n_time: int
) -> tuple[np.ndarray, np.ndarray]:
    """
    Validate and normalize longitudinal measurement times.

    Returns:
        raw_time: user-facing time points (shape ``(n_time,)``)
        time_unit: normalized times in ``[0, 1]`` used for optimization
    """
    if time_points is None:
        raw_time = np.linspace(0.0, 1.0, n_time, dtype=float)
    else:
        raw_time = np.asarray(time_points, dtype=float)
        if raw_time.ndim != 1 or raw_time.shape[0] != n_time:
            raise ValueError(
                "time_points must be a 1D array with length equal to R.shape[2]."
            )
        if not np.all(np.isfinite(raw_time)):
            raise ValueError("time_points must contain only finite values.")
        if np.any(np.diff(raw_time) <= 0.0):
            raise ValueError("time_points must be strictly increasing.")

    if n_time < 2:
        return raw_time, np.zeros(n_time, dtype=float)

    span = float(raw_time[-1] - raw_time[0])
    if not np.isfinite(span) or span <= 0.0:
        raise ValueError("time_points must span a positive interval.")

    time_unit = (raw_time - raw_time[0]) / span
    return raw_time, time_unit


def _validate_dynamic_score_target(score_target: str) -> str:
    """Validate dynamic scoring target and normalize aliases."""
    target = str(score_target).strip().lower()
    aliases = {
        "baseline": "initial",
        "start": "initial",
        "end": "final",
        "average": "mean",
        "delta": "gain",
        "trend": "gain",
    }
    target = aliases.get(target, target)
    if target not in {"initial", "final", "mean", "gain"}:
        raise ValueError(
            "score_target must be one of "
            "{'initial', 'final', 'mean', 'gain'} "
            "(aliases: baseline, start, end, average, delta, trend)."
        )
    return target


def _score_dynamic_path(theta_path: np.ndarray, score_target: str) -> np.ndarray:
    """Convert a per-model ability trajectory into ranking scores."""
    target = _validate_dynamic_score_target(score_target)
    if target == "initial":
        return theta_path[:, 0]
    if target == "final":
        return theta_path[:, -1]
    if target == "mean":
        return theta_path.mean(axis=1)
    return theta_path[:, -1] - theta_path[:, 0]


[docs] def rasch( R: np.ndarray, method: RankMethod = "competition", return_scores: bool = False, max_iter: int = 500, return_item_params: bool = False, ) -> ( np.ndarray | tuple[np.ndarray, np.ndarray] | tuple[np.ndarray, np.ndarray, dict[str, np.ndarray]] ): """ Rank models with Rasch (1PL) IRT via joint MLE. Method context: Each model ``l`` has latent ability ``theta_l`` and each question ``m`` has difficulty ``b_m``. We estimate both by maximizing the binomial likelihood over per-question correct counts. Args: R: Binary outcome tensor with shape ``(L, M, N)`` or matrix ``(L, M)`` (treated as ``N=1``). method: Tie-handling rule passed to :func:`scorio.utils.rank_scores`. return_scores: If ``True``, return ``(ranking, scores)``. max_iter: Positive maximum number of L-BFGS iterations. return_item_params: If True, also returns estimated item parameters (difficulty). Implies returning scores. Returns: Ranking array of shape ``(L,)``. If ``return_scores=True``, also returns ability scores ``theta`` (shape ``(L,)``). If ``return_item_params=True``, also returns ``{"difficulty": b}`` (shape ``(M,)``). Notation: ``k_{lm} = sum_n R_{lmn}`` is the correct-count for model ``l`` and question ``m``. Formula: .. math:: k_{lm} \\sim \\mathrm{Binomial}\\left(N,\\sigma(\\theta_l-b_m)\\right) .. math:: b \\leftarrow b - \\frac{1}{M}\\sum_m b_m References: Rasch, G. (1960). Probabilistic Models for Some Intelligence and Attainment Tests. Examples: >>> import numpy as np >>> from scorio import rank >>> R = np.array([ ... [[1, 1], [1, 1]], ... [[0, 0], [0, 0]], ... ]) >>> ranks, scores = rank.rasch(R, return_scores=True) >>> ranks.tolist() [1, 2] """ max_iter = _validate_positive_int("max_iter", max_iter) k_correct, n_trials = _to_binomial_counts(R) theta, beta = _estimate_rasch_abilities(k_correct, n_trials, max_iter=max_iter) scores = theta ranking = rank_scores(scores)[method] if return_item_params: return ranking, scores, {"difficulty": beta} return (ranking, scores) if return_scores else ranking
[docs] def rasch_map( R: np.ndarray, prior: Prior | float = 1.0, method: RankMethod = "competition", return_scores: bool = False, max_iter: int = 500, return_item_params: bool = False, ) -> ( np.ndarray | tuple[np.ndarray, np.ndarray] | tuple[np.ndarray, np.ndarray, dict[str, np.ndarray]] ): """ Rank models with Rasch (1PL) IRT via MAP estimation. Method context: Same likelihood as :func:`rasch`, with an additional prior penalty on abilities ``theta`` for shrinkage and numerical stability. Args: R: Binary outcome tensor with shape ``(L, M, N)`` or matrix ``(L, M)`` (treated as ``N=1``). prior: Ability prior. A ``float`` is interpreted as Gaussian prior variance; otherwise must be a ``Prior`` instance. method: Tie-handling rule passed to :func:`scorio.utils.rank_scores`. return_scores: If ``True``, return ``(ranking, scores)``. max_iter: Positive maximum number of L-BFGS iterations. return_item_params: If True, also returns estimated item parameters. Implies returning scores. Returns: Ranking array of shape ``(L,)``. If ``return_scores=True``, also returns MAP ability scores ``theta``. If ``return_item_params=True``, also returns ``{"difficulty": b}``. Formula: .. math:: \\hat\\theta,\\hat b = \\arg\\min_{\\theta,b} \\left[ -\\sum_{l,m}\\log p(k_{lm}\\mid\\theta_l,b_m) + \\mathrm{penalty}(\\theta) \\right] References: Mislevy, R. J. (1986). Bayes modal estimation in item response models. Psychometrika. Examples: >>> import numpy as np >>> from scorio import rank >>> R = np.array([ ... [[1, 1], [1, 1]], ... [[0, 0], [0, 0]], ... ]) >>> rank.rasch_map(R, prior=1.0).tolist() [1, 2] """ max_iter = _validate_positive_int("max_iter", max_iter) k_correct, n_trials = _to_binomial_counts(R) prior = _coerce_ability_prior(prior) theta, beta = _estimate_rasch_abilities_map( k_correct, n_trials, prior, max_iter=max_iter ) scores = theta ranking = rank_scores(scores)[method] if return_item_params: return ranking, scores, {"difficulty": beta} return (ranking, scores) if return_scores else ranking
[docs] def rasch_2pl( R: np.ndarray, method: RankMethod = "competition", return_scores: bool = False, max_iter: int = 500, return_item_params: bool = False, reg_discrimination: float = 0.01, ) -> ( np.ndarray | tuple[np.ndarray, np.ndarray] | tuple[np.ndarray, np.ndarray, dict[str, np.ndarray]] ): """ Rank models with 2PL IRT via joint (optionally regularized) JMLE. Method context: Extends Rasch with item discrimination ``a_m > 0``, so items can differ in how strongly they separate abilities. By default, a small L2 penalty is applied on ``log(a)`` for numerical stability. Args: R: Binary outcome tensor with shape ``(L, M, N)`` or matrix ``(L, M)`` (treated as ``N=1``). method: Tie-handling rule passed to :func:`scorio.utils.rank_scores`. return_scores: If ``True``, return ``(ranking, scores)``. max_iter: Positive maximum number of L-BFGS iterations. return_item_params: If True, also returns estimated item parameters (difficulty and discrimination). Implies returning scores. reg_discrimination: Non-negative L2 penalty weight on ``log(a)``. Set to ``0.0`` for pure (unpenalized) JMLE. Returns: Ranking array of shape ``(L,)``. If ``return_scores=True``, also returns ability scores ``theta``. If ``return_item_params=True``, also returns ``{"difficulty": b, "discrimination": a}``. Formula: .. math:: k_{lm} \\sim \\mathrm{Binomial} \\left(N,\\sigma\\left(a_m(\\theta_l-b_m)\\right)\\right) References: Birnbaum, A. (1968). Some Latent Trait Models and Their Use in Inferring an Examinee's Ability. In Statistical Theories of Mental Test Scores. Examples: >>> import numpy as np >>> from scorio import rank >>> R = np.array([ ... [[1, 1], [1, 1]], ... [[0, 0], [0, 0]], ... ]) >>> rank.rasch_2pl(R).tolist() [1, 2] """ max_iter = _validate_positive_int("max_iter", max_iter) reg_discrimination = _validate_nonnegative_float( "reg_discrimination", reg_discrimination ) k_correct, n_trials = _to_binomial_counts(R) theta, beta, a = _estimate_2pl_abilities( k_correct, n_trials, max_iter=max_iter, reg_discrimination=reg_discrimination, ) scores = theta ranking = rank_scores(scores)[method] if return_item_params: return ranking, scores, {"difficulty": beta, "discrimination": a} return (ranking, scores) if return_scores else ranking
[docs] def rasch_2pl_map( R: np.ndarray, prior: Prior | float = 1.0, method: RankMethod = "competition", return_scores: bool = False, max_iter: int = 500, return_item_params: bool = False, reg_discrimination: float = 0.01, ) -> ( np.ndarray | tuple[np.ndarray, np.ndarray] | tuple[np.ndarray, np.ndarray, dict[str, np.ndarray]] ): """ Rank models with 2PL IRT via MAP estimation. Method context: Same 2PL likelihood as :func:`rasch_2pl`, with a prior penalty on model abilities ``theta`` and an optional L2 penalty on ``log(a)``. Args: R: Binary outcome tensor with shape ``(L, M, N)`` or matrix ``(L, M)`` (treated as ``N=1``). prior: Ability prior. A ``float`` is interpreted as Gaussian prior variance; otherwise must be a ``Prior`` instance. method: Tie-handling rule passed to :func:`scorio.utils.rank_scores`. return_scores: If ``True``, return ``(ranking, scores)``. max_iter: Positive maximum number of L-BFGS iterations. return_item_params: If True, also returns estimated item parameters. Implies returning scores. reg_discrimination: Non-negative L2 penalty weight on ``log(a)``. Set to ``0.0`` to remove item-discrimination regularization. Returns: Ranking array of shape ``(L,)``. If ``return_scores=True``, also returns MAP ability scores ``theta``. If ``return_item_params=True``, also returns ``{"difficulty": b, "discrimination": a}``. Examples: >>> import numpy as np >>> from scorio import rank >>> R = np.array([ ... [[1, 1], [1, 1]], ... [[0, 0], [0, 0]], ... ]) >>> rank.rasch_2pl_map(R, prior=1.0).tolist() [1, 2] """ max_iter = _validate_positive_int("max_iter", max_iter) reg_discrimination = _validate_nonnegative_float( "reg_discrimination", reg_discrimination ) k_correct, n_trials = _to_binomial_counts(R) prior = _coerce_ability_prior(prior) theta, beta, a = _estimate_2pl_abilities_map( k_correct, n_trials, prior, max_iter=max_iter, reg_discrimination=reg_discrimination, ) scores = theta ranking = rank_scores(scores)[method] if return_item_params: return ranking, scores, {"difficulty": beta, "discrimination": a} return (ranking, scores) if return_scores else ranking
[docs] def dynamic_irt( R: np.ndarray, variant: DynamicIrtVariant = "linear", method: RankMethod = "competition", return_scores: bool = False, max_iter: int = 500, return_item_params: bool = False, time_points: np.ndarray | None = None, score_target: DynamicScoreTargetInput = "final", slope_reg: float = 0.01, state_reg: float = 1.0, assume_time_axis: bool = False, ) -> ( np.ndarray | tuple[np.ndarray, np.ndarray] | tuple[np.ndarray, np.ndarray, dict[str, np.ndarray]] ): """ Rank models with dynamic (longitudinal) IRT variants. Method context: ``variant="linear"`` is a static Rasch baseline over aggregated counts. ``variant="growth"`` fits a longitudinal logistic growth model with per-model baseline ``theta0_l`` and slope ``theta1_l``: .. math:: \\theta_{ln}=\\theta_{0,l}+\\theta_{1,l}t_n ``variant="state_space"`` fits a dynamic Rasch trajectory :math:`\\theta_{ln}` with random-walk smoothness regularization: .. math:: P(R_{lmn}=1)=\\sigma\\left(\\theta_{ln}-b_m\\right) .. math:: \\mathrm{penalty}=\\lambda\\sum_{l,n>0} \\frac{\\left(\\theta_{ln}-\\theta_{l,n-1}\\right)^2}{t_n-t_{n-1}} Args: R: Binary outcome tensor with shape ``(L, M, N)`` or matrix ``(L, M)`` (treated as ``N=1``). variant: ``"linear"``, ``"growth"``, or ``"state_space"``. method: Tie-handling rule passed to :func:`scorio.utils.rank_scores`. return_scores: If ``True``, return ``(ranking, scores)``. max_iter: Positive maximum number of L-BFGS iterations. return_item_params: If True, also returns estimated item parameters. Implies returning scores. time_points: Optional ordered measurement times of length ``N``. If ``None``, uses equally spaced times in ``[0, 1]``. Used only for longitudinal variants. score_target: Longitudinal score extracted from ability paths for ranking in growth and state-space variants. One of ``{"initial", "final", "mean", "gain"}``. slope_reg: Non-negative L2 regularization weight on growth slopes. Used only for ``variant="growth"``. state_reg: Non-negative random-walk smoothness penalty in ``variant="state_space"``. assume_time_axis: Safety switch for longitudinal variants. Set ``True`` to acknowledge that axis-2 of ``R`` is ordered time, not i.i.d. sampling trials. Returns: Ranking array of shape ``(L,)``. If ``return_scores=True``, also returns scores: ``theta`` for ``linear`` and dynamic target scores for ``growth``/``state_space``. If ``return_item_params=True``, also returns ``{"difficulty": b}`` (linear), or for longitudinal variants: ``{"difficulty": b, "ability_path": theta_path, ...}``. Formula: .. math:: P(R_{lmn}=1) = \\sigma\\left(\\theta_{0,l} + \\theta_{1,l} t_n - b_m\\right) References: Verhelst, N. D., & Glas, C. A. (1993). A dynamic generalization of the Rasch model. Psychometrika. Wang, C., & Nydick, S. W. (2020). On Longitudinal Item Response Theory Models: A Didactic. Journal of Educational and Behavioral Statistics. Examples: >>> import numpy as np >>> from scorio import rank >>> R = np.array([ ... [[1, 1, 1], [1, 1, 1]], ... [[0, 0, 0], [0, 0, 0]], ... ]) >>> rank.dynamic_irt(R, variant="linear").tolist() [1, 2] """ max_iter = _validate_positive_int("max_iter", max_iter) variant_name = str(variant).strip().lower() R = validate_input(R) k_correct = R.sum(axis=2, dtype=float) n_trials = int(R.shape[2]) score_target_name = _validate_dynamic_score_target(score_target) slope_reg = _validate_nonnegative_float("slope_reg", slope_reg) state_reg = _validate_nonnegative_float("state_reg", state_reg) if variant_name == "linear": if score_target_name != "final": raise ValueError( "score_target is only used for longitudinal variants " "('growth' and 'state_space')." ) theta, beta = _estimate_rasch_abilities(k_correct, n_trials, max_iter=max_iter) scores = theta elif variant_name == "growth": if not assume_time_axis: raise ValueError( "variant='growth' interprets axis-2 as ordered longitudinal time. " "Set assume_time_axis=True to proceed." ) raw_time, time_unit = _validate_time_points(time_points, n_trials) theta0, theta1, beta = _estimate_growth_model_abilities( R, time_unit, max_iter=max_iter, slope_reg=slope_reg, ) theta_path = theta0[:, None] + theta1[:, None] * time_unit[None, :] scores = _score_dynamic_path(theta_path, score_target_name) elif variant_name == "state_space": if not assume_time_axis: raise ValueError( "variant='state_space' interprets axis-2 as ordered longitudinal " "time. Set assume_time_axis=True to proceed." ) raw_time, time_unit = _validate_time_points(time_points, n_trials) theta_path, beta = _estimate_state_space_abilities( R, time_unit, max_iter=max_iter, state_reg=state_reg, ) scores = _score_dynamic_path(theta_path, score_target_name) else: raise ValueError( f"Unknown variant: {variant_name}. " "Use 'linear', 'growth', or 'state_space'." ) ranking = rank_scores(scores)[method] if return_item_params: if variant_name == "linear": return ranking, scores, {"difficulty": beta} if variant_name == "growth": return ( ranking, scores, { "difficulty": beta, "baseline": theta0, "slope": theta1, "ability_path": theta_path, "time_points": raw_time, }, ) return ( ranking, scores, { "difficulty": beta, "ability_path": theta_path, "time_points": raw_time, "gain": theta_path[:, -1] - theta_path[:, 0], }, ) return (ranking, scores) if return_scores else ranking
def _estimate_rasch_abilities( k_correct: np.ndarray, n_trials: int, max_iter: int = 500 ) -> tuple[np.ndarray, np.ndarray]: """ Estimate Rasch abilities via JMLE. Args: k_correct: Shape (L, M) with counts in [0, n_trials]. n_trials: Number of trials per (model, item). """ L, M = k_correct.shape def negative_log_likelihood(params): theta = params[:L] beta = params[L:] beta = beta - beta.mean() # Identifiability constraint # P(correct) = sigmoid(theta - beta) diff = theta[:, None] - beta[None, :] # (L, M) prob = sigmoid(diff) prob = np.clip(prob, 1e-10, 1 - 1e-10) nll = -np.sum( k_correct * np.log(prob) + (n_trials - k_correct) * np.log(1 - prob) ) return nll # Initialize from observed proportions p_lm = np.clip((k_correct + 0.5) / (n_trials + 1.0), 1e-6, 1 - 1e-6) model_scores = p_lm.mean(axis=1) question_difficulty = p_lm.mean(axis=0) theta_init = np.log(model_scores / (1 - model_scores)) beta_init = -np.log(question_difficulty / (1 - question_difficulty)) params_init = np.concatenate([theta_init, beta_init]) result = minimize( negative_log_likelihood, params_init, method="L-BFGS-B", options={"maxiter": max_iter}, ) theta = result.x[:L] beta = result.x[L:] beta = beta - beta.mean() return theta, beta def _estimate_rasch_abilities_map( k_correct: np.ndarray, n_trials: int, prior: Prior, max_iter: int = 500 ) -> tuple[np.ndarray, np.ndarray]: """ Estimate Rasch abilities via MAP with configurable prior on abilities. Args: k_correct: Shape (L, M) with counts in [0, n_trials]. n_trials: Number of trials per (model, item). prior: Prior distribution on ability parameters. max_iter: Maximum optimization iterations. """ L, M = k_correct.shape def negative_log_posterior(params): theta = params[:L] beta = params[L:] beta = beta - beta.mean() # Identifiability constraint # P(correct) = sigmoid(theta - beta) diff = theta[:, None] - beta[None, :] # (L, M) prob = sigmoid(diff) prob = np.clip(prob, 1e-10, 1 - 1e-10) # Negative log-likelihood nll = -np.sum( k_correct * np.log(prob) + (n_trials - k_correct) * np.log(1 - prob) ) # Prior penalty on abilities (negative log-prior) prior_penalty = prior.penalty(theta) return nll + prior_penalty # Initialize from observed proportions p_lm = np.clip((k_correct + 0.5) / (n_trials + 1.0), 1e-6, 1 - 1e-6) model_scores = p_lm.mean(axis=1) question_difficulty = p_lm.mean(axis=0) theta_init = np.log(model_scores / (1 - model_scores)) beta_init = -np.log(question_difficulty / (1 - question_difficulty)) params_init = np.concatenate([theta_init, beta_init]) result = minimize( negative_log_posterior, params_init, method="L-BFGS-B", options={"maxiter": max_iter}, ) theta = result.x[:L] beta = result.x[L:] beta = beta - beta.mean() return theta, beta def _estimate_2pl_abilities( k_correct: np.ndarray, n_trials: int, max_iter: int = 500, reg_discrimination: float = 0.01, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Estimate 2PL abilities via JMLE. """ L, M = k_correct.shape def negative_log_likelihood(params): theta = params[:L] beta = params[L : L + M] log_a = params[L + M :] beta = beta - beta.mean() a = np.exp(np.clip(log_a, -3, 3)) # Discrimination in reasonable range # P(correct) = sigmoid(a * (theta - beta)) diff = theta[:, None] - beta[None, :] logit = a[None, :] * diff prob = sigmoid(logit) prob = np.clip(prob, 1e-10, 1 - 1e-10) nll = -np.sum( k_correct * np.log(prob) + (n_trials - k_correct) * np.log(1 - prob) ) # Optional regularization on discrimination. nll += reg_discrimination * np.sum(log_a**2) return nll # Initialize p_lm = np.clip((k_correct + 0.5) / (n_trials + 1.0), 1e-6, 1 - 1e-6) model_scores = p_lm.mean(axis=1) question_difficulty = p_lm.mean(axis=0) theta_init = np.log(model_scores / (1 - model_scores)) beta_init = -np.log(question_difficulty / (1 - question_difficulty)) log_a_init = np.zeros(M) # Start with discrimination = 1 params_init = np.concatenate([theta_init, beta_init, log_a_init]) result = minimize( negative_log_likelihood, params_init, method="L-BFGS-B", options={"maxiter": max_iter}, ) theta = result.x[:L] beta = result.x[L : L + M] beta = beta - beta.mean() a = np.exp(np.clip(result.x[L + M :], -3, 3)) return theta, beta, a def _estimate_2pl_abilities_map( k_correct: np.ndarray, n_trials: int, prior: Prior, max_iter: int = 500, reg_discrimination: float = 0.01, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Estimate 2PL abilities via MAP with configurable prior on abilities. Args: k_correct: Shape (L, M) with counts in [0, n_trials]. n_trials: Number of trials per (model, item). prior: Prior distribution on ability parameters. max_iter: Maximum optimization iterations. """ L, M = k_correct.shape def negative_log_posterior(params): theta = params[:L] beta = params[L : L + M] log_a = params[L + M :] beta = beta - beta.mean() a = np.exp(np.clip(log_a, -3, 3)) # Discrimination in reasonable range # P(correct) = sigmoid(a * (theta - beta)) diff = theta[:, None] - beta[None, :] logit = a[None, :] * diff prob = sigmoid(logit) prob = np.clip(prob, 1e-10, 1 - 1e-10) # Negative log-likelihood nll = -np.sum( k_correct * np.log(prob) + (n_trials - k_correct) * np.log(1 - prob) ) # Optional regularization on discrimination. nll += reg_discrimination * np.sum(log_a**2) # Prior penalty on abilities (negative log-prior) prior_penalty = prior.penalty(theta) return nll + prior_penalty # Initialize p_lm = np.clip((k_correct + 0.5) / (n_trials + 1.0), 1e-6, 1 - 1e-6) model_scores = p_lm.mean(axis=1) question_difficulty = p_lm.mean(axis=0) theta_init = np.log(model_scores / (1 - model_scores)) beta_init = -np.log(question_difficulty / (1 - question_difficulty)) log_a_init = np.zeros(M) # Start with discrimination = 1 params_init = np.concatenate([theta_init, beta_init, log_a_init]) result = minimize( negative_log_posterior, params_init, method="L-BFGS-B", options={"maxiter": max_iter}, ) theta = result.x[:L] beta = result.x[L : L + M] beta = beta - beta.mean() a = np.exp(np.clip(result.x[L + M :], -3, 3)) return theta, beta, a def _estimate_growth_model_abilities( R: np.ndarray, time_unit: np.ndarray, max_iter: int = 500, slope_reg: float = 0.01, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Estimate a longitudinal Rasch (1PL) model with per-model growth. We fit the logistic growth model: P(R[l,m,n]=1) = σ(θ0_l + θ1_l * t_n - b_m) where: - θ0_l is the baseline ability (trial n=0), - θ1_l is a per-model trend over trials, - b_m is item difficulty. This is a more faithful longitudinal IRT formulation than regressing mean accuracy over trials, because it: - respects the Bernoulli likelihood, - retains item difficulties, - keeps probabilities in (0, 1) via the logistic link. Args: R: Binary tensor of shape (L, M, N). time_unit: Normalized time points in ``[0, 1]`` with shape ``(N,)``. max_iter: Maximum iterations for optimization. slope_reg: Non-negative L2 penalty on growth slopes. Returns: Tuple of: - theta0: (L,) baseline abilities - theta1: (L,) per-model slopes over trials - beta: (M,) item difficulties (mean-centered) """ R = validate_input(R) L, M, N = R.shape time_unit = np.asarray(time_unit, dtype=float) if time_unit.shape != (N,): raise ValueError("time_unit must have shape (N,) where N = R.shape[2].") if N < 2: k_correct = R.sum(axis=2, dtype=float) theta0, beta = _estimate_rasch_abilities( k_correct, n_trials=int(N), max_iter=max_iter ) theta1 = np.zeros(L, dtype=float) return theta0, theta1, beta # Init: baseline from trial 0, difficulty from global solve rates. p0 = np.clip(R[:, :, 0].mean(axis=1), 1e-6, 1 - 1e-6) theta0_init = np.log(p0 / (1 - p0)) theta1_init = np.zeros(L, dtype=float) p_m = np.clip(R.mean(axis=(0, 2)), 1e-6, 1 - 1e-6) beta_init = -np.log(p_m / (1 - p_m)) params_init = np.concatenate([theta0_init, theta1_init, beta_init]) R_float = R.astype(float, copy=False) def negative_log_likelihood(params: np.ndarray) -> float: theta0 = params[:L] theta1 = params[L : 2 * L] beta = params[2 * L :] beta = beta - beta.mean() # Identifiability constraint diff = ( theta0[:, None, None] + theta1[:, None, None] * time_unit[None, None, :] - beta[None, :, None] ) prob = sigmoid(diff) prob = np.clip(prob, 1e-10, 1 - 1e-10) nll = -np.sum(R_float * np.log(prob) + (1 - R_float) * np.log(1 - prob)) # Weak Gaussian prior on slopes for stable longitudinal estimation. nll += slope_reg * np.sum(theta1**2) return float(nll) result = minimize( negative_log_likelihood, params_init, method="L-BFGS-B", options={"maxiter": max_iter}, ) theta0 = result.x[:L] theta1 = result.x[L : 2 * L] beta = result.x[2 * L :] beta = beta - beta.mean() return theta0, theta1, beta def _estimate_state_space_abilities( R: np.ndarray, time_unit: np.ndarray, max_iter: int = 500, state_reg: float = 1.0, ) -> tuple[np.ndarray, np.ndarray]: """ Estimate dynamic Rasch abilities with per-model random-walk trajectories. We fit: P(R[l,m,n]=1) = σ(θ[l,n] - b[m]) with a quadratic smoothness penalty on first differences of θ over time. On an irregular time grid, we scale by the time step so the penalty is comparable across different spacings. Args: R: Binary tensor of shape (L, M, N). time_unit: Normalized time points in ``[0, 1]`` with shape ``(N,)``. max_iter: Maximum iterations for optimization. state_reg: Non-negative smoothness penalty on temporal differences. """ R = validate_input(R) L, M, N = R.shape time_unit = np.asarray(time_unit, dtype=float) if time_unit.shape != (N,): raise ValueError("time_unit must have shape (N,) where N = R.shape[2].") if N < 2: k_correct = R.sum(axis=2, dtype=float) theta, beta = _estimate_rasch_abilities( k_correct, n_trials=int(N), max_iter=max_iter ) return theta[:, None], beta # Initialize theta path from per-time observed solve rates. p_ln = np.clip(R.mean(axis=1), 1e-6, 1 - 1e-6) # (L, N) theta_init = np.log(p_ln / (1 - p_ln)) p_m = np.clip(R.mean(axis=(0, 2)), 1e-6, 1 - 1e-6) beta_init = -np.log(p_m / (1 - p_m)) params_init = np.concatenate([theta_init.ravel(), beta_init]) R_float = R.astype(float, copy=False) dt = np.diff(time_unit) def negative_log_posterior(params: np.ndarray) -> float: theta = params[: L * N].reshape(L, N) beta = params[L * N :] beta = beta - beta.mean() diff = theta[:, None, :] - beta[None, :, None] prob = sigmoid(diff) prob = np.clip(prob, 1e-10, 1 - 1e-10) nll = -np.sum(R_float * np.log(prob) + (1 - R_float) * np.log(1 - prob)) # Random-walk (Brownian-motion) smoothness over irregular or regular grids: # penalize squared increments scaled by the time step. step = (theta[:, 1:] - theta[:, :-1]) / np.sqrt(dt)[None, :] nll += state_reg * np.sum(step**2) # Weak anchoring for identifiability and numerical stability. nll += 1e-3 * np.sum(theta[:, 0] ** 2) return float(nll) result = minimize( negative_log_posterior, params_init, method="L-BFGS-B", options={"maxiter": max_iter}, ) theta_path = result.x[: L * N].reshape(L, N) beta = result.x[L * N :] beta = beta - beta.mean() return theta_path, beta
[docs] def rasch_3pl( R: np.ndarray, method: RankMethod = "competition", return_scores: bool = False, max_iter: int = 500, fix_guessing: float | None = None, return_item_params: bool = False, reg_discrimination: float = 0.01, reg_guessing: float = 0.1, guessing_upper: float = 0.5, ) -> ( np.ndarray | tuple[np.ndarray, np.ndarray] | tuple[np.ndarray, np.ndarray, dict[str, np.ndarray]] ): """ Rank models with 3PL IRT via joint (optionally regularized) JMLE. Method context: Extends 2PL with item-specific pseudo-guessing ``c_m``. Estimated guessing is constrained to ``[0, guessing_upper]``; optionally a fixed value can be used. By default, small L2 penalties are applied on ``log(a)`` and guessing logits for numerical stability. Args: R: Binary outcome tensor with shape ``(L, M, N)`` or matrix ``(L, M)`` (treated as ``N=1``). method: Tie-handling rule passed to :func:`scorio.utils.rank_scores`. return_scores: If ``True``, return ``(ranking, scores)``. max_iter: Positive maximum number of L-BFGS iterations. fix_guessing: If provided, fixes the guessing parameter to this value for all questions; must lie in ``[0, guessing_upper]``. return_item_params: If True, also returns estimated item parameters. Implies returning scores. reg_discrimination: Non-negative L2 penalty weight on ``log(a)``. Set to ``0.0`` for pure (unpenalized) JMLE. reg_guessing: Non-negative L2 penalty weight on guessing logits. Set to ``0.0`` for pure (unpenalized) JMLE. guessing_upper: Upper bound for item guessing ``c_m``. Must be in ``(0, 1)``. Default ``0.5`` is suitable for binary outcomes. Returns: Ranking array of shape ``(L,)``. If ``return_scores=True``, also returns ability scores ``theta``. If ``return_item_params=True``, also returns ``{"difficulty": b, "discrimination": a, "guessing": c}``. Formula: .. math:: p_{lm} = c_m + (1-c_m)\\sigma\\left(a_m(\\theta_l-b_m)\\right) References: Lord, F. M. (1980). Applications of Item Response Theory to Practical Testing Problems. Routledge. Birnbaum, A. (1968). Some Latent Trait Models and Their Use in Inferring an Examinee's Ability. In Statistical Theories of Mental Test Scores. Examples: >>> import numpy as np >>> from scorio import rank >>> R = np.array([ ... [[1, 1], [1, 1]], ... [[0, 0], [0, 0]], ... ]) >>> rank.rasch_3pl(R, fix_guessing=0.25).tolist() [1, 2] """ max_iter = _validate_positive_int("max_iter", max_iter) reg_discrimination = _validate_nonnegative_float( "reg_discrimination", reg_discrimination ) reg_guessing = _validate_nonnegative_float("reg_guessing", reg_guessing) guessing_upper = _validate_guessing_upper(guessing_upper) fix_guessing = _validate_fix_guessing(fix_guessing, guessing_upper) k_correct, n_trials = _to_binomial_counts(R) theta, beta, a, c = _estimate_3pl_abilities( k_correct, n_trials, max_iter=max_iter, fix_guessing=fix_guessing, reg_discrimination=reg_discrimination, reg_guessing=reg_guessing, guessing_upper=guessing_upper, ) scores = theta ranking = rank_scores(scores)[method] if return_item_params: return ( ranking, scores, {"difficulty": beta, "discrimination": a, "guessing": c}, ) return (ranking, scores) if return_scores else ranking
[docs] def rasch_3pl_map( R: np.ndarray, prior: Prior | float = 1.0, method: RankMethod = "competition", return_scores: bool = False, max_iter: int = 500, fix_guessing: float | None = None, return_item_params: bool = False, reg_discrimination: float = 0.01, reg_guessing: float = 0.1, guessing_upper: float = 0.5, ) -> ( np.ndarray | tuple[np.ndarray, np.ndarray] | tuple[np.ndarray, np.ndarray, dict[str, np.ndarray]] ): """ Rank models with 3PL IRT via MAP estimation. Method context: Same 3PL likelihood as :func:`rasch_3pl`, with prior regularization on model abilities ``theta`` and optional L2 regularization on item parameters. Args: R: Binary outcome tensor with shape ``(L, M, N)`` or matrix ``(L, M)`` (treated as ``N=1``). prior: Ability prior. A ``float`` is interpreted as Gaussian prior variance; otherwise must be a ``Prior`` instance. method: Tie-handling rule passed to :func:`scorio.utils.rank_scores`. return_scores: If ``True``, return ``(ranking, scores)``. max_iter: Positive maximum number of L-BFGS iterations. fix_guessing: Optional fixed guessing parameter in ``[0, guessing_upper]``. return_item_params: If ``True``, also return item parameters. reg_discrimination: Non-negative L2 penalty weight on ``log(a)``. reg_guessing: Non-negative L2 penalty weight on guessing logits. guessing_upper: Upper bound for item guessing ``c_m`` in ``(0, 1)``. Default is ``0.5`` for binary outcomes. Returns: Ranking array of shape ``(L,)``. If ``return_scores=True``, also returns MAP ability scores ``theta``. If ``return_item_params=True``, also returns ``{"difficulty": b, "discrimination": a, "guessing": c}``. """ max_iter = _validate_positive_int("max_iter", max_iter) reg_discrimination = _validate_nonnegative_float( "reg_discrimination", reg_discrimination ) reg_guessing = _validate_nonnegative_float("reg_guessing", reg_guessing) guessing_upper = _validate_guessing_upper(guessing_upper) fix_guessing = _validate_fix_guessing(fix_guessing, guessing_upper) k_correct, n_trials = _to_binomial_counts(R) prior = _coerce_ability_prior(prior) theta, beta, a, c = _estimate_3pl_abilities_map( k_correct, n_trials, prior, max_iter=max_iter, fix_guessing=fix_guessing, reg_discrimination=reg_discrimination, reg_guessing=reg_guessing, guessing_upper=guessing_upper, ) scores = theta ranking = rank_scores(scores)[method] if return_item_params: return ( ranking, scores, {"difficulty": beta, "discrimination": a, "guessing": c}, ) return (ranking, scores) if return_scores else ranking
def _estimate_3pl_abilities( k_correct: np.ndarray, n_trials: int, max_iter: int = 500, fix_guessing: float | None = None, reg_discrimination: float = 0.01, reg_guessing: float = 0.1, guessing_upper: float = 0.5, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Estimate 3PL abilities via JMLE. Args: k_correct: Shape (L, M) with counts in [0, n_trials]. n_trials: Number of trials per (model, item). max_iter: Maximum iterations for optimization. fix_guessing: If provided, use fixed guessing parameter for all items. reg_discrimination: L2 penalty weight on discrimination logits. reg_guessing: L2 penalty weight on guessing logits. guessing_upper: Upper bound for estimated guessing parameters. """ L, M = k_correct.shape def negative_log_likelihood(params): theta = params[:L] beta = params[L : L + M] log_a = params[L + M : L + 2 * M] if fix_guessing is None: # Estimate guessing parameters using a bounded logit transform. logit_c = params[L + 2 * M :] c = guessing_upper * sigmoid(logit_c) else: c = np.full(M, fix_guessing) beta = beta - beta.mean() # Identifiability a = np.exp(np.clip(log_a, -3, 3)) # P(correct) = c + (1-c) * sigmoid(a * (theta - beta)) diff = theta[:, None] - beta[None, :] # (L, M) logit = a[None, :] * diff base_prob = sigmoid(logit) prob = c[None, :] + (1 - c[None, :]) * base_prob prob = np.clip(prob, 1e-10, 1 - 1e-10) nll = -np.sum( k_correct * np.log(prob) + (n_trials - k_correct) * np.log(1 - prob) ) # Optional regularization nll += reg_discrimination * np.sum(log_a**2) if fix_guessing is None: nll += reg_guessing * np.sum(logit_c**2) return nll # Initialize p_lm = np.clip((k_correct + 0.5) / (n_trials + 1.0), 1e-6, 1 - 1e-6) model_scores = p_lm.mean(axis=1) question_difficulty = p_lm.mean(axis=0) theta_init = np.log(model_scores / (1 - model_scores)) beta_init = -np.log(question_difficulty / (1 - question_difficulty)) log_a_init = np.zeros(M) if fix_guessing is None: # Initialize guessing at midpoint of [0, guessing_upper]. logit_c_init = np.zeros(M) # sigmoid(0) * guessing_upper params_init = np.concatenate([theta_init, beta_init, log_a_init, logit_c_init]) else: params_init = np.concatenate([theta_init, beta_init, log_a_init]) result = minimize( negative_log_likelihood, params_init, method="L-BFGS-B", options={"maxiter": max_iter}, ) theta = result.x[:L] beta = result.x[L : L + M] beta = beta - beta.mean() log_a = result.x[L + M : L + 2 * M] a = np.exp(np.clip(log_a, -3, 3)) if fix_guessing is None: logit_c = result.x[L + 2 * M :] c = guessing_upper * sigmoid(logit_c) else: c = np.full(M, float(fix_guessing)) return theta, beta, a, c def _estimate_3pl_abilities_map( k_correct: np.ndarray, n_trials: int, prior: Prior, max_iter: int = 500, fix_guessing: float | None = None, reg_discrimination: float = 0.01, reg_guessing: float = 0.1, guessing_upper: float = 0.5, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Estimate 3PL abilities via MAP with configurable prior on abilities. Notes: - The 3PL model is often weakly identified without priors; we regularize: (i) θ via `prior.penalty(theta)` (ii) log a and (optionally) logit c via small quadratic penalties (interpretable as weak Gaussian priors) """ L, M = k_correct.shape def negative_log_posterior(params): theta = params[:L] beta = params[L : L + M] log_a = params[L + M : L + 2 * M] if fix_guessing is None: logit_c = params[L + 2 * M :] c = guessing_upper * sigmoid(logit_c) else: c = np.full(M, float(fix_guessing)) beta = beta - beta.mean() a = np.exp(np.clip(log_a, -3, 3)) diff = theta[:, None] - beta[None, :] logit = a[None, :] * diff base_prob = sigmoid(logit) prob = c[None, :] + (1 - c[None, :]) * base_prob prob = np.clip(prob, 1e-10, 1 - 1e-10) nll = -np.sum( k_correct * np.log(prob) + (n_trials - k_correct) * np.log(1 - prob) ) # Priors and regularization. nll += prior.penalty(theta) nll += reg_discrimination * np.sum(log_a**2) if fix_guessing is None: nll += reg_guessing * np.sum(logit_c**2) return float(nll) # Initialize p_lm = np.clip((k_correct + 0.5) / (n_trials + 1.0), 1e-6, 1 - 1e-6) model_scores = p_lm.mean(axis=1) question_difficulty = p_lm.mean(axis=0) theta_init = np.log(model_scores / (1 - model_scores)) beta_init = -np.log(question_difficulty / (1 - question_difficulty)) log_a_init = np.zeros(M) if fix_guessing is None: logit_c_init = np.zeros(M) # => c ≈ guessing_upper / 2 params_init = np.concatenate([theta_init, beta_init, log_a_init, logit_c_init]) else: params_init = np.concatenate([theta_init, beta_init, log_a_init]) result = minimize( negative_log_posterior, params_init, method="L-BFGS-B", options={"maxiter": max_iter}, ) theta = result.x[:L] beta = result.x[L : L + M] beta = beta - beta.mean() log_a = result.x[L + M : L + 2 * M] a = np.exp(np.clip(log_a, -3, 3)) if fix_guessing is None: logit_c = result.x[L + 2 * M :] c = guessing_upper * sigmoid(logit_c) else: c = np.full(M, float(fix_guessing)) return theta, beta, a, c
[docs] def rasch_mml( R: np.ndarray, method: RankMethod = "competition", return_scores: bool = False, max_iter: int = 100, em_iter: int = 20, n_quadrature: int = 21, return_item_params: bool = False, ) -> ( np.ndarray | tuple[np.ndarray, np.ndarray] | tuple[np.ndarray, np.ndarray, dict[str, np.ndarray]] ): """ Rank models with Rasch MML (EM + quadrature) and EAP scoring. Method context: Integrates out abilities under a population prior (standard normal), estimates item difficulties by EM, then computes expected-a-posteriori (EAP) model abilities. Args: R: Binary outcome tensor with shape ``(L, M, N)`` or matrix ``(L, M)`` (treated as ``N=1``). method: Tie-handling rule passed to :func:`scorio.utils.rank_scores`. return_scores: If ``True``, return ``(ranking, scores)``. max_iter: Positive max optimizer iterations in each M-step item update. em_iter: Positive number of EM iterations. n_quadrature: Number of Gauss-Hermite nodes (integer ``>=2``). return_item_params: If True, also returns estimated item parameters. Implies returning scores. Returns: Ranking array of shape ``(L,)``. If ``return_scores=True``, also returns EAP ability scores. If ``return_item_params=True``, also returns ``{"difficulty": b, "ability_sd": sd(theta|R)}``. Formula: .. math:: \\hat\\theta_l^{\\mathrm{EAP}} = \\sum_q w_{lq}\\,\\theta_q, \\quad w_{lq} \\propto p(k_l\\mid\\theta_q,b)\\,w_q References: Bock, R. D., & Aitkin, M. (1981). Marginal maximum likelihood estimation of item parameters: Application of an EM algorithm. Psychometrika, 46(4), 443-459. Mislevy, R. J. (1986). Bayes modal estimation in item response models. Psychometrika, 51(2), 177-195. Examples: >>> import numpy as np >>> from scorio import rank >>> R = np.array([ ... [[1, 1], [1, 1]], ... [[0, 0], [0, 0]], ... ]) >>> rank.rasch_mml(R).tolist() [1, 2] """ max_iter = _validate_positive_int("max_iter", max_iter) em_iter = _validate_positive_int("em_iter", em_iter) n_quadrature = _validate_positive_int("n_quadrature", n_quadrature, min_value=2) k_correct, n_trials = _to_binomial_counts(R) theta, beta, posterior, theta_q = _estimate_rasch_mml( k_correct, n_trials, max_iter=max_iter, em_iter=em_iter, n_quadrature=n_quadrature, ) scores = theta ranking = rank_scores(scores)[method] if return_item_params: theta_sd = _posterior_sd(posterior, theta_q) return ranking, scores, {"difficulty": beta, "ability_sd": theta_sd} return (ranking, scores) if return_scores else ranking
[docs] def rasch_mml_credible( R: np.ndarray, quantile: float = 0.05, method: RankMethod = "competition", return_scores: bool = False, max_iter: int = 100, em_iter: int = 20, n_quadrature: int = 21, ) -> RankResult: """ Rank models by a posterior quantile under Rasch MML. Method context: Uses the discrete posterior from :func:`rasch_mml` and ranks by posterior quantile ``Q_q(theta_l | R)``. Lower quantiles provide conservative, uncertainty-aware ordering. Args: R: Binary outcome tensor with shape ``(L, M, N)`` or matrix ``(L, M)`` (treated as ``N=1``). quantile: Posterior quantile ``q`` in ``(0, 1)``. method: Tie-handling rule passed to :func:`scorio.utils.rank_scores`. return_scores: If ``True``, return ``(ranking, scores)``. max_iter: Positive max optimizer iterations in each M-step item update. em_iter: Positive number of EM iterations. n_quadrature: Number of Gauss-Hermite nodes (integer ``>=2``). Returns: Ranking array of shape ``(L,)``. If ``return_scores=True``, also returns posterior-quantile scores. Formula: .. math:: s_l = Q_q(\\theta_l\\mid R) """ if not (0.0 < quantile < 1.0): raise ValueError("quantile must be in (0, 1)") max_iter = _validate_positive_int("max_iter", max_iter) em_iter = _validate_positive_int("em_iter", em_iter) n_quadrature = _validate_positive_int("n_quadrature", n_quadrature, min_value=2) k_correct, n_trials = _to_binomial_counts(R) _, beta, posterior, theta_q = _estimate_rasch_mml( k_correct, n_trials, max_iter=max_iter, em_iter=em_iter, n_quadrature=n_quadrature, ) scores = _posterior_quantile(posterior, theta_q, quantile) ranking = rank_scores(scores)[method] return (ranking, scores) if return_scores else ranking
def _posterior_sd(posterior: np.ndarray, theta_q: np.ndarray) -> np.ndarray: """ Posterior SD for each row of a discrete posterior over theta_q. """ posterior = np.asarray(posterior, dtype=float) theta_q = np.asarray(theta_q, dtype=float) mean = posterior @ theta_q second = posterior @ (theta_q**2) var = np.maximum(second - mean**2, 0.0) return np.sqrt(var) def _posterior_quantile( posterior: np.ndarray, theta_q: np.ndarray, q: float ) -> np.ndarray: """ Posterior quantile for each row of a discrete posterior over theta_q. """ if not (0.0 < q < 1.0): raise ValueError("q must be in (0, 1)") posterior = np.asarray(posterior, dtype=float) theta_q = np.asarray(theta_q, dtype=float) order = np.argsort(theta_q) theta_sorted = theta_q[order] post_sorted = posterior[:, order] cdf = np.cumsum(post_sorted, axis=1) out = np.empty(posterior.shape[0], dtype=float) for i in range(out.size): j = int(np.searchsorted(cdf[i], q, side="left")) if j >= theta_sorted.size: j = theta_sorted.size - 1 out[i] = theta_sorted[j] return out def _estimate_rasch_mml( k_correct: np.ndarray, n_trials: int, max_iter: int = 100, em_iter: int = 20, n_quadrature: int = 21, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Estimate Rasch model via Marginal Maximum Likelihood with EM. Args: k_correct: Shape (L, M) with counts in [0, n_trials]. n_trials: Number of trials per (model, item). max_iter: Max Newton-Raphson iterations per M-step. em_iter: Number of EM iterations. n_quadrature: Number of quadrature points for integration. Returns: EAP ability estimates for each model. """ L, M = k_correct.shape # Gauss-Hermite quadrature points and weights # Transform to standard normal: θ = √2 * x x_gh, w_gh = np.polynomial.hermite.hermgauss(n_quadrature) theta_q = np.sqrt(2) * x_gh # Quadrature points w_q = w_gh / np.sqrt(np.pi) # Normalized weights # Initialize difficulties from observed proportions p_lm = np.clip((k_correct + 0.5) / (n_trials + 1.0), 1e-6, 1 - 1e-6) question_difficulty = p_lm.mean(axis=0) beta = -np.log((question_difficulty + 0.01) / (1 - question_difficulty + 0.01)) beta = beta - beta.mean() def _make_item_nll(k_m, posterior): def item_nll(b): nll = 0.0 for q in range(n_quadrature): prob = sigmoid(theta_q[q] - b) prob = np.clip(prob, 1e-10, 1 - 1e-10) log_p = k_m * np.log(prob) + (n_trials - k_m) * np.log(1 - prob) nll -= np.sum(posterior[:, q] * log_p) return nll return item_nll # EM algorithm for _ in range(em_iter): # E-step: Compute posterior weights for each model at each quadrature point # P(θ_q | data) ∝ P(data | θ_q) * P(θ_q) log_lik = np.zeros((L, n_quadrature)) for q in range(n_quadrature): diff = theta_q[q] - beta # (M,) prob = sigmoid(diff) prob = np.clip(prob, 1e-10, 1 - 1e-10) # Log likelihood for each model at this quadrature point log_lik[:, q] = np.sum( k_correct * np.log(prob) + (n_trials - k_correct) * np.log(1 - prob), axis=1, ) # Posterior weights (softmax over quadrature points) log_lik_max = log_lik.max(axis=1, keepdims=True) lik = np.exp(log_lik - log_lik_max) * w_q[None, :] posterior = lik / lik.sum(axis=1, keepdims=True) # (L, n_quadrature) # M-step: Update item difficulties for m in range(M): k_m = k_correct[:, m] item_nll = _make_item_nll(k_m, posterior) result = minimize( item_nll, beta[m], method="L-BFGS-B", options={"maxiter": max_iter}, ) beta[m] = result.x[0] # Re-center difficulties beta = beta - beta.mean() # Final E-step: Compute EAP ability estimates log_lik = np.zeros((L, n_quadrature)) for q in range(n_quadrature): diff = theta_q[q] - beta prob = sigmoid(diff) prob = np.clip(prob, 1e-10, 1 - 1e-10) log_lik[:, q] = np.sum( k_correct * np.log(prob) + (n_trials - k_correct) * np.log(1 - prob), axis=1, ) log_lik_max = log_lik.max(axis=1, keepdims=True) lik = np.exp(log_lik - log_lik_max) * w_q[None, :] posterior = lik / lik.sum(axis=1, keepdims=True) # EAP = E[θ | data] = Σ θ_q * P(θ_q | data) abilities = np.sum(posterior * theta_q[None, :], axis=1) return abilities, beta, posterior, theta_q __all__ = [ "rasch", "rasch_map", "rasch_mml", "rasch_mml_credible", "rasch_2pl", "rasch_2pl_map", "rasch_3pl", "rasch_3pl_map", "dynamic_irt", ]