From a14c259738cd44358ee595199bb46e1522884f68 Mon Sep 17 00:00:00 2001 From: tan Date: Mon, 15 Feb 2021 12:34:32 +0530 Subject: [PATCH] fix channelmax and framemax negotiations While negotiating a connection channelmax and framemax parameters have to be tuned down according to the previous connection tune message from server. Not doing this can result in server aborting the connection. --- src/protocol.jl | 23 +++++++++++++---------- test/runtests.jl | 9 ++++++++- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/src/protocol.jl b/src/protocol.jl index 6666e5b..b456833 100644 --- a/src/protocol.jl +++ b/src/protocol.jl @@ -354,7 +354,7 @@ function connection_processor(c, name, fn) end function connection_sender(c::Connection) - @debug("==> sending on conn", host=c.virtualhost) + @debug("==> sending on conn", host=c.host, port=c.port, virtualhost=c.virtualhost) nbytes = sendq_to_stream(sock(c), c.sendq) @debug("==> sent", nbytes) c.heartbeat_time_client = time() # update heartbeat time for client @@ -1084,6 +1084,7 @@ function on_connection_tune(chan::MessageChannel, m::TAMQPMethodFrame, ctx) conn.channelmax = m.payload.fields[1].second conn.framemax = m.payload.fields[2].second conn.heartbeat = m.payload.fields[3].second + @debug("got_connection_tune", channelmax=conn.channelmax, framemax=conn.framemax, heartbeat=conn.heartbeat) handle(chan, FrameHeartbeat, on_connection_heartbeat) send_connection_tune_ok(chan, ctx[:channelmax], ctx[:framemax], ctx[:heartbeat]) handle(chan, :Connection, :Tune) @@ -1095,17 +1096,19 @@ end function send_connection_tune_ok(chan::MessageChannel, channelmax=0, framemax=0, heartbeat=0) conn = chan.conn - # set channelmax and framemax - (channelmax > 0) && (conn.channelmax = channelmax) - (framemax > 0) && (conn.framemax = framemax) - - # negotiate heartbeat (min of what expected by both parties) - if heartbeat > 0 && conn.heartbeat > 0 - conn.heartbeat = min(conn.heartbeat, heartbeat) - else - conn.heartbeat = max(conn.heartbeat, heartbeat) + # negotiate (min of what expected by both parties) + function opt(desired_param, limited_param) + if desired_param > 0 && limited_param > 0 + min(desired_param, limited_param) + else + max(desired_param, limited_param) + end end + conn.channelmax = opt(channelmax, conn.channelmax) + conn.framemax = opt(framemax, conn.framemax) + conn.heartbeat = opt(heartbeat, conn.heartbeat) + @debug("send_connection_tune_ok", channelmax=conn.channelmax, framemax=conn.framemax, heartbeat=conn.heartbeat) send(chan, TAMQPMethodPayload(:Connection, :TuneOk, (conn.channelmax, conn.framemax, conn.heartbeat))) diff --git a/test/runtests.jl b/test/runtests.jl index 2f3d5af..aa79838 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -10,7 +10,14 @@ AMQPTestRPC.runtests() if length(ARGS) > 0 amqps_host = ARGS[1] - AMQPTestCoverage.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, amqps=amqps_configure()) + virtualhost = ARGS[2] + port = AMQPClient.AMQPS_DEFAULT_PORT + + login = ENV["AMQPPLAIN_LOGIN"] + password = ENV["AMQPPLAIN_PASSWORD"] + auth_params = Dict{String,Any}("MECHANISM"=>"AMQPLAIN", "LOGIN"=>login, "PASSWORD"=>password) + + AMQPTestCoverage.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, virtualhost=virtualhost, amqps=amqps_configure(), auth_params=auth_params) AMQPTestThroughput.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, tls=true) AMQPTestRPC.runtests(; host=amqps_host, port=AMQPClient.AMQPS_DEFAULT_PORT, amqps=amqps_configure()) end