Skip to content

Commit

Permalink
checks.py: More AdditionalChecks
Browse files Browse the repository at this point in the history
  • Loading branch information
Ed (ODSC) committed Sep 22, 2024
1 parent 991cbfe commit 604eaf9
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 47 deletions.
6 changes: 6 additions & 0 deletions libcovebods/run_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
libcovebods.tasks.checks.CheckAnnotationStatementPointerTarget,
libcovebods.tasks.checks.CheckStatementRelationshipInterests,
libcovebods.tasks.checks.CheckStatementSerialisation,
libcovebods.tasks.checks.CheckStatementPersonIdentifiersHaveCorrectScheme,
libcovebods.tasks.checks.CheckStatementEntityIdentifiersHaveKnownScheme,
libcovebods.tasks.statistics.StatisticsCountEntityStatements,
libcovebods.tasks.statistics.StatisticsCountEntityRecordStatements,
libcovebods.tasks.statistics.StatisticsCountPersonStatements,
Expand All @@ -39,6 +41,7 @@
libcovebods.tasks.statistics.StatisticAddress,
libcovebods.tasks.statistics.StatisticOwnershipOrControlInterestDirectOrIndirect,
libcovebods.tasks.statistics.StatisticOwnershipOrControlWithAtLeastOneInterestBeneficial,
libcovebods.tasks.statistics.StatisticDeclarationSubjects,
libcovebods.tasks.peps.PEPForSchema02Only,
libcovebods.tasks.peps.PEPForSchema03AndAbove,
]
Expand All @@ -62,13 +65,16 @@
libcovebods.tasks.checks.CheckStatementDuplicateStatementId,
libcovebods.tasks.checks.CheckAnnotationStatementPointerTarget,
libcovebods.tasks.checks.CheckStatementRelationshipInterests,
libcovebods.tasks.checks.CheckStatementPersonIdentifiersHaveCorrectScheme,
libcovebods.tasks.checks.CheckStatementEntityIdentifiersHaveKnownScheme,
libcovebods.tasks.statistics.StatisticsCountEntityStatements,
libcovebods.tasks.statistics.StatisticsCountEntityRecordStatements,
libcovebods.tasks.statistics.StatisticsCountPersonStatements,
libcovebods.tasks.statistics.StatisticsCountPersonRecordStatements,
libcovebods.tasks.statistics.StatisticAddress,
libcovebods.tasks.statistics.StatisticOwnershipOrControlInterestDirectOrIndirect,
libcovebods.tasks.statistics.StatisticOwnershipOrControlWithAtLeastOneInterestBeneficial,
libcovebods.tasks.statistics.StatisticDeclarationSubjects,
libcovebods.tasks.peps.PEPForSchema02Only,
libcovebods.tasks.peps.PEPForSchema03AndAbove,
]
Expand Down
148 changes: 113 additions & 35 deletions libcovebods/tasks/checks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import pycountry

