Skip to content

Commit

Permalink
Option actions
Browse files Browse the repository at this point in the history
This new feature let users select actions they want to receive. Actions
supported are insert, update, delete, and truncate. This actions are
similar to PUBLICATION command.

Format version 1 won't enable truncate by default (to maintain backward
compatibility). Format version 2 will enable all options by default.

A new test was added (run only on Postgres v11 or later).  That's
because test includes truncate and it will only supported by v11 or
later.
  • Loading branch information
Euler Taveira committed Dec 20, 2019
1 parent b93fb62 commit aa431a9
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 1 deletion.
8 changes: 7 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ MODULES = wal2json
REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \
delete3 delete4 savepoint specialvalue toast bytea message typmod \
filtertable selecttable include_timestamp include_lsn include_xids \
truncate
truncate actions

PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
Expand All @@ -19,6 +19,12 @@ ifneq (,$(findstring $(MAJORVERSION),9.4 9.5 9.6 10))
REGRESS := $(filter-out truncate, $(REGRESS))
endif

# actions API is available in 11+
# this test should be executed in prior versions, however, truncate will fail.
ifneq (,$(findstring $(MAJORVERSION),9.4 9.5 9.6 10))
REGRESS := $(filter-out actions, $(REGRESS))
endif

# make installcheck
#
# It can be run but you need to add the following parameters to
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ Parameters
* `filter-msg-prefixes`: exclude messages if prefix is in the list. Default is empty which means that no message will be filtered. It is a comma separated value.
* `add-msg-prefixes`: include only messages if prefix is in the list. Default is all prefixes. It is a comma separated value. `wal2json` applies `filter-msg-prefixes` before this parameter.
* `format-version`: defines which format to use. Default is _1_.
* `actions`: define which operations will be sent. Default is all actions (insert, update, delete, and truncate). However, if you are using `format-version` 1, truncate is not enabled (backward compatibility).

Examples
========
Expand Down
133 changes: 133 additions & 0 deletions expected/actions.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
\set VERBOSITY terse
-- predictability
SET synchronous_commit = on;
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');
?column?
----------
init
(1 row)

-- actions
CREATE TABLE actions (a integer primary key);
INSERT INTO actions (a) VALUES(1);
UPDATE actions SET a = 2 WHERE a = 1;
DELETE FROM actions WHERE a = 2;
TRUNCATE TABLE actions;
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'actions', 'insert, foo, delete');
ERROR: could not parse value "foo" for parameter "actions"
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1');
data
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"change":[]}
{"change":[{"kind":"insert","schema":"public","table":"actions","columnnames":["a"],"columntypes":["integer"],"columnvalues":[1]}]}
{"change":[{"kind":"update","schema":"public","table":"actions","columnnames":["a"],"columntypes":["integer"],"columnvalues":[2],"oldkeys":{"keynames":["a"],"keytypes":["integer"],"keyvalues":[1]}}]}
{"change":[{"kind":"delete","schema":"public","table":"actions","oldkeys":{"keynames":["a"],"keytypes":["integer"],"keyvalues":[2]}}]}
{"change":[]}
(5 rows)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2');
data
-------------------------------------------------------------------------------------------------------------------------------------------------------------
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"I","schema":"public","table":"actions","columns":[{"name":"a","type":"integer","value":1}]}
{"action":"C"}
{"action":"B"}
{"action":"U","schema":"public","table":"actions","columns":[{"name":"a","type":"integer","value":2}],"identity":[{"name":"a","type":"integer","value":1}]}
{"action":"C"}
{"action":"B"}
{"action":"D","schema":"public","table":"actions","identity":[{"name":"a","type":"integer","value":2}]}
{"action":"C"}
{"action":"B"}
{"action":"T","schema":"public","table":"actions"}
{"action":"C"}
(14 rows)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'actions', 'insert');
data
--------------------------------------------------------------------------------------------------------
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"I","schema":"public","table":"actions","columns":[{"name":"a","type":"integer","value":1}]}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
(11 rows)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'actions', 'update');
data
-------------------------------------------------------------------------------------------------------------------------------------------------------------
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"U","schema":"public","table":"actions","columns":[{"name":"a","type":"integer","value":2}],"identity":[{"name":"a","type":"integer","value":1}]}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
(11 rows)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'actions', 'delete');
data
---------------------------------------------------------------------------------------------------------
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"D","schema":"public","table":"actions","identity":[{"name":"a","type":"integer","value":2}]}
{"action":"C"}
{"action":"B"}
{"action":"C"}
(11 rows)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'actions', 'truncate');
data
----------------------------------------------------
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"T","schema":"public","table":"actions"}
{"action":"C"}
(11 rows)

SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'actions', 'update, truncate');
data
-------------------------------------------------------------------------------------------------------------------------------------------------------------
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"U","schema":"public","table":"actions","columns":[{"name":"a","type":"integer","value":2}],"identity":[{"name":"a","type":"integer","value":1}]}
{"action":"C"}
{"action":"B"}
{"action":"C"}
{"action":"B"}
{"action":"T","schema":"public","table":"actions"}
{"action":"C"}
(12 rows)

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
----------
stop
(1 row)

