Skip to content

Commit

Permalink
fix Kafka Connect retries
Browse files Browse the repository at this point in the history
Summary: This diff batches records and throws retriable errors if addDoc fails

Test Plan: mvn package

Reviewers: haneeshr

Differential Revision: https://rockset.phacility.com/D3497
  • Loading branch information
kwadhwa18 committed Aug 30, 2019
1 parent b8a7e05 commit 48c1d44
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 100 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ hs_err_pid*
# intellij
.idea/
target/

*.iml
32 changes: 32 additions & 0 deletions src/main/java/rockset/RecordParser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package rockset;

import io.confluent.connect.avro.AvroData;
import io.confluent.kafka.serializers.NonRecordContainer;
import org.apache.kafka.connect.sink.SinkRecord;


public interface RecordParser {
Object parse(SinkRecord record);
}

class AvroParser implements RecordParser {
@Override
public Object parse(SinkRecord record) {
AvroData avroData = new AvroData(1); // arg is cacheSize
Object val = avroData.fromConnectData(record.valueSchema(), record.value());
if (val instanceof NonRecordContainer) {
val = ((NonRecordContainer) val).getValue();
}

return val;
}
}

class JsonParser implements RecordParser {
@Override
public Object parse(SinkRecord record) {
return record.value();
}
}


53 changes: 24 additions & 29 deletions src/main/java/rockset/RocksetClientWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
import com.rockset.client.ApiException;
import com.rockset.client.RocksetClient;
import com.rockset.client.model.AddDocumentsRequest;
import com.rockset.client.model.AddDocumentsResponse;
import com.rockset.client.model.ErrorModel;

import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
Expand All @@ -31,48 +30,44 @@ public RocksetClientWrapper(RocksetClient client) {
this.client = client;
}

private boolean isInternalError(Throwable e) {
return (e instanceof ApiException && ((ApiException) e).getCode() == 500);
}

// returns false on a Rockset internalerror exception to retry adding the doc,
// returns true otherwise
public boolean addDoc(String workspace, String collection, String json, SinkRecord sr) {
public boolean addDoc(String workspace, String collection,
Collection<SinkRecord> records, RecordParser recordParser) {
LinkedList<Object> list = new LinkedList<>();
ObjectMapper mapper = new ObjectMapper();

String srId = RocksetSinkUtils.createId(sr);
try {
Map<String, Object> doc = mapper.readValue(json, new TypeReference<Map<String, Object>>(){});
doc.put("_id", srId);
list.add(doc);
} catch (Exception e) {
throw new ConnectException("Invalid JSON encountered in stream");
for (SinkRecord record : records) {
String srId = RocksetSinkUtils.createId(record);
try {
Object val = recordParser.parse(record);
Map<String, Object> doc = mapper.readValue(val.toString(),
new TypeReference<Map<String, Object>>() {
});
doc.put("_id", srId);
list.add(doc);
} catch (Exception e) {
throw new ConnectException("Invalid JSON encountered in stream");
}
}

try {
AddDocumentsRequest documentsRequest = new AddDocumentsRequest().data(list);
client.addDocuments(workspace, collection, documentsRequest);
} catch (Exception e) {
if (e instanceof ApiException &&
((ApiException) e).getErrorModel().getType() == ErrorModel.TypeEnum.INTERNALERROR) {
if (isInternalError(e)) {
// return false to retry
return false;
}

throw new ConnectException(String.format("Unable to write document " +
"to collection %s, workspace %s in Rockset, cause: %s",
collection, workspace, e.getMessage()));
"to collection %s, workspace %s in Rockset, cause: %s",
collection, workspace, e.getMessage()), e);
}
return true;
}

private String createId(SinkRecord sr) {
if (sr.key() != null) {
if (sr.key() instanceof String) {
return String.valueOf(sr.key());
} else {
// only supports string keys
throw new ConnectException(String.format("Only keys of type String are supported, " +
"key is of type %s", sr.key().getClass()));
}
} else {
return sr.topic() + "+" + sr.kafkaPartition() + "+" + sr.kafkaOffset();
}
}
}
167 changes: 118 additions & 49 deletions src/main/java/rockset/RocksetSinkTask.java
Original file line number Diff line number Diff line change
@@ -1,115 +1,184 @@
package rockset;

import io.confluent.connect.avro.AvroData;
import io.confluent.kafka.serializers.NonRecordContainer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.github.jcustenborder.kafka.connect.utils.VersionUtil;

