Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK][Agreement] Add agreement package to sdk. #850

Merged
merged 3 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/sdk/python/human-protocol-sdk/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ black = "*"
pylint = "*"
pytest = "*"
setuptools-pipfile = "*"
hypothesis = "*"
numpy = "*"
pyerf = "*"

[packages]
cryptography = "*"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .measures import percent_agreement, cohens_kappa, fleiss_kappa
from .bootstrap import bootstrap_ci
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import numpy as np
from typing import Sequence, Callable, Optional
from warnings import warn

from human_protocol_sdk.agreement.utils import NormalDistribution


def bootstrap_ci(
data: Sequence,
statistic_fn: Callable,
n_iterations: int = 1000,
n_sample: Optional[int] = None,
ci=0.95,
algorithm="bca",
) -> tuple:
"""Returns the confidence interval for the boostrap estimate of the given
statistic.

Args:
data: Data to estimate the statistic.
statistic_fn: Function to calculate the statistic. `f(data)` must
return the statistic.
n_iterations: Number of bootstrap samples to use for the estimate.
n_sample: If provided, determines the size of each bootstrap sample
drawn from the data. If omitted, is equal to the length of the
data.
ci: Size of the confidence interval.
algorithm: Which algorithm to use for the confidence interval
estimation. "bca" uses the "Bias Corrected Bootstrap with
Acceleration", "percentile" simply takes the appropriate
percentiles from the bootstrap distribution.
"""
data = np.asarray(data)

if n_iterations < 1:
raise ValueError(
f"n_iterations must be a positive integer, but were {n_iterations}"
)

n_data = len(data)
if n_data < 30:
warn(
"Dataset size is low, bootstrap estimate might be inaccurate. For accurate results, make sure to provide at least 30 data points."
)

if n_sample is None:
n_sample = n_data
elif n_sample < 1:
raise ValueError(f"n_sample must be a positive integer, but was {n_sample}")

if not (0.0 <= ci <= 1.0):
raise ValueError(f"ci must be a float within [0.0, 1.0], but was {ci}")

# bootstrap estimates
theta_b = np.empty(n_iterations, dtype=float)
for i in range(n_iterations):
idx = np.random.randint(n_data - 1, size=(n_sample,))
sample = data[idx]
theta_b[i] = statistic_fn(sample)

match algorithm:
case "percentile":
alpha = 1.0 - ci
alpha /= 2.0
q = np.asarray([alpha, 1.0 - alpha])
case "bca":
# acceleration: estimate a from jackknife bootstrap
theta_hat = statistic_fn(data)
jn_idxs = ~np.eye(n_data, dtype=bool)
theta_jn = np.empty(n_data, dtype=float)
for i in range(n_data):
theta_jn[i] = (n_data - 1) * (
theta_hat - statistic_fn(data[jn_idxs[i]])
)

a = (np.sum(theta_jn**3) / np.sum(theta_jn**2, axis=-1) ** 1.5) / 6

alpha = 1.0 - ci
alpha /= 2
q = np.asarray([alpha, 1.0 - alpha])

# bias correction
N = NormalDistribution()
ppf = np.vectorize(N.ppf)
cdf = np.vectorize(N.cdf)

# bias term. discrepancy between bootrap values and estimated value
z_0 = ppf(np.mean(theta_b < theta_hat))
z_u = ppf(q)
z_diff = z_0 + z_u

q = cdf(z_0 + (z_diff / (1 - a * z_diff)))
case _:
raise ValueError(f"Algorithm '{algorithm}' is not available!")

# sanity checks
if np.any(np.isnan(q)):
warn(
f"q contains NaN values. Input data is probably invalid. Interval will be (nan, nan). data: {data}"
)
ci_low = ci_high = np.nan
else:
if np.any((q < 0.0) | (q > 1.0)):
warn(
f"q ({q}) out of bounds. Input data is probably invalid. q will be clipped into interval [0.0, 1.0]. data: {data}"
)
q = np.clip(q, 0.0, 1.0)
ci_low, ci_high = np.percentile(theta_b, q * 100)

