Skip to content
This repository has been archived by the owner on May 17, 2024. It is now read-only.

Commit

Permalink
Merge pull request syslog-ng#4882 from bazsi/filterx-separate-scope-b…
Browse files Browse the repository at this point in the history
…locks

Filterx separate scope blocks
  • Loading branch information
alltilla authored Apr 8, 2024
2 parents ffd06b9 + e659ebd commit c68e8cb
Show file tree
Hide file tree
Showing 19 changed files with 202 additions and 68 deletions.
2 changes: 1 addition & 1 deletion lib/filterx/expr-condition.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ _eval_condition(FilterXConditional *c)
{
return _eval_condition(c->false_branch);
}
FilterXObject *result;
FilterXObject *result = NULL;
for (GList *l = c->statements; l; l = l->next)
{
FilterXExpr *expr = l->data;
Expand Down
13 changes: 7 additions & 6 deletions lib/filterx/filterx-eval.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,11 @@ _evaluate_statement(FilterXExpr *expr)


gboolean
filterx_eval_exec_statements(GList *statements, LogMessage **pmsg, const LogPathOptions *path_options)
filterx_eval_exec_statements(FilterXScope *scope, GList *statements, LogMessage *msg)
{
FilterXScope *scope = filterx_scope_new();
FilterXEvalContext local_context =
{
.msgs = pmsg,
.msgs = &msg,
.num_msg = 1,
.template_eval_options = &DEFAULT_TEMPLATE_EVAL_OPTIONS,
.scope = scope,
Expand All @@ -102,13 +101,15 @@ filterx_eval_exec_statements(GList *statements, LogMessage **pmsg, const LogPath
goto fail;
}
}
log_msg_make_writable(pmsg, path_options);
/* NOTE: we only store the results into the message if the entire evaluation was successful */
filterx_scope_sync_to_message(scope, *pmsg);
success = TRUE;
fail:
filterx_eval_set_context(NULL);
filterx_scope_free(scope);
return success;
}

void
filterx_eval_sync_scope_and_message(FilterXScope *scope, LogMessage *msg)
{
filterx_scope_sync_to_message(scope, msg);
}
4 changes: 3 additions & 1 deletion lib/filterx/filterx-eval.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ struct _FilterXEvalContext
FilterXEvalContext *filterx_eval_get_context(void);
FilterXScope *filterx_eval_get_scope(void);
void filterx_eval_set_context(FilterXEvalContext *context);
gboolean filterx_eval_exec_statements(GList *statements, LogMessage **msg, const LogPathOptions *options);
gboolean filterx_eval_exec_statements(FilterXScope *scope, GList *statements, LogMessage *msg);
void filterx_eval_sync_scope_and_message(FilterXScope *scope, LogMessage *msg);


#endif
19 changes: 17 additions & 2 deletions lib/filterx/filterx-pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,28 @@ static void
log_filterx_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options)
{
LogFilterXPipe *self = (LogFilterXPipe *) s;
LogPathOptions local_path_options;
gboolean res;

path_options = log_path_options_chain(&local_path_options, path_options);
msg_trace(">>>>>> filterx rule evaluation begin",
evt_tag_str("rule", self->name),
log_pipe_location_tag(s),
evt_tag_msg_reference(msg));

FilterXScope *scope = filterx_scope_ref(path_options->filterx_scope);
if (!scope)
local_path_options.filterx_scope = scope = filterx_scope_new();

filterx_scope_make_writable(&scope);

NVTable *payload = nv_table_ref(msg->payload);
res = filterx_eval_exec_statements(self->stmts, &msg, path_options);
nv_table_unref(payload);
res = filterx_eval_exec_statements(scope, self->stmts, msg);
if (res)
{
log_msg_make_writable(&msg, path_options);
filterx_eval_sync_scope_and_message(scope, msg);
}

msg_trace("<<<<<< filterx rule evaluation result",
evt_tag_str("result", res ? "matched" : "unmatched"),
Expand All @@ -76,6 +88,9 @@ log_filterx_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_o
(*path_options->matched) = FALSE;
log_msg_drop(msg, path_options, AT_PROCESSED);
}

filterx_scope_unref(scope);
nv_table_unref(payload);
}

static LogPipe *
Expand Down
68 changes: 67 additions & 1 deletion lib/filterx/filterx-scope.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@

struct _FilterXScope
{
GAtomicCounter ref_cnt;
GHashTable *value_cache;
GPtrArray *weak_refs;
gboolean write_protected;
};

FilterXObject *
Expand All @@ -44,13 +46,17 @@ filterx_scope_lookup_message_ref(FilterXScope *self, NVHandle handle)
void
filterx_scope_register_message_ref(FilterXScope *self, NVHandle handle, FilterXObject *value)
{
g_assert(self->write_protected == FALSE);

value->shadow = TRUE;
g_hash_table_insert(self->value_cache, GINT_TO_POINTER(handle), filterx_object_ref(value));
}

