From 7f7caef66ea66d2de676ea78cf07de41c1bd9834 Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Tue, 20 Aug 2024 15:29:24 -0400 Subject: [PATCH] Publishing plus immediate flush (#1211) * Publishing plus immediate flush * no casting * aroc naming --- src/main/java/io/nats/client/Connection.java | 4 +- .../io/nats/client/impl/MessageQueue.java | 29 ++-- .../io/nats/client/impl/NatsConnection.java | 27 ++-- .../client/impl/NatsConnectionWriter.java | 129 ++++++++++-------- .../io/nats/client/impl/NatsJetStream.java | 4 +- .../impl/NatsJetStreamPullSubscription.java | 3 +- .../java/io/nats/client/impl/NatsMessage.java | 5 +- .../client/impl/NatsPublishableMessage.java | 4 +- .../io/nats/client/impl/ProtocolMessage.java | 1 - .../nats/client/impl/PushMessageManager.java | 2 +- .../nats/client/impl/MessageManagerTests.java | 2 +- .../io/nats/client/impl/NatsMessageTests.java | 1 - 12 files changed, 109 insertions(+), 102 deletions(-) diff --git a/src/main/java/io/nats/client/Connection.java b/src/main/java/io/nats/client/Connection.java index eb70af021..e839bcb3a 100644 --- a/src/main/java/io/nats/client/Connection.java +++ b/src/main/java/io/nats/client/Connection.java @@ -539,8 +539,8 @@ enum Status { String createInbox(); /** - * Flushes the underlying connection buffer the next chance it gets if the connection is valid. - * @throws IOException not applicable even though it's part of the signature due to implementation change + * Immediately flushes the underlying connection buffer if the connection is valid. + * @throws IOException if the connection flush fails */ void flushBuffer() throws IOException; diff --git a/src/main/java/io/nats/client/impl/MessageQueue.java b/src/main/java/io/nats/client/impl/MessageQueue.java index 4d9c771dc..06691f960 100644 --- a/src/main/java/io/nats/client/impl/MessageQueue.java +++ b/src/main/java/io/nats/client/impl/MessageQueue.java @@ -206,7 +206,7 @@ void poisonTheQueue() { NatsMessage poll(Duration timeout) throws InterruptedException { NatsMessage msg = null; - + if (timeout == null || this.isDraining()) { // try immediately msg = this.queue.poll(); } else { @@ -248,7 +248,7 @@ NatsMessage pop(Duration timeout) throws InterruptedException { return msg; } - + // Waits up to the timeout to try to accumulate multiple messages // Use the next field to read the entire set accumulated. // maxSize and maxMessages are both checked and if either is exceeded @@ -259,8 +259,8 @@ NatsMessage pop(Duration timeout) throws InterruptedException { // Only works in single reader mode, because we want to maintain order. // accumulate reads off the concurrent queue one at a time, so if multiple // readers are present, you could get out of order message delivery. - NatsMessage accumulate(long maxSize, long maxMessages, Duration timeout) - throws InterruptedException { + NatsMessage accumulate(long maxBytesToAccumulate, long maxMessagesToAccumulate, Duration timeout) + throws InterruptedException { if (!this.singleReaderMode) { throw new IllegalStateException("Accumulate is only supported in single reader mode."); @@ -278,7 +278,7 @@ NatsMessage accumulate(long maxSize, long maxMessages, Duration timeout) long size = msg.getSizeInBytes(); - if (maxMessages <= 1 || size >= maxSize) { + if (maxMessagesToAccumulate <= 1 || size >= maxBytesToAccumulate) { this.sizeInBytes.addAndGet(-size); this.length.decrementAndGet(); return msg; @@ -287,21 +287,24 @@ NatsMessage accumulate(long maxSize, long maxMessages, Duration timeout) long count = 1; NatsMessage cursor = msg; - while (cursor != null) { + while (true) { NatsMessage next = this.queue.peek(); if (next != null && !isPoison(next)) { long s = next.getSizeInBytes(); - - if (maxSize<0 || (size + s) < maxSize) { // keep going + if (maxBytesToAccumulate < 0 || (size + s) < maxBytesToAccumulate) { // keep going size += s; count++; - - cursor.next = this.queue.poll(); - cursor = cursor.next; - if (count == maxMessages) { + this.queue.poll(); // we need to get the message out of the queue b/c we only peeked + cursor.next = next; + if (next.flushImmediatelyAfterPublish) { + // if we are going to flush, then don't accumulate more break; } + if (count == maxMessagesToAccumulate) { + break; + } + cursor = cursor.next; } else { // One more is too far break; } @@ -348,7 +351,7 @@ void filter(Predicate p) { cursor = this.queue.poll(); } this.queue.addAll(newQueue); - } finally { + } finally { editLock.unlock(); } } diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index d2bea08cb..af8de92a1 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -935,7 +935,7 @@ void cleanUpPongQueue() { */ @Override public void publish(String subject, byte[] body) { - publishInternal(subject, null, null, body, true); + publishInternal(subject, null, null, body, true, false); } /** @@ -943,7 +943,7 @@ public void publish(String subject, byte[] body) { */ @Override public void publish(String subject, Headers headers, byte[] body) { - publishInternal(subject, null, headers, body, true); + publishInternal(subject, null, headers, body, true, false); } /** @@ -951,7 +951,7 @@ public void publish(String subject, Headers headers, byte[] body) { */ @Override public void publish(String subject, String replyTo, byte[] body) { - publishInternal(subject, replyTo, null, body, true); + publishInternal(subject, replyTo, null, body, true, false); } /** @@ -959,7 +959,7 @@ public void publish(String subject, String replyTo, byte[] body) { */ @Override public void publish(String subject, String replyTo, Headers headers, byte[] body) { - publishInternal(subject, replyTo, headers, body, true); + publishInternal(subject, replyTo, headers, body, true, false); } /** @@ -968,12 +968,12 @@ public void publish(String subject, String replyTo, Headers headers, byte[] body @Override public void publish(Message message) { validateNotNull(message, "Message"); - publishInternal(message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData(), false); + publishInternal(message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData(), false, false); } - void publishInternal(String subject, String replyTo, Headers headers, byte[] data, boolean validateSubjectAndReplyTo) { + void publishInternal(String subject, String replyTo, Headers headers, byte[] data, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) { checkPayloadSize(data); - NatsPublishableMessage npm = new NatsPublishableMessage(subject, replyTo, headers, data, validateSubjectAndReplyTo); + NatsPublishableMessage npm = new NatsPublishableMessage(subject, replyTo, headers, data, validateSubjectAndReplyTo, flushImmediatelyAfterPublish); if (npm.hasHeaders && !serverInfo.get().isHeadersSupported()) { throw new IllegalArgumentException("Headers are not supported by the server, version: " + serverInfo.get().getVersion()); } @@ -1108,7 +1108,7 @@ void sendSubscriptionMessage(String sid, String subject, String queueName, boole } bab.append(SP).append(sid); - NatsMessage subMsg = new ProtocolMessage(bab); + ProtocolMessage subMsg = new ProtocolMessage(bab); if (treatAsInternal) { queueInternalOutgoing(subMsg); @@ -1323,7 +1323,7 @@ CompletableFuture requestFutureInternal(String subject, Headers headers responsesAwaiting.put(sub.getSID(), future); } - publishInternal(subject, responseInbox, headers, data, validateSubjectAndReplyTo); + publishInternal(subject, responseInbox, headers, data, validateSubjectAndReplyTo, true); statistics.incrementRequestsSent(); return future; @@ -2256,15 +2256,6 @@ public void flushBuffer() throws IOException { writer.flushBuffer(); } - void lenientFlushBuffer() { - try { - writer.flushBuffer(); - } - catch (Exception e) { - // ignore - } - } - /** * {@inheritDoc} */ diff --git a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java index 79e92b2ff..554fb6730 100644 --- a/src/main/java/io/nats/client/impl/NatsConnectionWriter.java +++ b/src/main/java/io/nats/client/impl/NatsConnectionWriter.java @@ -37,6 +37,7 @@ class NatsConnectionWriter implements Runnable { private final NatsConnection connection; + private final ReentrantLock writerLock; private Future stopped; private Future dataPortFuture; private DataPort dataPort; @@ -50,10 +51,10 @@ class NatsConnectionWriter implements Runnable { private final MessageQueue outgoing; private final MessageQueue reconnectOutgoing; private final long reconnectBufferSize; - private final AtomicBoolean flushBuffer; NatsConnectionWriter(NatsConnection connection, NatsConnectionWriter sourceWriter) { this.connection = connection; + writerLock = new ReentrantLock(); this.running = new AtomicBoolean(false); this.reconnectMode = new AtomicBoolean(sourceWriter != null); @@ -76,8 +77,6 @@ class NatsConnectionWriter implements Runnable { reconnectOutgoing = new MessageQueue(true, options.getRequestCleanupInterval(), sourceWriter == null ? null : sourceWriter.reconnectOutgoing); reconnectBufferSize = options.getReconnectBufferSize(); - - flushBuffer = new AtomicBoolean(false); } // Should only be called if the current thread has exited. @@ -123,86 +122,88 @@ boolean isRunning() { } void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector stats) throws IOException { - int sendPosition = 0; - int sbl = sendBufferLength.get(); - - while (msg != null) { - long size = msg.getSizeInBytes(); - - if (sendPosition + size > sbl) { - if (sendPosition > 0) { - dataPort.write(sendBuffer, sendPosition); - connection.getNatsStatistics().registerWrite(sendPosition); - sendPosition = 0; - } - if (size > sbl) { // have to resize b/c can't fit 1 message - sbl = bufferAllocSize((int) size, BUFFER_BLOCK_SIZE); - sendBufferLength.set(sbl); - sendBuffer = new byte[sbl]; + writerLock.lock(); + try { + int sendPosition = 0; + int sbl = sendBufferLength.get(); + + while (msg != null) { + long size = msg.getSizeInBytes(); + + if (sendPosition + size > sbl) { + if (sendPosition > 0) { + dataPort.write(sendBuffer, sendPosition); + connection.getNatsStatistics().registerWrite(sendPosition); + sendPosition = 0; + } + if (size > sbl) { // have to resize b/c can't fit 1 message + sbl = bufferAllocSize((int) size, BUFFER_BLOCK_SIZE); + sendBufferLength.set(sbl); + sendBuffer = new byte[sbl]; + } } - } - ByteArrayBuilder bab = msg.getProtocolBab(); - int babLen = bab.length(); - System.arraycopy(bab.internalArray(), 0, sendBuffer, sendPosition, babLen); - sendPosition += babLen; + ByteArrayBuilder bab = msg.getProtocolBab(); + int babLen = bab.length(); + System.arraycopy(bab.internalArray(), 0, sendBuffer, sendPosition, babLen); + sendPosition += babLen; + + sendBuffer[sendPosition++] = CR; + sendBuffer[sendPosition++] = LF; - sendBuffer[sendPosition++] = CR; - sendBuffer[sendPosition++] = LF; + if (!msg.isProtocol()) { + sendPosition += msg.copyNotEmptyHeaders(sendPosition, sendBuffer); - if (!msg.isProtocol()) { - sendPosition += msg.copyNotEmptyHeaders(sendPosition, sendBuffer); + byte[] bytes = msg.getData(); // guaranteed to not be null + if (bytes.length > 0) { + System.arraycopy(bytes, 0, sendBuffer, sendPosition, bytes.length); + sendPosition += bytes.length; + } - byte[] bytes = msg.getData(); // guaranteed to not be null - if (bytes.length > 0) { - System.arraycopy(bytes, 0, sendBuffer, sendPosition, bytes.length); - sendPosition += bytes.length; + sendBuffer[sendPosition++] = CR; + sendBuffer[sendPosition++] = LF; } - sendBuffer[sendPosition++] = CR; - sendBuffer[sendPosition++] = LF; - } + stats.incrementOutMsgs(); + stats.incrementOutBytes(size); - stats.incrementOutMsgs(); - stats.incrementOutBytes(size); + if (msg.flushImmediatelyAfterPublish) { + dataPort.flush(); + } + msg = msg.next; + } - msg = msg.next; + // no need to write if there are no bytes + if (sendPosition > 0) { + dataPort.write(sendBuffer, sendPosition); + connection.getNatsStatistics().registerWrite(sendPosition); + } } - - // no need to write if there are no bytes - if (sendPosition > 0) { - dataPort.write(sendBuffer, sendPosition); + finally { + writerLock.unlock(); } - - connection.getNatsStatistics().registerWrite(sendPosition); } @Override public void run() { - Duration waitForMessage = Duration.ofMinutes(2); // This can be long since no one is sending - Duration reconnectWait = Duration.ofMillis(1); // This should be short, since we are trying to get the reconnect through + Duration outgoingTimeout = Duration.ofMinutes(2); // This can be long since no one is sending + Duration reconnectTimeout = Duration.ofMillis(1); // This should be short, since we are trying to get the reconnect through try { dataPort = this.dataPortFuture.get(); // Will wait for the future to complete StatisticsCollector stats = this.connection.getNatsStatistics(); - int maxAccumulate = Options.MAX_MESSAGES_IN_NETWORK_BUFFER; while (this.running.get()) { - NatsMessage msg = null; - + NatsMessage msg; if (this.reconnectMode.get()) { - msg = this.reconnectOutgoing.accumulate(sendBufferLength.get(), maxAccumulate, reconnectWait); - } else { - msg = this.outgoing.accumulate(sendBufferLength.get(), maxAccumulate, waitForMessage); + msg = this.reconnectOutgoing.accumulate(sendBufferLength.get(), Options.MAX_MESSAGES_IN_NETWORK_BUFFER, reconnectTimeout); + } + else { + msg = this.outgoing.accumulate(sendBufferLength.get(), Options.MAX_MESSAGES_IN_NETWORK_BUFFER, outgoingTimeout); } - if (msg != null) { sendMessageBatch(msg, dataPort, stats); } - - if (flushBuffer.getAndSet(false)) { - dataPort.flush(); - } } } catch (IOException | BufferOverflowException io) { // if already not running, an IOE is not unreasonable in a transition state @@ -241,8 +242,18 @@ void queueInternalMessage(NatsMessage msg) { } void flushBuffer() { - if (running.get()) { - flushBuffer.set(true); + // Since there is no connection level locking, we rely on synchronization + // of the APIs here. + writerLock.lock(); + try { + if (this.running.get()) { + dataPort.flush(); + } + } catch (Exception e) { + // NOOP; + } + finally { + writerLock.unlock(); } } } diff --git a/src/main/java/io/nats/client/impl/NatsJetStream.java b/src/main/java/io/nats/client/impl/NatsJetStream.java index 988a778d7..27ed3ad22 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStream.java +++ b/src/main/java/io/nats/client/impl/NatsJetStream.java @@ -146,7 +146,7 @@ private PublishAck publishSyncInternal(String subject, Headers headers, byte[] d Headers merged = mergePublishOptions(headers, options); if (jso.isPublishNoAck()) { - conn.publishInternal(subject, null, merged, data, validateSubjectAndReplyTo); + conn.publishInternal(subject, null, merged, data, validateSubjectAndReplyTo, false); return null; } @@ -160,7 +160,7 @@ private CompletableFuture publishAsyncInternal(String subject, Heade Headers merged = mergePublishOptions(headers, options); if (jso.isPublishNoAck()) { - conn.publishInternal(subject, null, merged, data, validateSubjectAndReplyTo); + conn.publishInternal(subject, null, merged, data, validateSubjectAndReplyTo, false); return null; } diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamPullSubscription.java b/src/main/java/io/nats/client/impl/NatsJetStreamPullSubscription.java index f122543fb..a661af61b 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamPullSubscription.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamPullSubscription.java @@ -62,8 +62,7 @@ protected String _pull(PullRequestOptions pullRequestOptions, boolean raiseStatu String publishSubject = js.prependPrefix(String.format(JSAPI_CONSUMER_MSG_NEXT, stream, consumerName)); String pullSubject = getSubject().replace("*", Long.toString(this.pullSubjectIdHolder.incrementAndGet())); manager.startPullRequest(pullSubject, pullRequestOptions, raiseStatusWarnings, pullManagerObserver); - connection.publish(publishSubject, pullSubject, pullRequestOptions.serialize()); - connection.lenientFlushBuffer(); + connection.publishInternal(publishSubject, pullSubject, null, pullRequestOptions.serialize(), true, true); return pullSubject; } diff --git a/src/main/java/io/nats/client/impl/NatsMessage.java b/src/main/java/io/nats/client/impl/NatsMessage.java index aea418637..e25f03434 100644 --- a/src/main/java/io/nats/client/impl/NatsMessage.java +++ b/src/main/java/io/nats/client/impl/NatsMessage.java @@ -53,8 +53,11 @@ public class NatsMessage implements Message { protected NatsSubscription subscription; - NatsMessage next; // for linked list + // for accumulate + protected NatsMessage next; + protected boolean flushImmediatelyAfterPublish; + // ack tracking protected AckType lastAck; // ---------------------------------------------------------------------------------------------------- diff --git a/src/main/java/io/nats/client/impl/NatsPublishableMessage.java b/src/main/java/io/nats/client/impl/NatsPublishableMessage.java index 8aefe29d3..adebe3229 100644 --- a/src/main/java/io/nats/client/impl/NatsPublishableMessage.java +++ b/src/main/java/io/nats/client/impl/NatsPublishableMessage.java @@ -21,10 +21,12 @@ class NatsPublishableMessage extends NatsMessage { public NatsPublishableMessage(boolean hasHeaders) { this.hasHeaders = hasHeaders; + flushImmediatelyAfterPublish = false; } - public NatsPublishableMessage(String subject, String replyTo, Headers headers, byte[] data, boolean validateSubjectAndReplyTo) { + public NatsPublishableMessage(String subject, String replyTo, Headers headers, byte[] data, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) { super(data); + this.flushImmediatelyAfterPublish = flushImmediatelyAfterPublish; if (validateSubjectAndReplyTo) { this.subject = validateSubject(subject, true); this.replyTo = validateReplyTo(replyTo, false); diff --git a/src/main/java/io/nats/client/impl/ProtocolMessage.java b/src/main/java/io/nats/client/impl/ProtocolMessage.java index b4fa91ddd..537f01a75 100644 --- a/src/main/java/io/nats/client/impl/ProtocolMessage.java +++ b/src/main/java/io/nats/client/impl/ProtocolMessage.java @@ -19,7 +19,6 @@ // Protocol message is a special version of a NatsPublishableMessage extends NatsMessage // ---------------------------------------------------------------------------------------------------- class ProtocolMessage extends NatsPublishableMessage { - private static final ByteArrayBuilder EMPTY_BAB = new ByteArrayBuilder(); ProtocolMessage(ByteArrayBuilder babProtocol) { super(false); diff --git a/src/main/java/io/nats/client/impl/PushMessageManager.java b/src/main/java/io/nats/client/impl/PushMessageManager.java index 83cde9db2..978081fa7 100644 --- a/src/main/java/io/nats/client/impl/PushMessageManager.java +++ b/src/main/java/io/nats/client/impl/PushMessageManager.java @@ -124,7 +124,7 @@ private void processFlowControl(String fcSubject, FlowControlSource source) { // we may get multiple fc/hb messages with the same reply // only need to post to that subject once if (fcSubject != null && !fcSubject.equals(lastFcSubject)) { - conn.publishInternal(fcSubject, null, null, null, false); + conn.publishInternal(fcSubject, null, null, null, false, false); lastFcSubject = fcSubject; // set after publish in case the pub fails conn.executeCallback((c, el) -> el.flowControlProcessed(c, sub, fcSubject, source)); } diff --git a/src/test/java/io/nats/client/impl/MessageManagerTests.java b/src/test/java/io/nats/client/impl/MessageManagerTests.java index 504fb033b..4a27f3ded 100644 --- a/src/test/java/io/nats/client/impl/MessageManagerTests.java +++ b/src/test/java/io/nats/client/impl/MessageManagerTests.java @@ -582,7 +582,7 @@ public MockPublishInternal(Options options) { } @Override - void publishInternal(String subject, String replyTo, Headers headers, byte[] data, boolean validate) { + void publishInternal(String subject, String replyTo, Headers headers, byte[] data, boolean validate, boolean flushImmediatelyAfterPublish) { fcSubject = subject; ++pubCount; } diff --git a/src/test/java/io/nats/client/impl/NatsMessageTests.java b/src/test/java/io/nats/client/impl/NatsMessageTests.java index 4422d379f..9a7f5b4c6 100644 --- a/src/test/java/io/nats/client/impl/NatsMessageTests.java +++ b/src/test/java/io/nats/client/impl/NatsMessageTests.java @@ -262,7 +262,6 @@ public void miscCoverage() { nmCov.calculate(); assertTrue(nmCov.toDetailString().contains("PUB sub reply 0")); - assertTrue(nmCov.toDetailString().contains("next=No")); } @Test