from collections import defaultdict
from datetime import datetime, timedelta
from jsonpointer import resolve_pointer
Expand Down Expand Up @@ -793,7 +795,7 @@ def check_statement_first_pass(self, statement):
if ("source" in statement and isinstance(statement["source"], dict) and
"retrievedAt" in statement["source"] and statement["source"]["retrievedAt"]):
retrieved_at = parse_date_field(statement["source"]["retrievedAt"])
if retrieved_at and retrieved_at > datetime.now():
if retrieved_at and retrieved_at > datetime.now().date():
self._additional_check_results.append(
{
"type": "statement_source_retrieved_at_future_date",
Expand All @@ -812,7 +814,7 @@ def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool:
def check_statement_first_pass(self, statement):
if ("statementDate" in statement and statement["statementDate"]):
statement_date = parse_date_field(statement["statementDate"])
if statement_date and statement_date > datetime.now():
if statement_date and statement_date > datetime.now().date():
self._additional_check_results.append(
{
"type": "statement_date_is_future_date",
Expand All @@ -836,7 +838,7 @@ def check_statement_first_pass(self, statement):
annotation["creationDate"]):
creation_date = parse_date_field(annotation["creationDate"])
print(creation_date)
if creation_date and creation_date > datetime.now():
if creation_date and creation_date > datetime.now().date():
self._additional_check_results.append(
{
"type": "statement_annotation_creation_date_is_future_date",
Expand All @@ -858,7 +860,7 @@ def check_statement_first_pass(self, statement):
and "publicationDate" in statement["publicationDetails"] and
statement["publicationDetails"]["publicationDate"]):
publication_date = parse_date_field(statement["publicationDetails"]["publicationDate"])
if publication_date and publication_date > datetime.now():
if publication_date and publication_date > datetime.now().date():
self._additional_check_results.append(
{
"type": "statement_publication_date_is_future_date",
Expand All @@ -876,12 +878,15 @@ def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool:
return schema_object.is_schema_version_equal_to_or_greater_than("0.4")

def check_person_statement_first_pass(self, statement):
print(statement)
if ("recordDetails" in statement and isinstance(statement["recordDetails"], dict)
and "deathDate" in statement["recordDetails"] and
statement["recordDetails"]["deathDate"]):
print("Death date:", statement["recordDetails"]["deathDate"])
death_date = parse_date_field(statement["recordDetails"]["deathDate"])
if death_date:
if (death_date > datetime.now() or death_date < datetime.strptime("1800-01-01", "%Y-%m-%d")):
if (death_date > datetime.now().date() or
death_date < datetime.strptime("1800-01-01", "%Y-%m-%d").date()):
self._additional_check_results.append(
{
"type": "statement_person_death_date_not_sensible_value",
Expand Down Expand Up @@ -937,15 +942,16 @@ def check_person_statement_first_pass(self, statement):
and "birthDate" in statement["recordDetails"] and
statement["recordDetails"]["birthDate"]):
birth_date = parse_date_field(statement["recordDetails"]["birthDate"])
if birth_date > datetime.now():
self._additional_check_results.append(
if birth_date:
if birth_date > datetime.now().date():
self._additional_check_results.append(
{
"type": "statement_person_birth_date_in_future",
"statement_type": None,
"statement": statement.get("statementId"),
})
elif birth_date < datetime.strptime("1800-01-01", "%Y-%m-%d"):
self._additional_check_results.append(
elif birth_date < datetime.strptime("1800-01-01", "%Y-%m-%d").date():
self._additional_check_results.append(
{
"type": "statement_person_birth_date_too_far_in_past",
"statement_type": None,
Expand Down Expand Up @@ -1010,13 +1016,13 @@ def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool:
return schema_object.is_schema_version_equal_to_or_greater_than("0.4")

def check_ownership_or_control_statement_first_pass(self, statement):
#print("Here!")
print("Checking Share Values:")
if ("recordDetails" in statement and isinstance(statement["recordDetails"], dict)
and "interests" in statement["recordDetails"] and
isinstance(statement["recordDetails"]["interests"], list)):
#print("Here!")
print("Here!")
for interest in statement["recordDetails"]["interests"]:
#print(interest)
print(interest)
if "share" in interest and isinstance(interest["share"], dict):
share = interest["share"]
if "exclusiveMinimum" in share and "minimum" in share:
Expand All @@ -1043,25 +1049,25 @@ def check_ownership_or_control_statement_first_pass(self, statement):
"statement_type": None,
"statement": statement.get("statementId"),
})
else:
if ((("exclusiveMinimum" in share and numeric_value(share["exclusiveMinimum"])) or
("minimum" in share and numeric_value(share["minimum"]))) and
(("exclusiveMaximum" in share and numeric_value(share["exclusiveMaximum"])) or
("maximum" in share and numeric_value(share["maximum"])))):
min_val = (float(share["minimum"]) if "minimum" in share else
float(share["exclusiveMinimum"]))
max_val = (float(share["maximum"]) if "maximum" in share else
float(share["exclusiveMaximum"]))
print("Min:", min_val, "Max:", max_val)
if max_val >= min_val:
self._additional_check_results.append(
return
if ((("exclusiveMinimum" in share and numeric_value(share["exclusiveMinimum"])) or
("minimum" in share and numeric_value(share["minimum"]))) and
(("exclusiveMaximum" in share and numeric_value(share["exclusiveMaximum"])) or
("maximum" in share and numeric_value(share["maximum"])))):
min_val = (float(share["minimum"]) if "minimum" in share else
float(share["exclusiveMinimum"]))
max_val = (float(share["maximum"]) if "maximum" in share else
float(share["exclusiveMaximum"]))
print("Min:", min_val, "Max:", max_val, "Comp:", max_val >= min_val)
if not max_val >= min_val:
self._additional_check_results.append(
{
"type": "statement_relationship_interests_exact_max_greater_than_min",
"type": "statement_relationship_interests_not_exact_max_greater_than_min",
"statement_type": None,
"statement": statement.get("statementId"),
})
elif max_val == min_val:
self._additional_check_results.append(
elif max_val == min_val:
self._additional_check_results.append(
{
"type": "statement_relationship_interests_exact_max_equals_min",
"statement_type": None,
Expand All @@ -1087,18 +1093,18 @@ def check_statement_first_pass(self, statement):


def check_statement_second_pass(self, statement):
print(self._statements)
if not statement["declarationSubject"] in self._statements:
self._additional_check_results.append(
if "declarationSubject" in statement:
if not statement["declarationSubject"] in self._statements:
self._additional_check_results.append(
{
"type": "statement_declaration_subject_not_exist",
"statement_type": None,
"statement": statement.get("statementId"),
})
else:
for record_type in self._statements[statement["declarationSubject"]]:
if not record_type in ('entity', 'person'):
self._additional_check_results.append(
else:
for record_type in self._statements[statement["declarationSubject"]]:
if not record_type in ('entity', 'person'):
self._additional_check_results.append(
{
"type": "statement_declaration_subject_not_entity_person",
"statement_type": None,
Expand Down Expand Up @@ -1129,18 +1135,20 @@ def check_statement_first_pass(self, statement):
self._components[component_id] = statement["recordId"]

def check_statement_second_pass(self, statement):
#print("Boo!")
print("Boo!")
if ("recordId" in statement and "recordDetails" in statement and
isinstance(statement["recordDetails"], dict) and "isComponent" in statement["recordDetails"]):
print(statement["recordId"], statement["recordDetails"]["isComponent"], self._statements,
self._components)
if statement["recordDetails"]["isComponent"] is True:
#print(statement["recordId"] in self._statements, self._statements[statement["recordId"]],
# self._statements[self._components[statement["recordId"]]])
print("iscomponent:", statement["recordId"], self._statements)
if not statement["recordId"] in self._components or not (self._statements[statement["recordId"]]
< self._statements[self._components[statement["recordId"]]]):
#print("Failed:", statement["recordId"], self._components,
# self._statements[statement["recordId"]], self._statements[self._components[statement["recordId"]]])
print("Failed:", statement["recordId"], self._components)
self._additional_check_results.append(
{
"type": "statement_entity_is_component_not_in_component_details",
Expand Down Expand Up @@ -1482,9 +1490,79 @@ def check_ownership_or_control_statement_second_pass(self, statement):
if statement["recordDetails"]["interestedParty"] in self._records:
if (self._records[statement["recordDetails"]["interestedParty"]]
> self._records[statement["recordId"]]):
print("Ordering:", self._records[statement["recordDetails"]["interestedParty"]],
self._records[statement["recordId"]], self._records)
self._additional_check_results.append(
{
"type": "relationship_interested_party_not_before_relationship_in_dataset",
"statement_type": None,
"statement": statement.get("statementId"),
})

class CheckStatementPersonIdentifiersHaveCorrectScheme(AdditionalCheck):
def __init__(self, lib_cove_bods_config, schema_object):
super().__init__(lib_cove_bods_config, schema_object)
self._count = 0
self._records = {}
self._components = {}

@staticmethod
def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool:
return schema_object.is_schema_version_equal_to_or_greater_than("0.4")

def check_person_statement_first_pass(self, statement):
if ("recordDetails" in statement and isinstance(statement["recordDetails"], dict) and
"identifiers" in statement["recordDetails"] and
isinstance(statement["recordDetails"]["identifiers"], list)):
for identifier in statement["recordDetails"]["identifiers"]:
print("Got here!")
if not ("scheme" in identifier and identifier["scheme"].count("-") == 1 and
len(identifier["scheme"].split("-")[0]) > 0 and
len(identifier["scheme"].split("-")[1]) > 0):
self._additional_check_results.append(
{
"type": "person_identifiers_invalid_composition",
"statement_type": None,
"statement": statement.get("statementId"),
})
else:
other_codes = ("BAH", "D", "EUE", "GBD", "GBN", "GBO", "GBP", "GBS", "UNA", "UNK",
"UNO", "XBA", "XIM", "XCC", "XCO", "XEC", "XPO", "XOM", "XXA", "XXB",
"XXC", "XXX", "ZIM")
if (not pycountry.countries.get(alpha_3=identifier["scheme"].split("-")[0]) and
not identifier["scheme"].split("-")[0] in other_codes):
self._additional_check_results.append(
{
"type": "person_identifiers_no_valid_iso_3166_1_alpha_3_code",
"statement_type": None,
"statement": statement.get("statementId"),
})
elif not identifier["scheme"].split("-")[1] in ('PASSPORT', 'TAXID', 'IDCARD'):
self._additional_check_results.append(
{
"type": "person_identifiers_not_passport_taxid_idcard",
"statement_type": None,
"statement": statement.get("statementId"),
})

class CheckStatementEntityIdentifiersHaveKnownScheme(AdditionalCheck):
def __init__(self, lib_cove_bods_config, schema_object):
super().__init__(lib_cove_bods_config, schema_object)
self.orgids_prefixes = get_orgids_prefixes()

@staticmethod
def does_apply_to_schema(lib_cove_bods_config, schema_object) -> bool:
return schema_object.is_schema_version_equal_to_or_greater_than("0.4")

def check_entity_statement_first_pass(self, statement):
if ("recordDetails" in statement and isinstance(statement["recordDetails"], dict) and
"identifiers" in statement["recordDetails"] and
isinstance(statement["recordDetails"]["identifiers"], list)):
for identifier in statement["recordDetails"]["identifiers"]:
if not ("scheme" in identifier and identifier["scheme"] in self.orgids_prefixes):
self._additional_check_results.append(
{
"type": "entity_identifiers_not_known_scheme",
"statement_type": None,
"statement": statement.get("statementId"),
})
23 changes: 11 additions & 12 deletions libcovebods/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import datetime
import httpx
import json
import re

from dateutil import parser
Expand Down Expand Up @@ -40,19 +42,16 @@ def get_statement_type(statement, schema_object):

def parse_date_field(date_str):
print(date_str)
if "-" in date_str or len(date_str) == 4:
if "T" in date_str:
if "Z" in date_str:
return datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ")
else:
return datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S")
else:
if not isinstance(date_str, str):
return None
elif "-" in date_str or len(date_str) == 4:
if date_str.count("-") < 2:
if re.match(r"^[0-9]{4}$", date_str):
return datetime.datetime.strptime(date_str, "%Y")
elif re.match(r"^[0-9]{4}-[0-9]{2}$", date_str):
return datetime.datetime.strptime(date_str, "%Y-%m")
else:
return datetime.datetime.strptime(date_str, "%Y-%m-%d")
return datetime.datetime.strptime(date_str, "%Y").date()
elif re.match(r"^[0-9]{4}-[0-9]{1,2}$", date_str):
return datetime.datetime.strptime(date_str, "%Y-%m").date()
else:
return datetime.datetime.fromisoformat(date_str).date()
else:
return None

Expand Down

0 comments on commit 604eaf9

Please sign in to comment.