Skip to content

Commit

Permalink
Add pre-execute hook option to do things like index the process_queue…
Browse files Browse the repository at this point in the history
… table
  • Loading branch information
jfinzel committed Aug 23, 2022
1 parent 87f24ca commit 90dd92f
Show file tree
Hide file tree
Showing 14 changed files with 6,705 additions and 13 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
EXTENSION = pg_fact_loader
DATA = pg_fact_loader--1.4.sql pg_fact_loader--1.4--1.5.sql \
pg_fact_loader--1.5.sql pg_fact_loader--1.5--1.6.sql \
pg_fact_loader--1.6.sql
pg_fact_loader--1.6.sql pg_fact_loader--1.6--1.7.sql \
pg_fact_loader--1.7.sql
MODULES = pg_fact_loader

REGRESS := 01_create_ext 02_schema 03_audit \
Expand Down
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ There are a number of config tables that drive pg_fact_loader loads:
- `depends_on_parent_daily_job_id`: For jobs that depend on other daily scheduled jobs only. Immediate parent which must complete before this job will run.
- `daily_scheduled_deps`: OPTIONAL for daily scheduled jobs. The only purpose of this column is to consider if we should wait to run a scheduled job because dependent tables are out of date. This is a regclass array of tables that this scheduled job depends on, which will only be considered if they are either listed in fact_loader.queue_tables or fact_loader.fact_tables. If the former, replication delay will be considered (if table is not local). If the latter, last_refresh_source_cutoff will be considered. Works in combination with daily_scheduled_dep_delay_tolerance which says how much time delay is tolerated. Job will FAIL if the time delay constraint is not met for all tables - this is intended to be configured as a rare occurrence and thus we want to raise an alarm about it.
- `daily_scheduled_dep_delay_tolerance`: OPTIONAL for daily scheduled jobs. Amount of time interval allowed that dependent tables can be out of date before running this job. For example, if 10 minutes, then if ANY of the dependent tables are more than 10 minutes out of date, this job will FAIL if the time delay constraint is not met for all tables - this is intended to be configured as a rare occurrence and thus we want to raise an alarm about it.
- `pre_execute_hook_sql`: OPTIONAL - custom sql to execute within the `load.sql` function, after the `process_queue` has been loaded, but prior to the actual load of the fact table using the `process_queue`. This feature was originally written due to the need to index the process_queue in certain unique circumstances, prior to actual execution over the `process_queue`.

`queue_tables`: Each queue table along with the base table to which it belongs.
- `queue_table_id`: Unique identifier for queue tables.
Expand Down Expand Up @@ -545,6 +546,24 @@ Once you have fixed whatever issues a job may have, you will need to re-enable i

# <a name="tech"></a>Technical Documentation

## <a name="new_releases"></a>New Releases
There are some helper scripts to assist in adding a new version of pg_fact_loader, mainly `pg_fact_loader-sql-maker.sh`.

1. To add a new version, open this file, change the `last_version` and `new_version` to the correct new values.
2. Remove everything after `create_update_file_with_header` in the script. The next few lines are custom files that were
changed with a particular release, which are added to the new version's SQL script. Whatever functions or views you modify,
or if you have a schema change in the `schema/` directory, you will want to add these files using the provided function,
i.e. `add_file views/prioritized_jobs.sql $update_file` will add the SQL for views/prioritized_jobs.sql to the new extension
script. You only need to add files that you modify with a release.
3. When all is prepared, run the script. It should create new files for you for the new extension version, including an update
script from the previous version to the new version.
4. Update the Makefile to include these new SQL files.
5. Update the first script in both `sql/` and `expected/` directories, which refer to the most recent version as a
default. Update it to the new version.
6. Update the pg_fact_loader.control file with the latest version.

To test your extension for all postgres versions, including testing extension upgrade paths, see and run the script `test_all_versions.sh`.

## <a name="workflow"></a>Workflow

The function `fact_loader.worker()` drives everything in the fact table loads.
Expand Down
2 changes: 1 addition & 1 deletion expected/01_create_ext.out
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Allow running regression suite with upgrade paths
\set v `echo ${FROMVERSION:-1.6}`
\set v `echo ${FROMVERSION:-1.7}`
SET client_min_messages TO warning;
CREATE EXTENSION pglogical;
CREATE EXTENSION pglogical_ticker;
Expand Down
14 changes: 13 additions & 1 deletion expected/06_basic_workers.out
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ ORDER BY customer_id;
10 | 0001234560 | 35 | | 0 |
(10 rows)

