Skip to content

Commit

Permalink
Fixes skupperproject#1700: refactor the AMQP link lifecycle (skupperp…
Browse files Browse the repository at this point in the history
…roject#1701)

This change removes some of the old link attach routing logic and
attempts to clean up the link API.

The logic that used to track the exchange of Attach/Detach
performatives has been simplified. The various counters and booleans
maintained by the qdr_link_t structure for tracking this exchange has
been reduced to a mask/flag implementation similar to Protons
implementation of endpoint state.

This patch refactors the link detach adaptor API to be more like the
existing AMQP connection API: there is now an explict API call to
release the link instance at the end of its lifecycle.

The adaptor API is modified by separating the AMQP detach handling
from the release of the link instance. The old qdr_link_detach()
adaptor function has been refactored into two functions:
qdr_link_detach_received() and qdr_link_close().

The qdr_link_detach_received() call is made by the AMQP adaptor when a
Detach Peformative has been received by the peer. It is only used by
the AMQP adaptor.

The new qdr_link_closed() API call is made by all adaptors when the
link instance is destroyed. This is similar to the existing
qdr_connection_closed() call but for links. It is used by all adaptors
to indicate to the core that the link is no longer in use and can be
cleaned up. In the case of the AMQP adaptor this call will be made
after the link detach handshake has completed.

Test coverage by the test-sender AMQP client has been increased by
adding a clean connection close function.
  • Loading branch information
kgiusti authored Jan 20, 2025
1 parent 1fb1a72 commit 2a0aaba
Show file tree
Hide file tree
Showing 17 changed files with 338 additions and 261 deletions.
44 changes: 29 additions & 15 deletions include/qpid/dispatch/protocol_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,15 +386,18 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core,
void *bind_token);

/**
* qdr_connection_closed
* qdr_connection_notify_closed
*
* This function must be called when a connection is closed, either cleanly by protocol
* or uncleanly by lost connectivity. Once this function is called, the caller must never
* again refer to or use the connection pointer.
* This function is invoked by the adaptor to notify the core that the given connection has been closed. This must be
* called when a connection is closed, either cleanly by protocol or uncleanly by lost connectivity.
*
* This must be the last core API call made by the adaptor for this connection. The core thread will free the
* qdr_connection_t as a result of this call therefore the adaptor MUST NOT reference the qdr_connection_t on return
* from this call.
*
* @param conn The pointer returned by qdr_connection_opened
*/
void qdr_connection_closed(qdr_connection_t *conn);
void qdr_connection_notify_closed(qdr_connection_t *conn);

/**
* qdr_connection_set_tracing
Expand Down Expand Up @@ -643,12 +646,6 @@ qd_iterator_t *qdr_terminus_dnp_address(qdr_terminus_t *term);
******************************************************************************
*/

typedef enum {
QD_DETACHED, // Protocol detach
QD_CLOSED, // Protocol close
QD_LOST // Connection or session closed
} qd_detach_type_t;

/**
* qdr_link_set_context
*
Expand Down Expand Up @@ -800,15 +797,32 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);

/**
* qdr_link_detach
* qdr_link_detach_received
*
* This function is invoked when a link detach arrives.
* This function is invoked when a link detach performative arrives from the remote peer. This may the first detach
* (peer-initiated link detach) or in response to a detach sent by the router (second detach).
*
* @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event.
* @param dt The type of detach that occurred.
* @param error The link error from the detach frame or 0 if none.
*/
void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error);
void qdr_link_detach_received(qdr_link_t *link, qdr_error_t *error);


/**
* qdr_link_notify_closed
*
* This function is invoked by the adaptor to notify the core that the given link has been closed. This must be called
* when the link is closed, either cleanly by protocol or uncleanly by lost connectivity (e.g. parent connection
* drop). This will also be called during adaptor shutdown on any outstanding links.
*
* This must be the last core API call made by the adaptor for this link. The core thread will free the qdr_link_t as a
* result of this call therefore the adaptor MUST NOT reference the qdr_link_t on return from this call.
*
* @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event.
* @param forced True if the link was closed due to failure or shutdown. False if closed by clean detach handshake.
*/
void qdr_link_notify_closed(qdr_link_t *link, bool forced);


