Skip to content

Commit

Permalink
Fixes skupperproject#1592 Added a test to detect the problem. This is…
Browse files Browse the repository at this point in the history
… not a 100% test, but it detects the condition some of the time.

Fixes skupperproject#1592 - Delay the reclaiming of deleted BIFLOW records to allow time to collect all co-record updates.
  • Loading branch information
ted-ross committed Aug 12, 2024
1 parent a0d6986 commit 6d04a7e
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 6 deletions.
62 changes: 56 additions & 6 deletions src/vanflow.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#define FLUSH_SLOT_COUNT 5
#define RATE_SLOT_COUNT 5
#define IDENTITY_MAX 27
#define DEFERRED_DELETION_TICKS 25 // Five seconds

typedef struct vflow_identity_t {
uint64_t record_id;
Expand Down Expand Up @@ -84,13 +85,14 @@ struct vflow_record_t {
DEQ_LINKS_N(UNFLUSHED, vflow_record_t);
vflow_record_type_t record_type;
vflow_record_t *parent;
vflow_record_t *co_record_peer; // Only used then record and co-record are on the same source
vflow_record_t *co_record_peer; // Only used when record and co-record are on the same source
vflow_record_list_t children;
vflow_identity_t identity;
vflow_attribute_data_list_t attributes;
vflow_rate_list_t rates;
uint64_t latency_start;
uint32_t emit_ordinal;
uint32_t delete_tick;
int flush_slot;
int default_flush_slot;
bool never_logged;
Expand Down Expand Up @@ -161,7 +163,9 @@ typedef struct {
vflow_record_list_t unflushed_log_records[FLUSH_SLOT_COUNT];
vflow_record_list_t unflushed_records[FLUSH_SLOT_COUNT]; // not flow or log records
vflow_record_list_t unflushed_co_records[FLUSH_SLOT_COUNT];
vflow_record_list_t to_delete_records;
vflow_rate_list_t rate_trackers;
uint32_t current_tick;
int current_flush_slot;
char *site_id;
char *hostname;
Expand Down Expand Up @@ -385,7 +389,24 @@ static void _vflow_post_flush_record_TH(vflow_record_t *record)
}
}

if (record->flush_slot == -1) {
//
// Check for deferred deletion. This happens if the record is a BIFLOW_TPORT and is not a co-record.
// The deferral is used to improve the probability that co-record updates will arrive before the record's
// last update is emitted.
//
if (record->ended && record->record_type == VFLOW_RECORD_BIFLOW_TPORT && !record->co_record) {
if (record->flush_slot != -1) {
DEQ_REMOVE_N(UNFLUSHED, state->unflushed_flow_records[record->flush_slot], record);
record->flush_slot = -1;
}

if (record->delete_tick == 0) {
record->delete_tick = state->current_tick + DEFERRED_DELETION_TICKS;
DEQ_INSERT_TAIL_N(UNFLUSHED, state->to_delete_records, record);
}
}

else if (record->flush_slot == -1) {
if (record->default_flush_slot == -1) {
record->default_flush_slot = state->current_flush_slot;
}
Expand Down Expand Up @@ -860,6 +881,9 @@ static void _vflow_free_record_TH(vflow_record_t *record, bool recursive)
}
}
record->flush_slot = -1;
} else if (record->delete_tick > 0) {
DEQ_REMOVE_N(UNFLUSHED, state->to_delete_records, record);
record->delete_tick = 0;
}

