Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Résolution du problème des alertes qui remontent malgré la présence d'une suspension d'alerte #3971

Merged
merged 9 commits into from
Dec 18, 2024
3 changes: 3 additions & 0 deletions datascience/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@
RISK_FACTOR_VERIFICATION_THRESHOLD = 2.3
FLAG_STATES_WITHOUT_SYSTEMATIC_VERIFICATION = ["FRA"]

# Missing DEP alerts configuration
MISSING_DEP_TRACK_ANALYSIS_HOURS = 48

# App URL
MONITORFISH_URL = os.getenv("MONITORFISH_URL") # http://monitor.fish/
BACKOFFICE_REGULATION_URL = MONITORFISH_URL + "backoffice/regulation"
Expand Down
16 changes: 11 additions & 5 deletions datascience/src/pipeline/flows/missing_dep_alerts.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from pathlib import Path

from prefect import Flow, case, task
from prefect import Flow, Parameter, case, task
from prefect.executors import LocalDaskExecutor

from config import MISSING_DEP_TRACK_ANALYSIS_HOURS
from src.pipeline.entities.alerts import AlertType
from src.pipeline.generic_tasks import extract
from src.pipeline.shared_tasks.alerts import (
Expand All @@ -16,23 +17,28 @@


@task(checkpoint=False)
def extract_missing_deps():
def extract_missing_deps(hours_from_now: int):
return extract(
db_name="monitorfish_remote", query_filepath="monitorfish/missing_deps.sql"
db_name="monitorfish_remote",
query_filepath="monitorfish/missing_deps.sql",
params={"hours_from_now": hours_from_now},
)


with Flow("Missing DEP alerts", executor=LocalDaskExecutor()) as flow:
flow_not_running = check_flow_not_running()
with case(flow_not_running, True):
vessels_with_missing_deps = extract_missing_deps()
hours_from_now = Parameter("hours_from_now", MISSING_DEP_TRACK_ANALYSIS_HOURS)
vessels_with_missing_deps = extract_missing_deps(hours_from_now)

alerts = make_alerts(
vessels_with_missing_deps,
AlertType.MISSING_DEP_ALERT.value,
AlertType.MISSING_DEP_ALERT.value,
)
silenced_alerts = extract_silenced_alerts(AlertType.MISSING_DEP_ALERT.value)
silenced_alerts = extract_silenced_alerts(
AlertType.MISSING_DEP_ALERT.value, number_of_hours=hours_from_now
)
active_reportings = extract_active_reportings(AlertType.MISSING_DEP_ALERT.value)
filtered_alerts = filter_alerts(alerts, silenced_alerts, active_reportings)

Expand Down
26 changes: 21 additions & 5 deletions datascience/src/pipeline/flows/missing_far_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@


@task(checkpoint=False)
def get_dates(days_without_far: int) -> Tuple[datetime, datetime, datetime, datetime]:
def get_dates(
days_without_far: int,
) -> Tuple[datetime, datetime, datetime, datetime, float]:
"""
Returns the dates used in the flow as a 4-tuple :
Returns the dates used in the flow as a 5-tuple :

- `days_without_far` days ago at 00:00 (beginning of the day) in UTC
- `days_without_far` days ago at 00:00 (beginning of the day) in UTC (1)
- Yesterday at 8pm in UTC
- Today at 00:00 (beginning of the day) in UTC
- Current datetime in UTC
- Current datetime in UTC (2)
- The number of hours that separate 1 and 2

Returns:
Tuple[datetime, datetime, datetime]
Expand All @@ -43,12 +46,16 @@ def get_dates(days_without_far: int) -> Tuple[datetime, datetime, datetime, date
today_at_zero_hours = utcnow.replace(hour=0, minute=0, second=0, microsecond=0)
period_start_at_zero_hours = today_at_zero_hours - timedelta(days=days_without_far)
yesterday_at_eight_pm = today_at_zero_hours - timedelta(hours=4)
period_start_hours_from_now = (
utcnow - period_start_at_zero_hours
).total_seconds() / 3600

return (
period_start_at_zero_hours,
yesterday_at_eight_pm,
today_at_zero_hours,
utcnow,
period_start_hours_from_now,
)


Expand Down Expand Up @@ -301,10 +308,16 @@ def get_vessels_at_sea(positions_at_sea: pd.DataFrame, min_days: int) -> pd.Data
"vessel_name",
"flag_state",
"facade",
"date_time",
"latitude",
"longitude",
]
]
.rename(
columns={
"date_time": "triggering_behaviour_datetime_utc",
}
)
.reset_index(drop=True)
)
return vessels_at_sea
Expand Down Expand Up @@ -426,6 +439,7 @@ def merge_risk_factor(
yesterday_at_eight_pm,
today_at_zero_hours,
utcnow,
period_start_hours_from_now,
) = get_dates(days_without_far)

positions_at_sea_yesterday_everywhere_query = make_positions_at_sea_query(
Expand Down Expand Up @@ -497,7 +511,9 @@ def merge_risk_factor(
districts_columns_to_add=["dml"],
)
alerts = make_alerts(vessels_with_missing_fars, alert_type, alert_config_name)
silenced_alerts = extract_silenced_alerts(alert_type)
silenced_alerts = extract_silenced_alerts(
alert_type, number_of_hours=period_start_hours_from_now
)
alert_without_silenced = filter_alerts(alerts, silenced_alerts)

# Load
Expand Down
6 changes: 4 additions & 2 deletions datascience/src/pipeline/flows/position_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ def get_vessels_in_alert(positions_in_alert: pd.DataFrame) -> pd.DataFrame:
]
.rename(
columns={
"date_time": "creation_date",
"date_time": "triggering_behaviour_datetime_utc",
}
)
.reset_index(drop=True)
Expand Down Expand Up @@ -526,7 +526,9 @@ def get_vessels_in_alert(positions_in_alert: pd.DataFrame) -> pd.DataFrame:
districts_columns_to_add=["dml"],
)
alerts = make_alerts(vessels_in_alert, alert_type, alert_config_name)
silenced_alerts = extract_silenced_alerts(alert_type)
silenced_alerts = extract_silenced_alerts(
alert_type, number_of_hours=hours_from_now
)
alert_without_silenced = filter_alerts(alerts, silenced_alerts)
load_alerts(alert_without_silenced, alert_config_name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ def extract_suspicions_of_under_declaration():
AlertType.SUSPICION_OF_UNDER_DECLARATION_ALERT.value,
)
silenced_alerts = extract_silenced_alerts(
AlertType.SUSPICION_OF_UNDER_DECLARATION_ALERT.value
AlertType.SUSPICION_OF_UNDER_DECLARATION_ALERT.value,
# 8 days, to cover the date range analyzed in
# `extract_suspicions_of_under_declaration`
number_of_hours=192,
)
active_reportings = extract_active_reportings(
AlertType.SUSPICION_OF_UNDER_DECLARATION_ALERT.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ WITH detected_recent_deps AS (
LEFT JOIN districts d
ON d.district_code = v.district_code
WHERE
p.date_time >= CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '48 hours'
p.date_time >= CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL ':hours_from_now hours'
AND p.date_time < CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '2 hours'
AND p.is_at_port = false
AND time_emitting_at_sea = INTERVAL '0'
Expand All @@ -37,6 +37,7 @@ SELECT
d.dml,
d.flag_state,
lp.risk_factor,
d.date_time AS triggering_behaviour_datetime_utc,
d.latitude,
d.longitude
FROM detected_recent_deps d
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
SELECT DISTINCT
SELECT
internal_reference_number,
external_reference_number,
ircs
ircs,
MAX(silenced_before_date) AT TIME ZONE 'UTC' AS silenced_before_date
FROM silenced_alerts
WHERE
NOW() < silenced_before_date
silenced_before_date > CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL ':number_of_hours HOURS'
AND value->>'type' = :alert_type
GROUP BY 1, 2, 3
ORDER BY 1, 2, 3
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ SELECT
fe.dml,
fe.flag_state,
lp.risk_factor,
DATE_TRUNC('day', CURRENT_TIMESTAMP AT TIME ZONE 'UTC') - INTERVAL '7 days' AS triggering_behaviour_datetime_utc,
lp.latitude,
lp.longitude
FROM fishing_efforts fe
Expand Down
55 changes: 35 additions & 20 deletions datascience/src/pipeline/shared_tasks/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,31 @@
from src.pipeline.generic_tasks import extract, load
from src.pipeline.processing import (
df_to_dict_series,
join_on_multiple_keys,
left_isin_right_by_decreasing_priority,
)
from src.pipeline.utils import delete_rows, get_table


@task(checkpoint=False)
def extract_silenced_alerts(alert_type: str) -> pd.DataFrame:
def extract_silenced_alerts(alert_type: str, number_of_hours: int = 0) -> pd.DataFrame:
"""
Return DataFrame of vessels with active silenced alerts of the given type.

Args:
alert_type (str): Type of alert for which to extract silenced alerts
number_of_hours (int, optional): Number of hours from current time to extract.
Defaults to 0.

Returns:
pd.DataFrame: Silenced alerts with columns
"""

alert_type = AlertType(alert_type)
return extract(
db_name="monitorfish_remote",
query_filepath="monitorfish/silenced_alerts.sql",
params={"alert_type": alert_type.value},
params={"alert_type": alert_type.value, "number_of_hours": number_of_hours},
)


Expand Down Expand Up @@ -119,10 +128,8 @@ def make_alerts(
- `dml`
- `flag_state`
- `risk_factor`
- and optionally, `creation_date`, `latitude` and `longitude`

If `creation_date` is not one of the columns, it will be added and filled with
`datetime.utcnow`.
- `triggering_behaviour_datetime_utc`
- and optionally, `latitude` and `longitude`

If `latitude` and `longitude` are not columns of the input, they are added and
filled with null values in the result.
Expand All @@ -132,7 +139,6 @@ def make_alerts(
create an alert.
alert_type (str): `type` to specify in the built alerts.
alert_config_name (str): `alert_config_name` to specify in the built alerts.
creation_date (datetime): `creation_date` to specify in the built alerts.

Returns:
pd.DataFrame: `DataFrame` of alerts.
Expand All @@ -145,8 +151,7 @@ def make_alerts(
}
)

if "creation_date" not in alerts:
alerts["creation_date"] = datetime.utcnow()
alerts["creation_date"] = datetime.utcnow()

if "latitude" not in alerts:
alerts["latitude"] = None
Expand Down Expand Up @@ -175,6 +180,7 @@ def make_alerts(
"flag_state",
"vessel_id",
"vessel_identifier",
"triggering_behaviour_datetime_utc",
"creation_date",
"latitude",
"longitude",
Expand Down Expand Up @@ -209,12 +215,16 @@ def filter_alerts(
- vessel_identifier
- flag_state
- facade
- triggering_behaviour_datetime_utc
- creation_date
- latitude
- longitude
- value
- alert_config_name

and the `silenced_alerts` DataFrame must have a `silenced_before_date`
column.

Args:
alerts (pd.DataFrame): positions alerts.
vessels_with_silenced_alerts (pd.DataFrame): vessels with silenced alerts.
Expand All @@ -224,18 +234,14 @@ def filter_alerts(
"""
vessel_id_cols = ["internal_reference_number", "external_reference_number", "ircs"]

if isinstance(vessels_with_active_reportings, pd.DataFrame):
vessels_to_remove = (
pd.concat([vessels_with_silenced_alerts, vessels_with_active_reportings])
.drop_duplicates()
.reset_index(drop=True)
)
else:
vessels_to_remove = vessels_with_silenced_alerts
alerts = join_on_multiple_keys(
alerts, vessels_with_silenced_alerts, or_join_keys=vessel_id_cols, how="left"
)

alerts = alerts.loc[
~left_isin_right_by_decreasing_priority(
alerts[vessel_id_cols], vessels_to_remove[vessel_id_cols]
(
(alerts.silenced_before_date.isna())
| (alerts.triggering_behaviour_datetime_utc > alerts.silenced_before_date)
),
[
"vessel_name",
Expand All @@ -251,7 +257,16 @@ def filter_alerts(
"value",
"alert_config_name",
],
].reset_index(drop=True)
]

if isinstance(vessels_with_active_reportings, pd.DataFrame):
alerts = alerts.loc[
~left_isin_right_by_decreasing_priority(
alerts[vessel_id_cols], vessels_with_active_reportings[vessel_id_cols]
)
]

alerts = alerts.sort_values("internal_reference_number").reset_index(drop=True)

return alerts

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,14 @@ INSERT INTO silenced_alerts (
'DEVINER FIGURE CONSCIENCE', 'ABC000542519', 'RO237719', 'FQ7058', 'INTERNAL_REFERENCE_NUMBER',
NOW() + ('15 DAYS')::interval, 'FR',
'{"type": "MISSING_FAR_ALERT", "seaFront": "NAMO"}'
),
(
'DEVINER FIGURE CONSCIENCE', 'ABC000542519', 'RO237719', 'FQ7058', 'INTERNAL_REFERENCE_NUMBER',
NOW() + ('7 DAYS')::interval, 'FR',
'{"type": "MISSING_FAR_ALERT", "seaFront": "NAMO"}'
),
(
'AUTRE NAVIRE', 'ABC000123456', NULL, NULL, 'INTERNAL_REFERENCE_NUMBER',
NOW() - ('5 HOURS')::interval, 'FR',
'{"type": "MISSING_FAR_ALERT", "seaFront": "NAMO"}'
);
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone

import pandas as pd
import pytest
Expand Down Expand Up @@ -26,6 +26,9 @@ def expected_missing_deps() -> pd.DataFrame:
"dml": ["DML 29"],
"flag_state": ["FR"],
"risk_factor": [2.58],
"triggering_behaviour_datetime_utc": [
datetime.utcnow() - timedelta(hours=2)
],
"latitude": [49.606],
"longitude": [-0.736],
}
Expand Down Expand Up @@ -83,8 +86,23 @@ def reset_test_data_missing_dep_alerts(reset_test_data):
def test_extract_missing_deps(
reset_test_data_missing_dep_alerts, expected_missing_deps
):
res = extract_missing_deps.run()
pd.testing.assert_frame_equal(res, expected_missing_deps)
res = extract_missing_deps.run(hours_from_now=48)
pd.testing.assert_frame_equal(
res.drop(columns=["triggering_behaviour_datetime_utc"]),
expected_missing_deps.drop(columns=["triggering_behaviour_datetime_utc"]),
)

assert (
(
(
res.triggering_behaviour_datetime_utc
- expected_missing_deps.triggering_behaviour_datetime_utc
)
.map(lambda td: td.total_seconds())
.abs()
)
< 10
).all()


def test_flow(reset_test_data_missing_dep_alerts):
Expand Down
Loading
Loading