-
Notifications
You must be signed in to change notification settings - Fork 139
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add ShiftAnalyzer to compute covariate and label shift between two da…
…tasets (#855) * split SearcherLauncher intto CLIPlauncher and searcherLauncher * refactor searcherLauncher * add pyemd to requirements-core.txt * update searcher and openvino launcher * create shift analyzer * add changelog * fix precommit errors * fix whitespace * fix tokensizer * add normalize for model launcher * add test with pytest framework * reflect comments from a reviewer * resolve pylint error * loosen the acceptance criteria * remove macos testing --------- Co-authored-by: wonjuleee <[email protected]>
- Loading branch information
Showing
19 changed files
with
655 additions
and
139 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,262 @@ | ||
# Copyright (C) 2023 Intel Corporation | ||
# | ||
# SPDX-License-Identifier: MIT | ||
|
||
from collections import defaultdict | ||
from typing import Dict, List, Optional | ||
|
||
import numpy as np | ||
import pyemd | ||
from scipy import linalg | ||
from scipy.stats import anderson_ksamp | ||
|
||
from datumaro.components.dataset import IDataset | ||
from datumaro.plugins.shift_analyzer import ShiftAnalyzerLauncher | ||
from datumaro.util import take_by | ||
|
||
|
||
class RunningStats1D: | ||
def __init__(self): | ||
self.running_mean = None | ||
self.running_sq_mean = None | ||
self.num: int = 0 | ||
|
||
def add(self, arr: np.ndarray) -> None: | ||
assert arr.ndim == 2 | ||
|
||
batch_size, _ = arr.shape | ||
mean = arr.mean(0) | ||
arr = np.expand_dims(arr, axis=-1) # B x D x 1 | ||
sq_mean = np.mean(np.matmul(arr, np.transpose(arr, axes=(0, 2, 1))), axis=0) # D x D | ||
|
||
self.num += batch_size | ||
|
||
if self.running_mean is not None: | ||
self.running_mean = self.running_mean + batch_size / float(self.num) * ( | ||
mean - self.running_mean | ||
) | ||
else: | ||
self.running_mean = mean | ||
|
||
if self.running_sq_mean is not None: | ||
self.running_sq_mean = self.running_sq_mean + batch_size / float(self.num) * ( | ||
sq_mean - self.running_sq_mean | ||
) | ||
else: | ||
self.running_sq_mean = sq_mean | ||
|
||
@property | ||
def mean(self) -> np.ndarray: | ||
return self.running_mean | ||
|
||
@property | ||
def cov(self) -> np.ndarray: | ||
mean = np.expand_dims(self.running_mean, axis=-1) # D x 1 | ||
return self.running_sq_mean - np.matmul(mean, mean.transpose()) | ||
|
||
|
||
class FeatureAccumulator: | ||
def __init__(self, model): | ||
self.model = model | ||
self._batch_size = 1 | ||
|
||
def get_activation_stats(self, dataset: IDataset) -> RunningStats1D: | ||
running_stats = RunningStats1D() | ||
|
||
for batch in take_by(dataset, self._batch_size): | ||
inputs = [] | ||
for item in batch: | ||
inputs.append(np.atleast_3d(item.media.data)) | ||
inputs = np.array(inputs) | ||
features = self.model.launch(inputs) | ||
running_stats.add(features) | ||
|
||
return running_stats | ||
|
||
|
||
class FeatureAccumulatorByLabel(FeatureAccumulator): | ||
def __init__(self, model): | ||
super().__init__(model) | ||
|
||
def get_activation_stats(self, dataset: IDataset) -> Dict[int, RunningStats1D]: | ||
running_stats: Dict[int, RunningStats1D] = {} | ||
|
||
for batch in take_by(dataset, self._batch_size): | ||
inputs, targets = [], [] | ||
for item in batch: | ||
for ann in item.annotations: | ||
inputs.append(np.atleast_3d(item.media.data)) | ||
targets.append(ann.label) | ||
|
||
inputs = np.array(inputs) | ||
features = self.model.launch(inputs) | ||
|
||
unique_indices = np.unique(targets) | ||
for idx in unique_indices: | ||
if idx not in running_stats: | ||
running_stats[idx] = RunningStats1D() | ||
|
||
running_stats[idx].add(features[targets == idx]) | ||
|
||
return running_stats | ||
|
||
|
||
class ShiftAnalyzer: | ||
def __init__(self) -> None: | ||
""" | ||
Searcher for Datumaro dataitems | ||
Parameters | ||
---------- | ||
dataset: | ||
Datumaro dataset to search similar dataitem. | ||
topk: | ||
Number of images. | ||
""" | ||
self._model = ShiftAnalyzerLauncher( | ||
model_name="googlenet-v4-tf", | ||
output_layers="InceptionV4/Logits/PreLogitsFlatten/flatten_1/Reshape", | ||
) | ||
|
||
def compute_covariate_shift(self, sources: List[IDataset], method: Optional[str] = "fid"): | ||
assert ( | ||
len(sources) == 2 | ||
), "Shift analyzer should get two datasets to compute shifts between them." | ||
|
||
if method == "fid": | ||
_feat_aggregator = FeatureAccumulator(model=self._model) | ||
|
||
src_stats = _feat_aggregator.get_activation_stats(sources[0]) | ||
tgt_stats = _feat_aggregator.get_activation_stats(sources[1]) | ||
|
||
src_mu, src_sigma = src_stats.mean, src_stats.cov | ||
tgt_mu, tgt_sigma = tgt_stats.mean, tgt_stats.cov | ||
|
||
return self._frechet_distance(src_mu, src_sigma, tgt_mu, tgt_sigma, atol=1e-3) | ||
|
||
elif method == "emd": | ||
_feat_aggregator = FeatureAccumulatorByLabel(model=self._model) | ||
|
||
src_stats = _feat_aggregator.get_activation_stats(sources[0]) | ||
tgt_stats = _feat_aggregator.get_activation_stats(sources[1]) | ||
|
||
w_s = np.array([stats.num for stats in src_stats.values()]) | ||
w_t = np.array([stats.num for stats in tgt_stats.values()]) | ||
|
||
f_s = np.stack([stats.mean for stats in src_stats.values()], axis=0) | ||
f_t = np.stack([stats.mean for stats in tgt_stats.values()], axis=0) | ||
|
||
# earth_mover_distance returns the similarity score in [0, 1]. | ||
# We return the dissimilarity score by 1 - similarity score. | ||
return 1.0 - self._earth_mover_distance(w_s, f_s, w_t, f_t, gamma=0.01) | ||
|
||
def compute_label_shift(self, sources: List[IDataset]): | ||
assert ( | ||
len(sources) == 2 | ||
), "Shift analyzer should get two datasets to compute shifts between them." | ||
|
||
labels = defaultdict(list) | ||
for idx, source in enumerate(sources): | ||
for item in source: | ||
for ann in item.annotations: | ||
labels[idx].append(ann.label) | ||
|
||
_, _, pv = anderson_ksamp([labels[0], labels[1]]) | ||
|
||
return 1 - pv | ||
|
||
def _frechet_distance( | ||
self, | ||
mu1: np.ndarray, | ||
sigma1: np.ndarray, | ||
mu2: np.ndarray, | ||
sigma2: np.ndarray, | ||
eps: float = 1e-6, | ||
atol: float = 1e-3, | ||
): | ||
""" | ||
Numpy implementation of the Frechet Distance. | ||
The Frechet distance between two multivariate Gaussians X_1 ~ N(mu_1, C_1) | ||
and X_2 ~ N(mu_2, C_2) is | ||
d^2 = ||mu_1 - mu_2||^2 + Tr(C_1 + C_2 - 2*sqrt(C_1*C_2)). | ||
Stable version by Dougal J. Sutherland. | ||
We borrowed the implementation of [1]_ (Apache 2.0 license). | ||
Our implementation forces 64-bit floating-type calculations to avoid numerical instability. | ||
Parameters | ||
---------- | ||
mu1 | ||
Numpy array containing the activations of a layer of the | ||
inception net (like returned by the function 'get_predictions') | ||
for generated samples. | ||
mu2 | ||
The sample mean over activations, precalculated on an representative data set. | ||
sigma1 | ||
The covariance matrix over activations for generated samples. | ||
sigma2 | ||
The covariance matrix over activations, precalculated on an representative data set. | ||
eps | ||
Epsilone term to the diagonal part of sigma covariance matrix. | ||
atol | ||
Threshold value to check whether the covariance matrix is real valued. | ||
If any imagenary diagonal part of the covariance matrix is greather than `atol`, | ||
raise `ValueError`. | ||
Returns | ||
------- | ||
Distance | ||
Frechet distance | ||
References | ||
---------- | ||
.. [1] https://github.com/mseitzer/pytorch-fid/blob/3d604a25516746c3a4a5548c8610e99010b2c819/src/pytorch_fid/fid_score.py#L150 | ||
""" | ||
mu1 = np.atleast_1d(mu1).astype(np.float64) | ||
mu2 = np.atleast_1d(mu2).astype(np.float64) | ||
|
||
sigma1 = np.atleast_2d(sigma1).astype(np.float64) | ||
sigma2 = np.atleast_2d(sigma2).astype(np.float64) | ||
|
||
assert mu1.shape == mu2.shape, "Training and test mean vectors have different lengths." | ||
assert ( | ||
sigma1.shape == sigma2.shape | ||
), "Training and test covariances have different dimensions." | ||
|
||
diff = mu1 - mu2 | ||
|
||
# Product might be almost singular | ||
covmean, _ = linalg.sqrtm(sigma1.dot(sigma2), disp=False) | ||
if not np.isfinite(covmean).all(): | ||
msg = ( | ||
"fid calculation produces singular product; " | ||
"adding %s to diagonal of cov estimates." | ||
) % eps | ||
print(msg) | ||
offset = np.eye(sigma1.shape[0]) * eps | ||
covmean = linalg.sqrtm((sigma1 + offset).dot(sigma2 + offset)) | ||
|
||
# Numerical error might give slight imaginary component | ||
if np.iscomplexobj(covmean): | ||
if not np.allclose(np.diagonal(covmean).imag, 0, atol=atol): | ||
m = np.max(np.abs(covmean.imag)) | ||
raise ValueError("Imaginary component {}".format(m)) | ||
covmean = covmean.real | ||
|
||
tr_covmean = np.trace(covmean) | ||
return diff.dot(diff) + np.trace(sigma1) + np.trace(sigma2) - 2 * tr_covmean | ||
|
||
def _earth_mover_distance( | ||
self, | ||
w_s: np.ndarray, | ||
f_s: np.ndarray, | ||
w_t: np.ndarray, | ||
f_t: np.ndarray, | ||
gamma: float, | ||
) -> float: | ||
w_1 = np.zeros((len(w_s) + len(w_t),), np.float64) | ||
w_2 = np.zeros((len(w_s) + len(w_t),), np.float64) | ||
w_1[: len(w_s)] = w_s / np.sum(w_s) | ||
w_2[len(w_s) :] = w_t / np.sum(w_t) | ||
|
||
f_concat = np.concatenate([f_s, f_t], axis=0) | ||
distances = np.linalg.norm(f_concat[:, None] - f_concat[None, :], axis=2).astype(np.float64) | ||
|
||
emd = pyemd.emd(w_1, w_2, distances) | ||
return np.exp(-gamma * emd).item() |
Oops, something went wrong.