Skip to content

Commit

Permalink
#694 Additional exception handling in hermes-frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
druminski committed Jan 20, 2017
1 parent fc48d77 commit d391e3a
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public enum Configs {
MESSAGES_LOCAL_STORAGE_MAX_RESEND_RETRIES("frontend.messages.local.storage.max.resend.retries", 5),
MESSAGES_LOADING_PAUSE_BETWEEN_RESENDS("frontend.messages.loading.pause.between.resend", 30),
MESSAGES_LOADING_WAIT_FOR_BROKER_TOPIC_INFO("frontend.messages.loading.wait.for.broker.topic.info", 5),
MESSAGES_LOCAL_STORAGE_SIZE_REPORTING_ENABLED("frontend.messages.local.storage.size.reporting.enabled", true),

CONSUMER_RECEIVER_POOL_TIMEOUT("consumer.receiver.pool.timeout", 100),
CONSUMER_RECEIVER_READ_QUEUE_CAPACITY("consumer.receiver.read.queue.capacity", 1000),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ public class Gauges {

THREADS = "threads",
INFLIGHT_REQUESTS = "inflight-requests",
OUTPUT_RATE = "output-rate." + GROUP + "." + TOPIC + "." + SUBSCRIPTION;
OUTPUT_RATE = "output-rate." + GROUP + "." + TOPIC + "." + SUBSCRIPTION,
BACKUP_STORAGE_MESSAGES = "backup-storage-messages-number";
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public void registerConsumersThreadGauge(Gauge<Integer> gauge) {
metricRegistry.register(metricRegistryName(Gauges.THREADS), gauge);
}

public void registerMessageRepositoryGauge(Gauge<Integer> gauge) {
metricRegistry.register(metricRegistryName(Gauges.BACKUP_STORAGE_MESSAGES), gauge);
}

public <T> void registerOutputRateGauge(TopicName topicName, String name, Gauge<T> gauge) {
metricRegistry.register(metricRegistryName(Gauges.OUTPUT_RATE, topicName, name), gauge);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.frontend.buffer.BackupMessage;
import pl.allegro.tech.hermes.frontend.buffer.MessageRepository;
import pl.allegro.tech.hermes.frontend.publishing.message.Message;
Expand All @@ -20,6 +21,11 @@ public class ChronicleMapMessageRepository implements MessageRepository {

private ChronicleMap<String, ChronicleMapEntryValue> map;

public ChronicleMapMessageRepository(File file, HermesMetrics hermesMetrics) {
this(file);
hermesMetrics.registerMessageRepositoryGauge(() -> map.size());
}

public ChronicleMapMessageRepository(File file) {
try {
logger.info("Creating backup storage in path: {}", file.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.common.config.ConfigFactory;
import pl.allegro.tech.hermes.common.hook.HooksHandler;
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.frontend.buffer.BackupFilesManager;
import pl.allegro.tech.hermes.frontend.buffer.BackupMessagesLoader;
import pl.allegro.tech.hermes.frontend.buffer.BrokerListener;
Expand All @@ -19,6 +20,7 @@
import static java.util.stream.Collectors.joining;
import static pl.allegro.tech.hermes.common.config.Configs.MESSAGES_LOCAL_STORAGE_DIRECTORY;
import static pl.allegro.tech.hermes.common.config.Configs.MESSAGES_LOCAL_STORAGE_ENABLED;
import static pl.allegro.tech.hermes.common.config.Configs.MESSAGES_LOCAL_STORAGE_SIZE_REPORTING_ENABLED;

public class PersistentBufferExtension {

Expand All @@ -33,18 +35,21 @@ public class PersistentBufferExtension {
private final HooksHandler hooksHandler;

private final BackupMessagesLoader backupMessagesLoader;
private final HermesMetrics hermesMetrics;

@Inject
public PersistentBufferExtension(ConfigFactory configFactory,
Clock clock,
BrokerListeners listeners,
HooksHandler hooksHandler,
BackupMessagesLoader backupMessagesLoader) {
BackupMessagesLoader backupMessagesLoader,
HermesMetrics hermesMetrics) {
this.config = configFactory;
this.clock = clock;
this.listeners = listeners;
this.hooksHandler = hooksHandler;
this.backupMessagesLoader = backupMessagesLoader;
this.hermesMetrics = hermesMetrics;
}

public void extend() {
Expand All @@ -66,7 +71,9 @@ public void extend() {
}

if (config.getBooleanProperty(MESSAGES_LOCAL_STORAGE_ENABLED)) {
MessageRepository repository = new ChronicleMapMessageRepository(backupFilesManager.getCurrentBackupFile());
MessageRepository repository = config.getBooleanProperty(MESSAGES_LOCAL_STORAGE_SIZE_REPORTING_ENABLED) ?
new ChronicleMapMessageRepository(backupFilesManager.getCurrentBackupFile(), hermesMetrics) :
new ChronicleMapMessageRepository(backupFilesManager.getCurrentBackupFile());
BrokerListener brokerListener = new BrokerListener(repository);

listeners.addAcknowledgeListener(brokerListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ class PublishingHandler implements HttpHandler {
public void handleRequest(HttpServerExchange exchange) throws Exception {
// change state of exchange to dispatched,
// thanks to this call, default response with 200 status code is not returned after handlerRequest() finishes its execution
exchange.dispatch(() -> handle(exchange));
exchange.dispatch(() -> {
try {
handle(exchange);
} catch (RuntimeException e) {
messageErrorProcessor.sendAndLog(exchange, "Exception while publishing message to a broker.", e);
}
});
}

private void handle(HttpServerExchange exchange) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,13 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
}

private void delayedSending(HttpServerExchange exchange, AttachmentContent attachment) {
exchange.getConnection().getWorker().execute(() ->
messageEndProcessor.bufferedButDelayed(exchange, attachment));
exchange.getConnection().getWorker().execute(() -> {
try {
messageEndProcessor.bufferedButDelayed(exchange, attachment);
} catch (RuntimeException exception) {
messageErrorProcessor.sendAndLog(exchange, "Exception while handling delayed message sending.", exception);
}
});
}

private void readingTimeout(HttpServerExchange exchange, AttachmentContent attachment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import io.undertow.server.HttpServerExchange;
import io.undertow.util.HttpString;
import io.undertow.util.StatusCodes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.frontend.listeners.BrokerListeners;
import pl.allegro.tech.hermes.frontend.metric.CachedTopic;
Expand All @@ -17,6 +19,7 @@

public class MessageEndProcessor {

private static final Logger logger = LoggerFactory.getLogger(MessageEndProcessor.class);
private static final HttpString messageIdHeader = new HttpString(MESSAGE_ID.getName());

private final Trackers trackers;
Expand Down Expand Up @@ -54,9 +57,30 @@ public void bufferedButDelayed(HttpServerExchange exchange, AttachmentContent at
}

private void sendResponse(HttpServerExchange exchange, AttachmentContent attachment, int statusCode) {
exchange.setStatusCode(statusCode);
exchange.getResponseHeaders().add(messageIdHeader, attachment.getMessageId());
if (!exchange.isResponseStarted()) {
exchange.setStatusCode(statusCode);
exchange.getResponseHeaders().add(messageIdHeader, attachment.getMessageId());
} else {
logger.warn("The response has already been started. Status code set on exchange: {}; Expected status code: {};" +
"Topic: {}; Message id: {}; Remote host {}",
exchange.getStatusCode(),
statusCode,
attachment.getCachedTopic().getQualifiedName(),
attachment.getMessageId(),
readHostAndPort(exchange));
}
attachment.markResponseAsReady();
exchange.endExchange();
try {
exchange.endExchange();
} catch (RuntimeException exception) {
logger.error("Exception while ending exchange. Status code set on exchange: {}; Expected status code: {};" +
"Topic: {}; Message id: {}; Remote host {}",
exchange.getStatusCode(),
statusCode,
attachment.getCachedTopic().getQualifiedName(),
attachment.getMessageId(),
readHostAndPort(exchange),
exception);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public void sendAndLog(HttpServerExchange exchange, Topic topic, String messageI
log(error.getMessage(), topic, messageId, readHostAndPort(exchange), e);
}

public void sendAndLog(HttpServerExchange exchange, String errorMessage, Exception e) {
AttachmentContent attachment = exchange.getAttachment(AttachmentContent.KEY);
sendAndLog(exchange, attachment.getTopic(), attachment.getMessageId(), error(errorMessage, INTERNAL_ERROR), e);
}

public void sendQuietly(HttpServerExchange exchange, ErrorDescription error, String messageId, String topicName) {
try {
if (exchange.getConnection().isOpen()) {
Expand Down

0 comments on commit d391e3a

Please sign in to comment.