return (ci_low, ci_high), theta_b
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import numpy as np

from .validations import (
validate_incidence_matrix,
validate_confusion_matrix,
)


def percent_agreement(
data: np.ndarray, data_format="im", invalid_return=np.nan
) -> float:
"""
Returns the overall agreement percentage observed across the data.

Args:
data: Annotation data.
data_format: The format of data. Options are 'im' for an incidence
matrix and 'cm' for a confusion matrix. Defaults to 'im'.
invalid_return: value to return if result is np.nan. Defaults to np.nan.
"""
data = np.asarray(data)

match data_format:
case "cm":
validate_confusion_matrix(data)
percent = np.diag(data).sum() / data.sum()
case _:
# implicitly assumes incidence matrix
validate_incidence_matrix(data)

n_raters = np.sum(data, 1)
item_agreements = np.sum(data * data, 1) - n_raters
max_item_agreements = n_raters * (n_raters - 1)
percent = item_agreements.sum() / max_item_agreements.sum()

if np.isnan(percent):
percent = invalid_return

return percent


def cohens_kappa(data: np.ndarray, invalid_return=np.nan) -> float:
"""
Returns Cohen's Kappa for the provided annotations.

Args:
data: Annotation data, provided as K x K confusion matrix, with K =
number of labels.
invalid_return: value to return if result is np.nan. Defaults to np.nan.
"""
data = np.asarray(data)

agreement_observed = percent_agreement(data, "cm")
agreement_expected = np.matmul(data.sum(0), data.sum(1)) / data.sum() ** 2

kappa = (agreement_observed - agreement_expected) / (1 - agreement_expected)

if np.isnan(kappa):
kappa = invalid_return

return kappa


def fleiss_kappa(data: np.ndarray, invalid_return=np.nan) -> float:
"""
Returns Fleisss' Kappa for the provided annotations.

Args:
data: Annotation data, provided as I x K incidence matrix, with
I = number of items and K = number of labels.
invalid_return: value to return if result is np.nan. Defaults to np.nan.
"""
data = np.asarray(data)

agreement_observed = percent_agreement(data, "im")

class_probabilities = data.sum(0) / data.sum()
agreement_expected = np.power(class_probabilities, 2).sum()

# in case all votes have been for the same class return percentage
if agreement_expected == agreement_observed == 1.0:
return 1.0

kappa = (agreement_observed - agreement_expected) / (1 - agreement_expected)

if np.isnan(kappa):
kappa = invalid_return

return kappa
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import numpy as np
from typing import Sequence, Optional

from pyerf import erf, erfinv

from .validations import (
validate_nd,
validate_equal_shape,
validate_same_dtype,
)


def confusion_matrix_from_sequence(
a: Sequence, b: Sequence, labels: Optional[Sequence] = None
):
"""Generate an N X N confusion matrix from the given sequence of values
a and b, where N is the number of unique labels.

Args:
a: A sequence of labels.
b: Another sequence of labels.
labels: The labels contained in the records. Must contain all labels in
the given records and may contain labels that are not found in the
records.
"""
a = np.asarray(a)
b = np.asarray(b)

validate_same_dtype(a, b)
validate_nd(a, 1)
validate_nd(b, 1)
validate_equal_shape(a, b)

# filter NaN values
M = np.vstack((a, b)).T # 2 x N Matrix
if M.dtype.kind in "UO": # string types
mask = M != "nan"
else:
mask = ~np.isnan(M)
a, b = M[np.all(mask, axis=1)].T

# create list of unique labels
if labels is None:
labels = np.concatenate([a, b])
labels = np.unique(labels)

# convert labels to indices
label_to_id = {label: i for i, label in enumerate(labels)}
map_fn = np.vectorize(lambda x: label_to_id[x])
a = map_fn(a)
b = map_fn(b)

# get indices and counts to populate confusion matrix
confusion_matrix = np.zeros((labels.size, labels.size), dtype=int)
ijs, counts = np.unique(np.vstack([a, b]), axis=1, return_counts=True)
confusion_matrix[ijs[0], ijs[1]] = counts

