Skip to content

Commit

Permalink
#694 Additional exception handling in hermes-frontend (#696)
Browse files Browse the repository at this point in the history
* #694 Handling race condition between ack and timeout tasks. Catching few more exceptions. Monitoring backup-storage size
  • Loading branch information
druminski authored Jan 23, 2017
1 parent a7d08f5 commit 8d1feeb
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public enum Configs {
MESSAGES_LOCAL_STORAGE_MAX_RESEND_RETRIES("frontend.messages.local.storage.max.resend.retries", 5),
MESSAGES_LOADING_PAUSE_BETWEEN_RESENDS("frontend.messages.loading.pause.between.resend", 30),
MESSAGES_LOADING_WAIT_FOR_BROKER_TOPIC_INFO("frontend.messages.loading.wait.for.broker.topic.info", 5),
MESSAGES_LOCAL_STORAGE_SIZE_REPORTING_ENABLED("frontend.messages.local.storage.size.reporting.enabled", true),

CONSUMER_RECEIVER_POOL_TIMEOUT("consumer.receiver.pool.timeout", 100),
CONSUMER_RECEIVER_READ_QUEUE_CAPACITY("consumer.receiver.read.queue.capacity", 1000),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ public class Gauges {

THREADS = "threads",
INFLIGHT_REQUESTS = "inflight-requests",
OUTPUT_RATE = "output-rate." + GROUP + "." + TOPIC + "." + SUBSCRIPTION;
OUTPUT_RATE = "output-rate." + GROUP + "." + TOPIC + "." + SUBSCRIPTION,
BACKUP_STORAGE_SIZE = "backup-storage.size";
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public void registerConsumersThreadGauge(Gauge<Integer> gauge) {
metricRegistry.register(metricRegistryName(Gauges.THREADS), gauge);
}

public void registerMessageRepositorySizeGauge(Gauge<Integer> gauge) {
metricRegistry.register(metricRegistryName(Gauges.BACKUP_STORAGE_SIZE), gauge);
}

public <T> void registerOutputRateGauge(TopicName topicName, String name, Gauge<T> gauge) {
metricRegistry.register(metricRegistryName(Gauges.OUTPUT_RATE, topicName, name), gauge);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.frontend.buffer.BackupMessage;
import pl.allegro.tech.hermes.frontend.buffer.MessageRepository;
import pl.allegro.tech.hermes.frontend.publishing.message.Message;
Expand All @@ -20,6 +21,11 @@ public class ChronicleMapMessageRepository implements MessageRepository {

private ChronicleMap<String, ChronicleMapEntryValue> map;

public ChronicleMapMessageRepository(File file, HermesMetrics hermesMetrics) {
this(file);
hermesMetrics.registerMessageRepositorySizeGauge(() -> map.size());
}

public ChronicleMapMessageRepository(File file) {
try {
logger.info("Creating backup storage in path: {}", file.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.hook.HooksHandler;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.frontend.buffer.BackupFilesManager;
import pl.allegro.tech.hermes.frontend.buffer.BackupMessagesLoader;
import pl.allegro.tech.hermes.frontend.buffer.BrokerListener;
Expand All @@ -19,6 +20,7 @@
import static java.util.stream.Collectors.joining;
import static pl.allegro.tech.hermes.common.config.Configs.MESSAGES_LOCAL_STORAGE_DIRECTORY;
import static pl.allegro.tech.hermes.common.config.Configs.MESSAGES_LOCAL_STORAGE_ENABLED;
import static pl.allegro.tech.hermes.common.config.Configs.MESSAGES_LOCAL_STORAGE_SIZE_REPORTING_ENABLED;

public class PersistentBufferExtension {

Expand All @@ -33,18 +35,21 @@ public class PersistentBufferExtension {
private final HooksHandler hooksHandler;

private final BackupMessagesLoader backupMessagesLoader;
private final HermesMetrics hermesMetrics;

@Inject
public PersistentBufferExtension(ConfigFactory configFactory,
Clock clock,
BrokerListeners listeners,
HooksHandler hooksHandler,
BackupMessagesLoader backupMessagesLoader) {
BackupMessagesLoader backupMessagesLoader,
HermesMetrics hermesMetrics) {
this.config = configFactory;
this.clock = clock;
this.listeners = listeners;
this.hooksHandler = hooksHandler;
this.backupMessagesLoader = backupMessagesLoader;
this.hermesMetrics = hermesMetrics;
}

public void extend() {
Expand All @@ -66,7 +71,9 @@ public void extend() {
}

if (config.getBooleanProperty(MESSAGES_LOCAL_STORAGE_ENABLED)) {
MessageRepository repository = new ChronicleMapMessageRepository(backupFilesManager.getCurrentBackupFile());
MessageRepository repository = config.getBooleanProperty(MESSAGES_LOCAL_STORAGE_SIZE_REPORTING_ENABLED) ?
new ChronicleMapMessageRepository(backupFilesManager.getCurrentBackupFile(), hermesMetrics) :
new ChronicleMapMessageRepository(backupFilesManager.getCurrentBackupFile());
BrokerListener brokerListener = new BrokerListener(repository);

listeners.addAcknowledgeListener(brokerListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ class PublishingHandler implements HttpHandler {
public void handleRequest(HttpServerExchange exchange) throws Exception {
// change state of exchange to dispatched,
// thanks to this call, default response with 200 status code is not returned after handlerRequest() finishes its execution
exchange.dispatch(() -> handle(exchange));
exchange.dispatch(() -> {
try {
handle(exchange);
} catch (RuntimeException e) {
messageErrorProcessor.sendAndLog(exchange, "Exception while publishing message to a broker.", e);
}
});
}

private void handle(HttpServerExchange exchange) {
Expand All @@ -51,7 +57,7 @@ public void onPublished(Message message, Topic topic) {
if (messageState.setSentToKafka()) {
attachment.removeTimeout();
messageEndProcessor.sent(exchange, attachment);
} else if (messageState.isDelayed()) {
} else if (messageState.setDelayedSentToKafka()) {
messageEndProcessor.delayedSent(exchange, attachment.getCachedTopic(), message);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,13 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
}

private void delayedSending(HttpServerExchange exchange, AttachmentContent attachment) {
exchange.getConnection().getWorker().execute(() ->
messageEndProcessor.bufferedButDelayed(exchange, attachment));
exchange.getConnection().getWorker().execute(() -> {
try {
messageEndProcessor.bufferedButDelayed(exchange, attachment);
} catch (RuntimeException exception) {
messageErrorProcessor.sendAndLog(exchange, "Exception while handling delayed message sending.", exception);
}
});
}

private void readingTimeout(HttpServerExchange exchange, AttachmentContent attachment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import io.undertow.server.HttpServerExchange;
import io.undertow.util.HttpString;
import io.undertow.util.StatusCodes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.frontend.listeners.BrokerListeners;
import pl.allegro.tech.hermes.frontend.metric.CachedTopic;
Expand All @@ -17,6 +19,7 @@

public class MessageEndProcessor {

private static final Logger logger = LoggerFactory.getLogger(MessageEndProcessor.class);
private static final HttpString messageIdHeader = new HttpString(MESSAGE_ID.getName());

private final Trackers trackers;
Expand Down Expand Up @@ -50,13 +53,41 @@ public void bufferedButDelayed(HttpServerExchange exchange, AttachmentContent at
Topic topic = attachment.getTopic();
brokerListeners.onTimeout(attachment.getMessage(), topic);
trackers.get(topic).logInflight(attachment.getMessageId(), topic.getName(), readHostAndPort(exchange));
handleRaceConditionBetweenAckAndTimeout(attachment, topic);
sendResponse(exchange, attachment, StatusCodes.ACCEPTED);
}

private void handleRaceConditionBetweenAckAndTimeout(AttachmentContent attachment, Topic topic) {
if (attachment.getMessageState().isDelayedSentToKafka()) {
brokerListeners.onAcknowledge(attachment.getMessage(), topic);
}
}

private void sendResponse(HttpServerExchange exchange, AttachmentContent attachment, int statusCode) {
exchange.setStatusCode(statusCode);
exchange.getResponseHeaders().add(messageIdHeader, attachment.getMessageId());
if (!exchange.isResponseStarted()) {
exchange.setStatusCode(statusCode);
exchange.getResponseHeaders().add(messageIdHeader, attachment.getMessageId());
} else {
logger.warn("The response has already been started. Status code set on exchange: {}; Expected status code: {};" +
"Topic: {}; Message id: {}; Remote host {}",
exchange.getStatusCode(),
statusCode,
attachment.getCachedTopic().getQualifiedName(),
attachment.getMessageId(),
readHostAndPort(exchange));
}
attachment.markResponseAsReady();
exchange.endExchange();
try {
exchange.endExchange();
} catch (RuntimeException exception) {
logger.error("Exception while ending exchange. Status code set on exchange: {}; Expected status code: {};" +
"Topic: {}; Message id: {}; Remote host {}",
exchange.getStatusCode(),
statusCode,
attachment.getCachedTopic().getQualifiedName(),
attachment.getMessageId(),
readHostAndPort(exchange),
exception);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public void sendAndLog(HttpServerExchange exchange, Topic topic, String messageI
log(error.getMessage(), topic, messageId, readHostAndPort(exchange), e);
}

public void sendAndLog(HttpServerExchange exchange, String errorMessage, Exception e) {
AttachmentContent attachment = exchange.getAttachment(AttachmentContent.KEY);
sendAndLog(exchange, attachment.getTopic(), attachment.getMessageId(), error(errorMessage, INTERNAL_ERROR), e);
}

public void sendQuietly(HttpServerExchange exchange, ErrorDescription error, String messageId, String topicName) {
try {
if (exchange.getConnection().isOpen()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ enum State {
SENDING_TO_KAFKA,
SENT_TO_KAFKA,
DELAYED_SENDING,
DELAYED_PROCESSING
DELAYED_PROCESSING,
DELAYED_SENT_TO_KAFKA
}

private volatile boolean timeoutHasPassed = false;
Expand Down Expand Up @@ -48,8 +49,8 @@ public boolean setSentToKafka() {
return state.compareAndSet(SENDING_TO_KAFKA, SENT_TO_KAFKA) || state.compareAndSet(SENDING_TO_KAFKA_PRODUCER_QUEUE, SENT_TO_KAFKA);
}

public boolean isDelayed() {
return timeoutHasPassed || state.get() == DELAYED_SENDING || state.get() == DELAYED_PROCESSING;
public boolean isDelayedSentToKafka() {
return state.get() == DELAYED_SENT_TO_KAFKA;
}

public boolean setDelayedSending() {
Expand All @@ -76,7 +77,11 @@ public boolean setDelayedProcessing() {
return timeoutHasPassed && state.compareAndSet(SENDING_TO_KAFKA, DELAYED_PROCESSING);
}

public boolean setDelayedSentToKafka() {
return state.compareAndSet(DELAYED_SENDING, DELAYED_SENT_TO_KAFKA) || state.compareAndSet(DELAYED_PROCESSING, DELAYED_SENT_TO_KAFKA);
}

public void setTimeoutHasPassed() {
this.timeoutHasPassed = true;
timeoutHasPassed = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,20 @@ class MessageStateTest extends Specification {
state.setSendingToKafkaProducerQueue()
state.setSendingToKafka()
state.setDelayedProcessing()
state.delayed
}

def "should not set 'delayed processing' state"() {
expect:
state.setSendingToKafkaProducerQueue()
state.setSendingToKafka()
!state.setDelayedProcessing()
!state.delayed
}

def "should set 'delayed sending' state from 'sending to kafka' state"() {
expect:
state.setSendingToKafkaProducerQueue()
state.setSendingToKafka()
state.setDelayedSending()
state.delayed
}

def "should not set 'delayed sending' state from 'sent to kafka'"() {
Expand All @@ -95,6 +92,30 @@ class MessageStateTest extends Specification {
state.setSendingToKafka()
state.setSentToKafka()
!state.setDelayedSending()
!state.delayed
}

def "should set 'delayed sent' state from 'delayed processing'"() {
expect:
state.setSendingToKafkaProducerQueue()
state.setSendingToKafka()
state.setTimeoutHasPassed()
state.setDelayedProcessing()
state.setDelayedSentToKafka()
}

def "should set 'delayed sent' state from 'delayed sending'"() {
expect:
state.setSendingToKafkaProducerQueue()
state.setSendingToKafka()
state.setDelayedSending()
state.setDelayedSentToKafka()
}

def "should not set 'delayed sent' state from 'sent to kafka'"() {
expect:
state.setSendingToKafkaProducerQueue()
state.setSendingToKafka()
state.setSentToKafka()
!state.setDelayedSentToKafka()
}
}

0 comments on commit 8d1feeb

Please sign in to comment.