Skip to content

Commit

Permalink
fixup: API name changes:
Browse files Browse the repository at this point in the history
o) rename qd_admin_connector_t --> qd_connector_config_t
o) change "admin_conn" to "ctor_config"
o) Use "ctor" as shorthand for "connector" in API and local vars
  • Loading branch information
kgiusti committed Feb 6, 2025
1 parent 6c2ec92 commit ec68f0e
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 182 deletions.
6 changes: 3 additions & 3 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1416,8 +1416,8 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
qd_router_connection_get_config(conn, &role, &cost, &name,
&conn->strip_annotations_in, &conn->strip_annotations_out, &link_capacity);

if (connector && !!connector->admin_conn->data_connection_count) {
memcpy(conn->group_correlator, connector->admin_conn->group_correlator, QD_DISCRIMINATOR_SIZE);
if (connector && !!connector->ctor_config->data_connection_count) {
memcpy(conn->group_correlator, connector->ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE);
if (connector->is_data_connector) {
// override the configured role to identify this as a data connection
assert(role == QDR_ROLE_INTER_ROUTER);
Expand Down Expand Up @@ -1602,7 +1602,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
if (!!connector) {
qd_connector_add_link(connector);
sys_mutex_lock(&connector->lock);
qd_format_string(connector->conn_msg, QD_CXTR_CONN_MSG_BUF_SIZE,
qd_format_string(connector->conn_msg, QD_CTOR_CONN_MSG_BUF_SIZE,
"[C%"PRIu64"] Connection Opened: dir=%s host=%s encrypted=%s auth=%s user=%s container_id=%s",
connection_id, inbound ? "in" : "out", host, encrypted ? proto : "no",
authenticated ? mech : "no", (char*) user, container);
Expand Down
64 changes: 32 additions & 32 deletions src/adaptors/amqp/connection_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
struct qd_connection_manager_t {
qd_server_t *server;
qd_listener_list_t listeners;
qd_admin_connector_list_t admin_connectors;
qd_connector_config_list_t connector_configs;
};


Expand Down Expand Up @@ -166,21 +166,21 @@ static int get_failover_info_length(qd_failover_item_list_t conn_info_list)
*/
QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl)
{
qd_admin_connector_t *admin_conn = (qd_admin_connector_t *) impl;
qd_connector_config_t *ctor_config = (qd_connector_config_t *) impl;
qd_connector_t *connector = 0;

qd_error_clear();

// TODO(kgiusti): inter-router connections may have several qd_connector_ts active due to the router data connection
// count configuration. However we can only report 1 connector via management. It would be more accurate to report
// all connectors associated with this management entity
sys_mutex_lock(&admin_conn->lock);
connector = DEQ_HEAD(admin_conn->connectors);
sys_mutex_lock(&ctor_config->lock);
connector = DEQ_HEAD(ctor_config->connectors);
if (connector) {
// prevent I/O thread from freeing connector while it is being accessed
sys_atomic_inc(&connector->ref_count);
}
sys_mutex_unlock(&admin_conn->lock);
sys_mutex_unlock(&ctor_config->lock);

if (connector) {
int i = 1;
Expand Down Expand Up @@ -244,19 +244,19 @@ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl

const char *state_info = 0;
switch (connector->state) {
case CXTR_STATE_CONNECTING:
case CTOR_STATE_CONNECTING:
state_info = "CONNECTING";
break;
case CXTR_STATE_OPEN:
case CTOR_STATE_OPEN:
state_info = "SUCCESS";
break;
case CXTR_STATE_FAILED:
case CTOR_STATE_FAILED:
state_info = "FAILED";
break;
case CXTR_STATE_INIT:
case CTOR_STATE_INIT:
state_info = "INITIALIZING";
break;
case CXTR_STATE_DELETED:
case CTOR_STATE_DELETED:
// deleted by management, waiting for connection to close
state_info = "CLOSING";
break;
Expand All @@ -283,17 +283,17 @@ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl
}


QD_EXPORT qd_admin_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity)
QD_EXPORT qd_connector_config_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity)
{
qd_connection_manager_t *cm = qd->connection_manager;
qd_admin_connector_t *admin_conn = qd_admin_connector_create(qd, entity);
if (!admin_conn) {
qd_connector_config_t *ctor_config = qd_connector_config_create(qd, entity);
if (!ctor_config) {
return 0;
}

DEQ_INSERT_TAIL(cm->admin_connectors, admin_conn);
log_config(&admin_conn->config, "Connector", true);
return admin_conn;
DEQ_INSERT_TAIL(cm->connector_configs, ctor_config);
log_config(&ctor_config->config, "Connector", true);
return ctor_config;
}


Expand All @@ -305,7 +305,7 @@ qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd)

cm->server = qd->server;
DEQ_INIT(cm->listeners);
DEQ_INIT(cm->admin_connectors);
DEQ_INIT(cm->connector_configs);

return cm;
}
Expand Down Expand Up @@ -333,11 +333,11 @@ void qd_connection_manager_free(qd_connection_manager_t *cm)
li = DEQ_HEAD(cm->listeners);
}

qd_admin_connector_t *admin_conn = DEQ_HEAD(cm->admin_connectors);
while (admin_conn) {
DEQ_REMOVE_HEAD(cm->admin_connectors);
qd_admin_connector_delete(admin_conn);
admin_conn = DEQ_HEAD(cm->admin_connectors);
qd_connector_config_t *ctor_config = DEQ_HEAD(cm->connector_configs);
while (ctor_config) {
DEQ_REMOVE_HEAD(cm->connector_configs);
qd_connector_config_delete(ctor_config);
ctor_config = DEQ_HEAD(cm->connector_configs);
}

free(cm);
Expand All @@ -351,7 +351,7 @@ QD_EXPORT void qd_connection_manager_start(qd_dispatch_t *qd)
{
static bool first_start = true;
qd_listener_t *li = DEQ_HEAD(qd->connection_manager->listeners);
qd_admin_connector_t *admin_conn = DEQ_HEAD(qd->connection_manager->admin_connectors);
qd_connector_config_t *ctor_config = DEQ_HEAD(qd->connection_manager->connector_configs);

while (li) {
if (!li->pn_listener) {
Expand All @@ -366,9 +366,9 @@ QD_EXPORT void qd_connection_manager_start(qd_dispatch_t *qd)
li = DEQ_NEXT(li);
}

while (admin_conn) {
qd_admin_connector_connect(admin_conn);
admin_conn = DEQ_NEXT(admin_conn);
while (ctor_config) {
qd_connector_config_connect(ctor_config);
ctor_config = DEQ_NEXT(ctor_config);
}

first_start = false;
Expand Down Expand Up @@ -399,19 +399,19 @@ QD_EXPORT void qd_connection_manager_delete_listener(qd_dispatch_t *qd, void *im
//
QD_EXPORT void qd_connection_manager_delete_connector(qd_dispatch_t *qd, void *impl)
{
qd_admin_connector_t *admin_conn = (qd_admin_connector_t*) impl;
assert(admin_conn);
qd_connector_config_t *ctor_config = (qd_connector_config_t *) impl;
assert(ctor_config);

// take it off the connection manager

log_config(&admin_conn->config, "Connector", false);
DEQ_REMOVE(qd->connection_manager->admin_connectors, admin_conn);
qd_admin_connector_delete(admin_conn);
log_config(&ctor_config->config, "Connector", false);
DEQ_REMOVE(qd->connection_manager->connector_configs, ctor_config);
qd_connector_config_delete(ctor_config);
}


const char *qd_connector_name(qd_connector_t *ct)
{
return ct ? ct->admin_conn->config.name : 0;
return ct ? ct->ctor_config->config.name : 0;
}

14 changes: 7 additions & 7 deletions src/adaptors/amqp/qd_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ static void decorate_connection(qd_connection_t *ctx, const qd_server_config_t *
pn_data_put_int(pn_connection_properties(conn), QDR_ROLE_INTER_ROUTER_DATA);
}

if (ctx->connector && (ctx->connector->is_data_connector || !!ctx->connector->admin_conn->data_connection_count)) {
if (ctx->connector && (ctx->connector->is_data_connector || !!ctx->connector->ctor_config->data_connection_count)) {
pn_data_put_symbol(pn_connection_properties(conn),
pn_bytes(strlen(QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY), QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY));
pn_data_put_string(pn_connection_properties(conn),
Expand Down Expand Up @@ -464,7 +464,7 @@ const qd_server_config_t *qd_connection_config(const qd_connection_t *conn)
if (conn->listener)
return &conn->listener->config;
if (conn->connector)
return &conn->connector->admin_conn->config;
return &conn->connector->ctor_config->config;
return NULL;
}

Expand Down Expand Up @@ -533,7 +533,7 @@ void qd_connection_invoke_deferred_calls(qd_connection_t *conn, bool discard)
const char* qd_connection_name(const qd_connection_t *c)
{
if (c->connector) {
return c->connector->admin_conn->config.host_port;
return c->connector->ctor_config->config.host_port;
} else {
return c->rhost_port;
}
Expand Down Expand Up @@ -595,14 +595,14 @@ static void set_rhost_port(qd_connection_t *ctx) {
static bool setup_ssl_sasl_and_open(qd_connection_t *ctx)
{
qd_connector_t *ct = ctx->connector;
const qd_server_config_t *config = &ct->admin_conn->config;
const qd_server_config_t *config = &ct->ctor_config->config;
pn_transport_t *tport = pn_connection_transport(ctx->pn_conn);

//
// Create an SSL session if required
//
if (ct->admin_conn->tls_config) {
ctx->ssl = qd_tls_session_amqp(ct->admin_conn->tls_config, tport, false);
if (ct->ctor_config->tls_config) {
ctx->ssl = qd_tls_session_amqp(ct->ctor_config->tls_config, tport, false);
if (!ctx->ssl) {
qd_log(LOG_SERVER, QD_LOG_ERROR,
"Failed to create TLS session for connection [C%" PRIu64 "] to %s:%s (%s)",
Expand Down Expand Up @@ -682,7 +682,7 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) {
qd_log(LOG_SERVER, QD_LOG_INFO, "[C%" PRIu64 "] Accepted connection to %s from %s",
ctx->connection_id, name, ctx->rhost_port);
} else if (ctx->connector) { /* Establishing an outgoing connection */
config = &ctx->connector->admin_conn->config;
config = &ctx->connector->ctor_config->config;
if (!setup_ssl_sasl_and_open(ctx)) {
qd_log(LOG_SERVER, QD_LOG_ERROR, "[C%" PRIu64 "] Connection aborted due to internal setup error",
ctx->connection_id);
Expand Down
Loading

0 comments on commit ec68f0e

Please sign in to comment.