Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

905 Miovision Open Data #909

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
107 changes: 107 additions & 0 deletions dags/miovision_open_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
r"""### Monthly Miovision Open Data DAG
Pipeline to run monthly Miovision aggregations for Open Data.
"""
import sys
import os

from airflow.decorators import dag, task
from datetime import timedelta
from airflow.models import Variable
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.macros import ds_format

import logging
import pendulum

try:
repo_path = os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
sys.path.insert(0, repo_path)
from dags.dag_functions import task_fail_slack_alert, send_slack_msg
from dags.custom_operators import SQLCheckOperatorWithReturnValue
except:
raise ImportError("Cannot import DAG helper functions.")

LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)

DAG_NAME = 'miovision_open_data'
DAG_OWNERS = Variable.get('dag_owners', deserialize_json=True).get(DAG_NAME, ["Unknown"])

default_args = {
'owner': ','.join(DAG_OWNERS),
'depends_on_past':False,
#set earlier start_date + catchup when ready?
'start_date': pendulum.datetime(2024, 1, 1, tz="America/Toronto"),
'email_on_failure': False,
'email_on_success': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': task_fail_slack_alert
}

@dag(
dag_id=DAG_NAME,
default_args=default_args,
schedule='0 14 3 * *', # 2pm, 3rd day of each month
catchup=True,
max_active_runs=1,
tags=["miovision", "open_data"],
doc_md=__doc__
)
def miovision_open_data_dag():

#considered whether it should have an external task sensor
#for the first of the month. Decided it should run later
#to give time for anomalous_range updates if any.

check_data_availability = SQLCheckOperatorWithReturnValue(
task_id="check_data_availability",
sql="""WITH daily_volumes AS (
SELECT dt::date, COALESCE(SUM(daily_volume), 0) AS daily_volume
FROM generate_series('{{ macros.ds_format(ds, '%Y-%m-%d', '%Y-%m-01') }}'::date,
'{{ macros.ds_format(ds, '%Y-%m-%d', '%Y-%m-01') }}'::date + '1 month'::interval - '1 day'::interval,
'1 day'::interval) AS dates(dt)
LEFT JOIN miovision_api.volumes_daily_unfiltered USING (dt)
GROUP BY dt
ORDER BY dt
)

SELECT NOT(COUNT(*) > 0), 'Missing dates: ' || string_agg(dt::text, ', ')
FROM daily_volumes
WHERE daily_volume = 0""",
conn_id="miovision_api_bot"
)

refresh_monthly_open_data = PostgresOperator(
task_id='refresh_monthly_open_data',
sql="SELECT gwolofs.insert_miovision_open_data_monthly_summary('{{ macros.ds_format(ds, '%Y-%m-%d', '%Y-%m-01') }}'::date)",
postgres_conn_id='miovision_api_bot',
autocommit=True
)

refresh_15min_open_data = PostgresOperator(
task_id='refresh_15min_open_data',
sql="SELECT gwolofs.insert_miovision_15min_open_data('{{ macros.ds_format(ds, '%Y-%m-%d', '%Y-%m-01') }}'::date)",
postgres_conn_id='miovision_api_bot',
autocommit=True
)

@task(
retries=0,
trigger_rule='all_done',
doc_md="""A status message to report DAG success."""
)
def status_message(ds = None, **context):
mnth = ds_format(ds, '%Y-%m-%d', '%Y-%m-01')
send_slack_msg(
context=context,
msg=f":meow_miovision: :open_data_to: DAG ran successfully for {mnth} :white_check_mark:"
)

(
check_data_availability >>
[refresh_monthly_open_data, refresh_15min_open_data] >>
status_message()
)

miovision_open_data_dag()
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
--Review decisions:
--Classification Grouping and naming
--Include/Exclude bicycles?
--Include/Exclude buses/streetcars?
--Decision to not include manual anomalous_range 'valid_caveat' notes: SELECT
--Including entry/exit information to satisfy ATR related DRs.
-->> providing exit leg and direction as extra columns rather
-->> than extra rows to reduce potential for double counting.

--DROP FUNCTION gwolofs.insert_miovision_15min_open_data;

CREATE OR REPLACE FUNCTION gwolofs.insert_miovision_15min_open_data(
_date date,
integer [] DEFAULT ARRAY[]::integer []
)
RETURNS void
LANGUAGE 'plpgsql'
COST 100
VOLATILE PARALLEL UNSAFE
AS $BODY$

DECLARE
target_intersections integer [] = miovision_api.get_intersections_uids(intersections);
n_deleted int;
n_inserted int;
_month date = date_trunc('month', _date);

BEGIN

WITH deleted AS (
DELETE FROM gwolofs.miovision_15min_open_data
WHERE
datetime_15min >= _month
AND datetime_15min < _month + interval '1 month'
AND intersection_uid = ANY(target_intersections)
RETURNING *
)

SELECT COUNT(*) INTO n_deleted
FROM deleted;