if (recursive) {
Expand Down Expand Up @@ -1166,8 +1190,9 @@ static void _vflow_clean_unflushed_TH(vflow_record_list_t *unflushed_records)
vflow_record_t *record = DEQ_HEAD(*unflushed_records);
while (!!record) {
DEQ_REMOVE_HEAD_N(UNFLUSHED, *unflushed_records);
assert(record->flush_slot >= 0);
assert(record->flush_slot >= 0 || record->delete_tick > 0);
record->flush_slot = -1;
record->delete_tick = 0;

//
// If this record has been ended, emit the log line.
Expand Down Expand Up @@ -1235,7 +1260,7 @@ static void _vflow_emit_co_records_TH(qdr_core_t *core, vflow_record_list_t *unf
*
* @param core Pointer to the core module
*/
static void _vflow_flush_TH(qdr_core_t *core)
static void _vflow_flush_TH(qdr_core_t *core, bool no_defer)
{
//
// If there is at least one collector for this router, batch up the
Expand All @@ -1262,6 +1287,28 @@ static void _vflow_flush_TH(qdr_core_t *core)
_vflow_clean_unflushed_TH(&state->unflushed_flow_records[state->current_flush_slot]);
_vflow_clean_unflushed_TH(&state->unflushed_log_records[state->current_flush_slot]);
_vflow_clean_unflushed_TH(&state->unflushed_co_records[state->current_flush_slot]);

//
// Flush records on the deferred-delete list
//
vflow_record_list_t delete_list;
DEQ_INIT(delete_list);
vflow_record_t *record = DEQ_HEAD(state->to_delete_records);
while (!!record && (record->delete_tick <= state->current_tick || no_defer)) {
assert(record->delete_tick > 0);
DEQ_REMOVE_HEAD_N(UNFLUSHED, state->to_delete_records);
DEQ_INSERT_TAIL_N(UNFLUSHED, delete_list, record);
record = DEQ_HEAD(state->to_delete_records);
}

//
// Note that we only use the flow_address for event generation. Only flow records are delete-deferred.
//
if (state->my_flow_address_usable) {
_vflow_emit_unflushed_as_events_TH(core, &delete_list, state->event_address_my_flow);
}

_vflow_clean_unflushed_TH(&delete_list);
}


Expand Down Expand Up @@ -1440,7 +1487,8 @@ static void _vflow_tick_TH(vflow_work_t *work, bool discard)
{
static int tick_ordinal = 0;
if (!discard) {
_vflow_flush_TH(state->router_core);
state->current_tick++;
_vflow_flush_TH(state->router_core, false);
state->current_flush_slot = (state->current_flush_slot + 1) % FLUSH_SLOT_COUNT;

tick_ordinal = (tick_ordinal + 1) % rate_slot_flush_intervals;
Expand Down Expand Up @@ -1629,7 +1677,7 @@ static void *_vflow_thread_TH(void *context)
// Flush out all of the slots
//
for (int i = 0; i < FLUSH_SLOT_COUNT; i++) {
_vflow_flush_TH(core);
_vflow_flush_TH(core, true);
state->current_flush_slot = (state->current_flush_slot + 1) % FLUSH_SLOT_COUNT;
}

Expand Down Expand Up @@ -2361,6 +2409,8 @@ static void _vflow_init(qdr_core_t *core, void **adaptor_context)
DEQ_INIT(state->unflushed_co_records[slot]);
}

DEQ_INIT(state->to_delete_records);

sys_mutex_init(&state->lock);
sys_mutex_init(&state->id_lock);
sys_cond_init(&state->condition);
Expand Down
35 changes: 35 additions & 0 deletions tests/system_tests_vflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ def setUpClass(cls):
cls.tcp_listener_port_eb = cls.tester.get_port()
cls.tcp_connector_port = cls.tester.get_port()
cls.tcp_noproc_port = cls.tester.get_port()
cls.tcp_noproc_port_ia = cls.tester.get_port()
cls.tcp_noproc_port_ib = cls.tester.get_port()
cls.tcp_noproc_port_eb = cls.tester.get_port()
cls.connector_down_port = cls.tester.get_port()

configs = [
Expand All @@ -369,6 +372,9 @@ def setUpClass(cls):
('tcpListener', {'host': '0.0.0.0',
'port': cls.tcp_listener_port_ia,
'address': 'tcpServiceAddress'}),
('tcpListener', {'host': '0.0.0.0',
'port': cls.tcp_noproc_port_ia,
'address': 'noProcessAddress'}),
# a dummy connector which never connects (operStatus == down)
('connector', {'role': 'inter-router',
'port': cls.connector_down_port,
Expand All @@ -395,6 +401,9 @@ def setUpClass(cls):
('tcpListener', {'host': '0.0.0.0',
'port': cls.tcp_listener_port_ib,
'address': 'tcpServiceAddress'}),
('tcpListener', {'host': '0.0.0.0',
'port': cls.tcp_noproc_port_ib,
'address': 'noProcessAddress'}),
],
# Router EdgeB
[
Expand All @@ -408,6 +417,9 @@ def setUpClass(cls):
('tcpListener', {'host': '0.0.0.0',
'port': cls.tcp_listener_port_eb,
'address': 'tcpServiceAddress'}),
('tcpListener', {'host': '0.0.0.0',
'port': cls.tcp_noproc_port_eb,
'address': 'noProcessAddress'}),
# metrics listener
('listener', {'role': 'normal',
'host': '0.0.0.0',
Expand Down Expand Up @@ -512,6 +524,29 @@ def test_02_check_biflows(self):
client_ib.wait()
client_eb.wait()

def test_03_short_connections(self):
"""
Generate service traffic from multiple sources (including the router local to the connector)
and verify that BIFLOW records are generated with the expected attributes.
"""
success = retry(lambda: self.snooper_thread.match_records({'INTA': [('CONNECTOR', {'START_TIME': ANY_VALUE})]}))
self.assertTrue(success, f"Failed to find baseline connector {self.snooper_thread.get_results()}")

test_name = 'test_03_short_connections'
flow_count = 100
clients = []
expected = {'EdgeB': []}

for i in range(flow_count):
clients.append(EchoClientRunner(test_name, 2, None, None, None, i + 5, 1, port_override=self.tcp_noproc_port_eb, delay_close=False))
expected['EdgeB'].append(('BIFLOW_TPORT', {'END_TIME' : ANY_VALUE, 'SOURCE_HOST' : ANY_VALUE, 'SOURCE_PORT' : ANY_VALUE, 'CONNECTOR' : ANY_VALUE, 'ERROR_CONNECTOR_SIDE' : ANY_VALUE}))

success = retry(lambda: self.snooper_thread.match_records(expected))
self.assertTrue(success, f"Failed to match records {self.snooper_thread.get_results()}")

for i in range(flow_count):
clients[i].wait()

@classmethod
def tearDownClass(cls):
cls.echo_server.wait()
Expand Down

0 comments on commit 6d04a7e

Please sign in to comment.