return confusion_matrix


class NormalDistribution:
"""Continuous Normal Distribution.

See: https://en.wikipedia.org/wiki/Normal_distribution
"""

def __init__(self, location: float = 0.0, scale: float = 1.0):
"""Creates a NormalDistribution from the given parameters.
Args:
location: Location of the distribution.
scale: Scale of the distribution. Must be positive.
"""
if scale < 0.0:
raise ValueError(f"scale parameter needs to be positive, but was {scale}")

self.location = location
self.scale = scale

def cdf(self, x: float) -> float:
"""Cumulative Distribution Function of the Normal Distribution. Returns
the probability that a random sample will be less than the given
point.

Args:
x: Point within the distribution's domain.
"""
return (1 + erf((x - self.location) / (self.scale * 2**0.5))) / 2

def pdf(self, x: float) -> float:
"""Probability Density Function of the Normal Distribution. Returns the
probability for observing the given sample in the distribution.

Args:
x: Point within the distribution's domain.
"""
return np.exp(-0.5 * (x - self.location / self.scale) ** 2) / (
self.scale * (2 * np.pi) ** 0.5
)

def ppf(self, p: float) -> float:
"""Probability Point function of the Normal Distribution. Returns
the maximum point to which cumulated probabilities equal the given
probability. Also called quantile. Inverse of the cdf.

Args:
p: Percentile of the distribution to be covered by the ppf.
"""
if not (0.0 <= p <= 1.0):
raise ValueError(f"p must be a float within [0.0, 1.0], but was {p}")

return self.location + self.scale * 2**0.5 * erfinv(2 * p - 1.0)
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import numpy as np


def validate_nd(M: np.ndarray, n=2):
"""Validates that M has n dimensions."""
if M.ndim != n:
raise ValueError(f"Input must be a {n}-dimensional array-like.")


def validate_dtype_is_subtype_of(M: np.ndarray, supertype: np.dtype):
"""Validates the data type of M is a subtype of supertype."""
if not issubclass(M.dtype.type, supertype):
raise ValueError(
f"Input must have a data type that is a subtype of " f"{supertype}"
)


def validate_is_numeric(M: np.ndarray):
"""Validates that the data type of M is a number type"""
if (
M.dtype.kind not in np.typecodes["AllFloat"]
and M.dtype.kind not in np.typecodes["AllInteger"]
):
raise ValueError("Input data type must be a numeric type.")


def validate_all_positive(M: np.ndarray):
"""
Validates that all entries in M are positive (including 0).
Raises a ValueError if not.
"""
if np.any(M < 0):
raise ValueError("Inputs must all be positive")


def validate_sufficient_annotations(M: np.ndarray, n=1):
"""Validates that M contains enough annotations."""
if M.sum() <= n:
raise ValueError(f"Input must have more than {1} annotation.")


def validate_incidence_matrix(M: np.ndarray):
"""Validates that M is an incidence matrix."""
validate_nd(M, n=2)
validate_is_numeric(M)
validate_all_positive(M)
validate_sufficient_annotations(M, n=1)


def validate_confusion_matrix(M):
"""Validates that M is a confusion Matrix."""
validate_incidence_matrix(M)

if M.shape[0] != M.shape[1]:
raise ValueError("Input must be a square matrix.")


def validate_equal_shape(a: np.ndarray, b: np.ndarray):
"""Validates that a and b have the same shape."""
if a.shape != b.shape:
raise ValueError("All inputs must have the same shape.")


def validate_same_dtype(a: np.ndarray, b: np.ndarray):
"""Validates that a and b share the same data type."""
if a.dtype.kind != b.dtype.kind:
raise ValueError("All inputs must have the same kind of dtype.")
1 change: 1 addition & 0 deletions packages/sdk/python/human-protocol-sdk/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@
packages=setuptools.find_packages() + ["artifacts"],
setup_requires="setuptools-pipfile",
use_pipfile=True,
extras_require={"agreement": ["numpy", "pyerf"]},
)
Loading