"""Working correlation structures for GEE.
Implements four standard working correlation structures used in
Generalized Estimating Equations (Liang & Zeger, 1986):
- Independence: R = I
- Exchangeable (compound symmetry): R_ij = alpha for i != j
- AR(1): R_ij = alpha^|i-j|
- Unstructured: R_ij estimated freely (equal cluster sizes only)
RULE 5 EXCEPTION: These classes use mutable internal state (self._alpha,
self._corr_matrix) because the GEE algorithm iteratively updates correlation
parameters during fitting. Making them frozen dataclasses would require
reconstructing a new object at every GEE iteration, which adds complexity
without safety benefit since these objects are internal to a single fit call
and never exposed as part of the public API's frozen result.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
import numpy as np
from numpy.typing import NDArray
from pystatistics.core.exceptions import ValidationError
[docs]
class CorrStructure(ABC):
"""Base class for GEE working correlation structures.
Subclasses must implement:
- ``name`` property: human-readable structure name
- ``working_corr(cluster_size)``: build the R_i matrix
- ``estimate(pearson_resids, phi)``: update parameters from residuals
- ``params`` property: current parameter values as a dict
"""
@property
@abstractmethod
def name(self) -> str:
"""Human-readable correlation structure name."""
...
[docs]
@abstractmethod
def working_corr(self, cluster_size: int) -> NDArray:
"""Return the working correlation matrix R_i for a cluster.
Parameters
----------
cluster_size : int
Number of observations in this cluster.
Returns
-------
NDArray
Correlation matrix of shape (cluster_size, cluster_size).
"""
...
[docs]
@abstractmethod
def estimate(
self, pearson_resids: list[NDArray], phi: float, n_params: int
) -> None:
"""Estimate correlation parameters from standardized Pearson residuals.
Updates internal state in-place. Each element of ``pearson_resids``
is a vector of Pearson residuals for one cluster.
Parameters
----------
pearson_resids : list[NDArray]
Per-cluster Pearson residual vectors.
phi : float
Current dispersion estimate.
n_params : int
Number of regression parameters (p), used in denominator.
"""
...
@property
@abstractmethod
def params(self) -> dict[str, float]:
"""Current parameter values as a dict."""
...
[docs]
class IndependenceCorr(CorrStructure):
"""Independence working correlation: R = I.
Assumes all within-cluster observations are uncorrelated.
No parameters to estimate. This is the simplest structure and
recovers standard GLM-like estimates (though with sandwich SE).
"""
@property
def name(self) -> str:
return "independence"
[docs]
def working_corr(self, cluster_size: int) -> NDArray:
"""Return identity matrix.
Parameters
----------
cluster_size : int
Number of observations in this cluster.
Returns
-------
NDArray
Identity matrix of shape (cluster_size, cluster_size).
"""
return np.eye(cluster_size)
[docs]
def estimate(
self, pearson_resids: list[NDArray], phi: float, n_params: int
) -> None:
"""No-op: independence has no parameters to estimate."""
@property
def params(self) -> dict[str, float]:
"""Empty dict: no correlation parameters."""
return {}
[docs]
class ExchangeableCorr(CorrStructure):
"""Exchangeable (compound symmetry) working correlation.
R_ij = alpha for i != j, R_ii = 1.
Alpha is estimated as:
alpha = [sum_i sum_{j<k} r_ij * r_ik] / [sum_i n_i*(n_i-1)/2 - p] / phi
where r_ij are Pearson residuals, phi is the dispersion, and p is the
number of regression parameters.
References
----------
Liang, K.-Y. & Zeger, S. L. (1986). Biometrika, 73(1), 13-22.
"""
def __init__(self) -> None:
self._alpha: float = 0.0
@property
def name(self) -> str:
return "exchangeable"
[docs]
def working_corr(self, cluster_size: int) -> NDArray:
"""Build exchangeable correlation matrix.
Parameters
----------
cluster_size : int
Number of observations in this cluster.
Returns
-------
NDArray
Matrix with 1 on diagonal and alpha off-diagonal.
"""
if cluster_size == 1:
return np.ones((1, 1))
R = np.full((cluster_size, cluster_size), self._alpha)
np.fill_diagonal(R, 1.0)
return R
[docs]
def estimate(
self, pearson_resids: list[NDArray], phi: float, n_params: int
) -> None:
"""Estimate alpha from cross-products of Pearson residuals.
Parameters
----------
pearson_resids : list[NDArray]
Per-cluster Pearson residual vectors.
phi : float
Current dispersion estimate.
n_params : int
Number of regression parameters (p).
"""
numerator = 0.0
denominator = 0.0
for r in pearson_resids:
n_i = len(r)
if n_i < 2:
continue
# Sum of all cross-products r_j * r_k for j < k
total = float(np.sum(r)) ** 2 - float(np.sum(r**2))
numerator += total / 2.0
denominator += n_i * (n_i - 1) / 2.0
denominator -= n_params
if denominator <= 0:
self._alpha = 0.0
return
self._alpha = float(np.clip(numerator / (denominator * phi), -1.0, 1.0))
@property
def params(self) -> dict[str, float]:
"""Current alpha estimate."""
return {"alpha": self._alpha}
[docs]
class AR1Corr(CorrStructure):
"""AR(1) working correlation: R_ij = alpha^|i-j|.
Assumes observations within a cluster are ordered (e.g., by time)
and adjacent observations have correlation alpha.
Alpha is estimated as:
alpha = [sum_i sum_t r_{i,t} * r_{i,t+1}] / [sum_i (n_i - 1) - p] / phi
References
----------
Liang, K.-Y. & Zeger, S. L. (1986). Biometrika, 73(1), 13-22.
"""
def __init__(self) -> None:
self._alpha: float = 0.0
@property
def name(self) -> str:
return "ar1"
[docs]
def working_corr(self, cluster_size: int) -> NDArray:
"""Build AR(1) correlation matrix.
Parameters
----------
cluster_size : int
Number of observations in this cluster.
Returns
-------
NDArray
Toeplitz matrix with R_ij = alpha^|i-j|.
"""
if cluster_size == 1:
return np.ones((1, 1))
idx = np.arange(cluster_size)
return np.power(self._alpha, np.abs(idx[:, None] - idx[None, :]))
[docs]
def estimate(
self, pearson_resids: list[NDArray], phi: float, n_params: int
) -> None:
"""Estimate alpha from consecutive residual products.
Parameters
----------
pearson_resids : list[NDArray]
Per-cluster Pearson residual vectors.
phi : float
Current dispersion estimate.
n_params : int
Number of regression parameters (p).
"""
numerator = 0.0
denominator = 0.0
for r in pearson_resids:
n_i = len(r)
if n_i < 2:
continue
numerator += float(np.sum(r[:-1] * r[1:]))
denominator += n_i - 1
denominator -= n_params
if denominator <= 0:
self._alpha = 0.0
return
self._alpha = float(np.clip(numerator / (denominator * phi), -1.0, 1.0))
@property
def params(self) -> dict[str, float]:
"""Current alpha estimate."""
return {"alpha": self._alpha}
[docs]
class UnstructuredCorr(CorrStructure):
"""Unstructured working correlation.
R_jk is estimated freely for each pair (j, k). This is only feasible
when all clusters have the same size, because each R_jk requires data
from all clusters at positions j and k.
R_jk = sum_i r_ij * r_ik / (K - p) / phi
where K is the number of clusters and p is the number of parameters.
Raises
------
ValidationError
If clusters have unequal sizes during estimation.
References
----------
Liang, K.-Y. & Zeger, S. L. (1986). Biometrika, 73(1), 13-22.
"""
def __init__(self) -> None:
self._corr_matrix: NDArray | None = None
@property
def name(self) -> str:
return "unstructured"
[docs]
def working_corr(self, cluster_size: int) -> NDArray:
"""Return the estimated unstructured correlation matrix.
If not yet estimated, returns the identity matrix.
Parameters
----------
cluster_size : int
Number of observations in this cluster.
Returns
-------
NDArray
Correlation matrix of shape (cluster_size, cluster_size).
"""
if self._corr_matrix is not None and self._corr_matrix.shape[0] == cluster_size:
return self._corr_matrix.copy()
return np.eye(cluster_size)
[docs]
def estimate(
self, pearson_resids: list[NDArray], phi: float, n_params: int
) -> None:
"""Estimate all pairwise correlations from residuals.
Parameters
----------
pearson_resids : list[NDArray]
Per-cluster Pearson residual vectors. All must have the same length.
phi : float
Current dispersion estimate.
n_params : int
Number of regression parameters (p).
Raises
------
ValidationError
If cluster sizes are not all equal.
"""
sizes = {len(r) for r in pearson_resids}
if len(sizes) != 1:
raise ValidationError(
"Unstructured correlation requires all clusters to have the "
f"same size, got sizes: {sorted(sizes)}"
)
n_i = sizes.pop()
K = len(pearson_resids)
denom = K - n_params
if denom <= 0:
self._corr_matrix = np.eye(n_i)
return
# Stack residuals: (K, n_i)
R_mat = np.vstack([r[np.newaxis, :] for r in pearson_resids])
# Compute R_jk = sum_i r_ij * r_ik / (denom * phi)
corr = (R_mat.T @ R_mat) / (denom * phi)
# Force diagonal to 1 and ensure symmetry
diag_sqrt = np.sqrt(np.diag(corr))
diag_sqrt = np.maximum(diag_sqrt, 1e-10)
corr = corr / (diag_sqrt[:, None] * diag_sqrt[None, :])
np.fill_diagonal(corr, 1.0)
self._corr_matrix = corr
@property
def params(self) -> dict[str, float]:
"""Current correlation matrix entries as a flat dict."""
if self._corr_matrix is None:
return {}
result = {}
n = self._corr_matrix.shape[0]
for j in range(n):
for k in range(j + 1, n):
result[f"r_{j}_{k}"] = float(self._corr_matrix[j, k])
return result
_CORR_STRUCTURES: dict[str, type[CorrStructure]] = {
"independence": IndependenceCorr,
"exchangeable": ExchangeableCorr,
"ar1": AR1Corr,
"unstructured": UnstructuredCorr,
}
def resolve_corr(corr_structure: str) -> CorrStructure:
"""Resolve a correlation structure name to an instance.
Parameters
----------
corr_structure : str
One of 'independence', 'exchangeable', 'ar1', 'unstructured'.
Returns
-------
CorrStructure
A new instance of the requested correlation structure.
Raises
------
ValidationError
If the name is not recognized.
"""
cls = _CORR_STRUCTURES.get(corr_structure.lower())
if cls is None:
valid = ", ".join(sorted(_CORR_STRUCTURES.keys()))
raise ValidationError(
f"Unknown corr_structure: {corr_structure!r}. Valid: {valid}"
)
return cls()