23 changes: 23 additions & 0 deletions sql/actions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
\set VERBOSITY terse

-- predictability
SET synchronous_commit = on;

SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json');

-- actions
CREATE TABLE actions (a integer primary key);
INSERT INTO actions (a) VALUES(1);
UPDATE actions SET a = 2 WHERE a = 1;
DELETE FROM actions WHERE a = 2;
TRUNCATE TABLE actions;
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'actions', 'insert, foo, delete');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'actions', 'insert');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'actions', 'update');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'actions', 'delete');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'actions', 'truncate');
SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'actions', 'update, truncate');

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
123 changes: 123 additions & 0 deletions wal2json.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ PG_MODULE_MAGIC;
extern void _PG_init(void);
extern void PGDLLEXPORT _PG_output_plugin_init(OutputPluginCallbacks *cb);

typedef struct
{
bool insert;
bool update;
bool delete;
bool truncate;
} JsonAction;

typedef struct
{
MemoryContext context;
Expand All @@ -49,6 +57,8 @@ typedef struct
bool pretty_print; /* pretty-print JSON? */
bool write_in_chunks; /* write in chunks? */

JsonAction actions; /* output only these actions */

List *filter_tables; /* filter out tables */
List *add_tables; /* add only these tables */
List *filter_msg_prefixes; /* filter by message prefixes */
Expand Down Expand Up @@ -215,6 +225,22 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is

data->format_version = WAL2JSON_FORMAT_VERSION;

/* default actions */
if (WAL2JSON_FORMAT_VERSION == 1)
{
data->actions.insert = true;
data->actions.update = true;
data->actions.delete = true;
data->actions.truncate = false; /* backward compatibility */
}
else
{
data->actions.insert = true;
data->actions.update = true;
data->actions.delete = true;
data->actions.truncate = true;
}

/* pretty print */
strcpy(data->ht, "");
strcpy(data->nl, "");
Expand Down Expand Up @@ -406,6 +432,58 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
(errcode(ERRCODE_INVALID_NAME),
errmsg("parameter \"%s\" was deprecated", elem->defname)));
}
else if (strcmp(elem->defname, "actions") == 0)
{
char *rawstr;

if (elem->arg == NULL)
{
elog(DEBUG1, "actions argument is null");
/* argument null means default; nothing to do here */
}
else
{
List *selected_actions = NIL;
ListCell *lc;

rawstr = pstrdup(strVal(elem->arg));
if (!split_string_to_list(rawstr, ',', &selected_actions))
{
pfree(rawstr);
ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}

data->actions.insert = false;
data->actions.update = false;
data->actions.delete = false;
data->actions.truncate = false;

foreach(lc, selected_actions)
{
char *p = lfirst(lc);

if (strcmp(p, "insert") == 0)
data->actions.insert = true;
else if (strcmp(p, "update") == 0)
data->actions.update = true;
else if (strcmp(p, "delete") == 0)
data->actions.delete = true;
else if (strcmp(p, "truncate") == 0)
data->actions.truncate = true;
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
p, elem->defname)));
}

pfree(rawstr);
list_free(selected_actions);
}
}
else if (strcmp(elem->defname, "filter-tables") == 0)
{
char *rawstr;
Expand Down Expand Up @@ -1042,6 +1120,23 @@ pg_decode_change_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
AssertVariableIsOfType(&pg_decode_change, LogicalDecodeChangeCB);

data = ctx->output_plugin_private;

if (!data->actions.insert)
{
elog(DEBUG3, "ignore INSERT");
return;
}
if (!data->actions.update)
{
elog(DEBUG3, "ignore UPDATE");
return;
}
if (!data->actions.delete)
{
elog(DEBUG3, "ignore DELETE");
return;
}

class_form = RelationGetForm(relation);
tupdesc = RelationGetDescr(relation);

Expand Down Expand Up @@ -1668,6 +1763,22 @@ pg_decode_change_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
char *schemaname;
char *tablename;

if (change->action == REORDER_BUFFER_CHANGE_INSERT && !data->actions.insert)
{
elog(DEBUG3, "ignore INSERT");
return;
}
if (change->action == REORDER_BUFFER_CHANGE_UPDATE && !data->actions.update)
{
elog(DEBUG3, "ignore UPDATE");
return;
}
if (change->action == REORDER_BUFFER_CHANGE_DELETE && !data->actions.delete)
{
elog(DEBUG3, "ignore DELETE");
return;
}

/* avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);

Expand Down Expand Up @@ -1958,6 +2069,12 @@ static void pg_decode_truncate_v1(LogicalDecodingContext *ctx,

data = ctx->output_plugin_private;

if (!data->actions.truncate)
{
elog(DEBUG3, "ignore TRUNCATE");
return;
}

/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);

Expand Down Expand Up @@ -2028,6 +2145,12 @@ static void pg_decode_truncate_v2(LogicalDecodingContext *ctx,
MemoryContext old;
int i;

if (!data->actions.truncate)
{
elog(DEBUG3, "ignore TRUNCATE");
return;
}

/* avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);

Expand Down

0 comments on commit aa431a9

Please sign in to comment.