Skip to content

Commit

Permalink
Refactor 311 tests to share functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Jan 24, 2024
1 parent 76364e9 commit f418237
Show file tree
Hide file tree
Showing 10 changed files with 1,094 additions and 976 deletions.
4 changes: 2 additions & 2 deletions include/aws/mqtt/private/client_impl_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ struct aws_mqtt_client_connection_vtable {

int (*get_stats_fn)(void *impl, struct aws_mqtt_connection_operation_statistics *stats);

enum aws_mqtt311_impl_type (*get_impl_type)(void *impl);
enum aws_mqtt311_impl_type (*get_impl_type)(const void *impl);
};

struct aws_mqtt_client_connection {
Expand All @@ -131,7 +131,7 @@ struct aws_mqtt_client_connection {
};

AWS_MQTT_API enum aws_mqtt311_impl_type aws_mqtt_client_connection_get_impl_type(
struct aws_mqtt_client_connection *connection);
const struct aws_mqtt_client_connection *connection);

AWS_MQTT_API uint64_t aws_mqtt_hash_uint16_t(const void *item);

Expand Down
22 changes: 7 additions & 15 deletions include/aws/mqtt/private/mqtt311_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,17 @@ typedef void(aws_mqtt311_listener_termination_completion_fn)(void *complete_ctx)
*
* All the callbacks that are supported here are invoked only on the 311 connection's event loop. With the
* add/remove callback set also on the event loop, everything is correctly serialized without data races.
*
* We only listen to connection-resumed because the only connection-level event we care about is a failure
* to rejoin a session (which invalidates all subscriptions that were considered valid)
*/
struct aws_mqtt311_callback_set {

/* Called from s_packet_handler_publish which is event-loop invoked */
aws_mqtt_client_publish_received_fn *publish_received_handler;

/* Called from s_mqtt_client_shutdown which is event-loop invoked */
aws_mqtt_client_on_connection_interrupted_fn *connection_interrupted_handler;

/* Called from s_packet_handler_connack which is event-loop invoked */
aws_mqtt_client_on_connection_interrupted_fn *connection_resumed_handler;

/* Also called from s_packet_handler_connack which is event-loop invoked */
aws_mqtt_client_on_connection_success_fn *connection_success_handler;
aws_mqtt_client_on_connection_resumed_fn *connection_resumed_handler;

void *user_data;
};
Expand Down Expand Up @@ -164,17 +161,12 @@ void aws_mqtt311_callback_set_manager_on_publish_received(
enum aws_mqtt_qos qos,
bool retain);

AWS_MQTT_API
void aws_mqtt311_callback_set_manager_on_connection_interrupted(
struct aws_mqtt311_callback_set_manager *manager);

AWS_MQTT_API
void aws_mqtt311_callback_set_manager_on_connection_resumed(
struct aws_mqtt311_callback_set_manager *manager);

AWS_MQTT_API
void aws_mqtt311_callback_set_manager_on_connection_success(
struct aws_mqtt311_callback_set_manager *manager);
struct aws_mqtt311_callback_set_manager *manager,
enum aws_mqtt_connect_return_code return_code,
bool rejoined_session);

AWS_EXTERN_C_END

Expand Down
4 changes: 2 additions & 2 deletions source/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -3223,7 +3223,7 @@ static void s_aws_mqtt_client_connection_311_release(void *impl) {
aws_ref_count_release(&connection->ref_count);
}

