-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reconnection not reattempted if the connection is broken whilst recovery of Channels, etc is in progress? #28
Comments
I discovered that the InterruptableWaiter was interrupting the await() in callWithRetries, though I don't yet understand that part of the program. |
This was because the channel.close() in recoverExchangesAndQueues was throwing an AlreadyClosedException, but the catch was on (IOException ignored). I changed it to catch Exception. Best, Dan. |
The same thing can, of course, happen with the next channelHandler.recoverChannel(true). Does
make sense? I also added a
|
Alternatively, this could be fixed, potentially more elegantly, by handling the InterruptionException, but I wasn't sure how to distinguish valid interrupts from invalid ones. If you get a chance, you can see my final code of the day at https://github.com/fakeh/lyra |
Hi Dan - Was traveling yesterday so I didn't get to look at this. Looking now... |
I need to rethink some of the queue/exchange recovery logic a bit. Working on that along with getting the tests together. This stuff definitely isn't quite ready for release yet :) |
Hi there, |
I just pushed a commit that re-works the way exchange/queue/binding recovery is done, along with some tests for these areas. Let me know how this looks against the tests you wrote. |
I merged this commit, which returns callWithReplies back to its original state, and reworks the exchange, queue & binding recovery into separate methods in RetryableResource which catch and selectively throw their exceptions. Exceptions are being thrown which cancel the next connection recovery:
After which no more connections recovery attempts are made. Presumably because
returns true. |
You can access the merge at https://github.com/fakeh/lyra/tree/MergeWithJHalterman |
I tried reproducing this with your merges, but I haven't had any luck yet. Do you have a test you can share or describe what you're doing? Also, I pushed another commit that is somewhat related to this work which you'll want to merge. |
I run the following script, which opens up a port proxying to an AMQP instance. You can run it on the same machine as the client, but you'll need to change the port. Customise the top to set the AMQP host's ip/port and local proxy port. The port is open for a random period, between 1 and 20 seconds. It's then closed for another 1 to twenty seconds. Ctrl-C quits.
I've not had to wait more than a few minutes for this to demonstrate the scenario above. The client itself is configured:
and makes ten or so consumers on one connection/channel, all use the same exchange but their their own queue and ~five bindings each. Sorry it's not a unit test, whilst I wanted it to be, I couldn't quite figure out testng/your abstractions in a way I thought was productive. Cheers, Dan. |
So I run the shell script and run a client on my box (OSX) - nothing doing. How is the shell script intended to work? |
Saving it to a file, giving it chmod +x and executing it doesn't give any output? You should see "Killing in x seconds", "Opening in x seconds" repeatedly. This means that the port configured at the top of the script is proxying to the IP and remote port configured to the script for up to twenty seconds before the proxy if forcibly closed. If you're running it on the same machine as the rabbitmq instance, you'll need to change the local port. You'll definitely need to change the IP address |
I ran it from the same machine as my Rabbit client which is connected to a remote Rabbit server. The bash script printed output, but didn't effect the Rabbit client at all. |
The rabbit client would need to be configured to connect to localhost, as the RabbitMQ instance server. And the IP address in the script set to the IP of the remote RabbitMQ instance. |
I like the proxy script, and I was able to reproduce the failure with your fork just as you described (after a few minutes), but I couldn't reproduce it with the current Lyra master after running for about 20 minutes. Perhaps it was a bad merge or some other changes you made? The current Lyra master looks good though. |
That's certainly a possibility; I'm new to git, though I was pretty confident I'd got it right. May I ask what part of the master code should handle the case above? |
From the stacktrace it looks like connection recovery fails, in which case the next ShutdownListener invocation should be picked up and a new recovery should start. |
Closing for now unless this pops up in Lyra master. |
It took a while, but the script was able to generate an exception thrown out of callWithRetries with the 0.4.0 release:
The SSE had: |
Thanks for sharing. Was it the same script you provided previously or were there any changes? Also, how long did it run till the failure occurred? |
Yes, the same script. |
Could
be a catch-all solution if added to
? |
Yes, I think that would be a catch all. Ideally we don't want to retry on certain exceptions that are fatal though, such as authorization failures, NoRouteToHost, etc. In theory, anything that cannot be resolved with some number of retries, we're better off just throwing than retrying. So I try to differentiate. |
Hi Jonathan, why not consider java.net.SocketException as a potential retryable exceptions ? |
@frascuchon I would certainly consider it. What scenario are you seeing this with? Can I reproduce it? |
Hi @jhalterman, I see there is no reconnection when I run a producer in a rabbitmq server restart |
I think this is very similar to the problem we were discussing in email regarding questionable proxy behavior, and trying to think of ways to reproduce it, @jhalterman . I managed to write a unit test this morning that reproduces it reliably. Here's a self-contained Maven project containing a TestNG test... just "mvn test" should be enough to run it. https://dl.dropboxusercontent.com/u/108601349/badproxy.tgz The test contains a simple proxy server (that I borrowed/stole from java2s.com and modified a bit) that initially proxies requests to a real AMQP server, then it is switched to a mode of answering connections, waiting a bit, and then hanging up, then switched back to normal. You'll need to configure the correct hostname, port, username and password. After the test forces a disconnect, Lyra will begin a recovery that will never get anywhere, and it won't retry after the initial connection back to the proxy server completes, even though it will also disconnect. |
@spatula75 I tried your test case and I think the reason the connection never recovers is proxy fails to send the server's response to the client's AMQPConnection which is left waiting forever. This occurs when public void runServer(String host, int remoteport, int localport) throws IOException {
// Create a ServerSocket to listen for connections with
ServerSocket ss = new ServerSocket(localport);
final byte[] request = new byte[1024];
final byte[] reply = new byte[4096];
while (true) {
Socket server = null;
try {
// Wait for a connection on the local port
client = ss.accept();
LOGGER.info("Got a client connection!");
streamFromClient = client.getInputStream();
streamToClient = client.getOutputStream();
// Make a connection to the real server.
// If we cannot connect to the server, send an error to the
// client, disconnect, and continue waiting for connections.
try {
LOGGER.info("Connecting to real server...");
server = new Socket(host, remoteport);
} catch (IOException e) {
PrintWriter out = new PrintWriter(streamToClient);
out.print("Proxy server cannot connect to " + host + ":" + remoteport + ":\n" + e + "\n");
out.flush();
client.close();
continue;
}
// Get server streams.
final InputStream streamFromServer = server.getInputStream();
final OutputStream streamToServer = server.getOutputStream();
// a thread to read the client's requests and pass them
// to the server. A separate thread for asynchronous.
runAsDaemon(new Runnable() {
public void run() {
int bytesRead;
try {
while ((bytesRead = streamFromClient.read(request)) != -1) {
LOGGER.info("Read {} bytes from client", bytesRead);
streamToServer.write(request, 0, bytesRead);
streamToServer.flush();
}
} catch (IOException e) {
LOGGER.error("Exception reading from client", e);
}
// the client closed the connection to us, so close our
// connection to the server.
try {
streamToServer.close();
} catch (IOException e) {
}
}
});
// Read the server's responses
// and pass them back to the client.
Thread t = runAsDaemon(new Runnable() {
public void run() {
int bytesRead;
try {
while ((bytesRead = streamFromServer.read(reply)) != -1) {
LOGGER.info("Read {} bytes from server", bytesRead);
streamToClient.write(reply, 0, bytesRead);
streamToClient.flush();
}
} catch (IOException e) {
LOGGER.error("Exception reading from client", e);
}
// the client closed the connection to us, so close our
// connection to the server.
try {
streamToServer.close();
} catch (IOException e) {
}
}
});
if (!doProxy) {
LOGGER.info("I'm no longer proxying. Waiting and then hanging up.");
Thread.sleep(20000);
LOGGER.info("Goodbye, client!");
client.close();
continue;
}
t.join();
} catch (IOException e) {
LOGGER.error("IOException", e);
} catch (InterruptedException e) {
LOGGER.error("Interrupted", e);
} finally {
try {
if (server != null)
server.close();
if (client != null)
client.close();
} catch (IOException e) {
}
}
}
}
private Thread runAsDaemon(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.start();
return t;
} I understand that reproducing a failure like this in a test can be hard - but if you think there's still an outstanding issue please do see if you can hit it. |
Yeah, that's the point of the test case: when the proxy is no longer proxying (and we disconnect and reconnect), even after it starts proxying again, we never attempt a reconnect again, despite the first reconnect failing. It's a situation we see in the Amazon EC2 world from time to time when it thinks it has nowhere to send an AMQP connection, so the ELB answers, does nothing, and hangs up. Then, rather than reconnecting according to the policy, no further reconnect attempts are made, even if the ELB starts working again. |
This makes sense - I just had to get my head back into what this was all about :) I'm wondering if there's much we can do about this right now. See the stacktrace from a paused recovery thread:
Lyra calls the amqp-client library to create a new connection, which is successfully created internally, but the subsequent attempt to read hangs forever since the proxy is disabled. The connection timeout (which you set to 10 seconds) doesn't come into play here since it only effects the initial TCP connection, not the subsequent read attempt. Aside from this test - any idea why ELB might accept connections but not send any data? That seems to be the killer thing here. If the ELB stops accepting connections - fine, we should be able to recover from that. If it takes connections but doesn't communicate, as this test does, that's a harder problem. |
Yeah, unfortunately what happens is the ELB always accepts a connection, regardless of whether it believes it has any healthy AMQP servers to which it can proxy. Then, if it doesn't believe it has any healthy servers right now, it will just drop the connection again, so you get this connect/disconnect behavior, instead of connecting to AMQP like you normally would, or having a connection-refused situation like you would if AMQP were unavailable. We ran into this when we were doing some failover testing, because ELB will very quickly decide that an AMQP instance is "unhealthy", but it takes comparatively longer for it to decide that it's "healthy" again. So when we were failing from one server to the next, to the next, it was marking them each unhealthy, so by the time we got to the last one in the list, even though the first several were actually back online, the ELB wouldn't send traffic back to them and would just drop the connection instead. |
Ok - Your test seems to capture that behavior well then. I'm guessing this issue could effect any amqp-client connection though, not just recovery attempts. It's probably just that it pops up during recovery since that's when the ELB scenario occurs. Perhaps @michaelklishin could weigh in on this. Michael - basically the problem is that when rabbitmq connections are proxied through ELB, ELB might accept a TCP connection but not respond to the initial handshake which leaves the client hanging forever. See the call stack 2 posts up. The first idea that comes to my mind is that everything that happens inside |
I won't swear to this, but I think when I was playing with this, I did try it with just a bare AMQP connection and saw it time out when the heartbeat interval had passed because the handshake didn't complete, but it has been a little while since I was toying with it. I think in that case, I just started up netcat on 5671 and tried to connect to a connection that didn't do anything, and I believe it did throw an exception as expected. It would be a useful thing to verify though, because my memory is like a sieve. |
@spatula75 Do you know if ELB eventually closes the client connection or how long it takes? |
The behavior varies. The first time, it seems to take about 20 seconds before the connection gets closed. After that, it's more of an immediate connect/close. |
Folks, line numbers in the stack trace don't match master or 3.5.2, AFAICT. If you have a way to reproduce this reasonably quickly, can you please try with 3.5.2 (the issue is likely there, I just want to see an up-to-date trace). Thank you! |
@michaelklishin Sure, here's a stacktrace taken with 3.5.2:
|
Should be fixed in rabbitmq/rabbitmq-java-client#65, will be in |
Awesome! |
@spatula75 Here's some log output running your test against Michael's changes along with a few tweaks in Lyra:
This looks pretty good to me. I'll make a new release of Lyra sometime soon which will work with the changes to come in the amqp-client 3.5.3. You should be able to build and use either in the meantime if you wish. |
Agreed, this looks like exactly the behavior we would expect: more than one attempt to recover, and once the proxy switches back into "actually proxying" mode, recovery succeeds and things return to normal. |
Hi again!
Reducing contention on the Channel reduced the occurrence. |
Happy to report that using amqp-client 3.5.3 (released last week), RudeProxyTest passes. |
I just released Lyra |
Awesome! Any idea how long it normally takes things to propagate to maven central? |
I think it's usually within an hour... |
👍 |
Lyra |
Sigh... I suck. Let's try this again - 0.5.2 - should be in Maven central shortly. |
Hi,
I made a test (not unit, yet) where the connection is broken during recovery.
It resulted in:
No more connection recovery attempts take place. Can you suggest a fix for this case?
More information:
My application has around 10 Consumers on one Channel. Each one declares an exchange and a queue with 5-10 bindings (for various routing keys) each. It runs in a rather old, embedded environment, where it can take a non-trivial amount of time to remake the connection, channel and all its bindings. In my test, the connection goes down again whilst the exchanges, queues and bindings are being remade. This results in some of those bindings, etc failing with e.g. AlreadyClosedExceptions. Then the reconnection fails with the above stack trace, and no more attempts are reconnection are made.
A fuller log is:
Best, Dan.
The text was updated successfully, but these errors were encountered: