From ec68f0e5f97ebc1e525a77cd11d155a28d0bfbc2 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Thu, 6 Feb 2025 15:59:42 -0500 Subject: [PATCH] fixup: API name changes: o) rename qd_admin_connector_t --> qd_connector_config_t o) change "admin_conn" to "ctor_config" o) Use "ctor" as shorthand for "connector" in API and local vars --- src/adaptors/amqp/amqp_adaptor.c | 6 +- src/adaptors/amqp/connection_manager.c | 64 ++++----- src/adaptors/amqp/qd_connection.c | 14 +- src/adaptors/amqp/qd_connector.c | 186 ++++++++++++------------- src/adaptors/amqp/qd_connector.h | 95 ++++++------- 5 files changed, 183 insertions(+), 182 deletions(-) diff --git a/src/adaptors/amqp/amqp_adaptor.c b/src/adaptors/amqp/amqp_adaptor.c index 452310df3..9cfe1d517 100644 --- a/src/adaptors/amqp/amqp_adaptor.c +++ b/src/adaptors/amqp/amqp_adaptor.c @@ -1416,8 +1416,8 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool qd_router_connection_get_config(conn, &role, &cost, &name, &conn->strip_annotations_in, &conn->strip_annotations_out, &link_capacity); - if (connector && !!connector->admin_conn->data_connection_count) { - memcpy(conn->group_correlator, connector->admin_conn->group_correlator, QD_DISCRIMINATOR_SIZE); + if (connector && !!connector->ctor_config->data_connection_count) { + memcpy(conn->group_correlator, connector->ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE); if (connector->is_data_connector) { // override the configured role to identify this as a data connection assert(role == QDR_ROLE_INTER_ROUTER); @@ -1602,7 +1602,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool if (!!connector) { qd_connector_add_link(connector); sys_mutex_lock(&connector->lock); - qd_format_string(connector->conn_msg, QD_CXTR_CONN_MSG_BUF_SIZE, + qd_format_string(connector->conn_msg, QD_CTOR_CONN_MSG_BUF_SIZE, "[C%"PRIu64"] Connection Opened: dir=%s host=%s encrypted=%s auth=%s user=%s container_id=%s", connection_id, inbound ? "in" : "out", host, encrypted ? proto : "no", authenticated ? mech : "no", (char*) user, container); diff --git a/src/adaptors/amqp/connection_manager.c b/src/adaptors/amqp/connection_manager.c index 0ccde98a1..6f56cfeda 100644 --- a/src/adaptors/amqp/connection_manager.c +++ b/src/adaptors/amqp/connection_manager.c @@ -44,7 +44,7 @@ struct qd_connection_manager_t { qd_server_t *server; qd_listener_list_t listeners; - qd_admin_connector_list_t admin_connectors; + qd_connector_config_list_t connector_configs; }; @@ -166,7 +166,7 @@ static int get_failover_info_length(qd_failover_item_list_t conn_info_list) */ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl) { - qd_admin_connector_t *admin_conn = (qd_admin_connector_t *) impl; + qd_connector_config_t *ctor_config = (qd_connector_config_t *) impl; qd_connector_t *connector = 0; qd_error_clear(); @@ -174,13 +174,13 @@ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl // TODO(kgiusti): inter-router connections may have several qd_connector_ts active due to the router data connection // count configuration. However we can only report 1 connector via management. It would be more accurate to report // all connectors associated with this management entity - sys_mutex_lock(&admin_conn->lock); - connector = DEQ_HEAD(admin_conn->connectors); + sys_mutex_lock(&ctor_config->lock); + connector = DEQ_HEAD(ctor_config->connectors); if (connector) { // prevent I/O thread from freeing connector while it is being accessed sys_atomic_inc(&connector->ref_count); } - sys_mutex_unlock(&admin_conn->lock); + sys_mutex_unlock(&ctor_config->lock); if (connector) { int i = 1; @@ -244,19 +244,19 @@ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl const char *state_info = 0; switch (connector->state) { - case CXTR_STATE_CONNECTING: + case CTOR_STATE_CONNECTING: state_info = "CONNECTING"; break; - case CXTR_STATE_OPEN: + case CTOR_STATE_OPEN: state_info = "SUCCESS"; break; - case CXTR_STATE_FAILED: + case CTOR_STATE_FAILED: state_info = "FAILED"; break; - case CXTR_STATE_INIT: + case CTOR_STATE_INIT: state_info = "INITIALIZING"; break; - case CXTR_STATE_DELETED: + case CTOR_STATE_DELETED: // deleted by management, waiting for connection to close state_info = "CLOSING"; break; @@ -283,17 +283,17 @@ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl } -QD_EXPORT qd_admin_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity) +QD_EXPORT qd_connector_config_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity) { qd_connection_manager_t *cm = qd->connection_manager; - qd_admin_connector_t *admin_conn = qd_admin_connector_create(qd, entity); - if (!admin_conn) { + qd_connector_config_t *ctor_config = qd_connector_config_create(qd, entity); + if (!ctor_config) { return 0; } - DEQ_INSERT_TAIL(cm->admin_connectors, admin_conn); - log_config(&admin_conn->config, "Connector", true); - return admin_conn; + DEQ_INSERT_TAIL(cm->connector_configs, ctor_config); + log_config(&ctor_config->config, "Connector", true); + return ctor_config; } @@ -305,7 +305,7 @@ qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd) cm->server = qd->server; DEQ_INIT(cm->listeners); - DEQ_INIT(cm->admin_connectors); + DEQ_INIT(cm->connector_configs); return cm; } @@ -333,11 +333,11 @@ void qd_connection_manager_free(qd_connection_manager_t *cm) li = DEQ_HEAD(cm->listeners); } - qd_admin_connector_t *admin_conn = DEQ_HEAD(cm->admin_connectors); - while (admin_conn) { - DEQ_REMOVE_HEAD(cm->admin_connectors); - qd_admin_connector_delete(admin_conn); - admin_conn = DEQ_HEAD(cm->admin_connectors); + qd_connector_config_t *ctor_config = DEQ_HEAD(cm->connector_configs); + while (ctor_config) { + DEQ_REMOVE_HEAD(cm->connector_configs); + qd_connector_config_delete(ctor_config); + ctor_config = DEQ_HEAD(cm->connector_configs); } free(cm); @@ -351,7 +351,7 @@ QD_EXPORT void qd_connection_manager_start(qd_dispatch_t *qd) { static bool first_start = true; qd_listener_t *li = DEQ_HEAD(qd->connection_manager->listeners); - qd_admin_connector_t *admin_conn = DEQ_HEAD(qd->connection_manager->admin_connectors); + qd_connector_config_t *ctor_config = DEQ_HEAD(qd->connection_manager->connector_configs); while (li) { if (!li->pn_listener) { @@ -366,9 +366,9 @@ QD_EXPORT void qd_connection_manager_start(qd_dispatch_t *qd) li = DEQ_NEXT(li); } - while (admin_conn) { - qd_admin_connector_connect(admin_conn); - admin_conn = DEQ_NEXT(admin_conn); + while (ctor_config) { + qd_connector_config_connect(ctor_config); + ctor_config = DEQ_NEXT(ctor_config); } first_start = false; @@ -399,19 +399,19 @@ QD_EXPORT void qd_connection_manager_delete_listener(qd_dispatch_t *qd, void *im // QD_EXPORT void qd_connection_manager_delete_connector(qd_dispatch_t *qd, void *impl) { - qd_admin_connector_t *admin_conn = (qd_admin_connector_t*) impl; - assert(admin_conn); + qd_connector_config_t *ctor_config = (qd_connector_config_t *) impl; + assert(ctor_config); // take it off the connection manager - log_config(&admin_conn->config, "Connector", false); - DEQ_REMOVE(qd->connection_manager->admin_connectors, admin_conn); - qd_admin_connector_delete(admin_conn); + log_config(&ctor_config->config, "Connector", false); + DEQ_REMOVE(qd->connection_manager->connector_configs, ctor_config); + qd_connector_config_delete(ctor_config); } const char *qd_connector_name(qd_connector_t *ct) { - return ct ? ct->admin_conn->config.name : 0; + return ct ? ct->ctor_config->config.name : 0; } diff --git a/src/adaptors/amqp/qd_connection.c b/src/adaptors/amqp/qd_connection.c index e0a0f4ccf..fe56a6aec 100644 --- a/src/adaptors/amqp/qd_connection.c +++ b/src/adaptors/amqp/qd_connection.c @@ -150,7 +150,7 @@ static void decorate_connection(qd_connection_t *ctx, const qd_server_config_t * pn_data_put_int(pn_connection_properties(conn), QDR_ROLE_INTER_ROUTER_DATA); } - if (ctx->connector && (ctx->connector->is_data_connector || !!ctx->connector->admin_conn->data_connection_count)) { + if (ctx->connector && (ctx->connector->is_data_connector || !!ctx->connector->ctor_config->data_connection_count)) { pn_data_put_symbol(pn_connection_properties(conn), pn_bytes(strlen(QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY), QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY)); pn_data_put_string(pn_connection_properties(conn), @@ -464,7 +464,7 @@ const qd_server_config_t *qd_connection_config(const qd_connection_t *conn) if (conn->listener) return &conn->listener->config; if (conn->connector) - return &conn->connector->admin_conn->config; + return &conn->connector->ctor_config->config; return NULL; } @@ -533,7 +533,7 @@ void qd_connection_invoke_deferred_calls(qd_connection_t *conn, bool discard) const char* qd_connection_name(const qd_connection_t *c) { if (c->connector) { - return c->connector->admin_conn->config.host_port; + return c->connector->ctor_config->config.host_port; } else { return c->rhost_port; } @@ -595,14 +595,14 @@ static void set_rhost_port(qd_connection_t *ctx) { static bool setup_ssl_sasl_and_open(qd_connection_t *ctx) { qd_connector_t *ct = ctx->connector; - const qd_server_config_t *config = &ct->admin_conn->config; + const qd_server_config_t *config = &ct->ctor_config->config; pn_transport_t *tport = pn_connection_transport(ctx->pn_conn); // // Create an SSL session if required // - if (ct->admin_conn->tls_config) { - ctx->ssl = qd_tls_session_amqp(ct->admin_conn->tls_config, tport, false); + if (ct->ctor_config->tls_config) { + ctx->ssl = qd_tls_session_amqp(ct->ctor_config->tls_config, tport, false); if (!ctx->ssl) { qd_log(LOG_SERVER, QD_LOG_ERROR, "Failed to create TLS session for connection [C%" PRIu64 "] to %s:%s (%s)", @@ -682,7 +682,7 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) { qd_log(LOG_SERVER, QD_LOG_INFO, "[C%" PRIu64 "] Accepted connection to %s from %s", ctx->connection_id, name, ctx->rhost_port); } else if (ctx->connector) { /* Establishing an outgoing connection */ - config = &ctx->connector->admin_conn->config; + config = &ctx->connector->ctor_config->config; if (!setup_ssl_sasl_and_open(ctx)) { qd_log(LOG_SERVER, QD_LOG_ERROR, "[C%" PRIu64 "] Connection aborted due to internal setup error", ctx->connection_id); diff --git a/src/adaptors/amqp/qd_connector.c b/src/adaptors/amqp/qd_connector.c index cbb1e2909..be39cdb78 100644 --- a/src/adaptors/amqp/qd_connector.c +++ b/src/adaptors/amqp/qd_connector.c @@ -34,7 +34,7 @@ ALLOC_DEFINE(qd_connector_t); -ALLOC_DEFINE(qd_admin_connector_t); +ALLOC_DEFINE(qd_connector_config_t); static qd_failover_item_t *qd_connector_get_conn_info_lh(qd_connector_t *ct) TA_REQ(ct->lock) @@ -53,12 +53,12 @@ static qd_failover_item_t *qd_connector_get_conn_info_lh(qd_connector_t *ct) TA_ /* Timer callback to try/retry connection open, connector->lock held */ static void try_open_lh(qd_connector_t *connector, qd_connection_t *qd_conn) TA_REQ(connector->lock) { - assert(connector->state != CXTR_STATE_DELETED); + assert(connector->state != CTOR_STATE_DELETED); - const qd_admin_connector_t *admin_conn = connector->admin_conn; - qd_connection_init(qd_conn, admin_conn->server, &admin_conn->config, connector, 0); + const qd_connector_config_t *ctor_config = connector->ctor_config; + qd_connection_init(qd_conn, ctor_config->server, &ctor_config->config, connector, 0); - connector->state = CXTR_STATE_OPEN; + connector->state = CTOR_STATE_OPEN; connector->delay = 5000; // @@ -75,7 +75,7 @@ static void try_open_lh(qd_connector_t *connector, qd_connection_t *qd_conn) TA_ // Set the sasl user name and password on the proton connection object. This has to be // done before pn_proactor_connect which will bind a transport to the connection - const qd_server_config_t *config = &connector->admin_conn->config; + const qd_server_config_t *config = &connector->ctor_config->config; if(config->sasl_username) pn_connection_set_user(qd_conn->pn_conn, config->sasl_username); if (config->sasl_password) @@ -83,7 +83,7 @@ static void try_open_lh(qd_connector_t *connector, qd_connection_t *qd_conn) TA_ qd_log(LOG_SERVER, QD_LOG_DEBUG, "[C%" PRIu64 "] Connecting to %s", qd_conn->connection_id, host_port); /* Note: the transport is configured in the PN_CONNECTION_BOUND event */ - pn_proactor_connect(qd_server_proactor(connector->admin_conn->server), qd_conn->pn_conn, host_port); + pn_proactor_connect(qd_server_proactor(connector->ctor_config->server), qd_conn->pn_conn, host_port); // at this point the qd_conn may now be scheduled on another thread } @@ -101,9 +101,9 @@ static void try_open_cb(void *context) sys_mutex_lock(&ct->lock); - if (ct->state == CXTR_STATE_CONNECTING || ct->state == CXTR_STATE_INIT) { + if (ct->state == CTOR_STATE_CONNECTING || ct->state == CTOR_STATE_INIT) { // else deleted or failed - on failed wait until after connection is freed - // and state is set to CXTR_STATE_CONNECTING (timer is rescheduled then) + // and state is set to CTOR_STATE_CONNECTING (timer is rescheduled then) try_open_lh(ct, ctx); ctx = 0; // owned by ct } @@ -128,11 +128,11 @@ static void deferred_close(void *context, bool discard) const qd_server_config_t *qd_connector_get_config(const qd_connector_t *c) { - return &c->admin_conn->config; + return &c->ctor_config->config; } -qd_connector_t *qd_connector(qd_admin_connector_t *admin_conn, bool is_data_connector) +qd_connector_t *qd_connector(qd_connector_config_t *ctor_config, bool is_data_connector) { qd_connector_t *connector = new_qd_connector_t(); if (!connector) return 0; @@ -147,20 +147,20 @@ qd_connector_t *qd_connector(qd_admin_connector_t *admin_conn, bool is_data_conn connector->reconnect_enabled = true; connector->is_data_connector = is_data_connector; - connector->admin_conn = admin_conn; - sys_atomic_inc(&admin_conn->ref_count); + connector->ctor_config = ctor_config; + sys_atomic_inc(&ctor_config->ref_count); connector->conn_index = 1; - connector->state = CXTR_STATE_INIT; + connector->state = CTOR_STATE_INIT; qd_failover_item_t *item = NEW(qd_failover_item_t); ZERO(item); - if (admin_conn->config.ssl_required) + if (ctor_config->config.ssl_required) item->scheme = strdup("amqps"); else item->scheme = strdup("amqp"); - item->host = strdup(admin_conn->config.host); - item->port = strdup(admin_conn->config.port); + item->host = strdup(ctor_config->config.host); + item->port = strdup(ctor_config->config.port); int hplen = strlen(item->host) + strlen(item->port) + 2; item->host_port = malloc(hplen); snprintf(item->host_port, hplen, "%s:%s", item->host , item->port); @@ -170,13 +170,13 @@ qd_connector_t *qd_connector(qd_admin_connector_t *admin_conn, bool is_data_conn // Set up the vanflow record for this connector (LINK) // Do this only for router-to-router connectors since the record represents an inter-router link // - if ((strcmp(admin_conn->config.role, "inter-router") == 0 && !is_data_connector) || - strcmp(admin_conn->config.role, "edge") == 0 || - strcmp(admin_conn->config.role, "inter-edge") == 0) { + if ((strcmp(ctor_config->config.role, "inter-router") == 0 && !is_data_connector) || + strcmp(ctor_config->config.role, "edge") == 0 || + strcmp(ctor_config->config.role, "inter-edge") == 0) { connector->vflow_record = vflow_start_record(VFLOW_RECORD_LINK, 0); - vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_NAME, admin_conn->config.name); - vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_ROLE, admin_conn->config.role); - vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_LINK_COST, admin_conn->config.inter_router_cost); + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_NAME, ctor_config->config.name); + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_ROLE, ctor_config->config.role); + vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_LINK_COST, ctor_config->config.inter_router_cost); vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down"); vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_DOWN_COUNT, 0); vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_PROTOCOL, item->scheme); @@ -191,19 +191,19 @@ qd_connector_t *qd_connector(qd_admin_connector_t *admin_conn, bool is_data_conn const char *qd_connector_policy_vhost(const qd_connector_t* ct) { - return ct->admin_conn->policy_vhost; + return ct->ctor_config->policy_vhost; } bool qd_connector_connect(qd_connector_t *ct) { sys_mutex_lock(&ct->lock); - if (ct->state != CXTR_STATE_DELETED) { + if (ct->state != CTOR_STATE_DELETED) { // expect: do not attempt to connect an already connected qd_connection assert(ct->qd_conn == 0); ct->qd_conn = 0; ct->delay = 0; - ct->state = CXTR_STATE_CONNECTING; + ct->state = CTOR_STATE_CONNECTING; qd_timer_schedule(ct->timer, ct->delay); sys_mutex_unlock(&ct->lock); return true; @@ -226,7 +226,7 @@ void qd_connector_close(qd_connector_t *ct) sys_mutex_lock(&ct->lock); timer = ct->timer; ct->timer = 0; - ct->state = CXTR_STATE_DELETED; + ct->state = CTOR_STATE_DELETED; qd_connection_t *conn = ct->qd_conn; if (conn && conn->pn_conn) { qd_connection_invoke_deferred_impl(conn, deferred_close, conn->pn_conn, dct); @@ -245,10 +245,10 @@ void qd_connector_decref(qd_connector_t* connector) if (sys_atomic_dec(&connector->ref_count) == 1) { // expect both mgmt and qd_connection no longer reference this - assert(connector->state == CXTR_STATE_DELETED); + assert(connector->state == CTOR_STATE_DELETED); assert(connector->qd_conn == 0); - qd_admin_connector_decref(connector->admin_conn); + qd_connector_config_decref(connector->ctor_config); vflow_end_record(connector->vflow_record); connector->vflow_record = 0; qd_timer_free(connector->timer); @@ -300,16 +300,16 @@ static void increment_conn_index_lh(qd_connector_t *connector) TA_REQ(connector- */ void qd_connector_handle_transport_error(qd_connector_t *connector, uint64_t connection_id, pn_condition_t *condition) { - const qd_server_config_t *config = &connector->admin_conn->config; - char conn_msg[QD_CXTR_CONN_MSG_BUF_SIZE]; // avoid holding connector lock when logging - char conn_msg_1[QD_CXTR_CONN_MSG_BUF_SIZE]; // this connection message does not contain the connection id + const qd_server_config_t *config = &connector->ctor_config->config; + char conn_msg[QD_CTOR_CONN_MSG_BUF_SIZE]; // avoid holding connector lock when logging + char conn_msg_1[QD_CTOR_CONN_MSG_BUF_SIZE]; // this connection message does not contain the connection id bool log_error_message = false; sys_mutex_lock(&connector->lock); - if (connector->state != CXTR_STATE_DELETED) { + if (connector->state != CTOR_STATE_DELETED) { increment_conn_index_lh(connector); // note: will transition back to STATE_CONNECTING when associated connection is freed (pn_connection_free) - connector->state = CXTR_STATE_FAILED; + connector->state = CTOR_STATE_FAILED; if (condition && pn_condition_is_set(condition)) { qd_format_string(conn_msg, sizeof(conn_msg), "[C%"PRIu64"] Connection to %s failed: %s %s", connection_id, config->host_port, pn_condition_get_name(condition), @@ -333,7 +333,7 @@ void qd_connector_handle_transport_error(qd_connector_t *connector, uint64_t con // if (strcmp(connector->conn_msg, conn_msg_1) != 0) { - strncpy(connector->conn_msg, conn_msg_1, QD_CXTR_CONN_MSG_BUF_SIZE); + strncpy(connector->conn_msg, conn_msg_1, QD_CTOR_CONN_MSG_BUF_SIZE); log_error_message = true; } } @@ -370,7 +370,7 @@ void qd_connector_add_connection(qd_connector_t *connector, qd_connection_t *ctx ctx->connector = connector; connector->qd_conn = ctx; - strncpy(ctx->group_correlator, connector->admin_conn->group_correlator, QD_DISCRIMINATOR_SIZE); + strncpy(ctx->group_correlator, connector->ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE); } @@ -404,7 +404,7 @@ void qd_connector_remove_connection(qd_connector_t *connector, bool final, const connector->qd_conn = 0; ctx->connector = 0; - if (connector->state != CXTR_STATE_DELETED) { + if (connector->state != CTOR_STATE_DELETED) { // Increment the connection index by so that we can try connecting to the failover url (if any). bool has_failover = qd_connector_has_failover_info(connector); long delay = connector->delay; @@ -415,7 +415,7 @@ void qd_connector_remove_connection(qd_connector_t *connector, bool final, const // We want to quickly keep cycling thru the failover urls every second. delay = 1000; } - connector->state = CXTR_STATE_CONNECTING; + connector->state = CTOR_STATE_CONNECTING; qd_timer_schedule(connector->timer, delay); } sys_mutex_unlock(&connector->lock); @@ -426,12 +426,12 @@ void qd_connector_remove_connection(qd_connector_t *connector, bool final, const /** - * Create a new qd_admin_connector_t instance + * Create a new qd_connector_config_t instance */ -qd_admin_connector_t *qd_admin_connector_create(qd_dispatch_t *qd, qd_entity_t *entity) +qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t *entity) { - qd_admin_connector_t *admin_conn = new_qd_admin_connector_t(); - if (!admin_conn) { + qd_connector_config_t *ctor_config = new_qd_connector_config_t(); + if (!ctor_config) { char *name = qd_entity_opt_string(entity, "name", "UNKNOWN"); qd_error(QD_ERROR_CONFIG, "Failed to create Connector %s: resource allocation failed", name); free(name); @@ -440,36 +440,36 @@ qd_admin_connector_t *qd_admin_connector_create(qd_dispatch_t *qd, qd_entity_t * qd_error_clear(); - ZERO(admin_conn); - DEQ_ITEM_INIT(admin_conn); - sys_atomic_init(&admin_conn->ref_count, 1); // for caller - sys_mutex_init(&admin_conn->lock); - admin_conn->server = qd_dispatch_get_server(qd); - DEQ_INIT(admin_conn->connectors); + ZERO(ctor_config); + DEQ_ITEM_INIT(ctor_config); + sys_atomic_init(&ctor_config->ref_count, 1); // for caller + sys_mutex_init(&ctor_config->lock); + ctor_config->server = qd_dispatch_get_server(qd); + DEQ_INIT(ctor_config->connectors); - if (qd_server_config_load(&admin_conn->config, entity, false) != QD_ERROR_NONE) { + if (qd_server_config_load(&ctor_config->config, entity, false) != QD_ERROR_NONE) { qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message()); - qd_admin_connector_decref(admin_conn); + qd_connector_config_decref(ctor_config); return 0; } - admin_conn->policy_vhost = qd_entity_opt_string(entity, "policyVhost", 0); + ctor_config->policy_vhost = qd_entity_opt_string(entity, "policyVhost", 0); if (qd_error_code()) { qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message()); - qd_admin_connector_decref(admin_conn); + qd_connector_config_decref(ctor_config); return 0; } // // If an sslProfile is configured allocate a TLS config to be used by all child connector's connections // - if (admin_conn->config.ssl_profile_name) { - admin_conn->tls_config = qd_tls_config(admin_conn->config.ssl_profile_name, + if (ctor_config->config.ssl_profile_name) { + ctor_config->tls_config = qd_tls_config(ctor_config->config.ssl_profile_name, QD_TLS_TYPE_PROTON_AMQP, QD_TLS_CONFIG_CLIENT_MODE, - admin_conn->config.verify_host_name, - admin_conn->config.ssl_require_peer_authentication); - if (!admin_conn->tls_config) { + ctor_config->config.verify_host_name, + ctor_config->config.ssl_require_peer_authentication); + if (!ctor_config->tls_config) { // qd_tls2_config() has set the qd_error_message(), which is logged below goto error; } @@ -477,21 +477,21 @@ qd_admin_connector_t *qd_admin_connector_create(qd_dispatch_t *qd, qd_entity_t * // For inter-router connectors create associated inter-router data connectors if configured - if (strcmp(admin_conn->config.role, "inter-router") == 0) { - admin_conn->data_connection_count = qd_dispatch_get_data_connection_count(qd); - if (!!admin_conn->data_connection_count) { - qd_generate_discriminator(admin_conn->group_correlator); + if (strcmp(ctor_config->config.role, "inter-router") == 0) { + ctor_config->data_connection_count = qd_dispatch_get_data_connection_count(qd); + if (!!ctor_config->data_connection_count) { + qd_generate_discriminator(ctor_config->group_correlator); - // Add any data connectors to the head of the connectors list in the admin_connector first. This allows the + // Add any data connectors to the head of the connectors list first. This allows the // router control connector to be located at the head of the list. - for (int i = 0; i < admin_conn->data_connection_count; i++) { - qd_connector_t *dc = qd_connector(admin_conn, true); + for (int i = 0; i < ctor_config->data_connection_count; i++) { + qd_connector_t *dc = qd_connector(ctor_config, true); if (!dc) { - qd_error(QD_ERROR_CONFIG, "Failed to create data Connector %s: resource allocation failed", admin_conn->config.name); + qd_error(QD_ERROR_CONFIG, "Failed to create data Connector %s: resource allocation failed", ctor_config->config.name); goto error; } - DEQ_INSERT_HEAD(admin_conn->connectors, dc); + DEQ_INSERT_HEAD(ctor_config->connectors, dc); } } } @@ -499,71 +499,71 @@ qd_admin_connector_t *qd_admin_connector_create(qd_dispatch_t *qd, qd_entity_t * // Create the primary connector associated with this configuration. It will be located // at the head of the connectors list - qd_connector_t *ct = qd_connector(admin_conn, false); + qd_connector_t *ct = qd_connector(ctor_config, false); if (!ct) { - qd_error(QD_ERROR_CONFIG, "Failed to create data Connector %s: resource allocation failed", admin_conn->config.name); + qd_error(QD_ERROR_CONFIG, "Failed to create data Connector %s: resource allocation failed", ctor_config->config.name); goto error; } - DEQ_INSERT_HEAD(admin_conn->connectors, ct); + DEQ_INSERT_HEAD(ctor_config->connectors, ct); - return admin_conn; + return ctor_config; error: if (qd_error_code()) qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message()); - for (qd_connector_t *dc = DEQ_HEAD(admin_conn->connectors); dc; dc = DEQ_HEAD(admin_conn->connectors)) { - DEQ_REMOVE_HEAD(admin_conn->connectors); - dc->state = CXTR_STATE_DELETED; + for (qd_connector_t *dc = DEQ_HEAD(ctor_config->connectors); dc; dc = DEQ_HEAD(ctor_config->connectors)) { + DEQ_REMOVE_HEAD(ctor_config->connectors); + dc->state = CTOR_STATE_DELETED; qd_connector_decref(dc); } - qd_admin_connector_decref(admin_conn); + qd_connector_config_decref(ctor_config); return 0; } -void qd_admin_connector_delete(qd_admin_connector_t *admin_conn) +void qd_connector_config_delete(qd_connector_config_t *ctor_config) { - qd_connector_t *ct = DEQ_HEAD(admin_conn->connectors); + qd_connector_t *ct = DEQ_HEAD(ctor_config->connectors); while (ct) { - DEQ_REMOVE_HEAD(admin_conn->connectors); + DEQ_REMOVE_HEAD(ctor_config->connectors); qd_connector_close(ct); qd_connector_decref(ct); - ct = DEQ_HEAD(admin_conn->connectors); + ct = DEQ_HEAD(ctor_config->connectors); } // drop ref held by the caller - qd_admin_connector_decref(admin_conn); + qd_connector_config_decref(ctor_config); } -void qd_admin_connector_decref(qd_admin_connector_t *admin_conn) +void qd_connector_config_decref(qd_connector_config_t *ctor_config) { - if (!admin_conn) + if (!ctor_config) return; - uint32_t rc = sys_atomic_dec(&admin_conn->ref_count); + uint32_t rc = sys_atomic_dec(&ctor_config->ref_count); (void) rc; assert(rc > 0); // else underflow if (rc == 1) { // Expect: all connectors hold the ref_count so this must be empty - assert(DEQ_IS_EMPTY(admin_conn->connectors)); - sys_mutex_free(&admin_conn->lock); - sys_atomic_destroy(&admin_conn->ref_count); - free(admin_conn->policy_vhost); - qd_tls_config_decref(admin_conn->tls_config); - qd_server_config_free(&admin_conn->config); - free_qd_admin_connector_t(admin_conn); + assert(DEQ_IS_EMPTY(ctor_config->connectors)); + sys_mutex_free(&ctor_config->lock); + sys_atomic_destroy(&ctor_config->ref_count); + free(ctor_config->policy_vhost); + qd_tls_config_decref(ctor_config->tls_config); + qd_server_config_free(&ctor_config->config); + free_qd_connector_config_t(ctor_config); } } // Initiate connections on all child connectors -void qd_admin_connector_connect(qd_admin_connector_t *admin_conn) +void qd_connector_config_connect(qd_connector_config_t *ctor_config) { - if (!admin_conn->activated) { - admin_conn->activated = true; - for (qd_connector_t *ct = DEQ_HEAD(admin_conn->connectors); !!ct; ct = DEQ_NEXT(ct)) { + if (!ctor_config->activated) { + ctor_config->activated = true; + for (qd_connector_t *ct = DEQ_HEAD(ctor_config->connectors); !!ct; ct = DEQ_NEXT(ct)) { qd_connector_connect(ct); } } diff --git a/src/adaptors/amqp/qd_connector.h b/src/adaptors/amqp/qd_connector.h index a3971c9d1..667fc743b 100644 --- a/src/adaptors/amqp/qd_connector.h +++ b/src/adaptors/amqp/qd_connector.h @@ -34,34 +34,34 @@ typedef struct qd_server_t qd_server_t; typedef struct qd_connection_t qd_connection_t; typedef struct vflow_record_t vflow_record_t; typedef struct qd_tls_config_t qd_tls_config_t; -typedef struct qd_admin_connector_t qd_admin_connector_t; +typedef struct qd_connector_config_t qd_connector_config_t; typedef enum { - CXTR_STATE_INIT = 0, - CXTR_STATE_CONNECTING, - CXTR_STATE_OPEN, - CXTR_STATE_FAILED, - CXTR_STATE_DELETED // by management -} cxtr_state_t; + CTOR_STATE_INIT = 0, + CTOR_STATE_CONNECTING, + CTOR_STATE_OPEN, + CTOR_STATE_FAILED, + CTOR_STATE_DELETED // by management +} connector_state_t; /** - * A qd_connector_t manages a single outgoing transport connection. It is responsible for re-establishing the connection - * should it fail. It is the child of a qd_admin_connector_t instance. + * A qd_connector_t manages a single outgoing AMQP network connection connection (represented by a qd_connection_t + * instance). It is responsible for re-establishing the network connection should it fail. */ typedef struct qd_connector_t { - // Sibling connectors belonging to the same parent qd_admin_connector_t + // Sibling connectors sharing the same qd_connector_config_t DEQ_LINKS(struct qd_connector_t); - qd_admin_connector_t *admin_conn; + qd_connector_config_t *ctor_config; - /* Referenced by parent qd_admin_connector_t and child qd_connection_t */ + /* Referenced by parent qd_connector_config_t and child qd_connection_t */ sys_atomic_t ref_count; qd_timer_t *timer; long delay; - /* Connector state and ctx can be modified by I/O or management threads. */ + /* Connector state and qd_conn can be modified by I/O or management threads. */ sys_mutex_t lock; - cxtr_state_t state; + connector_state_t state; qd_connection_t *qd_conn; vflow_record_t *vflow_record; bool oper_status_down; // set when oper-status transitions to 'down' to avoid repeated error indications. @@ -73,22 +73,22 @@ typedef struct qd_connector_t { int conn_index; // Which connection in the connection list to connect to next. /* holds proton transport error condition text on connection failure */ -#define QD_CXTR_CONN_MSG_BUF_SIZE 300 - char conn_msg[QD_CXTR_CONN_MSG_BUF_SIZE]; +#define QD_CTOR_CONN_MSG_BUF_SIZE 300 + char conn_msg[QD_CTOR_CONN_MSG_BUF_SIZE]; } qd_connector_t; DEQ_DECLARE(qd_connector_t, qd_connector_list_t); /** - * An qd_admin_connector_t instance is created for each "connector" configuration object provisioned on the router. A - * connector may have one or more outgoing connections associated with it depending on the connectors role. The purpose - * of the qd_admin_connector_t is to manage a set of outgoing connections associated with the connector - * configuration. An qd_admin_connector_t will instantiate a qd_connector_t for each outgoing connection required by the - * connector configuration. + * An qd_connector_config_t instance is created for each "connector" configuration object provisioned on the router. It + * holds the configuration information that is used for outgoing AMQP connections. The qd_connector_config_t instance + * will be used to construct one or more qd_connection_t instances that share that configuration data. + * + * qd_connector_config_t instances are managed by the Connection Manager. */ -struct qd_admin_connector_t { - DEQ_LINKS(struct qd_admin_connector_t); // connection_manager list +struct qd_connector_config_t { + DEQ_LINKS(struct qd_connector_config_t); // connection_manager list /* Referenced by connection_manager and children qd_connector_t */ sys_atomic_t ref_count; @@ -106,26 +106,27 @@ struct qd_admin_connector_t { qd_connector_list_t connectors; }; -DEQ_DECLARE(qd_admin_connector_t, qd_admin_connector_list_t); +DEQ_DECLARE(qd_connector_config_t, qd_connector_config_list_t); -/** Management call to create an Admin Connector +/** Management call to instantiate a qd_connector_config_t from a configuration entity */ -qd_admin_connector_t *qd_admin_connector_create(qd_dispatch_t *qd, qd_entity_t *entity); +qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t *entity); -/** Management call to delete the Admin Connector +/** Management call to delete a qd_connector_config_t * - * This will close and release all child connector and connections then - * decrement the callers reference count to the admin connector. + * This will close and release all child connector and connections, then decrement the callers reference count to the + * qd_connector_config_t instance. */ -void qd_admin_connector_delete(qd_admin_connector_t *admin_conn); +void qd_connector_config_delete(qd_connector_config_t *ctor_config); -/** Management call to start all child connector connections +/** Management call start all child connections for the given configuration instance */ -void qd_admin_connector_connect(qd_admin_connector_t *admin_conn); +void qd_connector_config_connect(qd_connector_config_t *ctor_config); -/** Drop a reference to the Admin Connector +/** Drop a reference to the configuration instance. + * This may free the given instance. */ -void qd_admin_connector_decref(qd_admin_connector_t *admin_conn); +void qd_connector_config_decref(qd_connector_config_t *ctor_config); /** * Connector API @@ -135,33 +136,33 @@ void qd_admin_connector_decref(qd_admin_connector_t *admin_conn); * Create a new connector. * Call qd_connector_connect() to initiate the outgoing connection */ -qd_connector_t *qd_connector(qd_admin_connector_t *admin_conn, bool is_data_connector); +qd_connector_t *qd_connector(qd_connector_config_t *ctor_config, bool is_data_connector); /** * Initiate an outgoing connection. Returns true if successful. */ -bool qd_connector_connect(qd_connector_t *ct); +bool qd_connector_connect(qd_connector_t *ctor); /** * Close the associated connection and deactivate the connector */ -void qd_connector_close(qd_connector_t *ct); +void qd_connector_close(qd_connector_t *ctor); -void qd_connector_decref(qd_connector_t *ct); +void qd_connector_decref(qd_connector_t *ctor); -const qd_server_config_t *qd_connector_get_config(const qd_connector_t *ct); -const char *qd_connector_get_group_correlator(const qd_connector_t *ct); -bool qd_connector_has_failover_info(const qd_connector_t* ct); -const char *qd_connector_policy_vhost(const qd_connector_t* ct); -void qd_connector_handle_transport_error(qd_connector_t *connector, uint64_t connection_id, pn_condition_t *condition); -void qd_connector_remote_opened(qd_connector_t *connector); +const qd_server_config_t *qd_connector_get_config(const qd_connector_t *ctor); +const char *qd_connector_get_group_correlator(const qd_connector_t *ctor); +bool qd_connector_has_failover_info(const qd_connector_t* ctor); +const char *qd_connector_policy_vhost(const qd_connector_t* ctor); +void qd_connector_handle_transport_error(qd_connector_t *ctor, uint64_t connection_id, pn_condition_t *condition); +void qd_connector_remote_opened(qd_connector_t *ctor); // add a new connection to the parent connector -void qd_connector_add_connection(qd_connector_t *connector, qd_connection_t *ctx); -void qd_connector_add_link(qd_connector_t *connector); +void qd_connector_add_connection(qd_connector_t *ctor, qd_connection_t *qd_conn); +void qd_connector_add_link(qd_connector_t *ctor); // remove the child connection // NOTE WELL: this may free the connector if the connection is holding the last // reference to it -void qd_connector_remove_connection(qd_connector_t *connector, bool final, const char *condition_name, const char *condition_description); +void qd_connector_remove_connection(qd_connector_t *ctor, bool final, const char *condition_name, const char *condition_description); #endif