enum aws_mqtt311_impl_type s_aws_mqtt_client_connection_3_get_impl(void *impl) {
enum aws_mqtt311_impl_type s_aws_mqtt_client_connection_3_get_impl(const void *impl) {
(void)impl;

return AWS_MQTT311_IT_311_CONNECTION_IMPL;
Expand Down Expand Up @@ -3354,7 +3354,7 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new(struct aws_mqt
connection->handler.vtable = aws_mqtt_get_client_channel_vtable();
connection->handler.impl = connection;

aws_mqtt311_callback_set_manager_init(&connection->callback_manager, connection->allocator, connection);
aws_mqtt311_callback_set_manager_init(&connection->callback_manager, connection->allocator, &connection->base);

return &connection->base;

Expand Down
4 changes: 4 additions & 0 deletions source/client_channel_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ static int s_packet_handler_connack(struct aws_byte_cursor message_cursor, void
(void *)connection);

MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_resumed, connack.connect_return_code, connack.session_present);

aws_mqtt311_callback_set_manager_on_connection_resumed(&connection->callback_manager, connack.connect_return_code, connack.session_present);
} else {

aws_create_reconnect_task(connection);
Expand Down Expand Up @@ -291,6 +293,8 @@ static int s_packet_handler_publish(struct aws_byte_cursor message_cursor, void

MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_any_publish, &publish.topic_name, &publish.payload, dup, qos, retain);

aws_mqtt311_callback_set_manager_on_publish_received(&connection->callback_manager, &publish.topic_name, &publish.payload, dup, qos, retain);

AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: publish received with msg id=%" PRIu16 " dup=%d qos=%d retain=%d payload-size=%zu topic=" PRInSTR,
Expand Down
2 changes: 1 addition & 1 deletion source/client_impl_shared.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ int aws_mqtt_client_connection_get_stats(
return (*connection->vtable->get_stats_fn)(connection->impl, stats);
}

enum aws_mqtt311_impl_type aws_mqtt_client_connection_get_impl_type(struct aws_mqtt_client_connection *connection) {
enum aws_mqtt311_impl_type aws_mqtt_client_connection_get_impl_type(const struct aws_mqtt_client_connection *connection) {
return (*connection->vtable->get_impl_type)(connection->impl);
}

Expand Down
54 changes: 39 additions & 15 deletions source/mqtt311_listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@

#include <inttypes.h>

static struct aws_event_loop *s_mqtt_client_connection_get_event_loop(const struct aws_mqtt_client_connection *connection) {
AWS_FATAL_ASSERT(aws_mqtt_client_connection_get_impl_type(connection) == AWS_MQTT311_IT_311_CONNECTION_IMPL);

struct aws_mqtt_client_connection_311_impl *connection_impl = connection->impl;

return connection_impl->loop;
}

struct aws_mqtt311_listener {
struct aws_allocator *allocator;

Expand Down Expand Up @@ -81,9 +89,8 @@ static void s_mqtt311_listener_terminate_task_fn(struct aws_task *task, void *ar

static void s_aws_mqtt311_listener_on_zero_ref_count(void *context) {
struct aws_mqtt311_listener *listener = context;
struct aws_mqtt_client_connection_311_impl *connection_impl = listener->config.connection->impl;

aws_event_loop_schedule_task_now(connection_impl->loop, &listener->terminate_task);
aws_event_loop_schedule_task_now(s_mqtt_client_connection_get_event_loop(listener->config.connection), &listener->terminate_task);
}

struct aws_mqtt311_listener *aws_mqtt311_listener_new(
Expand All @@ -97,7 +104,6 @@ struct aws_mqtt311_listener *aws_mqtt311_listener_new(
return NULL;
}

struct aws_mqtt_client_connection_311_impl *connection_impl = config->connection->impl;
struct aws_mqtt311_listener *listener = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt311_listener));

listener->allocator = allocator;
Expand All @@ -110,7 +116,7 @@ struct aws_mqtt311_listener *aws_mqtt311_listener_new(
aws_task_init(&listener->terminate_task, s_mqtt311_listener_terminate_task_fn, listener, "Mqtt311ListenerTerminate");

aws_mqtt311_listener_acquire(listener);
aws_event_loop_schedule_task_now(connection_impl->loop, &listener->initialize_task);
aws_event_loop_schedule_task_now(s_mqtt_client_connection_get_event_loop(config->connection), &listener->initialize_task);

return listener;
}
Expand Down Expand Up @@ -184,11 +190,11 @@ static struct aws_mqtt311_callback_set_entry *s_new_311_callback_set_entry(
return entry;
}

uint64_t aws_mqtt5_callback_set_manager_push_front(
uint64_t aws_mqtt311_callback_set_manager_push_front(
struct aws_mqtt311_callback_set_manager *manager,
struct aws_mqtt311_callback_set *callback_set) {

AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(manager->client->loop));
AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(s_mqtt_client_connection_get_event_loop(manager->connection)));

struct aws_mqtt311_callback_set_entry *entry = s_new_311_callback_set_entry(manager, callback_set);

Expand All @@ -199,7 +205,7 @@ uint64_t aws_mqtt5_callback_set_manager_push_front(

void aws_mqtt311_callback_set_manager_remove(struct aws_mqtt311_callback_set_manager *manager, uint64_t callback_set_id) {

AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(manager->client->loop));
AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(s_mqtt_client_connection_get_event_loop(manager->connection)));

struct aws_linked_list_node *node = aws_linked_list_begin(&manager->callback_set_entries);
while (node != aws_linked_list_end(&manager->callback_set_entries)) {
Expand Down Expand Up @@ -233,19 +239,37 @@ void aws_mqtt311_callback_set_manager_on_publish_received(
enum aws_mqtt_qos qos,
bool retain) {

}
struct aws_mqtt_client_connection_311_impl *connection_impl = manager->connection->impl;
AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(connection_impl->loop));

void aws_mqtt311_callback_set_manager_on_connection_interrupted(
struct aws_mqtt311_callback_set_manager *manager) {
struct aws_linked_list_node *node = aws_linked_list_begin(&manager->callback_set_entries);
while (node != aws_linked_list_end(&manager->callback_set_entries)) {
struct aws_mqtt311_callback_set_entry *entry = AWS_CONTAINER_OF(node, struct aws_mqtt311_callback_set_entry, node);
node = aws_linked_list_next(node);

struct aws_mqtt311_callback_set *callback_set = &entry->callbacks;
if (callback_set->publish_received_handler != NULL) {
(*callback_set->publish_received_handler)(manager->connection, topic, payload, dup, qos, retain, callback_set->user_data);
}
}
}

void aws_mqtt311_callback_set_manager_on_connection_resumed(
struct aws_mqtt311_callback_set_manager *manager) {
struct aws_mqtt311_callback_set_manager *manager,
enum aws_mqtt_connect_return_code return_code,
bool rejoined_session) {

}
struct aws_mqtt_client_connection_311_impl *connection_impl = manager->connection->impl;
AWS_FATAL_ASSERT(aws_event_loop_thread_is_callers_thread(connection_impl->loop));

void aws_mqtt311_callback_set_manager_on_connection_success(
struct aws_mqtt311_callback_set_manager *manager) {
struct aws_linked_list_node *node = aws_linked_list_begin(&manager->callback_set_entries);
while (node != aws_linked_list_end(&manager->callback_set_entries)) {
struct aws_mqtt311_callback_set_entry *entry = AWS_CONTAINER_OF(node, struct aws_mqtt311_callback_set_entry, node);
node = aws_linked_list_next(node);

}
struct aws_mqtt311_callback_set *callback_set = &entry->callbacks;
if (callback_set->connection_resumed_handler != NULL) {
(*callback_set->connection_resumed_handler)(manager->connection, return_code, rejoined_session, callback_set->user_data);
}
}
}
2 changes: 1 addition & 1 deletion source/v5/mqtt5_to_mqtt3_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -2854,7 +2854,7 @@ static uint16_t s_aws_mqtt_5_resubscribe_existing_topics(
return 0;
}

enum aws_mqtt311_impl_type s_aws_mqtt_client_connection_5_get_impl(void *impl) {
enum aws_mqtt311_impl_type s_aws_mqtt_client_connection_5_get_impl(const void *impl) {
(void)impl;

return AWS_MQTT311_IT_5_ADAPTER_IMPL;
Expand Down
Loading

0 comments on commit f418237

Please sign in to comment.