/**
* qdr_link_deliver
Expand Down
102 changes: 51 additions & 51 deletions src/adaptors/amqp/amqp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,24 +139,6 @@ static qdr_delivery_t *qdr_node_delivery_qdr_from_pn(pn_delivery_t *dlv)
return ref ? (qdr_delivery_t*) ref->ref : 0;
}

// clean up all qdr_delivery/pn_delivery bindings for the link
//
void qd_link_abandoned_deliveries_handler(qd_router_t *router, qd_link_t *link)
{
qd_link_ref_list_t *list = qd_link_get_ref_list(link);
qd_link_ref_t *ref = DEQ_HEAD(*list);

while (ref) {
qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref;
pn_delivery_t *pdlv = qdr_delivery_get_context(dlv);
assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv));

// this will remove and release the ref
qdr_node_disconnect_deliveries(router->router_core, link, dlv, pdlv);
ref = DEQ_HEAD(*list);
}
}


// read the delivery-state set by the remote endpoint
//
Expand Down Expand Up @@ -1223,10 +1205,9 @@ static int AMQP_link_flow_handler(qd_router_t *router, qd_link_t *link)
/**
* Link Detached Handler
*/
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_detach_type_t dt)
static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link)
{
if (!link)
return 0;
assert(link);

pn_link_t *pn_link = qd_link_pn(link);
if (!pn_link)
Expand Down Expand Up @@ -1257,29 +1238,59 @@ static int AMQP_link_detach_handler(qd_router_t *router, qd_link_t *link, qd_det
}
}

qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0;
// Notify the core that a detach has been received.

qdr_link_t *rlink = (qdr_link_t *) qd_link_get_context(link);
if (rlink) {
//
// If this is the second (response) detach or the link hasn't really detached but is being dropped due to parent
// connection/session loss then this is the last proton event that will be generated for this link. The qd_link
// will be freed on return from this call so remove the cross linkage between it and the qdr_link peer.

if (dt == QD_LOST || qdr_link_get_context(rlink) == 0) {
// note qdr_link context will be zeroed when the core sends the first detach, so if it is zero then this is
// the second detach!
qd_link_set_context(link, 0);
qdr_link_set_context(rlink, 0);
}

qdr_error_t *error = qdr_error_from_pn(cond);
qdr_link_detach(rlink, dt, error);
pn_condition_t *cond = pn_link_remote_condition(pn_link);
qdr_error_t *error = qdr_error_from_pn(cond);
qdr_link_detach_received(rlink, error);
} else if ((pn_link_state(pn_link) & PN_LOCAL_CLOSED) == 0) {
// Normally the core would be responsible for sending the response detach to close the link (via
// CORE_link_detach) but since there is no core link that will not happen.
pn_link_close(pn_link);
}

return 0;
}


/**
* Link closed handler
*
* This is the last callback for the given link - the link will be freed on return from this call! Forced is true if the
* link has not properly closed (detach handshake completed).
*/
static void AMQP_link_closed_handler(qd_router_t *router, qd_link_t *qd_link, bool forced)
{
assert(qd_link);

// Clean up all qdr_delivery/pn_delivery bindings for the link.

qd_link_ref_list_t *list = qd_link_get_ref_list(qd_link);
qd_link_ref_t *ref = DEQ_HEAD(*list);

while (ref) {
qdr_delivery_t *dlv = (qdr_delivery_t*) ref->ref;
pn_delivery_t *pdlv = qdr_delivery_get_context(dlv);
assert(pdlv && ref == (qd_link_ref_t*) pn_delivery_get_context(pdlv));

// This will decrement the qdr_delivery_t reference count - do not access the dlv pointer after this call!
qdr_node_disconnect_deliveries(router->router_core, qd_link, dlv, pdlv);
ref = DEQ_HEAD(*list);
}

qdr_link_t *qdr_link = (qdr_link_t *) qd_link_get_context(qd_link);
if (qdr_link) {
// Notify core that this link no longer exists
qdr_link_set_context(qdr_link, 0);
qd_link_set_context(qd_link, 0);
qdr_link_notify_closed(qdr_link, forced);
// This will cause the core to free qdr_link at some point so:
qdr_link = 0;
}
}

