From ec88472872942c7c3058684c78d90755e23a68c4 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Thu, 4 Apr 2024 11:16:35 +0200 Subject: [PATCH 01/11] filterx: fix whitespace issues in test_expr_condition Signed-off-by: Balazs Scheidler --- lib/filterx/tests/test_expr_condition.c | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/lib/filterx/tests/test_expr_condition.c b/lib/filterx/tests/test_expr_condition.c index 0818ee999b6..68208be933c 100644 --- a/lib/filterx/tests/test_expr_condition.c +++ b/lib/filterx/tests/test_expr_condition.c @@ -42,12 +42,14 @@ #include "scratch-buffers.h" -FilterXExpr *_string_to_filterXExpr(const gchar *str) +FilterXExpr * +_string_to_filterXExpr(const gchar *str) { return filterx_literal_new(filterx_string_new(str, -1)); } -gint _assert_cmp_string_to_filterx_object(const char *str, FilterXObject *obj) +gint +_assert_cmp_string_to_filterx_object(const char *str, FilterXObject *obj) { cr_assert(filterx_object_is_type(obj, &FILTERX_TYPE_NAME(string))); gsize string_len; @@ -55,7 +57,8 @@ gint _assert_cmp_string_to_filterx_object(const char *str, FilterXObject *obj) return strcmp(string, str); } -FilterXExpr *_assert_assign_var(const char *var_name, FilterXExpr *value) +FilterXExpr * +_assert_assign_var(const char *var_name, FilterXExpr *value) { FilterXExpr *control_variable = filterx_message_ref_expr_new(log_msg_get_value_handle(var_name)); cr_assert(control_variable != NULL); @@ -63,7 +66,8 @@ FilterXExpr *_assert_assign_var(const char *var_name, FilterXExpr *value) return filterx_assign_new(control_variable, value); } -void _assert_set_test_variable(const char *var_name, FilterXExpr *expr) +void +_assert_set_test_variable(const char *var_name, FilterXExpr *expr) { FilterXExpr *assign = _assert_assign_var(var_name, expr); cr_assert(assign != NULL); @@ -76,7 +80,8 @@ void _assert_set_test_variable(const char *var_name, FilterXExpr *expr) filterx_object_unref(assign_eval_res); } -FilterXObject *_assert_get_test_variable(const char *var_name) +FilterXObject * +_assert_get_test_variable(const char *var_name) { FilterXExpr *control_variable = filterx_message_ref_expr_new(log_msg_get_value_handle(var_name)); cr_assert(control_variable != NULL); @@ -92,7 +97,8 @@ typedef struct _TestEnv FilterXEvalContext context; } TestEnv; -void init_test(TestEnv *env) +void +init_test(TestEnv *env) { cr_assert(env != NULL); @@ -110,7 +116,8 @@ void init_test(TestEnv *env) } -void deinit_test(const TestEnv *env) +void +deinit_test(const TestEnv *env) { cr_assert(env != NULL); log_msg_unref(env->msg); From 9e4246c6620bc47240e9b10991f316d73fbedd50 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Thu, 4 Apr 2024 11:29:59 +0200 Subject: [PATCH 02/11] filterx: fix compilation warning on uninitialized value Signed-off-by: Balazs Scheidler --- lib/filterx/expr-condition.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/filterx/expr-condition.c b/lib/filterx/expr-condition.c index 4382c2037f1..884618c7b05 100644 --- a/lib/filterx/expr-condition.c +++ b/lib/filterx/expr-condition.c @@ -57,7 +57,7 @@ _eval_condition(FilterXConditional *c) { return _eval_condition(c->false_branch); } - FilterXObject *result; + FilterXObject *result = NULL; for (GList *l = c->statements; l; l = l->next) { FilterXExpr *expr = l->data; From 57794ffd5fae2b01740ad8f97b6dbfb334c2b629 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Wed, 3 Apr 2024 14:45:28 +0200 Subject: [PATCH 03/11] logpipe: add comments around functions performing LogPathOptions chaining Signed-off-by: Balazs Scheidler --- lib/logpipe.h | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/lib/logpipe.h b/lib/logpipe.h index 402506ece5b..3b31dfe1029 100644 --- a/lib/logpipe.h +++ b/lib/logpipe.h @@ -220,6 +220,14 @@ struct _LogPathOptions #define LOG_PATH_OPTIONS_INIT { TRUE, FALSE, NULL, NULL } #define LOG_PATH_OPTIONS_INIT_NOACK { FALSE, FALSE, NULL, NULL } +/* LogPathOptions are chained up at the start of a junction and teared down + * at the end (see log_path_options_pop_junction(). + * + * The "matched" value is kept separate on the parent level and the junction + * level. This way the junction can separately act on matching/non-matching + * messages and potentially propagate it to the parent (or not), see + * logmpx.c for details. + * */ static inline void log_path_options_push_junction(LogPathOptions *local_path_options, gboolean *matched, const LogPathOptions *parent) { @@ -228,6 +236,11 @@ log_path_options_push_junction(LogPathOptions *local_path_options, gboolean *mat local_path_options->parent = parent; } +/* Part of the junction related state needs to be "popped" once the + * conditional decision is concluded. This happens in the `if (filter)` + * form, once the filter is evaluated, or at the end of the junction. This + * basically resets the "matched" pointer to that of the parent junction. + */ static inline void log_path_options_pop_conditional(LogPathOptions *local_path_options) { @@ -236,6 +249,11 @@ log_path_options_pop_conditional(LogPathOptions *local_path_options) } /* + * Tear down the embedded junction related state from the LogPathOptions + * chain. This implies log_path_options_pop_conditional() as well, which + * will do nothing if there was a conditional midpoint (e.g. `if + * (filter)`). + * * NOTE: we need to be optional about ->parent being set, as synthetic * messages (e.g. the likes emitted by db-parser/grouping-by() may arrive * at the end of a junction without actually crossing the beginning of the From 5a90fa508f29b8f00d96376108ba2054c212cf42 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Wed, 3 Apr 2024 15:36:40 +0200 Subject: [PATCH 04/11] logpipe: rename "parent" to "lpo_parent_junction" Signed-off-by: Balazs Scheidler --- lib/logpipe.h | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/logpipe.h b/lib/logpipe.h index 3b31dfe1029..fa94ac73d97 100644 --- a/lib/logpipe.h +++ b/lib/logpipe.h @@ -214,7 +214,7 @@ struct _LogPathOptions gboolean flow_control_requested; gboolean *matched; - const LogPathOptions *parent; + const LogPathOptions *lpo_parent_junction; }; #define LOG_PATH_OPTIONS_INIT { TRUE, FALSE, NULL, NULL } @@ -229,11 +229,13 @@ struct _LogPathOptions * logmpx.c for details. * */ static inline void -log_path_options_push_junction(LogPathOptions *local_path_options, gboolean *matched, const LogPathOptions *parent) +log_path_options_push_junction(LogPathOptions *local_path_options, + gboolean *matched, + const LogPathOptions *lpo_parent_junction) { - *local_path_options = *parent; + *local_path_options = *lpo_parent_junction; local_path_options->matched = matched; - local_path_options->parent = parent; + local_path_options->lpo_parent_junction = lpo_parent_junction; } /* Part of the junction related state needs to be "popped" once the @@ -244,8 +246,8 @@ log_path_options_push_junction(LogPathOptions *local_path_options, gboolean *mat static inline void log_path_options_pop_conditional(LogPathOptions *local_path_options) { - if (local_path_options->parent) - local_path_options->matched = local_path_options->parent->matched; + if (local_path_options->lpo_parent_junction) + local_path_options->matched = local_path_options->lpo_parent_junction->matched; } /* @@ -266,8 +268,8 @@ log_path_options_pop_junction(LogPathOptions *local_path_options) { log_path_options_pop_conditional(local_path_options); - if (local_path_options->parent) - local_path_options->parent = local_path_options->parent->parent; + if (local_path_options->lpo_parent_junction) + local_path_options->lpo_parent_junction = local_path_options->lpo_parent_junction->lpo_parent_junction; } typedef struct _LogPipeOptions LogPipeOptions; From 9f2a2a179b6b15e6ae407c8dae2175dddd4f18cf Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Wed, 3 Apr 2024 15:38:18 +0200 Subject: [PATCH 05/11] logpipe: rename all local LogPathOptions instances to use the same name Signed-off-by: Balazs Scheidler --- lib/logmpx.c | 8 ++++---- lib/logmsg/logmsg.c | 8 ++++---- lib/logmsg/logmsg.h | 2 +- lib/logthrdest/logthrdestdrv.c | 4 ++-- lib/logwriter.c | 4 ++-- lib/metrics-pipe.c | 10 +++++----- modules/diskq/dqtool.c | 4 ++-- modules/diskq/tests/test_reliable_backlog.c | 20 ++++++++++---------- 8 files changed, 30 insertions(+), 30 deletions(-) diff --git a/lib/logmpx.c b/lib/logmpx.c index 28c5fb6079d..c73a8280c05 100644 --- a/lib/logmpx.c +++ b/lib/logmpx.c @@ -81,11 +81,11 @@ log_multiplexer_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_op LogMultiplexer *self = (LogMultiplexer *) s; gint i; gboolean matched; - LogPathOptions local_options; + LogPathOptions local_path_options; gboolean delivered = FALSE; gint fallback; - log_path_options_push_junction(&local_options, &matched, path_options); + log_path_options_push_junction(&local_path_options, &matched, path_options); if (_has_multiple_arcs(self)) { log_msg_write_protect(msg); @@ -106,8 +106,8 @@ log_multiplexer_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_op } matched = TRUE; - log_msg_add_ack(msg, &local_options); - log_pipe_queue(next_hop, log_msg_ref(msg), &local_options); + log_msg_add_ack(msg, &local_path_options); + log_pipe_queue(next_hop, log_msg_ref(msg), &local_path_options); if (matched) { diff --git a/lib/logmsg/logmsg.c b/lib/logmsg/logmsg.c index 612da1eb6a8..038d155b2c6 100644 --- a/lib/logmsg/logmsg.c +++ b/lib/logmsg/logmsg.c @@ -1806,7 +1806,7 @@ log_msg_ack(LogMessage *self, const LogPathOptions *path_options, AckType ack_ty * to send to further consuming pipes. */ const LogPathOptions * -log_msg_break_ack(LogMessage *msg, const LogPathOptions *path_options, LogPathOptions *local_options) +log_msg_break_ack(LogMessage *msg, const LogPathOptions *path_options, LogPathOptions *local_path_options) { /* NOTE: in case the user requested flow control, we can't break the * ACK chain, as that would lead to early acks, that would cause @@ -1816,10 +1816,10 @@ log_msg_break_ack(LogMessage *msg, const LogPathOptions *path_options, LogPathOp log_msg_ack(msg, path_options, AT_PROCESSED); - *local_options = *path_options; - local_options->ack_needed = FALSE; + *local_path_options = *path_options; + local_path_options->ack_needed = FALSE; - return local_options; + return local_path_options; } diff --git a/lib/logmsg/logmsg.h b/lib/logmsg/logmsg.h index 0861dab3f1a..8e89029df83 100644 --- a/lib/logmsg/logmsg.h +++ b/lib/logmsg/logmsg.h @@ -525,7 +525,7 @@ void log_msg_add_ack(LogMessage *msg, const LogPathOptions *path_options); void log_msg_ack(LogMessage *msg, const LogPathOptions *path_options, AckType ack_type); void log_msg_drop(LogMessage *msg, const LogPathOptions *path_options, AckType ack_type); const LogPathOptions *log_msg_break_ack(LogMessage *msg, const LogPathOptions *path_options, - LogPathOptions *local_options); + LogPathOptions *local_path_options); void log_msg_refcache_start_producer(LogMessage *self); void log_msg_refcache_start_consumer(LogMessage *self, const LogPathOptions *path_options); diff --git a/lib/logthrdest/logthrdestdrv.c b/lib/logthrdest/logthrdestdrv.c index 5fbc9c32cb2..8404dd84fea 100644 --- a/lib/logthrdest/logthrdestdrv.c +++ b/lib/logthrdest/logthrdestdrv.c @@ -1162,10 +1162,10 @@ log_threaded_dest_driver_queue(LogPipe *s, LogMessage *msg, { LogThreadedDestDriver *self = (LogThreadedDestDriver *)s; LogThreadedDestWorker *dw = _lookup_worker(self, msg); - LogPathOptions local_options; + LogPathOptions local_path_options; if (!path_options->flow_control_requested) - path_options = log_msg_break_ack(msg, path_options, &local_options); + path_options = log_msg_break_ack(msg, path_options, &local_path_options); log_msg_add_ack(msg, path_options); log_queue_push_tail(dw->queue, log_msg_ref(msg), path_options); diff --git a/lib/logwriter.c b/lib/logwriter.c index d79348c47cd..bd9939a63be 100644 --- a/lib/logwriter.c +++ b/lib/logwriter.c @@ -852,7 +852,7 @@ static void log_writer_queue(LogPipe *s, LogMessage *lm, const LogPathOptions *path_options) { LogWriter *self = (LogWriter *) s; - LogPathOptions local_options; + LogPathOptions local_path_options; gint mark_mode = self->options->mark_mode; if (!path_options->flow_control_requested && @@ -861,7 +861,7 @@ log_writer_queue(LogPipe *s, LogMessage *lm, const LogPathOptions *path_options) /* NOTE: this code ACKs the message back if there's a write error in * order not to hang the client in case of a disk full */ - path_options = log_msg_break_ack(lm, path_options, &local_options); + path_options = log_msg_break_ack(lm, path_options, &local_path_options); } if (log_writer_is_msg_suppressed(self, lm)) diff --git a/lib/metrics-pipe.c b/lib/metrics-pipe.c index d8b66922a80..2fc5efe65c4 100644 --- a/lib/metrics-pipe.c +++ b/lib/metrics-pipe.c @@ -44,16 +44,16 @@ _queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options) stats_counter_inc(self->ingress_counter); gboolean matched = TRUE; - LogPathOptions local_options = *path_options; - local_options.matched = &matched; + LogPathOptions local_path_options = *path_options; + local_path_options.matched = &matched; - log_pipe_forward_msg(s, msg, &local_options); + log_pipe_forward_msg(s, msg, &local_path_options); - if (*local_options.matched) + if (*local_path_options.matched) stats_counter_inc(self->egress_counter); if (path_options->matched) - *path_options->matched = *local_options.matched; + *path_options->matched = *local_path_options.matched; } static gboolean diff --git a/modules/diskq/dqtool.c b/modules/diskq/dqtool.c index d475a67196b..ecc50fe9ca3 100644 --- a/modules/diskq/dqtool.c +++ b/modules/diskq/dqtool.c @@ -227,7 +227,7 @@ dqtool_cat(int argc, char *argv[]) msg = g_string_sized_new(128); for (i = optind; i < argc; i++) { - LogPathOptions local_options = LOG_PATH_OPTIONS_INIT; + LogPathOptions local_path_options = LOG_PATH_OPTIONS_INIT; LogQueue *lq; if (!open_queue(argv[i], &lq, &options, TRUE)) @@ -235,7 +235,7 @@ dqtool_cat(int argc, char *argv[]) log_queue_rewind_backlog_all(lq); - while ((log_msg = log_queue_pop_head(lq, &local_options)) != NULL) + while ((log_msg = log_queue_pop_head(lq, &local_path_options)) != NULL) { /* format log */ LogTemplateEvalOptions eval_options = {&configuration->template_options, LTZ_LOCAL, 0, NULL, LM_VT_STRING}; diff --git a/modules/diskq/tests/test_reliable_backlog.c b/modules/diskq/tests/test_reliable_backlog.c index 516f80b7c1e..600387b0a26 100644 --- a/modules/diskq/tests/test_reliable_backlog.c +++ b/modules/diskq/tests/test_reliable_backlog.c @@ -113,24 +113,24 @@ set_mark_message_serialized_size(void) static void _prepare_eof_test(LogQueueDiskReliable *dq, LogMessage **msg1, LogMessage **msg2) { - LogPathOptions local_options = LOG_PATH_OPTIONS_INIT; + LogPathOptions local_path_options = LOG_PATH_OPTIONS_INIT; gint64 start_pos = TEST_DISKQ_SIZE - 1; *msg1 = log_msg_new_mark(); *msg2 = log_msg_new_mark(); (*msg1)->ack_func = _dummy_ack; - log_msg_add_ack(*msg1, &local_options); + log_msg_add_ack(*msg1, &local_path_options); (*msg2)->ack_func = _dummy_ack; - log_msg_add_ack(*msg2, &local_options); + log_msg_add_ack(*msg2, &local_path_options); dq->super.qdisk->hdr->write_head = start_pos; dq->super.qdisk->hdr->read_head = QDISK_RESERVED_SPACE + mark_message_serialized_size + 1; dq->super.qdisk->hdr->backlog_head = dq->super.qdisk->hdr->read_head; - log_queue_push_tail(&dq->super.super, *msg1, &local_options); - log_queue_push_tail(&dq->super.super, *msg2, &local_options); + log_queue_push_tail(&dq->super.super, *msg1, &local_path_options); + log_queue_push_tail(&dq->super.super, *msg2, &local_path_options); cr_assert_eq(dq->flow_control_window->length, NUMBER_MESSAGES_IN_QUEUE(2), "%s", "Messages aren't in flow_control_window"); @@ -168,12 +168,12 @@ test_rewind_over_eof(LogQueueDiskReliable *dq) LogMessage *msg3 = log_msg_new_mark(); LogMessage *read_message3; - LogPathOptions local_options = LOG_PATH_OPTIONS_INIT; + LogPathOptions local_path_options = LOG_PATH_OPTIONS_INIT; msg3->ack_func = _dummy_ack; - log_queue_push_tail(&dq->super.super, msg3, &local_options); + log_queue_push_tail(&dq->super.super, msg3, &local_path_options); gint64 previous_read_head = dq->super.qdisk->hdr->read_head; - read_message3 = log_queue_pop_head(&dq->super.super, &local_options); + read_message3 = log_queue_pop_head(&dq->super.super, &local_path_options); cr_assert_not_null(read_message3, "%s", "Can't read message from queue"); cr_assert_eq(dq->super.qdisk->hdr->read_head, dq->super.qdisk->hdr->write_head, "%s", "Read head in bad position"); @@ -185,13 +185,13 @@ test_rewind_over_eof(LogQueueDiskReliable *dq) cr_assert_eq(dq->super.qdisk->hdr->read_head, previous_read_head, "%s", "Read head is corrupted"); - read_message3 = log_queue_pop_head(&dq->super.super, &local_options); + read_message3 = log_queue_pop_head(&dq->super.super, &local_path_options); cr_assert_not_null(read_message3, "%s", "Can't read message from queue"); cr_assert_eq(dq->super.qdisk->hdr->read_head, dq->super.qdisk->hdr->write_head, "%s", "Read head in bad position"); cr_assert_eq(msg3, read_message3, "%s", "Message 3 isn't read from flow_control_window"); - log_msg_drop(msg3, &local_options, AT_PROCESSED); + log_msg_drop(msg3, &local_path_options, AT_PROCESSED); } static void From 22683f23e3f7aa9e8b721f7e36979920684d2d5d Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Wed, 3 Apr 2024 14:49:30 +0200 Subject: [PATCH 06/11] logpipe: add separate log_path_options_chain() function Signed-off-by: Balazs Scheidler --- lib/logmsg/logmsg.c | 2 +- lib/logpipe.h | 13 +++++++++++-- lib/metrics-pipe.c | 6 ++++-- modules/correlation/stateful-parser.c | 4 +++- modules/diskq/logqueue-disk-reliable.c | 7 ++++--- 5 files changed, 23 insertions(+), 9 deletions(-) diff --git a/lib/logmsg/logmsg.c b/lib/logmsg/logmsg.c index 038d155b2c6..d7d716a5858 100644 --- a/lib/logmsg/logmsg.c +++ b/lib/logmsg/logmsg.c @@ -1816,7 +1816,7 @@ log_msg_break_ack(LogMessage *msg, const LogPathOptions *path_options, LogPathOp log_msg_ack(msg, path_options, AT_PROCESSED); - *local_path_options = *path_options; + log_path_options_chain(local_path_options, path_options); local_path_options->ack_needed = FALSE; return local_path_options; diff --git a/lib/logpipe.h b/lib/logpipe.h index fa94ac73d97..51dd1e4a2ea 100644 --- a/lib/logpipe.h +++ b/lib/logpipe.h @@ -220,6 +220,16 @@ struct _LogPathOptions #define LOG_PATH_OPTIONS_INIT { TRUE, FALSE, NULL, NULL } #define LOG_PATH_OPTIONS_INIT_NOACK { FALSE, FALSE, NULL, NULL } +/* + * Embed a step in our LogPathOptions chain. + */ +static inline LogPathOptions * +log_path_options_chain(LogPathOptions *local_path_options, const LogPathOptions *lpo_previous_hop) +{ + *local_path_options = *lpo_previous_hop; + return local_path_options; +} + /* LogPathOptions are chained up at the start of a junction and teared down * at the end (see log_path_options_pop_junction(). * @@ -446,7 +456,7 @@ log_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options) if (G_UNLIKELY(s->flags & (PIF_HARD_FLOW_CONTROL | PIF_JUNCTION_END | PIF_CONDITIONAL_MIDPOINT))) { - local_path_options = *path_options; + path_options = log_path_options_chain(&local_path_options, path_options); if (s->flags & PIF_HARD_FLOW_CONTROL) { local_path_options.flow_control_requested = 1; @@ -460,7 +470,6 @@ log_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options) { log_path_options_pop_conditional(&local_path_options); } - path_options = &local_path_options; } if (s->queue) diff --git a/lib/metrics-pipe.c b/lib/metrics-pipe.c index 2fc5efe65c4..f7a303c6544 100644 --- a/lib/metrics-pipe.c +++ b/lib/metrics-pipe.c @@ -44,12 +44,14 @@ _queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options) stats_counter_inc(self->ingress_counter); gboolean matched = TRUE; - LogPathOptions local_path_options = *path_options; + LogPathOptions local_path_options; + log_path_options_chain(&local_path_options, path_options); local_path_options.matched = &matched; log_pipe_forward_msg(s, msg, &local_path_options); - if (*local_path_options.matched) + /* this is populated via local_path_options->matched */ + if (matched) stats_counter_inc(self->egress_counter); if (path_options->matched) diff --git a/modules/correlation/stateful-parser.c b/modules/correlation/stateful-parser.c index f152700e827..be23ed19211 100644 --- a/modules/correlation/stateful-parser.c +++ b/modules/correlation/stateful-parser.c @@ -68,9 +68,11 @@ void _queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options) { StatefulParser *self = (StatefulParser *) s; - LogPathOptions local_path_options = *path_options; + LogPathOptions local_path_options; gboolean matched = TRUE; + log_path_options_chain(&local_path_options, path_options); + /* if we consume messages into our state, then let's consider these * messages matched, even though we are dropping them */ diff --git a/modules/diskq/logqueue-disk-reliable.c b/modules/diskq/logqueue-disk-reliable.c index 8ff363db648..9ae17630bac 100644 --- a/modules/diskq/logqueue-disk-reliable.c +++ b/modules/diskq/logqueue-disk-reliable.c @@ -392,9 +392,10 @@ _push_tail(LogQueue *s, LogMessage *msg, const LogPathOptions *path_options) * Keep the message in memory for fast-path. * Set its ack_needed to FALSE, because we have already acked it. */ - LogPathOptions local_options = *path_options; - local_options.ack_needed = FALSE; - _push_to_memory_queue_tail(self->front_cache, message_position, msg, &local_options); + LogPathOptions local_path_options; + log_path_options_chain(&local_path_options, path_options); + local_path_options.ack_needed = FALSE; + _push_to_memory_queue_tail(self->front_cache, message_position, msg, &local_path_options); log_queue_memory_usage_add(s, log_msg_get_size(msg)); goto exit; } From 8107f8dc56666198703c3270b0cf440800f187e6 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Thu, 4 Apr 2024 07:42:51 +0200 Subject: [PATCH 07/11] metrics-pipe: don't reset *matched to TRUE The general model for the matched value passed along in LogPathOptions is to initialize it to TRUE and set it to FALSE in case a path becomes non-matching. This should not be an actual bug, as we would not get the message in queue() unless it is still TRUE, but no need to overwrite that value. Signed-off-by: Balazs Scheidler --- lib/metrics-pipe.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/metrics-pipe.c b/lib/metrics-pipe.c index f7a303c6544..707bff99b51 100644 --- a/lib/metrics-pipe.c +++ b/lib/metrics-pipe.c @@ -54,8 +54,8 @@ _queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options) if (matched) stats_counter_inc(self->egress_counter); - if (path_options->matched) - *path_options->matched = *local_path_options.matched; + if (path_options->matched && !matched) + *path_options->matched = FALSE; } static gboolean From 78142cee9906ef3f4f0aa7c7c51aba880738395b Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Thu, 4 Apr 2024 09:11:42 +0200 Subject: [PATCH 08/11] filterx: split filterx_eval_exec_statements() into two functions Signed-off-by: Balazs Scheidler --- lib/filterx/filterx-eval.c | 13 +++++++------ lib/filterx/filterx-eval.h | 4 +++- lib/filterx/filterx-pipe.c | 9 ++++++++- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/lib/filterx/filterx-eval.c b/lib/filterx/filterx-eval.c index 393266b7009..226ed2fc5c7 100644 --- a/lib/filterx/filterx-eval.c +++ b/lib/filterx/filterx-eval.c @@ -82,12 +82,11 @@ _evaluate_statement(FilterXExpr *expr) gboolean -filterx_eval_exec_statements(GList *statements, LogMessage **pmsg, const LogPathOptions *path_options) +filterx_eval_exec_statements(FilterXScope *scope, GList *statements, LogMessage *msg) { - FilterXScope *scope = filterx_scope_new(); FilterXEvalContext local_context = { - .msgs = pmsg, + .msgs = &msg, .num_msg = 1, .template_eval_options = &DEFAULT_TEMPLATE_EVAL_OPTIONS, .scope = scope, @@ -102,13 +101,15 @@ filterx_eval_exec_statements(GList *statements, LogMessage **pmsg, const LogPath goto fail; } } - log_msg_make_writable(pmsg, path_options); /* NOTE: we only store the results into the message if the entire evaluation was successful */ - filterx_scope_sync_to_message(scope, *pmsg); success = TRUE; fail: filterx_eval_set_context(NULL); - filterx_scope_free(scope); return success; +} +void +filterx_eval_sync_scope_and_message(FilterXScope *scope, LogMessage *msg) +{ + filterx_scope_sync_to_message(scope, msg); } diff --git a/lib/filterx/filterx-eval.h b/lib/filterx/filterx-eval.h index 632014c35bb..d0a05dd4c98 100644 --- a/lib/filterx/filterx-eval.h +++ b/lib/filterx/filterx-eval.h @@ -39,6 +39,8 @@ struct _FilterXEvalContext FilterXEvalContext *filterx_eval_get_context(void); FilterXScope *filterx_eval_get_scope(void); void filterx_eval_set_context(FilterXEvalContext *context); -gboolean filterx_eval_exec_statements(GList *statements, LogMessage **msg, const LogPathOptions *options); +gboolean filterx_eval_exec_statements(FilterXScope *scope, GList *statements, LogMessage *msg); +void filterx_eval_sync_scope_and_message(FilterXScope *scope, LogMessage *msg); + #endif diff --git a/lib/filterx/filterx-pipe.c b/lib/filterx/filterx-pipe.c index 0f698669d20..38785b5a78a 100644 --- a/lib/filterx/filterx-pipe.c +++ b/lib/filterx/filterx-pipe.c @@ -57,7 +57,14 @@ log_filterx_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_o evt_tag_msg_reference(msg)); NVTable *payload = nv_table_ref(msg->payload); - res = filterx_eval_exec_statements(self->stmts, &msg, path_options); + FilterXScope *scope = filterx_scope_new(); + res = filterx_eval_exec_statements(scope, self->stmts, msg); + if (res) + { + log_msg_make_writable(&msg, path_options); + filterx_eval_sync_scope_and_message(scope, msg); + } + filterx_scope_free(scope); nv_table_unref(payload); msg_trace("<<<<<< filterx rule evaluation result", From 3e6693384637d80ebd80bafc8e67ad2e151413ca Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Fri, 5 Apr 2024 11:31:37 +0200 Subject: [PATCH 09/11] filterx: make FilterXScope refcounted Signed-off-by: Balazs Scheidler --- lib/filterx/filterx-pipe.c | 2 +- lib/filterx/filterx-scope.c | 21 +++++++++++++++++++-- lib/filterx/filterx-scope.h | 3 ++- lib/filterx/tests/test_expr_condition.c | 2 +- lib/filterx/tests/test_filterx_expr.c | 6 +++--- 5 files changed, 26 insertions(+), 8 deletions(-) diff --git a/lib/filterx/filterx-pipe.c b/lib/filterx/filterx-pipe.c index 38785b5a78a..b4fe356f31a 100644 --- a/lib/filterx/filterx-pipe.c +++ b/lib/filterx/filterx-pipe.c @@ -64,7 +64,7 @@ log_filterx_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_o log_msg_make_writable(&msg, path_options); filterx_eval_sync_scope_and_message(scope, msg); } - filterx_scope_free(scope); + filterx_scope_unref(scope); nv_table_unref(payload); msg_trace("<<<<<< filterx rule evaluation result", diff --git a/lib/filterx/filterx-scope.c b/lib/filterx/filterx-scope.c index 0102ed13a27..c9a1afd0170 100644 --- a/lib/filterx/filterx-scope.c +++ b/lib/filterx/filterx-scope.c @@ -25,6 +25,7 @@ struct _FilterXScope { + GAtomicCounter ref_cnt; GHashTable *value_cache; GPtrArray *weak_refs; }; @@ -84,15 +85,31 @@ filterx_scope_new(void) { FilterXScope *self = g_new0(FilterXScope, 1); + g_atomic_counter_set(&self->ref_cnt, 1); self->value_cache = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) filterx_object_unref); self->weak_refs = g_ptr_array_new_with_free_func((GDestroyNotify) filterx_object_unref); return self; } -void -filterx_scope_free(FilterXScope *self) +static void +_free(FilterXScope *self) { g_hash_table_unref(self->value_cache); g_ptr_array_free(self->weak_refs, TRUE); g_free(self); } + +FilterXScope * +filterx_scope_ref(FilterXScope *self) +{ + if (self) + g_atomic_counter_inc(&self->ref_cnt); + return self; +} + +void +filterx_scope_unref(FilterXScope *self) +{ + if (self && (g_atomic_counter_dec_and_test(&self->ref_cnt))) + _free(self); +} diff --git a/lib/filterx/filterx-scope.h b/lib/filterx/filterx-scope.h index 6652059dfe7..4f1118d24a1 100644 --- a/lib/filterx/filterx-scope.h +++ b/lib/filterx/filterx-scope.h @@ -34,6 +34,7 @@ void filterx_scope_register_message_ref(FilterXScope *self, NVHandle handle, Fil void filterx_scope_store_weak_ref(FilterXScope *self, FilterXObject *object); FilterXScope *filterx_scope_new(void); -void filterx_scope_free(FilterXScope *self); +FilterXScope *filterx_scope_ref(FilterXScope *self); +void filterx_scope_unref(FilterXScope *self); #endif \ No newline at end of file diff --git a/lib/filterx/tests/test_expr_condition.c b/lib/filterx/tests/test_expr_condition.c index 68208be933c..3ac09859e73 100644 --- a/lib/filterx/tests/test_expr_condition.c +++ b/lib/filterx/tests/test_expr_condition.c @@ -121,7 +121,7 @@ deinit_test(const TestEnv *env) { cr_assert(env != NULL); log_msg_unref(env->msg); - filterx_scope_free(env->scope); + filterx_scope_unref(env->scope); filterx_eval_set_context(NULL); } diff --git a/lib/filterx/tests/test_filterx_expr.c b/lib/filterx/tests/test_filterx_expr.c index 4682a211397..b48decdf895 100644 --- a/lib/filterx/tests/test_filterx_expr.c +++ b/lib/filterx/tests/test_filterx_expr.c @@ -88,7 +88,7 @@ Test(filterx_expr, test_filterx_template_evaluates_to_the_expanded_value) filterx_expr_unref(fexpr); log_msg_unref(msg); filterx_object_unref(fobj); - filterx_scope_free(scope); + filterx_scope_unref(scope); filterx_eval_set_context(NULL); } @@ -166,7 +166,7 @@ Test(filterx_expr, test_filterx_list_merge) filterx_expr_unref(fillable); log_msg_unref(msg); - filterx_scope_free(scope); + filterx_scope_unref(scope); filterx_eval_set_context(NULL); } @@ -292,7 +292,7 @@ Test(filterx_expr, test_filterx_dict_merge) filterx_object_unref(foo); filterx_expr_unref(fillable); log_msg_unref(msg); - filterx_scope_free(scope); + filterx_scope_unref(scope); filterx_eval_set_context(NULL); } From 2ecc30e855dba9279903d4346fc0f1749ba96744 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Thu, 4 Apr 2024 17:58:30 +0200 Subject: [PATCH 10/11] filterx: add copy-on-write feature to FilterXScope Signed-off-by: Balazs Scheidler --- lib/filterx/filterx-scope.c | 49 +++++++++++++++++++++++++++++++++++++ lib/filterx/filterx-scope.h | 4 +++ 2 files changed, 53 insertions(+) diff --git a/lib/filterx/filterx-scope.c b/lib/filterx/filterx-scope.c index c9a1afd0170..fcf451b415c 100644 --- a/lib/filterx/filterx-scope.c +++ b/lib/filterx/filterx-scope.c @@ -28,6 +28,7 @@ struct _FilterXScope GAtomicCounter ref_cnt; GHashTable *value_cache; GPtrArray *weak_refs; + gboolean write_protected; }; FilterXObject * @@ -45,6 +46,8 @@ filterx_scope_lookup_message_ref(FilterXScope *self, NVHandle handle) void filterx_scope_register_message_ref(FilterXScope *self, NVHandle handle, FilterXObject *value) { + g_assert(self->write_protected == FALSE); + value->shadow = TRUE; g_hash_table_insert(self->value_cache, GINT_TO_POINTER(handle), filterx_object_ref(value)); } @@ -52,6 +55,8 @@ filterx_scope_register_message_ref(FilterXScope *self, NVHandle handle, FilterXO void filterx_scope_store_weak_ref(FilterXScope *self, FilterXObject *object) { + g_assert(self->write_protected == FALSE); + if (object) g_ptr_array_add(self->weak_refs, filterx_object_ref(object)); } @@ -91,6 +96,50 @@ filterx_scope_new(void) return self; } +static FilterXScope * +filterx_scope_clone(FilterXScope *other) +{ + FilterXScope *self = filterx_scope_new(); + + GHashTableIter iter; + gpointer _key, _value; + + g_hash_table_iter_init(&iter, self->value_cache); + while (g_hash_table_iter_next(&iter, &_key, &_value)) + { + NVHandle handle = GPOINTER_TO_INT(_key); + FilterXObject *value = (FilterXObject *) _value; + + /* NOTE: clone will not actually clone inmutable objects, in those + * cases we just take a reference */ + g_hash_table_insert(self->value_cache, GINT_TO_POINTER(handle), filterx_object_clone(value)); + } + + /* NOTE: we don't clone weak references, those only relate to mutable + * objects, which we are cloning anyway */ + return self; +} + +void +filterx_scope_write_protect(FilterXScope *self) +{ + self->write_protected = TRUE; +} + +FilterXScope * +filterx_scope_make_writable(FilterXScope **pself) +{ + if ((*pself)->write_protected) + { + FilterXScope *new; + + new = filterx_scope_clone(*pself); + filterx_scope_unref(*pself); + *pself = new; + } + return *pself; +} + static void _free(FilterXScope *self) { diff --git a/lib/filterx/filterx-scope.h b/lib/filterx/filterx-scope.h index 4f1118d24a1..cb20cabb8f7 100644 --- a/lib/filterx/filterx-scope.h +++ b/lib/filterx/filterx-scope.h @@ -33,6 +33,10 @@ FilterXObject *filterx_scope_lookup_message_ref(FilterXScope *self, NVHandle han void filterx_scope_register_message_ref(FilterXScope *self, NVHandle handle, FilterXObject *value); void filterx_scope_store_weak_ref(FilterXScope *self, FilterXObject *object); +/* copy on write */ +void filterx_scope_write_protect(FilterXScope *self); +FilterXScope *filterx_scope_make_writable(FilterXScope **pself); + FilterXScope *filterx_scope_new(void); FilterXScope *filterx_scope_ref(FilterXScope *self); void filterx_scope_unref(FilterXScope *self); From e659ebdd8a68dba5df7a0e65370e78234a7fcbe0 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Thu, 4 Apr 2024 13:16:20 +0200 Subject: [PATCH 11/11] filterx: propagate FilterXScope between filterx blocks Signed-off-by: Balazs Scheidler --- lib/filterx/filterx-pipe.c | 14 +++++++++++--- lib/logmpx.c | 2 ++ lib/logpipe.h | 2 ++ 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/lib/filterx/filterx-pipe.c b/lib/filterx/filterx-pipe.c index b4fe356f31a..d31109dbb1d 100644 --- a/lib/filterx/filterx-pipe.c +++ b/lib/filterx/filterx-pipe.c @@ -49,23 +49,28 @@ static void log_filterx_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options) { LogFilterXPipe *self = (LogFilterXPipe *) s; + LogPathOptions local_path_options; gboolean res; + path_options = log_path_options_chain(&local_path_options, path_options); msg_trace(">>>>>> filterx rule evaluation begin", evt_tag_str("rule", self->name), log_pipe_location_tag(s), evt_tag_msg_reference(msg)); + FilterXScope *scope = filterx_scope_ref(path_options->filterx_scope); + if (!scope) + local_path_options.filterx_scope = scope = filterx_scope_new(); + + filterx_scope_make_writable(&scope); + NVTable *payload = nv_table_ref(msg->payload); - FilterXScope *scope = filterx_scope_new(); res = filterx_eval_exec_statements(scope, self->stmts, msg); if (res) { log_msg_make_writable(&msg, path_options); filterx_eval_sync_scope_and_message(scope, msg); } - filterx_scope_unref(scope); - nv_table_unref(payload); msg_trace("<<<<<< filterx rule evaluation result", evt_tag_str("result", res ? "matched" : "unmatched"), @@ -83,6 +88,9 @@ log_filterx_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_o (*path_options->matched) = FALSE; log_msg_drop(msg, path_options, AT_PROCESSED); } + + filterx_scope_unref(scope); + nv_table_unref(payload); } static LogPipe * diff --git a/lib/logmpx.c b/lib/logmpx.c index c73a8280c05..00919eb5dd0 100644 --- a/lib/logmpx.c +++ b/lib/logmpx.c @@ -89,6 +89,8 @@ log_multiplexer_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_op if (_has_multiple_arcs(self)) { log_msg_write_protect(msg); + if (path_options->filterx_scope) + filterx_scope_write_protect(path_options->filterx_scope); } for (fallback = 0; (fallback == 0) || (fallback == 1 && self->fallback_exists && !delivered); fallback++) { diff --git a/lib/logpipe.h b/lib/logpipe.h index 51dd1e4a2ea..bc4493dce73 100644 --- a/lib/logpipe.h +++ b/lib/logpipe.h @@ -27,6 +27,7 @@ #include "syslog-ng.h" #include "logmsg/logmsg.h" +#include "filterx/filterx-scope.h" #include "cfg.h" #include "atomic.h" #include "messages.h" @@ -215,6 +216,7 @@ struct _LogPathOptions gboolean *matched; const LogPathOptions *lpo_parent_junction; + FilterXScope *filterx_scope; }; #define LOG_PATH_OPTIONS_INIT { TRUE, FALSE, NULL, NULL }