Skip to content

Commit

Permalink
Deal with logs
Browse files Browse the repository at this point in the history
  • Loading branch information
sfodagain committed Feb 3, 2025
1 parent 4c621ca commit 41d5067
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ struct aws_rr_response_path_entry {
*/
typedef void(aws_mqtt_stream_operation_subscription_match_fn)(
const struct aws_linked_list *operations,
const struct aws_byte_cursor *topic_filter, // TODO Do we need this for anything other than tests?
const struct aws_byte_cursor *topic_filter,
const struct aws_protocol_adapter_incoming_publish_event *publish_event,
void *user_data);

Expand Down Expand Up @@ -113,7 +113,7 @@ AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_add_request_subs
/*
* Remove a subscription for a given request operation.
*/
AWS_MQTT_API void aws_mqtt_request_response_client_subscriptions_remove_request_subscription(
AWS_MQTT_API int aws_mqtt_request_response_client_subscriptions_remove_request_subscription(
struct aws_request_response_subscriptions *subscriptions,
const struct aws_byte_cursor *topic_filter);

Expand Down
30 changes: 25 additions & 5 deletions source/request-response/request_response_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,7 @@ struct aws_mqtt_request_response_client {
struct aws_priority_queue operations_by_timeout;

/*
* Structure to handle subscriptions: add/remove subscriptions, match incoming messages.
* TODO Add normal description.
* Structure to handle stream and request subscriptions.
*/
struct aws_request_response_subscriptions subscriptions;

Expand Down Expand Up @@ -786,11 +785,19 @@ static void s_apply_publish_to_streaming_operation_list(
const struct aws_byte_cursor *topic_filter,
const struct aws_protocol_adapter_incoming_publish_event *publish_event,
void *user_data) {
(void)topic_filter;
(void)user_data;

AWS_FATAL_ASSERT(operations != NULL);

struct aws_mqtt_request_response_client *rr_client = user_data;

AWS_LOGF_DEBUG(
AWS_LS_MQTT_REQUEST_RESPONSE,
"id=%p: request-response client incoming publish on topic '" PRInSTR
"' matches streaming subscription on topic filter '" PRInSTR "'",
(void *)rr_client,
AWS_BYTE_CURSOR_PRI(publish_event->topic),
AWS_BYTE_CURSOR_PRI(*topic_filter));

struct aws_linked_list_node *node = aws_linked_list_begin(operations);
while (node != aws_linked_list_end(operations)) {
struct aws_mqtt_rr_client_operation *operation =
Expand Down Expand Up @@ -883,6 +890,12 @@ static void s_apply_publish_to_response_path_entry(

struct aws_mqtt_request_response_client *rr_client = user_data;

AWS_LOGF_DEBUG(
AWS_LS_MQTT_REQUEST_RESPONSE,
"id=%p: request-response client incoming publish on topic '" PRInSTR "' matches response path",
(void *)rr_client,
AWS_BYTE_CURSOR_PRI(publish_event->topic));

struct aws_json_value *json_payload = NULL;

struct aws_byte_cursor correlation_token;
Expand Down Expand Up @@ -1698,7 +1711,14 @@ static void s_remove_operation_from_client_tables(struct aws_mqtt_rr_client_oper
for (size_t i = 0; i < path_count; ++i) {
struct aws_mqtt_request_operation_response_path path;
aws_array_list_get_at(paths, &path, i);
aws_mqtt_request_response_client_subscriptions_remove_request_subscription(&client->subscriptions, &path.topic);
if (aws_mqtt_request_response_client_subscriptions_remove_request_subscription(
&client->subscriptions, &path.topic) == AWS_OP_ERR) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_REQUEST_RESPONSE,
"id=%p: internal state error removing reference to response path for topic " PRInSTR,
(void *)client,
AWS_BYTE_CURSOR_PRI(path.topic));
}
}
}

Expand Down
54 changes: 4 additions & 50 deletions source/request-response/request_response_subscription_set.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ int aws_mqtt_request_response_client_subscriptions_add_request_subscription(
return AWS_OP_SUCCESS;
}

void aws_mqtt_request_response_client_subscriptions_remove_request_subscription(
int aws_mqtt_request_response_client_subscriptions_remove_request_subscription(
struct aws_request_response_subscriptions *subscriptions,
const struct aws_byte_cursor *topic_filter) {

Expand All @@ -199,29 +199,17 @@ void aws_mqtt_request_response_client_subscriptions_remove_request_subscription(

struct aws_hash_element *element = NULL;
if (aws_hash_table_find(&subscriptions->request_response_paths, topic_filter, &element) || element == NULL) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_REQUEST_RESPONSE,
"internal state error removing reference to response path for topic " PRInSTR,
AWS_BYTE_CURSOR_PRI(*topic_filter));
return;
return AWS_OP_ERR;
}

struct aws_rr_response_path_entry *entry = element->value;
--entry->ref_count;

if (entry->ref_count == 0) {
AWS_LOGF_DEBUG(
AWS_LS_MQTT_REQUEST_RESPONSE,
"removing last reference to response path for topic " PRInSTR,
AWS_BYTE_CURSOR_PRI(*topic_filter));
aws_hash_table_remove(&subscriptions->request_response_paths, topic_filter, NULL, NULL);
} else {
AWS_LOGF_DEBUG(
AWS_LS_MQTT_REQUEST_RESPONSE,
"removing reference to response path for topic " PRInSTR ", %zu references remain",
AWS_BYTE_CURSOR_PRI(*topic_filter),
entry->ref_count);
}

return AWS_OP_SUCCESS;
}

static void s_match_stream_subscriptions(
Expand All @@ -232,11 +220,6 @@ static void s_match_stream_subscriptions(
struct aws_hash_element *subscription_filter_element = NULL;
if (aws_hash_table_find(subscriptions, &publish_event->topic, &subscription_filter_element) == AWS_OP_SUCCESS &&
subscription_filter_element != NULL) {
// TODO Deal with logs without client pointer.
AWS_LOGF_DEBUG(
AWS_LS_MQTT_REQUEST_RESPONSE,
"request-response client incoming publish on topic '" PRInSTR "' matches streaming topic",
AWS_BYTE_CURSOR_PRI(publish_event->topic));

struct aws_rr_operation_list_topic_filter_entry *entry = subscription_filter_element->value;
on_stream_operation_subscription_match(
Expand All @@ -250,18 +233,9 @@ static void s_match_wildcard_stream_subscriptions(
aws_mqtt_stream_operation_subscription_match_fn *on_stream_operation_subscription_match,
void *user_data) {

AWS_LOGF_INFO(
AWS_LS_MQTT_REQUEST_RESPONSE,
"= Looking for subscription for topic '" PRInSTR "'",
AWS_BYTE_CURSOR_PRI(publish_event->topic));

for (struct aws_hash_iter iter = aws_hash_iter_begin(subscriptions); !aws_hash_iter_done(&iter);
aws_hash_iter_next(&iter)) {
struct aws_rr_operation_list_topic_filter_entry *entry = iter.element.value;
AWS_LOGF_INFO(
AWS_LS_MQTT_REQUEST_RESPONSE,
"= Checking subscription with topic filter " PRInSTR,
AWS_BYTE_CURSOR_PRI(entry->topic_filter_cursor));

struct aws_byte_cursor subscription_topic_filter_segment;
AWS_ZERO_STRUCT(subscription_topic_filter_segment);
Expand All @@ -273,22 +247,11 @@ static void s_match_wildcard_stream_subscriptions(
bool multi_level_wildcard = false;

while (aws_byte_cursor_next_split(&entry->topic_filter_cursor, '/', &subscription_topic_filter_segment)) {
AWS_LOGF_INFO(
AWS_LS_MQTT_REQUEST_RESPONSE,
"=== subscription topic filter segment is '" PRInSTR "'",
AWS_BYTE_CURSOR_PRI(subscription_topic_filter_segment));

if (!aws_byte_cursor_next_split(&publish_event->topic, '/', &topic_segment)) {
AWS_LOGF_INFO(AWS_LS_MQTT_REQUEST_RESPONSE, "=== topic segment is NULL");
match = false;
break;
}

AWS_LOGF_INFO(
AWS_LS_MQTT_REQUEST_RESPONSE,
"======= topic segment is '" PRInSTR "'",
AWS_BYTE_CURSOR_PRI(topic_segment));

if (aws_byte_cursor_eq_c_str(&subscription_topic_filter_segment, "#")) {
multi_level_wildcard = true;
match = true;
Expand All @@ -297,7 +260,6 @@ static void s_match_wildcard_stream_subscriptions(

if (!aws_byte_cursor_eq_c_str(&subscription_topic_filter_segment, "+") &&
!aws_byte_cursor_eq_ignore_case(&topic_segment, &subscription_topic_filter_segment)) {
AWS_LOGF_INFO(AWS_LS_MQTT_REQUEST_RESPONSE, "======= topic segment differs");
match = false;
break;
}
Expand All @@ -308,11 +270,8 @@ static void s_match_wildcard_stream_subscriptions(
}

if (match) {
AWS_LOGF_INFO(AWS_LS_MQTT_REQUEST_RESPONSE, "=== found subscription match");
on_stream_operation_subscription_match(
&entry->operations, &entry->topic_filter_cursor, publish_event, user_data);
} else {
AWS_LOGF_INFO(AWS_LS_MQTT_REQUEST_RESPONSE, "=== this is not the right subscription");
}
}
}
Expand All @@ -326,10 +285,6 @@ void s_match_request_response_subscriptions(
struct aws_hash_element *response_path_element = NULL;
if (aws_hash_table_find(request_response_paths, &publish_event->topic, &response_path_element) == AWS_OP_SUCCESS &&
response_path_element != NULL) {
AWS_LOGF_DEBUG(
AWS_LS_MQTT_REQUEST_RESPONSE,
"request-response client incoming publish on topic '" PRInSTR "' matches response path",
AWS_BYTE_CURSOR_PRI(publish_event->topic));

on_request_operation_subscription_match(response_path_element->value, publish_event, user_data);
}
Expand All @@ -344,7 +299,6 @@ void aws_mqtt_request_response_client_subscriptions_match(

AWS_FATAL_PRECONDITION(subscriptions);
AWS_FATAL_PRECONDITION(publish_event);
// TODO ? Allow NULLs?
AWS_FATAL_PRECONDITION(on_stream_operation_subscription_match);
AWS_FATAL_PRECONDITION(on_request_operation_subscription_match);

Expand Down

0 comments on commit 41d5067

Please sign in to comment.