RAISE NOTICE 'Deleted % rows from gwolofs.miovision_15min_open_data for month %.', n_deleted, _month;

CREATE TEMP TABLE miovision_movement_map_new AS (
SELECT
entries.movement_uid,
entries.leg_old AS leg,
entries.dir AS entry_dir,
mov.movement_name AS movement,
--assign exits for peds, bike entry only movements
COALESCE(exits.leg_new, entries.leg_old) AS exit_leg,
COALESCE(exits.dir, entries.dir) AS exit_dir
FROM miovision_api.movement_map AS entries
JOIN miovision_api.movements AS mov USING (movement_uid)
LEFT JOIN miovision_api.movement_map AS exits ON
exits.leg_old = entries.leg_old
AND exits.movement_uid = entries.movement_uid
AND exits.leg_new = substr(exits.dir, 1, 1) --eg. E leg going East is an exit
WHERE entries.leg_new <> substr(entries.dir, 1, 1) --eg. E leg going West is an entry
);

WITH inserted AS (
INSERT INTO gwolofs.miovision_15min_open_data (
intersection_uid, intersection_long_name, datetime_15min, classification_type,
entry_leg, entry_dir, movement, exit_leg, exit_dir, volume_15min
)
SELECT
v15.intersection_uid,
i.api_name AS intersection_long_name,
v15.datetime_bin AS datetime_15min,
CASE
WHEN cl.classification = 'Light' THEN 'Light Auto'
WHEN cl.classification IN (
'SingleUnitTruck', 'ArticulatedTruck', 'MotorizedVehicle', 'Bus'
) THEN 'Truck/Bus'
ELSE cl.classification -- 'Bicycle', 'Pedestrian'
END AS classification_type,
v15.leg AS entry_leg,
mm.entry_dir,
mm.movement,
mm.exit_leg,
mm.exit_dir,
--assign exits for peds, bike entry only movements
SUM(v15.volume) AS volume_15min
--exclude notes (manual text field)
--array_agg(ar.notes ORDER BY ar.range_start, ar.uid) FILTER (WHERE ar.uid IS NOT NULL) AS anomalous_range_caveats
FROM miovision_api.volumes_15min_mvt AS v15
JOIN miovision_api.classifications AS cl USING (classification_uid)
JOIN miovision_api.intersections AS i USING (intersection_uid)
-- TMC to ATR crossover table
JOIN miovision_movement_map_new AS mm USING (movement_uid, leg)
--anti-join anomalous_ranges. See HAVING clause.
LEFT JOIN miovision_api.anomalous_ranges AS ar ON
(
ar.intersection_uid = v15.intersection_uid
OR ar.intersection_uid IS NULL
) AND (
ar.classification_uid = v15.classification_uid
OR ar.classification_uid IS NULL
)
AND v15.datetime_bin >= ar.range_start
AND (
v15.datetime_bin < ar.range_end
OR ar.range_end IS NULL
)
AND ar.problem_level IN ('do-not-use'::text, 'questionable'::text)
WHERE
v15.datetime_bin >= _month
AND v15.datetime_bin < _month + interval '1 month'
AND v15.intersection_uid = ANY(target_intersections)
GROUP BY
v15.intersection_uid,
i.api_name,
v15.datetime_bin,
classification_type,
v15.leg,
mm.entry_dir,
mm.movement,
mm.exit_leg,
mm.exit_dir
HAVING
NOT array_agg(ar.problem_level) && ARRAY['do-not-use'::text, 'questionable'::text]
AND SUM(v15.volume) > 0 --confirm
ORDER BY
v15.intersection_uid,
classification_type,
v15.datetime_bin,
v15.leg
RETURNING *
)

SELECT COUNT(*) INTO n_inserted
FROM inserted;

RAISE NOTICE 'Inserted % rows into gwolofs.miovision_15min_open_data for month %.', n_inserted, _month;

END;
$BODY$;

ALTER FUNCTION gwolofs.insert_miovision_15min_open_data(date, integer [])
OWNER TO gwolofs;

GRANT EXECUTE ON FUNCTION gwolofs.insert_miovision_15min_open_data(date, integer [])
TO miovision_admins;

GRANT EXECUTE ON FUNCTION gwolofs.insert_miovision_15min_open_data(date, integer [])
TO miovision_api_bot;

REVOKE ALL ON FUNCTION gwolofs.insert_miovision_15min_open_data(date, integer [])
FROM public;

COMMENT ON FUNCTION gwolofs.insert_miovision_15min_open_data(date, integer [])
IS 'Function for first deleting then inserting monthly 15
minute open data volumes into gwolofs.miovision_15min_open_data.
Contains an optional intersection parameter in case one just one
intersection needs to be refreshed.';

--testing, around 50 minutes for 1 month (5M rows)
SELECT gwolofs.insert_miovision_15min_open_data('2024-02-01'::date);
Loading
Loading