Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Corrector transformation #1006

Merged
merged 12 commits into from
May 22, 2023
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
(<https://github.com/openvinotoolkit/datumaro/pull/996>)
- Add VocInstanceSegmentationImporter and VocInstanceSegmentationExporter
(<https://github.com/openvinotoolkit/datumaro/pull/997>)
- Add Corrector transformation
(<https://github.com/openvinotoolkit/datumaro/pull/1006>)

### Enhancements
- Use autosummary for fully-automatic Python module docs generation
Expand Down
140 changes: 139 additions & 1 deletion datumaro/plugins/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
from __future__ import annotations

import argparse
import json
import logging as log
import os.path as osp
import random
import re
from collections import Counter
from collections import Counter, defaultdict
from copy import deepcopy
from enum import Enum, auto
from itertools import chain
Expand Down Expand Up @@ -1229,3 +1230,140 @@ def transform_item(self, item: DatasetItem):
attributes=self._filter_attrs(item.attributes), annotations=filtered_annotations
)
return item


class Correct(Transform, CliPlugin):
"""
Changes the content of infos.
A user can add meta-data of dataset such as author, comments, or related papers.
Infos values are not affect on the dataset structure.
We thus can add any meta-data freely.
wonjuleee marked this conversation as resolved.
Show resolved Hide resolved
"""

@classmethod
def build_cmdline_parser(cls, **kwargs):
parser = super().build_cmdline_parser(**kwargs)
parser.add_argument(
"-r",
"--reports",
type=str,
default="validation_reports.json",
help="A validation report from a 'validate' CLI",
)
return parser

def __init__(
self,
extractor: IDataset,
reports: Union[str, Dict],
):
super().__init__(extractor)

if isinstance(reports, str):
try:
# Try to load the argument as a JSON file
with open(reports) as file:
wonjuleee marked this conversation as resolved.
Show resolved Hide resolved
reports = json.load(file)
except FileNotFoundError:
raise Exception("Invalid validation reports with json format")

self._reports = reports["validation_reports"]

self._remove_items = []
self._remove_anns = []
self._add_attrs = []
wonjuleee marked this conversation as resolved.
Show resolved Hide resolved

self._analyze_reports(report=self._reports)

def _parse_ann_ids(self, desc: str):
return [int(s) for s in str.split(desc, "'") if s.isdigit()][0]

def _analyze_reports(self, report):
for rep in report:
if rep["anomaly_type"] == "MissingLabelCategories":
vinnamkim marked this conversation as resolved.
Show resolved Hide resolved
label_categories = LabelCategories()
for item in self._extractor:
for ann in item.annotations:
wonjuleee marked this conversation as resolved.
Show resolved Hide resolved
attrs = set()
for attr in ann.attributes:
attrs.add(attr)
wonjuleee marked this conversation as resolved.
Show resolved Hide resolved
label_id = label_categories.find(str(ann.label))[0]
if label_id is None:
label_categories.add(name=str(ann.label), attributes=attrs)
else:
label_categories[label_id].attributes.add(attrs)
self._extractor.categories()[AnnotationType.label] = label_categories
wonjuleee marked this conversation as resolved.
Show resolved Hide resolved

if rep["anomaly_type"] == "UndefinedLabel":
label_categories = self._extractor.categories().get(AnnotationType.label)
desc = [s for s in str.split(rep["description"], "'")]
wonjuleee marked this conversation as resolved.
Show resolved Hide resolved
add_label_name = desc[1]
label_id = label_categories.find(add_label_name)[0]
wonjuleee marked this conversation as resolved.
Show resolved Hide resolved
if label_id is None:
label_categories.add(name=add_label_name)

if rep["anomaly_type"] == "UndefinedAttribute":
label_categories = self._extractor.categories().get(AnnotationType.label)
desc = [s for s in str.split(rep["description"], "'")]
attr_name, label_name = desc[1], desc[3]
label_id = label_categories.find(label_name)[0]
if label_id is not None:
label_categories[label_id].attributes.add(attr_name)

# [TODO] Correct LabeleDefinedButNotFound: removing a label, reindexing, remapping others
# if rep["anomaly_type"] == "LabelDefinedButNotFound":
# remove_label_name = self._parse_label_cat(rep["description"])
# label_cat = self._extractor.categories()[AnnotationType.label]
# if remove_label_name in [labels.name for labels in label_cat.items]:
# label_cat.remove(remove_label_name)

if rep["anomaly_type"] in ["MissingAnnotation", "MultiLabelAnnotations"]:
self._remove_items.append((rep["item_id"], rep["subset"]))
wonjuleee marked this conversation as resolved.
Show resolved Hide resolved

if rep["anomaly_type"] in [
"NegativeLength",
"InvalidValue",
"FarFromLabelMean",
"FarFromAttrMean",
]:
ann_id = None or self._parse_ann_ids(rep["description"])
self._remove_anns.append((rep["item_id"], rep["subset"], ann_id))
wonjuleee marked this conversation as resolved.
Show resolved Hide resolved

if rep["anomaly_type"] == "MissingAttribute":
desc = [s for s in str.split(rep["description"], "'")]
attr_name, label_name = desc[1], desc[3]
label_id = self._extractor.categories()[AnnotationType.label].find(label_name)[0]
self._add_attrs.append((rep["item_id"], rep["subset"], label_id, attr_name))
wonjuleee marked this conversation as resolved.
Show resolved Hide resolved

def _find_removing_anns_in_item(self, target: tuple[str, str]):
return [tup[2] for tup in self._remove_anns if tup[:2] == target]

def _find_adding_attrs_in_item(self, target: tuple[str, str]):
return [tup[2:] for tup in self._add_attrs if tup[:2] == target]
wonjuleee marked this conversation as resolved.
Show resolved Hide resolved

def __iter__(self):
for item in self._extractor:
if (item.id, item.subset) in self._remove_items:
continue

ann_ids = self._find_removing_anns_in_item(target=(item.id, item.subset))
if ann_ids:
updated_anns = [ann for ann in item.annotations if ann.id not in ann_ids]
yield item.wrap(annotations=updated_anns)
else:
updated_attrs = defaultdict(list)
for label_id, attr_name in self._find_adding_attrs_in_item(
target=(item.id, item.subset)
):
if label_id in updated_attrs:
updated_attrs[label_id].append(attr_name)
else:
updated_attrs.update({label_id: [attr_name]})
wonjuleee marked this conversation as resolved.
Show resolved Hide resolved
updated_anns = []
for ann in item.annotations:
if ann.label in updated_attrs:
ann.attributes.update(
{attr_name: "" for attr_name in updated_attrs[ann.label]}
)
updated_anns.append(ann)
wonjuleee marked this conversation as resolved.
Show resolved Hide resolved
yield item.wrap(annotations=updated_anns)
Loading