void
filterx_scope_store_weak_ref(FilterXScope *self, FilterXObject *object)
{
g_assert(self->write_protected == FALSE);

if (object)
g_ptr_array_add(self->weak_refs, filterx_object_ref(object));
}
Expand Down Expand Up @@ -84,15 +90,75 @@ filterx_scope_new(void)
{
FilterXScope *self = g_new0(FilterXScope, 1);

g_atomic_counter_set(&self->ref_cnt, 1);
self->value_cache = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, (GDestroyNotify) filterx_object_unref);
self->weak_refs = g_ptr_array_new_with_free_func((GDestroyNotify) filterx_object_unref);
return self;
}

static FilterXScope *
filterx_scope_clone(FilterXScope *other)
{
FilterXScope *self = filterx_scope_new();

GHashTableIter iter;
gpointer _key, _value;

g_hash_table_iter_init(&iter, self->value_cache);
while (g_hash_table_iter_next(&iter, &_key, &_value))
{
NVHandle handle = GPOINTER_TO_INT(_key);
FilterXObject *value = (FilterXObject *) _value;

/* NOTE: clone will not actually clone inmutable objects, in those
* cases we just take a reference */
g_hash_table_insert(self->value_cache, GINT_TO_POINTER(handle), filterx_object_clone(value));
}

/* NOTE: we don't clone weak references, those only relate to mutable
* objects, which we are cloning anyway */
return self;
}

void
filterx_scope_free(FilterXScope *self)
filterx_scope_write_protect(FilterXScope *self)
{
self->write_protected = TRUE;
}

FilterXScope *
filterx_scope_make_writable(FilterXScope **pself)
{
if ((*pself)->write_protected)
{
FilterXScope *new;

new = filterx_scope_clone(*pself);
filterx_scope_unref(*pself);
*pself = new;
}
return *pself;
}

static void
_free(FilterXScope *self)
{
g_hash_table_unref(self->value_cache);
g_ptr_array_free(self->weak_refs, TRUE);
g_free(self);
}

FilterXScope *
filterx_scope_ref(FilterXScope *self)
{
if (self)
g_atomic_counter_inc(&self->ref_cnt);
return self;
}

void
filterx_scope_unref(FilterXScope *self)
{
if (self && (g_atomic_counter_dec_and_test(&self->ref_cnt)))
_free(self);
}
7 changes: 6 additions & 1 deletion lib/filterx/filterx-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ FilterXObject *filterx_scope_lookup_message_ref(FilterXScope *self, NVHandle han
void filterx_scope_register_message_ref(FilterXScope *self, NVHandle handle, FilterXObject *value);
void filterx_scope_store_weak_ref(FilterXScope *self, FilterXObject *object);

/* copy on write */
void filterx_scope_write_protect(FilterXScope *self);
FilterXScope *filterx_scope_make_writable(FilterXScope **pself);

FilterXScope *filterx_scope_new(void);
void filterx_scope_free(FilterXScope *self);
FilterXScope *filterx_scope_ref(FilterXScope *self);
void filterx_scope_unref(FilterXScope *self);

#endif
23 changes: 15 additions & 8 deletions lib/filterx/tests/test_expr_condition.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,32 @@
#include "scratch-buffers.h"


FilterXExpr *_string_to_filterXExpr(const gchar *str)
FilterXExpr *
_string_to_filterXExpr(const gchar *str)
{
return filterx_literal_new(filterx_string_new(str, -1));
}

gint _assert_cmp_string_to_filterx_object(const char *str, FilterXObject *obj)
gint
_assert_cmp_string_to_filterx_object(const char *str, FilterXObject *obj)
{
cr_assert(filterx_object_is_type(obj, &FILTERX_TYPE_NAME(string)));
gsize string_len;
const gchar *string = filterx_string_get_value(obj, &string_len);
return strcmp(string, str);
}

FilterXExpr *_assert_assign_var(const char *var_name, FilterXExpr *value)
FilterXExpr *
_assert_assign_var(const char *var_name, FilterXExpr *value)
{
FilterXExpr *control_variable = filterx_message_ref_expr_new(log_msg_get_value_handle(var_name));
cr_assert(control_variable != NULL);

return filterx_assign_new(control_variable, value);
}

void _assert_set_test_variable(const char *var_name, FilterXExpr *expr)
void
_assert_set_test_variable(const char *var_name, FilterXExpr *expr)
{
FilterXExpr *assign = _assert_assign_var(var_name, expr);
cr_assert(assign != NULL);
Expand All @@ -76,7 +80,8 @@ void _assert_set_test_variable(const char *var_name, FilterXExpr *expr)
filterx_object_unref(assign_eval_res);
}

FilterXObject *_assert_get_test_variable(const char *var_name)
FilterXObject *
_assert_get_test_variable(const char *var_name)
{
FilterXExpr *control_variable = filterx_message_ref_expr_new(log_msg_get_value_handle(var_name));
cr_assert(control_variable != NULL);
Expand All @@ -92,7 +97,8 @@ typedef struct _TestEnv
FilterXEvalContext context;
} TestEnv;

void init_test(TestEnv *env)
void
init_test(TestEnv *env)
{

cr_assert(env != NULL);
Expand All @@ -110,11 +116,12 @@ void init_test(TestEnv *env)

}

void deinit_test(const TestEnv *env)
void
deinit_test(const TestEnv *env)
{
cr_assert(env != NULL);
log_msg_unref(env->msg);
filterx_scope_free(env->scope);
filterx_scope_unref(env->scope);
filterx_eval_set_context(NULL);
}

Expand Down
6 changes: 3 additions & 3 deletions lib/filterx/tests/test_filterx_expr.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Test(filterx_expr, test_filterx_template_evaluates_to_the_expanded_value)
filterx_expr_unref(fexpr);
log_msg_unref(msg);
filterx_object_unref(fobj);
filterx_scope_free(scope);
filterx_scope_unref(scope);
filterx_eval_set_context(NULL);
}

