Skip to content

Commit

Permalink
Fixes skupperproject#1732: implement new Admin Connector type
Browse files Browse the repository at this point in the history
  • Loading branch information
kgiusti committed Feb 4, 2025
1 parent ab29b6c commit ed10a4b
Show file tree
Hide file tree
Showing 10 changed files with 473 additions and 375 deletions.
11 changes: 11 additions & 0 deletions include/qpid/dispatch/dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "qpid/dispatch/error.h"

#include <stdbool.h>
#include <stdint.h>

/**@file
* Configure and prepare a dispatch instance.
Expand All @@ -33,6 +34,7 @@
typedef struct qd_dispatch_t qd_dispatch_t;
typedef struct qd_connection_manager_t qd_connection_manager_t;
typedef struct qd_policy_t qd_policy_t;
typedef struct qd_server_t qd_server_t;

/**
* Initialize the Dispatch library and prepare it for operation.
Expand Down Expand Up @@ -83,6 +85,15 @@ qd_connection_manager_t *qd_dispatch_connection_manager(const qd_dispatch_t *qd)
*/
qd_policy_t *qd_dispatch_get_policy(const qd_dispatch_t *dispatch);

/**
* Return the configured inter-router data connection count
*/
uint32_t qd_dispatch_get_data_connection_count(const qd_dispatch_t *dispatch);

/**
* Return the routers server
*/
qd_server_t *qd_dispatch_get_server(const qd_dispatch_t *dispatch);

/**
* @}
Expand Down
34 changes: 19 additions & 15 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1402,8 +1402,10 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
const char *host = 0;
char host_local[255];
const qd_server_config_t *config;
if (qd_connection_connector(conn)) {
config = qd_connector_config(qd_connection_connector(conn));
qd_connector_t *connector = qd_connection_connector(conn);

if (connector) {
config = qd_connector_get_config(connector);
snprintf(host_local, 254, "%s", config->host_port);
host = &host_local[0];
}
Expand All @@ -1414,8 +1416,13 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
qd_router_connection_get_config(conn, &role, &cost, &name,
&conn->strip_annotations_in, &conn->strip_annotations_out, &link_capacity);

if (conn->connector && conn->connector->config.has_data_connectors) {
memcpy(conn->group_correlator, conn->connector->group_correlator, QD_DISCRIMINATOR_SIZE);
if (connector && !!connector->admin_conn->data_connection_count) {
memcpy(conn->group_correlator, connector->admin_conn->group_correlator, QD_DISCRIMINATOR_SIZE);
if (connector->is_data_connector) {
// override the configured role to identify this as a data connection
assert(role == QDR_ROLE_INTER_ROUTER);
role = QDR_ROLE_INTER_ROUTER_DATA;
}
}

// check offered capabilities for streaming link support and connection trunking support
Expand Down Expand Up @@ -1524,8 +1531,8 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
} else if ((key.size == strlen(QD_CONNECTION_PROPERTY_ACCESS_ID)
&& strncmp(key.start, QD_CONNECTION_PROPERTY_ACCESS_ID, key.size) == 0)) {
if (!pn_data_next(props)) break;
if (!!conn->connector && !!conn->connector->vflow_record && pn_data_type(props) == PN_STRING) {
vflow_set_ref_from_pn(conn->connector->vflow_record, VFLOW_ATTRIBUTE_PEER, props);
if (!!connector && !!connector->vflow_record && pn_data_type(props) == PN_STRING) {
vflow_set_ref_from_pn(connector->vflow_record, VFLOW_ATTRIBUTE_PEER, props);
}

} else {
Expand Down Expand Up @@ -1553,7 +1560,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
authenticated,
conn->opened,
(char*) mech,
conn->connector ? QD_OUTGOING : QD_INCOMING,
connector ? QD_OUTGOING : QD_INCOMING,
host,
proto,
cipher,
Expand Down Expand Up @@ -1592,17 +1599,14 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool
qd_listener_add_link(conn->listener);
}

if (!!conn->connector) {
qd_connector_add_link(conn->connector);
}

if (conn->connector) {
sys_mutex_lock(&conn->connector->lock);
qd_format_string(conn->connector->conn_msg, QD_CXTR_CONN_MSG_BUF_SIZE,
if (!!connector) {
qd_connector_add_link(connector);
sys_mutex_lock(&connector->lock);
qd_format_string(connector->conn_msg, QD_CXTR_CONN_MSG_BUF_SIZE,
"[C%"PRIu64"] Connection Opened: dir=%s host=%s encrypted=%s auth=%s user=%s container_id=%s",
connection_id, inbound ? "in" : "out", host, encrypted ? proto : "no",
authenticated ? mech : "no", (char*) user, container);
sys_mutex_unlock(&conn->connector->lock);
sys_mutex_unlock(&connector->lock);
}

free(proto);
Expand Down
Loading

0 comments on commit ed10a4b

Please sign in to comment.