public class RocksetSinkTask extends SinkTask {
private static Logger log = LoggerFactory.getLogger(RocksetSinkTask.class);
private RocksetClientWrapper rc;
private ExecutorService executorService;
RocksetConnectorConfig config;
private Map<TopicPartition, List<CompletableFuture>> futureMap;
private RocksetConnectorConfig config;
private RecordParser recordParser;


public static final int RETRIES_COUNT = 5;
public static final int INITIAL_DELAY = 250;

private RecordParser getRecordParser(String format) {
switch (format) {
case "json":
return new JsonParser();
case "avro":
return new AvroParser();
default:
throw new ConnectException(String.format("Format %s not supported", format));
}
}

@Override
public void start(Map<String, String> settings) {
this.config = new RocksetConnectorConfig(settings);
this.rc = new RocksetClientWrapper(
this.config.getRocksetApikey(),
this.config.getRocksetApiServerUrl());
this.executorService = Executors.newFixedThreadPool(this.config.getRocksetTaskThreads());
this.futureMap = new HashMap<>();
this.recordParser = getRecordParser(config.getFormat());

}

// used for testing
public void start(Map<String, String> settings, RocksetClientWrapper rc, ExecutorService executorService) {
public void start(Map<String, String> settings, RocksetClientWrapper rc,
ExecutorService executorService) {
this.config = new RocksetConnectorConfig(settings);
this.rc = rc;
this.executorService = executorService;
this.futureMap = new HashMap<>();
this.recordParser = getRecordParser(config.getFormat());
}

@Override
public void put(Collection<SinkRecord> records) {
String collection = this.config.getRocksetCollection();
String workspace = this.config.getRocksetWorkspace();
String format = this.config.getFormat();
handleRecords(records, format, workspace, collection);
handleRecords(records, workspace, collection);
}

private void handleRecords(Collection<SinkRecord> records, String format,
private void handleRecords(Collection<SinkRecord> records,
String workspace, String collection) {
log.debug("Adding {} documents to collection {} in workspace {}",
records.size(), collection, workspace);
if (records.size() == 0) {
return;
}
switch (format) {
case "json":
for (SinkRecord sr : records) {
executorService.execute(() -> addWithRetries(workspace, collection,
sr.value().toString(), sr));
}
break;
case "avro":
AvroData avroData = new AvroData(1000); // arg is cacheSize
for (SinkRecord sr : records) {
executorService.execute(() -> {
Object val = avroData.fromConnectData(sr.valueSchema(), sr.value());
if (val instanceof NonRecordContainer) {
val = ((NonRecordContainer) val).getValue();
}
addWithRetries(workspace, collection, val.toString(), sr);
});
}
break;
default:
throw new ConnectException(String.format("Format %s not supported", format));

submitForProcessing(workspace, collection, records);
}

private Map<TopicPartition, Collection<SinkRecord>> partitionRecordsByTopic(
Collection<SinkRecord> records) {
Map<TopicPartition, Collection<SinkRecord>> topicPartitionedRecords = new HashMap<>();
for (SinkRecord record: records) {
TopicPartition key = new TopicPartition(record.topic(), record.kafkaPartition());
topicPartitionedRecords.computeIfAbsent(key, k -> new ArrayList<>()).add(record);
}

return topicPartitionedRecords;
}

private void addWithRetries(String workspace, String collection, String doc, SinkRecord sr) {
boolean success = this.rc.addDoc(workspace, collection, doc, sr);
int retries = 0;
int delay = INITIAL_DELAY;
while (!success && retries < RETRIES_COUNT) {
try {
Thread.sleep(delay);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
success = this.rc.addDoc(workspace, collection, doc, sr);
retries += 1;
delay *= 2;
private void submitForProcessing(String workspace, String collection,
Collection<SinkRecord> records) {

Map<TopicPartition, Collection<SinkRecord>> partitionedRecords =
partitionRecordsByTopic(records);

for (Map.Entry<TopicPartition, Collection<SinkRecord>> tpe : partitionedRecords.entrySet()) {
TopicPartition tp = tpe.getKey();
checkForFailures(tp, false);
futureMap.computeIfAbsent(tp, k -> new ArrayList<>()).add(
addWithRetries(workspace, collection, tpe.getValue()));
}
}

private boolean isRetriableException(Throwable e) {
return (e.getCause() != null && e.getCause() instanceof RetriableException);
}

private void checkForFailures(TopicPartition tp, boolean wait) {
if (futureMap.get(tp) == null) {
return;
}
if (!success) {
throw new ConnectException(String.format("Add document request timed out " +
"for document with _id %s, collection %s, and workspace %s",
RocksetSinkUtils.createId(sr), collection, workspace));

List<CompletableFuture> futures = futureMap.get(tp);
Iterator<CompletableFuture> futureIterator = futures.iterator();
while (futureIterator.hasNext()) {
CompletableFuture future = futureIterator.next();
// this is blocking only if wait is true
if (wait || future.isDone()) {
try {
future.get();
} catch (Exception e) {
if (isRetriableException(e)) {
throw new RetriableException(
String.format("Unable to write document for topic: %s, partition: %s, in Rockset,"
+ " should retry, cause: %s", tp.topic(), tp.partition(), e.getMessage()), e);
}

throw new RuntimeException(
String.format("Unable to write document for topic: %s, partition: %s, in Rockset,"
+ " cause: %s", tp.topic(), tp.partition(), e.getMessage()), e);
} finally {
futureIterator.remove();
}
}
}
}

// TODO improve this logic
private CompletableFuture addWithRetries(String workspace, String collection,
Collection<SinkRecord> records) {
return CompletableFuture.runAsync(() -> {
boolean success = this.rc.addDoc(workspace, collection, records, recordParser);
int retries = 0;
int delay = INITIAL_DELAY;
while (!success && retries < RETRIES_COUNT) {
try {
Thread.sleep(delay);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
// addDoc throws ConnectException if it's not Internal Error
success = this.rc.addDoc(workspace, collection, records, recordParser);
retries += 1;
delay *= 2;
}
if (!success) {
throw new RetriableException(String.format("Add document request timed out "
+ "for document with collection %s and workspace %s", collection, workspace));
}
}, executorService);
}

@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
// Not Required
for (Map.Entry<TopicPartition, OffsetAndMetadata> toe : map.entrySet()) {
checkForFailures(toe.getKey(), true);
}
}

@Override
public void stop() {
// Not Required
executorService.shutdown();
}

@Override
Expand Down
Loading

0 comments on commit 48c1d44

Please sign in to comment.