UPDATE fact_loader.fact_tables SET force_worker_priority = TRUE WHERE fact_table_relid = 'test_fact.customers_fact'::REGCLASS;
UPDATE fact_loader.fact_tables
SET force_worker_priority = TRUE,
-- Test 1.7 pre-hook feature
pre_execute_hook_sql = 'CREATE TABLE cool_pre_execute_hook_sql (id int);'
WHERE fact_table_relid = 'test_fact.customers_fact'::REGCLASS;
SELECT pglogical_ticker.tick();
tick
------
Expand Down Expand Up @@ -179,6 +183,14 @@ ORDER BY customer_id;
10 | 0001234560 | 35 | | 0 |
(10 rows)

SELECT * FROM cool_pre_execute_hook_sql;
id
----
(0 rows)

UPDATE fact_loader.fact_tables
SET pre_execute_hook_sql = NULL
WHERE fact_table_relid = 'test_fact.customers_fact'::REGCLASS;
--This would simulate an application's changes being out of order now
UPDATE test.customers SET age = 41 WHERE customer_id = 2;
SELECT pglogical_ticker.tick();
Expand Down
2 changes: 1 addition & 1 deletion expected/16_1_2_features.out
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ SELECT fact_table_id,
FROM fact_loader.unresolved_failures;
fact_table_id | fact_table_relid | messages
---------------+-----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
9 | test_fact.orders_fact_chain | {"Hint": "No operator matches the given name and argument type(s). You might need to add explicit type casts.", "Context": "PL/pgSQL function test_fact.orders_fact_chain_merge(integer) line 4 at SQL statement\nSQL statement \"\nSELECT process_queue_id, test_fact.orders_fact_chain_merge(key_value::integer)\nFROM (\n/****\nMust wrap this to execute in order of ids\n***/\nSELECT *\nFROM process_queue\nWHERE process_queue_id BETWEEN 1 AND 1\n AND fact_table_id = 9\n AND proid = 'test_fact.orders_fact_chain_merge'::REGPROC\nORDER BY process_queue_id) q;\n\"\nPL/pgSQL function fact_loader.load(integer) line 46 at EXECUTE\nSQL statement \"SELECT fact_loader.load(p_fact_table_id)\"\nPL/pgSQL function fact_loader.try_load(integer) line 40 at PERFORM\nPL/pgSQL function fact_loader.worker() line 16 at IF", "Message": "operator does not exist: integer = jsonb"}
9 | test_fact.orders_fact_chain | {"Hint": "No operator matches the given name and argument type(s). You might need to add explicit type casts.", "Context": "PL/pgSQL function test_fact.orders_fact_chain_merge(integer) line 4 at SQL statement\nSQL statement \"\nSELECT process_queue_id, test_fact.orders_fact_chain_merge(key_value::integer)\nFROM (\n/****\nMust wrap this to execute in order of ids\n***/\nSELECT *\nFROM process_queue\nWHERE process_queue_id BETWEEN 1 AND 1\n AND fact_table_id = 9\n AND proid = 'test_fact.orders_fact_chain_merge'::REGPROC\nORDER BY process_queue_id) q;\n\"\nPL/pgSQL function fact_loader.load(integer) line 56 at EXECUTE\nSQL statement \"SELECT fact_loader.load(p_fact_table_id)\"\nPL/pgSQL function fact_loader.try_load(integer) line 40 at PERFORM\nPL/pgSQL function fact_loader.worker() line 16 at IF", "Message": "operator does not exist: integer = jsonb"}
(1 row)

--No data
Expand Down
10 changes: 10 additions & 0 deletions functions/load.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ DECLARE
v_metadata_update_sql text;
v_debug_rec record;
v_debug_text text = '';
v_pre_execute_hook_sql text = '';
BEGIN
/***
There are 3 basic steps to this load:
Expand All @@ -29,6 +30,15 @@ This just creates a temp table with all changes to be processed
RAISE DEBUG 'Populating Queue for fact_table_id %: %', p_fact_table_id, v_process_queue_sql;
EXECUTE COALESCE(v_process_queue_sql, $$SELECT 'No queue data' AS result$$);

/****
Pre-execute hook
*/
SELECT pre_execute_hook_sql INTO v_pre_execute_hook_sql
FROM fact_loader.fact_tables
WHERE fact_table_id = p_fact_table_id;

EXECUTE COALESCE(v_pre_execute_hook_sql, $$SELECT 'No pre-execute hook.' AS result$$);

/****
For DEBUG purposes only to view the actual process_queue. Requires setting log_min_messages to DEBUG.
*/
Expand Down
Loading

0 comments on commit 90dd92f

Please sign in to comment.