Expand Down Expand Up @@ -166,7 +166,7 @@ Test(filterx_expr, test_filterx_list_merge)

filterx_expr_unref(fillable);
log_msg_unref(msg);
filterx_scope_free(scope);
filterx_scope_unref(scope);
filterx_eval_set_context(NULL);
}

Expand Down Expand Up @@ -292,7 +292,7 @@ Test(filterx_expr, test_filterx_dict_merge)
filterx_object_unref(foo);
filterx_expr_unref(fillable);
log_msg_unref(msg);
filterx_scope_free(scope);
filterx_scope_unref(scope);
filterx_eval_set_context(NULL);
}

Expand Down
10 changes: 6 additions & 4 deletions lib/logmpx.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,16 @@ log_multiplexer_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_op
LogMultiplexer *self = (LogMultiplexer *) s;
gint i;
gboolean matched;
LogPathOptions local_options;
LogPathOptions local_path_options;
gboolean delivered = FALSE;
gint fallback;

log_path_options_push_junction(&local_options, &matched, path_options);
log_path_options_push_junction(&local_path_options, &matched, path_options);
if (_has_multiple_arcs(self))
{
log_msg_write_protect(msg);
if (path_options->filterx_scope)
filterx_scope_write_protect(path_options->filterx_scope);
}
for (fallback = 0; (fallback == 0) || (fallback == 1 && self->fallback_exists && !delivered); fallback++)
{
Expand All @@ -106,8 +108,8 @@ log_multiplexer_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_op
}

matched = TRUE;
log_msg_add_ack(msg, &local_options);
log_pipe_queue(next_hop, log_msg_ref(msg), &local_options);
log_msg_add_ack(msg, &local_path_options);
log_pipe_queue(next_hop, log_msg_ref(msg), &local_path_options);

if (matched)
{
Expand Down
8 changes: 4 additions & 4 deletions lib/logmsg/logmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1806,7 +1806,7 @@ log_msg_ack(LogMessage *self, const LogPathOptions *path_options, AckType ack_ty
* to send to further consuming pipes.
*/
const LogPathOptions *
log_msg_break_ack(LogMessage *msg, const LogPathOptions *path_options, LogPathOptions *local_options)
log_msg_break_ack(LogMessage *msg, const LogPathOptions *path_options, LogPathOptions *local_path_options)
{
/* NOTE: in case the user requested flow control, we can't break the
* ACK chain, as that would lead to early acks, that would cause
Expand All @@ -1816,10 +1816,10 @@ log_msg_break_ack(LogMessage *msg, const LogPathOptions *path_options, LogPathOp

log_msg_ack(msg, path_options, AT_PROCESSED);

*local_options = *path_options;
local_options->ack_needed = FALSE;
log_path_options_chain(local_path_options, path_options);
local_path_options->ack_needed = FALSE;

return local_options;
return local_path_options;
}


Expand Down
2 changes: 1 addition & 1 deletion lib/logmsg/logmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ void log_msg_add_ack(LogMessage *msg, const LogPathOptions *path_options);
void log_msg_ack(LogMessage *msg, const LogPathOptions *path_options, AckType ack_type);
void log_msg_drop(LogMessage *msg, const LogPathOptions *path_options, AckType ack_type);
const LogPathOptions *log_msg_break_ack(LogMessage *msg, const LogPathOptions *path_options,
LogPathOptions *local_options);
LogPathOptions *local_path_options);

void log_msg_refcache_start_producer(LogMessage *self);
void log_msg_refcache_start_consumer(LogMessage *self, const LogPathOptions *path_options);
Expand Down
Loading

0 comments on commit c68e8cb

Please sign in to comment.