static void bind_connection_context(qdr_connection_t *qdrc, void* token)
{
qd_connection_t *conn = (qd_connection_t*) token;
Expand Down Expand Up @@ -1761,7 +1772,7 @@ static int AMQP_closed_handler(qd_router_t *router, qd_connection_t *conn, void
if (!!conn->listener && qdrc->role != QDR_ROLE_INTER_ROUTER_DATA) {
qd_listener_remove_link(conn->listener);
}
qdr_connection_closed(qdrc);
qdr_connection_notify_closed(qdrc);
qd_connection_set_context(conn, 0);
}

Expand All @@ -1776,8 +1787,8 @@ static const qd_node_type_t router_node = {"router", 0,
AMQP_outgoing_link_handler,
AMQP_conn_wake_handler,
AMQP_link_detach_handler,
AMQP_link_closed_handler,
AMQP_link_attach_handler,
qd_link_abandoned_deliveries_handler,
AMQP_link_flow_handler,
0, // node_created_handler
0, // node_destroyed_handler
Expand Down Expand Up @@ -1920,7 +1931,7 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error
return;

pn_link_t *pn_link = qd_link_pn(qlink);
if (!pn_link)
if (!pn_link || !!(pn_link_state(pn_link) & PN_LOCAL_CLOSED)) // already detached
return;

if (error) {
Expand All @@ -1945,17 +1956,6 @@ static void CORE_link_detach(void *context, qdr_link_t *link, qdr_error_t *error
}
}

//
// This is the last event for this link that the core is going to send into Proton so remove the core => adaptor
// linkage. If this is the response attach then there will be no further proton link events to send to the core so
// remove the adaptor => core linkage. If this is the first (request) detach preserve the adaptor => core linkage so
// we can notify the core when the second (response) detach arrives
//
qdr_link_set_context(link, 0);
if (!first) {
qd_link_set_context(qlink, 0);
}

qd_link_close(qlink);
}

Expand Down
60 changes: 34 additions & 26 deletions src/adaptors/amqp/container.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ struct qd_link_t {
ALLOC_DEFINE_SAFE(qd_link_t);
ALLOC_DEFINE(qd_link_ref_t);

static void qd_link_free(qd_link_t *);


/** Encapsulates a proton session */
struct qd_session_t {
DEQ_LINKS(qd_session_t);
Expand Down Expand Up @@ -177,7 +180,7 @@ static qd_link_t *setup_outgoing_link(qd_container_t *container, pn_link_t *pn_l
qd_session_incref(link->qd_session);

pn_link_set_context(pn_link, link);
container->ntype->outgoing_handler(container->qd_router, link);
container->ntype->outgoing_link_handler(container->qd_router, link);
return link;
}

Expand Down Expand Up @@ -209,7 +212,7 @@ static qd_link_t *setup_incoming_link(qd_container_t *container, pn_link_t *pn_l
pn_link_set_max_message_size(pn_link, max_size);
}
pn_link_set_context(pn_link, link);
container->ntype->incoming_handler(container->qd_router, link);
container->ntype->incoming_link_handler(container->qd_router, link);
return link;
}

Expand Down Expand Up @@ -277,7 +280,8 @@ static void notify_closed(qd_container_t *container, qd_connection_t *conn, void


// The given connection has dropped. There will be no further link events for this connection so manually clean up all
// links
// links. Note that we do not free the pn_link_t - proton will free all links when the parent connection is freed.
//
static void close_links(qd_container_t *container, pn_connection_t *conn, bool print_log)
{
pn_link_t *pn_link = pn_link_head(conn, 0);
Expand All @@ -289,7 +293,7 @@ static void close_links(qd_container_t *container, pn_connection_t *conn, bool p
if (print_log)
qd_log(LOG_CONTAINER, QD_LOG_DEBUG, "Aborting link '%s' due to parent connection end",
pn_link_name(pn_link));
container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST);
container->ntype->link_closed_handler(container->qd_router, qd_link, true); // true == forced
qd_link_free(qd_link);
}

Expand Down Expand Up @@ -318,6 +322,7 @@ static void cleanup_link(qd_link_t *link)
// cleanup any inbound message that has not been forwarded
qd_message_t *msg = qd_alloc_deref_safe_ptr(&link->incoming_msg);
if (msg) {
qd_nullify_safe_ptr(&link->incoming_msg);
qd_message_free(msg);
}
}
Expand All @@ -326,8 +331,7 @@ static void cleanup_link(qd_link_t *link)
static int close_handler(qd_container_t *container, pn_connection_t *conn, qd_connection_t* qd_conn)
{
//
// Close all links, passing QD_LOST as the reason. These links are not
// being properly 'detached'. They are being orphaned.
// Close all links. These links are not being properly 'detached'. They are being orphaned.
//
if (qd_conn)
qd_conn->closed = true;
Expand Down Expand Up @@ -508,9 +512,9 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
// Remote has nuked our session. Check for any links that were
// left open and forcibly detach them, since no detaches will
// arrive on this session.
// Remote has closed the session. Check for any child links and forcibly close them since there will be
// no detach performatives arriving for these links. Note that we do not free the pn_link_t since proton
// will free all child pn_link_t when it frees the session.
pn_link = pn_link_head(conn, 0);
while (pn_link) {
pn_link_t *next_link = pn_link_next(pn_link, 0);
Expand All @@ -529,7 +533,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
qd_log(LOG_CONTAINER, QD_LOG_DEBUG,
"Aborting link '%s' due to parent session end", pn_link_name(pn_link));
container->ntype->link_detach_handler(container->qd_router, qd_link, QD_LOST);
container->ntype->link_closed_handler(container->qd_router, qd_link, true);
qd_link_free(qd_link);
}
}
Expand Down Expand Up @@ -590,10 +594,6 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
pn_link = pn_event_link(event);
qd_link = (qd_link_t*) pn_link_get_context(pn_link);
if (qd_link) {
qd_detach_type_t dt = pn_event_type(event) == PN_LINK_REMOTE_CLOSE ? QD_CLOSED : QD_DETACHED;
if (qd_link->pn_link == pn_link) {
pn_link_close(pn_link);
}
if (qd_link->policy_counted) {
qd_link->policy_counted = false;
if (pn_link_is_sender(pn_link)) {
Expand All @@ -609,25 +609,35 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
}
}

container->ntype->link_detach_handler(container->qd_router, qd_link, dt);
// notify arrival of inbound detach
container->ntype->link_detach_handler(container->qd_router, qd_link);

if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) {
// link fully closed
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
// Link now fully detached
container->ntype->link_closed_handler(container->qd_router, qd_link, false);
qd_link_free(qd_link);
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
}
} else { // no qd_link, manually detach or free
if ((pn_link_state(pn_link) & PN_LOCAL_CLOSED) == 0) {
pn_link_close(pn_link);
} else {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
}

} else {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
}
}
break;

case PN_LINK_LOCAL_CLOSE:
pn_link = pn_event_link(event);
if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
add_link_to_free_list(&qd_conn->free_link_list, pn_link);
qd_link_free((qd_link_t *) pn_link_get_context(pn_link));
qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link);
if (qd_link) {
// Link now fully detached
container->ntype->link_closed_handler(container->qd_router, qd_link, false);
qd_link_free(qd_link);
}
add_link_to_free_list(&qd_conn->free_link_list, pn_link); // why???
}
break;

Expand Down Expand Up @@ -775,16 +785,14 @@ qd_link_t *qd_link(qd_connection_t *conn, qd_direction_t dir, const char* name,
}


void qd_link_free(qd_link_t *link)
static void qd_link_free(qd_link_t *link)
{
if (!link) return;

sys_mutex_lock(&amqp_adaptor.container->lock);
DEQ_REMOVE(amqp_adaptor.container->links, link);
sys_mutex_unlock(&amqp_adaptor.container->lock);

amqp_adaptor.container->ntype->link_abandoned_deliveries_handler(amqp_adaptor.container->qd_router, link);

cleanup_link(link);
free_qd_link_t(link);
}
Expand Down
Loading

0 comments on commit 2a0aaba

Please sign in to comment.