Skip to content

Commit

Permalink
Publishing plus immediate flush (#1211)
Browse files Browse the repository at this point in the history
* Publishing plus immediate flush

* no casting

* aroc naming
  • Loading branch information
scottf authored Aug 20, 2024
1 parent 8806fbf commit 7f7caef
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 102 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,8 @@ enum Status {
String createInbox();

/**
* Flushes the underlying connection buffer the next chance it gets if the connection is valid.
* @throws IOException not applicable even though it's part of the signature due to implementation change
* Immediately flushes the underlying connection buffer if the connection is valid.
* @throws IOException if the connection flush fails
*/
void flushBuffer() throws IOException;

Expand Down
29 changes: 16 additions & 13 deletions src/main/java/io/nats/client/impl/MessageQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ void poisonTheQueue() {

NatsMessage poll(Duration timeout) throws InterruptedException {
NatsMessage msg = null;

if (timeout == null || this.isDraining()) { // try immediately
msg = this.queue.poll();
} else {
Expand Down Expand Up @@ -248,7 +248,7 @@ NatsMessage pop(Duration timeout) throws InterruptedException {

return msg;
}

// Waits up to the timeout to try to accumulate multiple messages
// Use the next field to read the entire set accumulated.
// maxSize and maxMessages are both checked and if either is exceeded
Expand All @@ -259,8 +259,8 @@ NatsMessage pop(Duration timeout) throws InterruptedException {
// Only works in single reader mode, because we want to maintain order.
// accumulate reads off the concurrent queue one at a time, so if multiple
// readers are present, you could get out of order message delivery.
NatsMessage accumulate(long maxSize, long maxMessages, Duration timeout)
throws InterruptedException {
NatsMessage accumulate(long maxBytesToAccumulate, long maxMessagesToAccumulate, Duration timeout)
throws InterruptedException {

if (!this.singleReaderMode) {
throw new IllegalStateException("Accumulate is only supported in single reader mode.");
Expand All @@ -278,7 +278,7 @@ NatsMessage accumulate(long maxSize, long maxMessages, Duration timeout)

long size = msg.getSizeInBytes();

if (maxMessages <= 1 || size >= maxSize) {
if (maxMessagesToAccumulate <= 1 || size >= maxBytesToAccumulate) {
this.sizeInBytes.addAndGet(-size);
this.length.decrementAndGet();
return msg;
Expand All @@ -287,21 +287,24 @@ NatsMessage accumulate(long maxSize, long maxMessages, Duration timeout)
long count = 1;
NatsMessage cursor = msg;

while (cursor != null) {
while (true) {
NatsMessage next = this.queue.peek();
if (next != null && !isPoison(next)) {
long s = next.getSizeInBytes();

if (maxSize<0 || (size + s) < maxSize) { // keep going
if (maxBytesToAccumulate < 0 || (size + s) < maxBytesToAccumulate) { // keep going
size += s;
count++;

cursor.next = this.queue.poll();
cursor = cursor.next;

if (count == maxMessages) {
this.queue.poll(); // we need to get the message out of the queue b/c we only peeked
cursor.next = next;
if (next.flushImmediatelyAfterPublish) {
// if we are going to flush, then don't accumulate more
break;
}
if (count == maxMessagesToAccumulate) {
break;
}
cursor = cursor.next;
} else { // One more is too far
break;
}
Expand Down Expand Up @@ -348,7 +351,7 @@ void filter(Predicate<NatsMessage> p) {
cursor = this.queue.poll();
}
this.queue.addAll(newQueue);
} finally {
} finally {
editLock.unlock();
}
}
Expand Down
27 changes: 9 additions & 18 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -935,31 +935,31 @@ void cleanUpPongQueue() {
*/
@Override
public void publish(String subject, byte[] body) {
publishInternal(subject, null, null, body, true);
publishInternal(subject, null, null, body, true, false);
}

/**
* {@inheritDoc}
*/
@Override
public void publish(String subject, Headers headers, byte[] body) {
publishInternal(subject, null, headers, body, true);
publishInternal(subject, null, headers, body, true, false);
}

/**
* {@inheritDoc}
*/
@Override
public void publish(String subject, String replyTo, byte[] body) {
publishInternal(subject, replyTo, null, body, true);
publishInternal(subject, replyTo, null, body, true, false);
}

/**
* {@inheritDoc}
*/
@Override
public void publish(String subject, String replyTo, Headers headers, byte[] body) {
publishInternal(subject, replyTo, headers, body, true);
publishInternal(subject, replyTo, headers, body, true, false);
}

/**
Expand All @@ -968,12 +968,12 @@ public void publish(String subject, String replyTo, Headers headers, byte[] body
@Override
public void publish(Message message) {
validateNotNull(message, "Message");
publishInternal(message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData(), false);
publishInternal(message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData(), false, false);
}

void publishInternal(String subject, String replyTo, Headers headers, byte[] data, boolean validateSubjectAndReplyTo) {
void publishInternal(String subject, String replyTo, Headers headers, byte[] data, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) {
checkPayloadSize(data);
NatsPublishableMessage npm = new NatsPublishableMessage(subject, replyTo, headers, data, validateSubjectAndReplyTo);
NatsPublishableMessage npm = new NatsPublishableMessage(subject, replyTo, headers, data, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
if (npm.hasHeaders && !serverInfo.get().isHeadersSupported()) {
throw new IllegalArgumentException("Headers are not supported by the server, version: " + serverInfo.get().getVersion());
}
Expand Down Expand Up @@ -1108,7 +1108,7 @@ void sendSubscriptionMessage(String sid, String subject, String queueName, boole
}
bab.append(SP).append(sid);

NatsMessage subMsg = new ProtocolMessage(bab);
ProtocolMessage subMsg = new ProtocolMessage(bab);

if (treatAsInternal) {
queueInternalOutgoing(subMsg);
Expand Down Expand Up @@ -1323,7 +1323,7 @@ CompletableFuture<Message> requestFutureInternal(String subject, Headers headers
responsesAwaiting.put(sub.getSID(), future);
}

publishInternal(subject, responseInbox, headers, data, validateSubjectAndReplyTo);
publishInternal(subject, responseInbox, headers, data, validateSubjectAndReplyTo, true);
statistics.incrementRequestsSent();

return future;
Expand Down Expand Up @@ -2256,15 +2256,6 @@ public void flushBuffer() throws IOException {
writer.flushBuffer();
}

void lenientFlushBuffer() {
try {
writer.flushBuffer();
}
catch (Exception e) {
// ignore
}
}

/**
* {@inheritDoc}
*/
Expand Down
129 changes: 70 additions & 59 deletions src/main/java/io/nats/client/impl/NatsConnectionWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class NatsConnectionWriter implements Runnable {

private final NatsConnection connection;

private final ReentrantLock writerLock;
private Future<Boolean> stopped;
private Future<DataPort> dataPortFuture;
private DataPort dataPort;
Expand All @@ -50,10 +51,10 @@ class NatsConnectionWriter implements Runnable {
private final MessageQueue outgoing;
private final MessageQueue reconnectOutgoing;
private final long reconnectBufferSize;
private final AtomicBoolean flushBuffer;

NatsConnectionWriter(NatsConnection connection, NatsConnectionWriter sourceWriter) {
this.connection = connection;
writerLock = new ReentrantLock();

this.running = new AtomicBoolean(false);
this.reconnectMode = new AtomicBoolean(sourceWriter != null);
Expand All @@ -76,8 +77,6 @@ class NatsConnectionWriter implements Runnable {
reconnectOutgoing = new MessageQueue(true, options.getRequestCleanupInterval(),
sourceWriter == null ? null : sourceWriter.reconnectOutgoing);
reconnectBufferSize = options.getReconnectBufferSize();

flushBuffer = new AtomicBoolean(false);
}

// Should only be called if the current thread has exited.
Expand Down Expand Up @@ -123,86 +122,88 @@ boolean isRunning() {
}

void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector stats) throws IOException {
int sendPosition = 0;
int sbl = sendBufferLength.get();

while (msg != null) {
long size = msg.getSizeInBytes();

if (sendPosition + size > sbl) {
if (sendPosition > 0) {
dataPort.write(sendBuffer, sendPosition);
connection.getNatsStatistics().registerWrite(sendPosition);
sendPosition = 0;
}
if (size > sbl) { // have to resize b/c can't fit 1 message
sbl = bufferAllocSize((int) size, BUFFER_BLOCK_SIZE);
sendBufferLength.set(sbl);
sendBuffer = new byte[sbl];
writerLock.lock();
try {
int sendPosition = 0;
int sbl = sendBufferLength.get();

while (msg != null) {
long size = msg.getSizeInBytes();

if (sendPosition + size > sbl) {
if (sendPosition > 0) {
dataPort.write(sendBuffer, sendPosition);
connection.getNatsStatistics().registerWrite(sendPosition);
sendPosition = 0;
}
if (size > sbl) { // have to resize b/c can't fit 1 message
sbl = bufferAllocSize((int) size, BUFFER_BLOCK_SIZE);
sendBufferLength.set(sbl);
sendBuffer = new byte[sbl];
}
}
}

ByteArrayBuilder bab = msg.getProtocolBab();
int babLen = bab.length();
System.arraycopy(bab.internalArray(), 0, sendBuffer, sendPosition, babLen);
sendPosition += babLen;
ByteArrayBuilder bab = msg.getProtocolBab();
int babLen = bab.length();
System.arraycopy(bab.internalArray(), 0, sendBuffer, sendPosition, babLen);
sendPosition += babLen;

sendBuffer[sendPosition++] = CR;
sendBuffer[sendPosition++] = LF;

sendBuffer[sendPosition++] = CR;
sendBuffer[sendPosition++] = LF;
if (!msg.isProtocol()) {
sendPosition += msg.copyNotEmptyHeaders(sendPosition, sendBuffer);

if (!msg.isProtocol()) {
sendPosition += msg.copyNotEmptyHeaders(sendPosition, sendBuffer);
byte[] bytes = msg.getData(); // guaranteed to not be null
if (bytes.length > 0) {
System.arraycopy(bytes, 0, sendBuffer, sendPosition, bytes.length);
sendPosition += bytes.length;
}

byte[] bytes = msg.getData(); // guaranteed to not be null
if (bytes.length > 0) {
System.arraycopy(bytes, 0, sendBuffer, sendPosition, bytes.length);
sendPosition += bytes.length;
sendBuffer[sendPosition++] = CR;
sendBuffer[sendPosition++] = LF;
}

sendBuffer[sendPosition++] = CR;
sendBuffer[sendPosition++] = LF;
}
stats.incrementOutMsgs();
stats.incrementOutBytes(size);

stats.incrementOutMsgs();
stats.incrementOutBytes(size);
if (msg.flushImmediatelyAfterPublish) {
dataPort.flush();
}
msg = msg.next;
}

msg = msg.next;
// no need to write if there are no bytes
if (sendPosition > 0) {
dataPort.write(sendBuffer, sendPosition);
connection.getNatsStatistics().registerWrite(sendPosition);
}
}

// no need to write if there are no bytes
if (sendPosition > 0) {
dataPort.write(sendBuffer, sendPosition);
finally {
writerLock.unlock();
}

connection.getNatsStatistics().registerWrite(sendPosition);
}

@Override
public void run() {
Duration waitForMessage = Duration.ofMinutes(2); // This can be long since no one is sending
Duration reconnectWait = Duration.ofMillis(1); // This should be short, since we are trying to get the reconnect through
Duration outgoingTimeout = Duration.ofMinutes(2); // This can be long since no one is sending
Duration reconnectTimeout = Duration.ofMillis(1); // This should be short, since we are trying to get the reconnect through

try {
dataPort = this.dataPortFuture.get(); // Will wait for the future to complete
StatisticsCollector stats = this.connection.getNatsStatistics();
int maxAccumulate = Options.MAX_MESSAGES_IN_NETWORK_BUFFER;

while (this.running.get()) {
NatsMessage msg = null;

NatsMessage msg;
if (this.reconnectMode.get()) {
msg = this.reconnectOutgoing.accumulate(sendBufferLength.get(), maxAccumulate, reconnectWait);
} else {
msg = this.outgoing.accumulate(sendBufferLength.get(), maxAccumulate, waitForMessage);
msg = this.reconnectOutgoing.accumulate(sendBufferLength.get(), Options.MAX_MESSAGES_IN_NETWORK_BUFFER, reconnectTimeout);
}
else {
msg = this.outgoing.accumulate(sendBufferLength.get(), Options.MAX_MESSAGES_IN_NETWORK_BUFFER, outgoingTimeout);
}

if (msg != null) {
sendMessageBatch(msg, dataPort, stats);
}

if (flushBuffer.getAndSet(false)) {
dataPort.flush();
}
}
} catch (IOException | BufferOverflowException io) {
// if already not running, an IOE is not unreasonable in a transition state
Expand Down Expand Up @@ -241,8 +242,18 @@ void queueInternalMessage(NatsMessage msg) {
}

void flushBuffer() {
if (running.get()) {
flushBuffer.set(true);
// Since there is no connection level locking, we rely on synchronization
// of the APIs here.
writerLock.lock();
try {
if (this.running.get()) {
dataPort.flush();
}
} catch (Exception e) {
// NOOP;
}
finally {
writerLock.unlock();
}
}
}
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private PublishAck publishSyncInternal(String subject, Headers headers, byte[] d
Headers merged = mergePublishOptions(headers, options);

if (jso.isPublishNoAck()) {
conn.publishInternal(subject, null, merged, data, validateSubjectAndReplyTo);
conn.publishInternal(subject, null, merged, data, validateSubjectAndReplyTo, false);
return null;
}

Expand All @@ -160,7 +160,7 @@ private CompletableFuture<PublishAck> publishAsyncInternal(String subject, Heade
Headers merged = mergePublishOptions(headers, options);

if (jso.isPublishNoAck()) {
conn.publishInternal(subject, null, merged, data, validateSubjectAndReplyTo);
conn.publishInternal(subject, null, merged, data, validateSubjectAndReplyTo, false);
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ protected String _pull(PullRequestOptions pullRequestOptions, boolean raiseStatu
String publishSubject = js.prependPrefix(String.format(JSAPI_CONSUMER_MSG_NEXT, stream, consumerName));
String pullSubject = getSubject().replace("*", Long.toString(this.pullSubjectIdHolder.incrementAndGet()));
manager.startPullRequest(pullSubject, pullRequestOptions, raiseStatusWarnings, pullManagerObserver);
connection.publish(publishSubject, pullSubject, pullRequestOptions.serialize());
connection.lenientFlushBuffer();
connection.publishInternal(publishSubject, pullSubject, null, pullRequestOptions.serialize(), true, true);
return pullSubject;
}

Expand Down
Loading

0 comments on commit 7f7caef

Please sign in to comment.