eventTimeExtractor)
+
+ // Defines the segment name generation via the predefined SimpleSegmentNameGenerator
+ // Exemplary segment name: tableName_minTimestamp_maxTimestamp_segmentNamePostfix_0
+ .withSimpleSegmentNameGenerator(String tableName, String segmentNamePostfix)
+
+ // Use a custom segment name generator if the SimpleSegmentNameGenerator does not work for your use case
+ .withSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator)
+
+ // Use a custom filesystem adapter.
+ // CAUTION: Make sure all nodes your Flink app runs on can access the shared filesystem via the provided FileSystemAdapter
+ .withFileSystemAdapter(FileSystemAdapter fsAdapter)
+
+ // Defines the size of the Pinot segments
+ .withMaxRowsPerSegment(int maxRowsPerSegment)
+
+ // Prefix within the local filesystem's temp directory used for storing intermediate files
+ .withTempDirectoryPrefix(String tempDirPrefix)
+
+ // Number of threads used in the `PinotSinkGlobalCommitter` to commit a batch of segments
+ // Optional - Default is 4
+ .withNumCommitThreads(int numCommitThreads)
+
+ // Builds the PinotSink
+ .build()
+ dataStream.addSink(pinotSink);
+```
diff --git a/flink-connector-pinot/docs/images/PinotSinkGlobalCommitter_combine.png b/flink-connector-pinot/docs/images/PinotSinkGlobalCommitter_combine.png
new file mode 100644
index 00000000..e9ea878b
Binary files /dev/null and b/flink-connector-pinot/docs/images/PinotSinkGlobalCommitter_combine.png differ
diff --git a/flink-connector-pinot/docs/images/PinotSinkWriter.png b/flink-connector-pinot/docs/images/PinotSinkWriter.png
new file mode 100644
index 00000000..389988bf
Binary files /dev/null and b/flink-connector-pinot/docs/images/PinotSinkWriter.png differ
diff --git a/flink-connector-pinot/docs/images/PinotSinkWriter_prepareCommit.png b/flink-connector-pinot/docs/images/PinotSinkWriter_prepareCommit.png
new file mode 100644
index 00000000..ed4a095d
Binary files /dev/null and b/flink-connector-pinot/docs/images/PinotSinkWriter_prepareCommit.png differ
diff --git a/flink-connector-pinot/pom.xml b/flink-connector-pinot/pom.xml
new file mode 100644
index 00000000..74eacd52
--- /dev/null
+++ b/flink-connector-pinot/pom.xml
@@ -0,0 +1,196 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache.bahir
+ bahir-flink-parent_2.11
+ 1.1-SNAPSHOT
+ ..
+
+
+ flink-connector-pinot_2.11
+ flink-connector-pinot
+
+
+ jar
+
+
+
+ 0.6.0
+
+
+
+
+ org.apache.flink
+ flink-scala_${scala.binary.version}
+ ${flink.version}
+
+
+
+
+ org.apache.pinot
+ pinot-tools
+ ${pinot.version}
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ log4j
+ log4j
+
+
+
+
+
+ org.apache.httpcomponents
+ httpmime
+ 4.5.13
+
+
+
+
+ org.apache.pinot
+ pinot-java-client
+ ${pinot.version}
+ test
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ log4j
+ log4j
+
+
+
+
+
+ org.apache.flink
+ flink-streaming-java_${scala.binary.version}
+ ${flink.version}
+ test-jar
+ test
+
+
+
+ org.apache.flink
+ flink-test-utils_${scala.binary.version}
+ ${flink.version}
+ test
+
+
+
+ org.apache.flink
+ flink-tests
+ ${flink.version}
+ test-jar
+ test
+
+
+
+ org.apache.flink
+ flink-runtime_${scala.binary.version}
+ ${flink.version}
+ test-jar
+ test
+
+
+
+ org.testcontainers
+ testcontainers
+ 1.15.2
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ 1.15.2
+ test
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ 1.4.1
+
+
+ enforce-versions
+
+ enforce
+
+
+
+
+ ${java.version}
+
+
+
+
+
+ true
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ 3.1.1
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.1.0
+
+
+ package
+
+ shade
+
+
+
+
+ reference.conf
+
+
+
+
+
+
+
+
+
+
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerClient.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerClient.java
new file mode 100644
index 00000000..43aafab4
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerClient.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Helpers to interact with the Pinot controller via its public API.
+ */
+@Internal
+public class PinotControllerClient implements Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PinotControllerClient.class);
+ private final PinotControllerHttpClient httpClient;
+
+ /**
+ * @param controllerHost Pinot controller's host
+ * @param controllerPort Pinot controller's port
+ */
+ public PinotControllerClient(String controllerHost, String controllerPort) {
+ this.httpClient = new PinotControllerHttpClient(controllerHost, controllerPort);
+ }
+
+ /**
+ * Checks whether the provided segment name is registered with the given table.
+ *
+ * @param tableName Target table's name
+ * @param segmentName Segment name to check
+ * @return True if segment with the provided name exists
+ * @throws IOException
+ */
+ public boolean tableHasSegment(String tableName, String segmentName) throws IOException {
+ PinotControllerHttpClient.ApiResponse res = httpClient.get(String.format("/tables/%s/%s/metadata", tableName, segmentName));
+
+ if (res.statusLine.getStatusCode() == 200) {
+ // A segment named `segmentName` exists within the table named `tableName`
+ return true;
+ }
+ if (res.statusLine.getStatusCode() == 404) {
+ // There is no such segment named `segmentName` within the table named `tableName`
+ // (or the table named `tableName` does not exist)
+ return false;
+ }
+
+ // Received an unexpected status code
+ throw new PinotControllerApiException(res.responseBody);
+ }
+
+ /**
+ * Deletes a segment from a table.
+ *
+ * @param tableName Target table's name
+ * @param segmentName Identifies the segment to delete
+ * @throws IOException
+ */
+ public void deleteSegment(String tableName, String segmentName) throws IOException {
+ PinotControllerHttpClient.ApiResponse res = httpClient.delete(String.format("/tables/%s/%s", tableName, segmentName));
+
+ if (res.statusLine.getStatusCode() != 200) {
+ LOG.error("Could not delete segment {} from table {}. Pinot controller returned: {}", tableName, segmentName, res.responseBody);
+ throw new PinotControllerApiException(res.responseBody);
+ }
+ }
+
+ /**
+ * Fetches a Pinot table's schema via the Pinot controller API.
+ *
+ * @param tableName Target table's name
+ * @return Pinot table schema
+ * @throws IOException
+ */
+ public Schema getSchema(String tableName) throws IOException {
+ Schema schema;
+ PinotControllerHttpClient.ApiResponse res = httpClient.get(String.format("/tables/%s/schema", tableName));
+ LOG.debug("Get schema request for table {} returned {}", tableName, res.responseBody);
+
+ if (res.statusLine.getStatusCode() != 200) {
+ throw new PinotControllerApiException(res.responseBody);
+ }
+
+ try {
+ schema = JsonUtils.stringToObject(res.responseBody, Schema.class);
+ } catch (IOException e) {
+ throw new PinotControllerApiException("Caught exception while reading schema from Pinot Controller's response: " + res.responseBody);
+ }
+ LOG.debug("Retrieved schema: {}", schema.toSingleLineJsonString());
+ return schema;
+ }
+
+ /**
+ * Fetches a Pinot table's configuration via the Pinot controller API.
+ *
+ * @param tableName Target table's name
+ * @return Pinot table configuration
+ * @throws IOException
+ */
+ public TableConfig getTableConfig(String tableName) throws IOException {
+ TableConfig tableConfig;
+ PinotControllerHttpClient.ApiResponse res = httpClient.get(String.format("/tables/%s", tableName));
+ LOG.debug("Get table config request for table {} returned {}", tableName, res.responseBody);
+
+ try {
+ String tableConfigAsJson = JsonUtils.stringToJsonNode(res.responseBody).get("OFFLINE").toString();
+ tableConfig = JsonUtils.stringToObject(tableConfigAsJson, TableConfig.class);
+ } catch (IOException e) {
+ throw new PinotControllerApiException("Caught exception while reading table config from Pinot Controller's response: " + res.responseBody);
+ }
+ LOG.debug("Retrieved table config: {}", tableConfig.toJsonString());
+ return tableConfig;
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ httpClient.close();
+ }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerHttpClient.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerHttpClient.java
new file mode 100644
index 00000000..6ac05d62
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotControllerHttpClient.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.*;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Helpers to interact with the Pinot controller via its public API.
+ */
+@Internal
+public class PinotControllerHttpClient implements Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PinotControllerHttpClient.class);
+ private final String controllerHostPort;
+ private final CloseableHttpClient httpClient;
+
+ /**
+ * @param controllerHost Pinot controller's host
+ * @param controllerPort Pinot controller's port
+ */
+ public PinotControllerHttpClient(String controllerHost, String controllerPort) {
+ checkNotNull(controllerHost);
+ checkNotNull(controllerPort);
+ this.controllerHostPort = String.format("http://%s:%s", controllerHost, controllerPort);
+ this.httpClient = HttpClients.createDefault();
+ }
+
+ /**
+ * Issues a request to the Pinot controller API.
+ *
+ * @param request Request to issue
+ * @return Api response
+ * @throws IOException
+ */
+ private ApiResponse execute(HttpRequestBase request) throws IOException {
+ ApiResponse result;
+
+ try (CloseableHttpResponse response = httpClient.execute(request)) {
+ String body = EntityUtils.toString(response.getEntity());
+ result = new ApiResponse(response.getStatusLine(), body);
+ }
+
+ return result;
+ }
+
+ /**
+ * Issues a POST request to the Pinot controller API.
+ *
+ * @param path Path to POST to
+ * @param body Request's body
+ * @return API response
+ * @throws IOException
+ */
+ ApiResponse post(String path, String body) throws IOException {
+ HttpPost httppost = new HttpPost(this.controllerHostPort + path);
+ httppost.setEntity(new StringEntity(body, ContentType.APPLICATION_JSON));
+ LOG.debug("Posting string entity {} to {}", body, path);
+ return this.execute(httppost);
+ }
+
+ /**
+ * Issues a GET request to the Pinot controller API.
+ *
+ * @param path Path to GET from
+ * @return API response
+ * @throws IOException
+ */
+ ApiResponse get(String path) throws IOException {
+ HttpGet httpget = new HttpGet(this.controllerHostPort + path);
+ LOG.debug("Sending GET request to {}", path);
+ return this.execute(httpget);
+ }
+
+ /**
+ * Issues a DELETE request to the Pinot controller API.
+ *
+ * @param path Path to issue DELETE request to
+ * @return API response
+ * @throws IOException
+ */
+ ApiResponse delete(String path) throws IOException {
+ HttpDelete httpdelete = new HttpDelete(this.controllerHostPort + path);
+ LOG.debug("Sending DELETE request to {}", path);
+ return this.execute(httpdelete);
+ }
+
+ @Override
+ public void close() throws IOException {
+ httpClient.close();
+ }
+
+ /**
+ * Helper class for wrapping Pinot controller API responses.
+ */
+ static class ApiResponse {
+ public final StatusLine statusLine;
+ public final String responseBody;
+
+ ApiResponse(StatusLine statusLine, String responseBody) {
+ this.statusLine = statusLine;
+ this.responseBody = responseBody;
+ }
+ }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java
new file mode 100644
index 00000000..7d2fe94d
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/PinotSink.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
+import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommittable;
+import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommitter;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator;
+import org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
+import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkCommittableSerializer;
+import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkGlobalCommittableSerializer;
+import org.apache.flink.streaming.connectors.pinot.serializer.PinotSinkWriterStateSerializer;
+import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter;
+import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriterState;
+import org.apache.flink.streaming.connectors.pinot.writer.PinotWriterSegment;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.tools.admin.command.UploadSegmentCommand;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Apache Pinot sink that stores objects from upstream Flink tasks in a Apache Pinot table. The sink
+ * can be operated in {@code RuntimeExecutionMode.STREAMING} or {@code RuntimeExecutionMode.BATCH}
+ * mode. But ensure to enable checkpointing when using in streaming mode.
+ *
+ * We advise you to use the provided {@link PinotSink.Builder} to build and configure the
+ * PinotSink. All the communication with the Pinot cluster's table is managed via the Pinot
+ * controller. Thus you need to provide its host and port as well as the target Pinot table.
+ * The {@link TableConfig} and {@link Schema} is automatically retrieved via the Pinot controller API
+ * and therefore does not need to be provided.
+ *
+ *
Whenever an element is received by the sink it gets stored in a {@link PinotWriterSegment}. A
+ * {@link PinotWriterSegment} represents exactly one segment that will be pushed to the Pinot
+ * cluster later on. Its size is determined by the customizable {@code maxRowsPerSegment} parameter.
+ * Please note that the maximum segment size that can be handled by this sink is limited by the
+ * lower bound of memory available at each subTask.
+ * Each subTask holds a list of {@link PinotWriterSegment}s of which at most one is active. An
+ * active {@link PinotWriterSegment} is capable of accepting at least one more element. If a
+ * {@link PinotWriterSegment} switches from active to inactive it flushes its
+ * {@code maxRowsPerSegment} elements to disk. The data file is stored in the local filesystem's
+ * temporary directory and contains serialized elements. We use the {@link JsonSerializer} to
+ * serialize elements to JSON.
+ *
+ *
On checkpointing all not in-progress {@link PinotWriterSegment}s are transformed into
+ * committables. As the data files need to be shared across nodes, the sink requires access to a
+ * shared filesystem. We use the {@link FileSystemAdapter} for that purpose.
+ * A {@link FileSystemAdapter} is capable of copying a file from the local to the shared filesystem
+ * and vice-versa. A {@link PinotSinkCommittable} contains a reference to a data file on the shared
+ * filesystem as well as the minimum and maximum timestamp contained in the data file. A timestamp -
+ * usually the event time - is extracted from each received element via {@link EventTimeExtractor}.
+ * The timestamps are later on required to follow the guideline for naming Pinot segments.
+ * An eventually existent in-progress {@link PinotWriterSegment}'s state is saved in the snapshot
+ * taken when checkpointing. This ensures that the at-most-once delivery guarantee can be fulfilled
+ * when recovering from failures.
+ *
+ *
We use the {@link PinotSinkGlobalCommitter} to collect all created
+ * {@link PinotSinkCommittable}s, create segments from the referenced data files and finally push them
+ * to the Pinot table. Therefore, the minimum and maximum timestamp of all
+ * {@link PinotSinkCommittable} is determined. The segment names are then generated using the
+ * {@link PinotSinkSegmentNameGenerator} which gets the minimum and maximum timestamp as input.
+ * The segment generation starts with downloading the referenced data file from the shared file system
+ * using the provided {@link FileSystemAdapter}. Once this is was completed, we use Pinot's
+ * {@link SegmentIndexCreationDriver} to generate the final segment. Each segment is thereby stored
+ * in a temporary directory on the local filesystem. Next, the segment is uploaded to the Pinot
+ * controller using Pinot's {@link UploadSegmentCommand}.
+ *
+ *
To ensure that possible failures are handled accordingly each segment name is checked for
+ * existence within the Pinot cluster before uploading a segment. In case a segment name already
+ * exists, i.e. if the last commit failed partially with some segments already been uploaded, the
+ * existing segment is deleted first. When the elements since the last checkpoint are replayed the
+ * minimum and maximum timestamp of all received elements will be the same. Thus the same set of
+ * segment names is generated and we can delete previous segments by checking for segment name
+ * presence. Note: The {@link PinotSinkSegmentNameGenerator} must be deterministic. We also provide
+ * a {@link SimpleSegmentNameGenerator} which is a simple but for most users suitable segment name
+ * generator.
+ *
+ *
Please note that we use the {@link GlobalCommitter} to ensure consistent segment naming. This
+ * comes with performance limitations as a {@link GlobalCommitter} always runs at a parallelism of 1
+ * which results in a clear bottleneck at the {@link PinotSinkGlobalCommitter} that does all the
+ * computational intensive work (i.e. generating and uploading segments). In order to overcome this
+ * issue we introduce a custom multithreading approach within the {@link PinotSinkGlobalCommitter}
+ * to parallelize the segment creation and upload process.
+ *
+ * @param Type of incoming elements
+ */
+public class PinotSink implements Sink {
+
+ private final String pinotControllerHost;
+ private final String pinotControllerPort;
+ private final String tableName;
+ private final int maxRowsPerSegment;
+ private final String tempDirPrefix;
+ private final JsonSerializer jsonSerializer;
+ private final SegmentNameGenerator segmentNameGenerator;
+ private final FileSystemAdapter fsAdapter;
+ private final EventTimeExtractor eventTimeExtractor;
+ private final int numCommitThreads;
+
+ /**
+ * @param pinotControllerHost Host of the Pinot controller
+ * @param pinotControllerPort Port of the Pinot controller
+ * @param tableName Target table's name
+ * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment
+ * @param tempDirPrefix Prefix for temp directories used
+ * @param jsonSerializer Serializer used to convert elements to JSON
+ * @param eventTimeExtractor Defines the way event times are extracted from received objects
+ * @param segmentNameGenerator Pinot segment name generator
+ * @param fsAdapter Filesystem adapter used to save files for sharing files across nodes
+ * @param numCommitThreads Number of threads used in the {@link PinotSinkGlobalCommitter} for committing segments
+ */
+ private PinotSink(String pinotControllerHost, String pinotControllerPort, String tableName,
+ int maxRowsPerSegment, String tempDirPrefix, JsonSerializer jsonSerializer,
+ EventTimeExtractor eventTimeExtractor,
+ SegmentNameGenerator segmentNameGenerator, FileSystemAdapter fsAdapter,
+ int numCommitThreads) {
+ this.pinotControllerHost = checkNotNull(pinotControllerHost);
+ this.pinotControllerPort = checkNotNull(pinotControllerPort);
+ this.tableName = checkNotNull(tableName);
+
+ checkArgument(maxRowsPerSegment > 0);
+ this.maxRowsPerSegment = maxRowsPerSegment;
+ this.tempDirPrefix = checkNotNull(tempDirPrefix);
+ this.jsonSerializer = checkNotNull(jsonSerializer);
+ this.eventTimeExtractor = checkNotNull(eventTimeExtractor);
+ this.segmentNameGenerator = checkNotNull(segmentNameGenerator);
+ this.fsAdapter = checkNotNull(fsAdapter);
+ checkArgument(numCommitThreads > 0);
+ this.numCommitThreads = numCommitThreads;
+ }
+
+ /**
+ * Creates a Pinot sink writer.
+ *
+ * @param context InitContext
+ * @param states State extracted from snapshot. This list must not have a size larger than 1
+ */
+ @Override
+ public PinotSinkWriter createWriter(InitContext context, List states) {
+ PinotSinkWriter writer = new PinotSinkWriter<>(
+ context.getSubtaskId(), maxRowsPerSegment, eventTimeExtractor,
+ jsonSerializer, fsAdapter
+ );
+
+ if (states.size() == 1) {
+ writer.initializeState(states.get(0));
+ } else if (states.size() > 1) {
+ throw new IllegalStateException("Did not expected more than one element in states.");
+ }
+ return writer;
+ }
+
+ /**
+ * The PinotSink does not use a committer. Instead a global committer is used
+ *
+ * @return Empty Optional
+ */
+ @Override
+ public Optional> createCommitter() {
+ return Optional.empty();
+ }
+
+ /**
+ * Creates the global committer.
+ */
+ @Override
+ public Optional> createGlobalCommitter() throws IOException {
+ String timeColumnName = eventTimeExtractor.getTimeColumn();
+ TimeUnit segmentTimeUnit = eventTimeExtractor.getSegmentTimeUnit();
+ PinotSinkGlobalCommitter committer = new PinotSinkGlobalCommitter(
+ pinotControllerHost, pinotControllerPort, tableName, segmentNameGenerator,
+ tempDirPrefix, fsAdapter, timeColumnName, segmentTimeUnit, numCommitThreads
+ );
+ return Optional.of(committer);
+ }
+
+ /**
+ * Creates the committables' serializer.
+ */
+ @Override
+ public Optional> getCommittableSerializer() {
+ return Optional.of(new PinotSinkCommittableSerializer());
+ }
+
+ /**
+ * Creates the global committables' serializer.
+ */
+ @Override
+ public Optional> getGlobalCommittableSerializer() {
+ return Optional.of(new PinotSinkGlobalCommittableSerializer());
+ }
+
+ /**
+ * The PinotSink does not use writer states.
+ *
+ * @return Empty Optional
+ */
+ @Override
+ public Optional> getWriterStateSerializer() {
+ return Optional.of(new PinotSinkWriterStateSerializer());
+ }
+
+ /**
+ * Builder for configuring a {@link PinotSink}. This is the recommended public API.
+ *
+ * @param Type of incoming elements
+ */
+ public static class Builder {
+
+ static final int DEFAULT_COMMIT_THREADS = 4;
+
+ String pinotControllerHost;
+ String pinotControllerPort;
+ String tableName;
+ int maxRowsPerSegment;
+ String tempDirPrefix = "flink-connector-pinot";
+ JsonSerializer jsonSerializer;
+ EventTimeExtractor eventTimeExtractor;
+ SegmentNameGenerator segmentNameGenerator;
+ FileSystemAdapter fsAdapter;
+ int numCommitThreads = DEFAULT_COMMIT_THREADS;
+
+ /**
+ * Defines the basic connection parameters.
+ *
+ * @param pinotControllerHost Host of the Pinot controller
+ * @param pinotControllerPort Port of the Pinot controller
+ * @param tableName Target table's name
+ */
+ public Builder(String pinotControllerHost, String pinotControllerPort, String tableName) {
+ this.pinotControllerHost = pinotControllerHost;
+ this.pinotControllerPort = pinotControllerPort;
+ this.tableName = tableName;
+ }
+
+ /**
+ * Defines the serializer used to serialize elements to JSON format.
+ *
+ * @param jsonSerializer JsonSerializer
+ * @return Builder
+ */
+ public Builder withJsonSerializer(JsonSerializer jsonSerializer) {
+ this.jsonSerializer = jsonSerializer;
+ return this;
+ }
+
+ /**
+ * Defines the EventTimeExtractor used to extract event times from received objects.
+ *
+ * @param eventTimeExtractor EventTimeExtractor
+ * @return Builder
+ */
+ public Builder withEventTimeExtractor(EventTimeExtractor eventTimeExtractor) {
+ this.eventTimeExtractor = eventTimeExtractor;
+ return this;
+ }
+
+ /**
+ * Defines the SegmentNameGenerator used to generate names for the segments pushed to Pinot.
+ *
+ * @param segmentNameGenerator SegmentNameGenerator
+ * @return Builder
+ */
+ public Builder withSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator) {
+ this.segmentNameGenerator = segmentNameGenerator;
+ return this;
+ }
+
+ /**
+ * Defines a basic segment name generator which will be used to generate names for the
+ * segments pushed to Pinot.
+ *
+ * @param segmentNamePostfix Postfix which will be appended to the segment name to identify
+ * segments coming from this Flink sink
+ * @return Builder
+ */
+ public Builder withSimpleSegmentNameGenerator(String segmentNamePostfix) {
+ return withSegmentNameGenerator(new SimpleSegmentNameGenerator(tableName, segmentNamePostfix));
+ }
+
+ /**
+ * Defines the FileSystemAdapter used share data files between the {@link PinotSinkWriter} and
+ * the {@link PinotSinkGlobalCommitter}.
+ *
+ * @param fsAdapter Adapter for interacting with the shared file system
+ * @return Builder
+ */
+ public Builder withFileSystemAdapter(FileSystemAdapter fsAdapter) {
+ this.fsAdapter = fsAdapter;
+ return this;
+ }
+
+ /**
+ * Defines the segment size via the maximum number of elements per segment.
+ *
+ * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment
+ * @return Builder
+ */
+ public Builder withMaxRowsPerSegment(int maxRowsPerSegment) {
+ this.maxRowsPerSegment = maxRowsPerSegment;
+ return this;
+ }
+
+ /**
+ * Defines the path prefix for the files created in a node's local filesystem.
+ *
+ * @param tempDirPrefix Prefix for temp directories used
+ * @return Builder
+ */
+ public Builder withTempDirectoryPrefix(String tempDirPrefix) {
+ this.tempDirPrefix = tempDirPrefix;
+ return this;
+ }
+
+ /**
+ * Defines the number of threads that shall be used to commit segments in the {@link PinotSinkGlobalCommitter}.
+ *
+ * @param numCommitThreads Number of threads
+ * @return Builder
+ */
+ public Builder withNumCommitThreads(int numCommitThreads) {
+ this.numCommitThreads = numCommitThreads;
+ return this;
+ }
+
+ /**
+ * Finally builds the {@link PinotSink} according to the configuration.
+ *
+ * @return PinotSink
+ */
+ public PinotSink build() {
+ return new PinotSink<>(
+ pinotControllerHost,
+ pinotControllerPort,
+ tableName,
+ maxRowsPerSegment,
+ tempDirPrefix,
+ jsonSerializer,
+ eventTimeExtractor,
+ segmentNameGenerator,
+ fsAdapter,
+ numCommitThreads
+ );
+ }
+ }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkCommittable.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkCommittable.java
new file mode 100644
index 00000000..5a8c6553
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkCommittable.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.committer;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The PinotSinkCommittable is required for sharing committables with the
+ * {@link PinotSinkGlobalCommitter} instance
+ */
+@Internal
+public class PinotSinkCommittable implements Serializable {
+ private final String dataFilePath;
+ private final long minTimestamp;
+ private final long maxTimestamp;
+
+ /**
+ * @param dataFilePath Path referencing a file on the shared filesystem defined via {@link org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter}
+ * @param minTimestamp The minimum timestamp of all the elements contained in {@link #dataFilePath}
+ * @param maxTimestamp The maximum timestamp of all the elements contained in {@link #dataFilePath}
+ */
+ public PinotSinkCommittable(String dataFilePath, long minTimestamp, long maxTimestamp) {
+ this.dataFilePath = checkNotNull(dataFilePath);
+ this.minTimestamp = minTimestamp;
+ this.maxTimestamp = maxTimestamp;
+ }
+
+ public String getDataFilePath() {
+ return dataFilePath;
+ }
+
+ public long getMinTimestamp() {
+ return minTimestamp;
+ }
+
+ public long getMaxTimestamp() {
+ return maxTimestamp;
+ }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommittable.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommittable.java
new file mode 100644
index 00000000..766b831d
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommittable.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.committer;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Global committable references all data files that will be committed during checkpointing.
+ */
+@Internal
+public class PinotSinkGlobalCommittable implements Serializable {
+ private final List dataFilePaths;
+ private final long minTimestamp;
+ private final long maxTimestamp;
+
+ /**
+ * @param dataFilePaths List of paths to data files on shared file system
+ * @param minTimestamp Minimum timestamp of all objects in all data files
+ * @param maxTimestamp Maximum timestamp of all objects in all data files
+ */
+ public PinotSinkGlobalCommittable(List dataFilePaths, long minTimestamp, long maxTimestamp) {
+ this.dataFilePaths = checkNotNull(dataFilePaths);
+ this.minTimestamp = minTimestamp;
+ this.maxTimestamp = maxTimestamp;
+ }
+
+ public List getDataFilePaths() {
+ return dataFilePaths;
+ }
+
+ public long getMinTimestamp() {
+ return minTimestamp;
+ }
+
+ public long getMaxTimestamp() {
+ return maxTimestamp;
+ }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java
new file mode 100644
index 00000000..46e03e43
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.committer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.streaming.connectors.pinot.PinotControllerClient;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemUtils;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.tools.admin.command.UploadSegmentCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Global committer takes committables from {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter},
+ * generates segments and pushed them to the Pinot controller.
+ * Note: We use a custom multithreading approach to parallelize the segment creation and upload to
+ * overcome the performance limitations resulting from using a {@link GlobalCommitter} always
+ * running at a parallelism of 1.
+ */
+@Internal
+public class PinotSinkGlobalCommitter implements GlobalCommitter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PinotSinkGlobalCommitter.class);
+
+ private final String pinotControllerHost;
+ private final String pinotControllerPort;
+ private final String tableName;
+ private final SegmentNameGenerator segmentNameGenerator;
+ private final FileSystemAdapter fsAdapter;
+ private final String timeColumnName;
+ private final TimeUnit segmentTimeUnit;
+ private final PinotControllerClient pinotControllerClient;
+ private final File tempDirectory;
+ private final Schema tableSchema;
+ private final TableConfig tableConfig;
+ private final ExecutorService pool;
+
+ /**
+ * @param pinotControllerHost Host of the Pinot controller
+ * @param pinotControllerPort Port of the Pinot controller
+ * @param tableName Target table's name
+ * @param segmentNameGenerator Pinot segment name generator
+ * @param tempDirPrefix Prefix for directory to store temporary files in
+ * @param fsAdapter Adapter for interacting with the shared file system
+ * @param timeColumnName Name of the column containing the timestamp
+ * @param segmentTimeUnit Unit of the time column
+ * @param numCommitThreads Number of threads used to commit the committables
+ */
+ public PinotSinkGlobalCommitter(String pinotControllerHost, String pinotControllerPort,
+ String tableName, SegmentNameGenerator segmentNameGenerator,
+ String tempDirPrefix, FileSystemAdapter fsAdapter,
+ String timeColumnName, TimeUnit segmentTimeUnit,
+ int numCommitThreads) throws IOException {
+ this.pinotControllerHost = checkNotNull(pinotControllerHost);
+ this.pinotControllerPort = checkNotNull(pinotControllerPort);
+ this.tableName = checkNotNull(tableName);
+ this.segmentNameGenerator = checkNotNull(segmentNameGenerator);
+ this.fsAdapter = checkNotNull(fsAdapter);
+ this.timeColumnName = checkNotNull(timeColumnName);
+ this.segmentTimeUnit = checkNotNull(segmentTimeUnit);
+ this.pinotControllerClient = new PinotControllerClient(pinotControllerHost, pinotControllerPort);
+
+ // Create directory that temporary files will be stored in
+ this.tempDirectory = Files.createTempDirectory(tempDirPrefix).toFile();
+
+ // Retrieve the Pinot table schema and the Pinot table config from the Pinot controller
+ this.tableSchema = pinotControllerClient.getSchema(tableName);
+ this.tableConfig = pinotControllerClient.getTableConfig(tableName);
+
+ // We use a thread pool in order to parallelize the segment creation and segment upload
+ checkArgument(numCommitThreads > 0);
+ this.pool = Executors.newFixedThreadPool(numCommitThreads);
+ }
+
+ /**
+ * Identifies global committables that need to be re-committed from a list of recovered committables.
+ *
+ * @param globalCommittables List of global committables that are checked for required re-commit
+ * @return List of global committable that need to be re-committed
+ * @throws IOException
+ */
+ @Override
+ public List filterRecoveredCommittables(List globalCommittables) throws IOException {
+ // Holds identified global committables whose commit needs to be retried
+ List committablesToRetry = new ArrayList<>();
+
+ for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) {
+ CommitStatus commitStatus = getCommitStatus(globalCommittable);
+
+ if (commitStatus.getMissingSegmentNames().isEmpty()) {
+ // All segments were already committed. Thus, we do not need to retry the commit.
+ continue;
+ }
+
+ for (String existingSegment : commitStatus.getExistingSegmentNames()) {
+ // Some but not all segments were already committed. As we cannot assure the data
+ // files containing the same data as originally when recovering from failure,
+ // we delete the already committed segments in order to recommit them later on.
+ pinotControllerClient.deleteSegment(tableName, existingSegment);
+ }
+ committablesToRetry.add(globalCommittable);
+ }
+ return committablesToRetry;
+ }
+
+ /**
+ * Combines multiple {@link PinotSinkCommittable}s into one {@link PinotSinkGlobalCommittable}
+ * by finding the minimum and maximum timestamps from the provided {@link PinotSinkCommittable}s.
+ *
+ * @param committables Committables created by {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter}
+ * @return Global committer committable
+ */
+ @Override
+ public PinotSinkGlobalCommittable combine(List committables) {
+ List dataFilePaths = new ArrayList<>();
+ long minTimestamp = Long.MAX_VALUE;
+ long maxTimestamp = Long.MIN_VALUE;
+
+ // Extract all data file paths and the overall minimum and maximum timestamps
+ // from all committables
+ for (PinotSinkCommittable committable : committables) {
+ dataFilePaths.add(committable.getDataFilePath());
+ minTimestamp = Long.min(minTimestamp, committable.getMinTimestamp());
+ maxTimestamp = Long.max(maxTimestamp, committable.getMaxTimestamp());
+ }
+
+ LOG.debug("Combined {} committables into one global committable", committables.size());
+ return new PinotSinkGlobalCommittable(dataFilePaths, minTimestamp, maxTimestamp);
+ }
+
+ /**
+ * Copies data files from shared filesystem to the local filesystem, generates segments with names
+ * according to the segment naming schema and finally pushes the segments to the Pinot cluster.
+ * Before pushing a segment it is checked whether there already exists a segment with that name
+ * in the Pinot cluster by calling the Pinot controller. In case there is one, it gets deleted.
+ *
+ * @param globalCommittables List of global committables
+ * @return Global committables whose commit failed
+ * @throws IOException
+ */
+ @Override
+ public List commit(List globalCommittables) throws IOException {
+ // List of failed global committables that can be retried later on
+ List failedCommits = new ArrayList<>();
+
+ for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) {
+ Set> resultFutures = new HashSet<>();
+ // Commit all segments in globalCommittable
+ for (int sequenceId = 0; sequenceId < globalCommittable.getDataFilePaths().size(); sequenceId++) {
+ String dataFilePath = globalCommittable.getDataFilePaths().get(sequenceId);
+ // Get segment names with increasing sequenceIds
+ String segmentName = getSegmentName(globalCommittable, sequenceId);
+ // Segment committer handling the whole commit process for a single segment
+ Callable segmentCommitter = new SegmentCommitter(
+ pinotControllerHost, pinotControllerPort, tempDirectory, fsAdapter,
+ dataFilePath, segmentName, tableSchema, tableConfig, timeColumnName,
+ segmentTimeUnit
+ );
+ // Submits the segment committer to the thread pool
+ resultFutures.add(pool.submit(segmentCommitter));
+ }
+
+ boolean commitSucceeded = true;
+ try {
+ for (Future wasSuccessful : resultFutures) {
+ // In case any of the segment commits wasn't successful we mark the whole
+ // globalCommittable as failed
+ if (!wasSuccessful.get()) {
+ commitSucceeded = false;
+ failedCommits.add(globalCommittable);
+ // Once any of the commits failed, we do not need to check the remaining
+ // ones, as we try to commit the globalCommittable next time
+ break;
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ // In case of an exception thrown while accessing commit status, mark the whole
+ // globalCommittable as failed
+ failedCommits.add(globalCommittable);
+ LOG.error("Accessing a SegmentCommitter thread errored with {}", e.getMessage(), e);
+ }
+
+ if (commitSucceeded) {
+ // If commit succeeded, cleanup the data files stored on the shared file system. In
+ // case the commit of at least one of the segments failed, nothing will be cleaned
+ // up here to enable retrying failed commits (data files must therefore stay
+ // available on the shared filesystem).
+ for (String path : globalCommittable.getDataFilePaths()) {
+ fsAdapter.deleteFromSharedFileSystem(path);
+ }
+ }
+ }
+
+ // Return failed commits so that they can be retried later on
+ return failedCommits;
+ }
+
+ /**
+ * Empty method.
+ */
+ @Override
+ public void endOfInput() {
+ }
+
+ /**
+ * Closes the Pinot controller http client, clears the created temporary directory and
+ * shuts the thread pool down.
+ */
+ @Override
+ public void close() throws IOException {
+ pinotControllerClient.close();
+ tempDirectory.delete();
+ pool.shutdown();
+ }
+
+ /**
+ * Helper method for generating segment names using the segment name generator.
+ *
+ * @param globalCommittable Global committable the segment name shall be generated from
+ * @param sequenceId Incrementing counter
+ * @return generated segment name
+ */
+ private String getSegmentName(PinotSinkGlobalCommittable globalCommittable, int sequenceId) {
+ return segmentNameGenerator.generateSegmentName(sequenceId,
+ globalCommittable.getMinTimestamp(), globalCommittable.getMaxTimestamp());
+ }
+
+ /**
+ * Evaluates the status of already uploaded segments by requesting segment metadata from the
+ * Pinot controller.
+ *
+ * @param globalCommittable Global committable whose commit status gets evaluated
+ * @return Commit status
+ * @throws IOException
+ */
+ private CommitStatus getCommitStatus(PinotSinkGlobalCommittable globalCommittable) throws IOException {
+ List existingSegmentNames = new ArrayList<>();
+ List missingSegmentNames = new ArrayList<>();
+
+ // For all segment names that will be used to submit new segments, check whether the segment
+ // name already exists for the target table
+ for (int sequenceId = 0; sequenceId < globalCommittable.getDataFilePaths().size(); sequenceId++) {
+ String segmentName = getSegmentName(globalCommittable, sequenceId);
+ if (pinotControllerClient.tableHasSegment(tableName, segmentName)) {
+ // Segment name already exists
+ existingSegmentNames.add(segmentName);
+ } else {
+ // Segment name does not exist yet
+ missingSegmentNames.add(segmentName);
+ }
+ }
+ return new CommitStatus(existingSegmentNames, missingSegmentNames);
+ }
+
+ /**
+ * Wrapper for existing and missing segments in the Pinot cluster.
+ */
+ private static class CommitStatus {
+ private final List existingSegmentNames;
+ private final List missingSegmentNames;
+
+ CommitStatus(List existingSegmentNames, List missingSegmentNames) {
+ this.existingSegmentNames = existingSegmentNames;
+ this.missingSegmentNames = missingSegmentNames;
+ }
+
+ public List getExistingSegmentNames() {
+ return existingSegmentNames;
+ }
+
+ public List getMissingSegmentNames() {
+ return missingSegmentNames;
+ }
+ }
+
+ /**
+ * Helper class for committing a single segment. Downloads a data file from the shared filesystem,
+ * generates a segment from the data file and uploads segment to the Pinot controller.
+ */
+ private static class SegmentCommitter implements Callable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SegmentCommitter.class);
+
+ private final String pinotControllerHost;
+ private final String pinotControllerPort;
+ private final File tempDirectory;
+ private final FileSystemAdapter fsAdapter;
+ private final String dataFilePath;
+ private final String segmentName;
+ private final Schema tableSchema;
+ private final TableConfig tableConfig;
+ private final String timeColumnName;
+ private final TimeUnit segmentTimeUnit;
+
+ /**
+ * @param pinotControllerHost Host of the Pinot controller
+ * @param pinotControllerPort Port of the Pinot controller
+ * @param tempDirectory Directory to store temporary files in
+ * @param fsAdapter Filesystem adapter used to load data files from the shared file system
+ * @param dataFilePath Data file to load from the shared file system
+ * @param segmentName Name of the segment to create and commit
+ * @param tableSchema Pinot table schema
+ * @param tableConfig Pinot table config
+ * @param timeColumnName Name of the column containing the timestamp
+ * @param segmentTimeUnit Unit of the time column
+ */
+ SegmentCommitter(String pinotControllerHost, String pinotControllerPort,
+ File tempDirectory, FileSystemAdapter fsAdapter,
+ String dataFilePath, String segmentName, Schema tableSchema,
+ TableConfig tableConfig, String timeColumnName,
+ TimeUnit segmentTimeUnit) {
+ this.pinotControllerHost = pinotControllerHost;
+ this.pinotControllerPort = pinotControllerPort;
+ this.tempDirectory = tempDirectory;
+ this.fsAdapter = fsAdapter;
+ this.dataFilePath = dataFilePath;
+ this.segmentName = segmentName;
+ this.tableSchema = tableSchema;
+ this.tableConfig = tableConfig;
+ this.timeColumnName = timeColumnName;
+ this.segmentTimeUnit = segmentTimeUnit;
+ }
+
+ /**
+ * Downloads a segment from the shared file system via {@code fsAdapter}, generates a segment
+ * and finally uploads the segment to the Pinot controller
+ *
+ * @return True if the commit succeeded
+ */
+ @Override
+ public Boolean call() {
+ // Local copy of data file stored on the shared filesystem
+ File segmentData = null;
+ // File containing the final Pinot segment
+ File segmentFile = null;
+ try {
+ // Download data file from the shared filesystem
+ LOG.debug("Downloading data file {} from shared file system...", dataFilePath);
+ List serializedElements = fsAdapter.readFromSharedFileSystem(dataFilePath);
+ segmentData = FileSystemUtils.writeToLocalFile(serializedElements, tempDirectory);
+ LOG.debug("Successfully downloaded data file {} from shared file system", dataFilePath);
+
+ segmentFile = FileSystemUtils.createFileInDir(tempDirectory);
+ LOG.debug("Creating segment in " + segmentFile.getAbsolutePath());
+
+ // Creates a segment with name `segmentName` in `segmentFile`
+ generateSegment(segmentData, segmentFile, true);
+
+ // Uploads the recently created segment to the Pinot controller
+ uploadSegment(segmentFile);
+
+ // Commit successful
+ return true;
+ } catch (IOException e) {
+ LOG.error("Error while committing segment data stored on shared filesystem.", e);
+
+ // Commit failed
+ return false;
+ } finally {
+ // Finally cleanup all files created on the local filesystem
+ if (segmentData != null) {
+ segmentData.delete();
+ }
+ if (segmentFile != null) {
+ segmentFile.delete();
+ }
+ }
+ }
+
+ /**
+ * Creates a segment from the given parameters.
+ * This method was adapted from {@link org.apache.pinot.tools.admin.command.CreateSegmentCommand}.
+ *
+ * @param dataFile File containing the JSON data
+ * @param outDir Segment target path
+ * @param _postCreationVerification Verify segment after generation
+ */
+ private void generateSegment(File dataFile, File outDir, Boolean _postCreationVerification) {
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, tableSchema);
+ segmentGeneratorConfig.setSegmentName(segmentName);
+ segmentGeneratorConfig.setSegmentTimeUnit(segmentTimeUnit);
+ segmentGeneratorConfig.setTimeColumnName(timeColumnName);
+ segmentGeneratorConfig.setInputFilePath(dataFile.getPath());
+ segmentGeneratorConfig.setFormat(FileFormat.JSON);
+ segmentGeneratorConfig.setOutDir(outDir.getPath());
+ segmentGeneratorConfig.setTableName(tableConfig.getTableName());
+
+ try {
+ SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig);
+ driver.build();
+ File indexDir = new File(outDir, segmentName);
+ LOG.debug("Successfully created segment: {} in directory: {}", segmentName, indexDir);
+ if (_postCreationVerification) {
+ LOG.debug("Verifying the segment by loading it");
+ ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, ReadMode.mmap);
+ LOG.debug("Successfully loaded segment: {} of size: {} bytes", segmentName,
+ segment.getSegmentSizeBytes());
+ segment.destroy();
+ }
+ }
+ // SegmentIndexCreationDriverImpl throws generic Exceptions during init and build
+ // ImmutableSegmentLoader throws generic Exception during load
+ catch (Exception e) {
+ String message = String.format("Error while generating segment from file %s", dataFile.getAbsolutePath());
+ LOG.error(message, e);
+ throw new RuntimeException(message);
+ }
+ LOG.debug("Successfully created 1 segment from data file: {}", dataFile);
+ }
+
+ /**
+ * Uploads a segment using the Pinot admin tool.
+ *
+ * @param segmentFile File containing the segment to upload
+ * @throws IOException
+ */
+ private void uploadSegment(File segmentFile) throws IOException {
+ try {
+ UploadSegmentCommand cmd = new UploadSegmentCommand();
+ cmd.setControllerHost(pinotControllerHost);
+ cmd.setControllerPort(pinotControllerPort);
+ cmd.setSegmentDir(segmentFile.getAbsolutePath());
+ cmd.execute();
+ } catch (Exception e) {
+ // UploadSegmentCommand.execute() throws generic Exception
+ LOG.error("Could not upload segment {}", segmentFile.getAbsolutePath(), e);
+ throw new IOException(e.getMessage());
+ }
+ }
+ }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/exceptions/PinotControllerApiException.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/exceptions/PinotControllerApiException.java
new file mode 100644
index 00000000..c5283d6c
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/exceptions/PinotControllerApiException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.exceptions;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+
+/**
+ * Pinot controller API exception wrapper
+ */
+@Internal
+public class PinotControllerApiException extends IOException {
+
+ public PinotControllerApiException(String reason) {
+ super(reason);
+ }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/external/EventTimeExtractor.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/external/EventTimeExtractor.java
new file mode 100644
index 00000000..fe2647d0
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/external/EventTimeExtractor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.external;
+
+import org.apache.flink.api.connector.sink.SinkWriter;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Defines the interface for event time extractors
+ *
+ * @param Type of incoming elements
+ */
+public interface EventTimeExtractor extends Serializable {
+
+ /**
+ * Extracts event time from incoming elements.
+ *
+ * @param element Incoming element
+ * @param context Context of SinkWriter
+ * @return timestamp
+ */
+ long getEventTime(IN element, SinkWriter.Context context);
+
+ /**
+ * @return Name of column in Pinot target table that contains the timestamp.
+ */
+ String getTimeColumn();
+
+ /**
+ * @return Unit of the time column in the Pinot target table.
+ */
+ TimeUnit getSegmentTimeUnit();
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/external/JsonSerializer.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/external/JsonSerializer.java
new file mode 100644
index 00000000..8774ac19
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/external/JsonSerializer.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.external;
+
+import java.io.Serializable;
+
+/**
+ * Defines the interface for serializing incoming elements to JSON format.
+ * The JSON format is expected during Pinot segment creation.
+ *
+ * @param Type of incoming elements
+ */
+public abstract class JsonSerializer implements Serializable {
+
+ public abstract String toJson(IN element);
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/filesystem/FileSystemAdapter.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/filesystem/FileSystemAdapter.java
new file mode 100644
index 00000000..42610a84
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/filesystem/FileSystemAdapter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.filesystem;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Defines the interaction with a shared filesystem. The shared filesystem must be accessible from all
+ * nodes within the cluster than run a partition of the {@link org.apache.flink.streaming.connectors.pinot.PinotSink}.
+ */
+public interface FileSystemAdapter extends Serializable {
+
+ /**
+ * Writes a list of serialized elements to the shared filesystem.
+ *
+ * @param elements List of serialized elements
+ * @return Path identifying the remote file
+ * @throws IOException
+ */
+ String writeToSharedFileSystem(List elements) throws IOException;
+
+ /**
+ * Reads a previously written list of serialized elements from the shared filesystem.
+ *
+ * @param path Path returned by {@link #writeToSharedFileSystem}
+ * @return List of serialized elements read from the shared filesystem
+ * @throws IOException
+ */
+ List readFromSharedFileSystem(String path) throws IOException;
+
+ /**
+ * Deletes a file from the shared filesystem
+ *
+ * @param path Path returned by {@link #writeToSharedFileSystem}
+ * @throws IOException
+ */
+ void deleteFromSharedFileSystem(String path) throws IOException;
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/filesystem/FileSystemUtils.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/filesystem/FileSystemUtils.java
new file mode 100644
index 00000000..b61d9cf1
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/filesystem/FileSystemUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.UUID;
+
+@Internal
+public class FileSystemUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileSystemUtils.class);
+
+ /**
+ * Writes a list of serialized elements to the temp directory of local filesystem
+ * with prefix tempDirPrefix
+ *
+ * @param elements List of serialized elements
+ * @param targetDir Directory to create file in
+ * @return File containing the written data
+ * @throws IOException
+ */
+ public static File writeToLocalFile(List elements, File targetDir) throws IOException {
+ File dataFile = createFileInDir(targetDir);
+
+ Files.write(dataFile.toPath(), elements, Charset.defaultCharset());
+ LOG.debug("Successfully written data to file {}", dataFile.getAbsolutePath());
+
+ return dataFile;
+ }
+
+ /**
+ * Creates file with random name in targetDir.
+ *
+ * @param targetDir Directory to create file in
+ * @return New File
+ */
+ public static File createFileInDir(File targetDir) {
+ String fileName = String.format("%s.json", UUID.randomUUID().toString());
+ return new File(targetDir.toString(), fileName);
+ }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/segment/name/PinotSinkSegmentNameGenerator.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/segment/name/PinotSinkSegmentNameGenerator.java
new file mode 100644
index 00000000..ee3a9081
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/segment/name/PinotSinkSegmentNameGenerator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.segment.name;
+
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+
+import java.io.Serializable;
+
+/**
+ * Defines the segment name generator interface that is used to generate segment names. The segment
+ * name generator is required to be serializable. We expect users to inherit from
+ * {@link PinotSinkSegmentNameGenerator} in case they want to define their custom name generator.
+ */
+public interface PinotSinkSegmentNameGenerator extends SegmentNameGenerator, Serializable {
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/segment/name/SimpleSegmentNameGenerator.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/segment/name/SimpleSegmentNameGenerator.java
new file mode 100644
index 00000000..666673ca
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/segment/name/SimpleSegmentNameGenerator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.segment.name;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Adapted from {@link org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator}.
+ *
+ * Simple segment name generator which does not perform time conversion.
+ *
+ * The segment name is simply joining the following fields with '_' but ignoring all the {@code null}s.
+ *
+ * - Table name
+ * - Minimum time value
+ * - Maximum time value
+ * - Segment name postfix
+ * - Sequence id
+ *
+ */
+public class SimpleSegmentNameGenerator implements PinotSinkSegmentNameGenerator {
+
+ private final String tableName;
+ private final String segmentNamePostfix;
+
+ public SimpleSegmentNameGenerator(String tableName, String segmentNamePostfix) {
+ this.tableName = checkNotNull(tableName);
+ this.segmentNamePostfix = checkNotNull(segmentNamePostfix);
+ }
+
+ @Override
+ public String generateSegmentName(int sequenceId, @Nullable Object minTimeValue, @Nullable Object maxTimeValue) {
+ return JOINER
+ .join(tableName, minTimeValue, maxTimeValue, segmentNamePostfix, sequenceId >= 0 ? sequenceId : null);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder stringBuilder = new StringBuilder("SimpleSegmentNameGenerator: tableName=").append(tableName);
+ if (segmentNamePostfix != null) {
+ stringBuilder.append(", segmentNamePostfix=").append(segmentNamePostfix);
+ }
+ return stringBuilder.toString();
+ }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkCommittableSerializer.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkCommittableSerializer.java
new file mode 100644
index 00000000..ed61de26
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkCommittableSerializer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.serializer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
+
+import java.io.*;
+
+/**
+ * Serializer for {@link PinotSinkCommittable}
+ */
+@Internal
+public class PinotSinkCommittableSerializer implements SimpleVersionedSerializer {
+
+ private static final int CURRENT_VERSION = 1;
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(PinotSinkCommittable pinotSinkCommittable) throws IOException {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos)) {
+ out.writeLong(pinotSinkCommittable.getMinTimestamp());
+ out.writeLong(pinotSinkCommittable.getMaxTimestamp());
+ out.writeUTF(pinotSinkCommittable.getDataFilePath());
+ out.flush();
+ return baos.toByteArray();
+ }
+ }
+
+ @Override
+ public PinotSinkCommittable deserialize(int version, byte[] serialized) throws IllegalStateException, IOException {
+ switch (version) {
+ case 1:
+ return deserializeV1(serialized);
+ default:
+ throw new IllegalStateException("Unrecognized version or corrupt state: " + version);
+ }
+ }
+
+ private PinotSinkCommittable deserializeV1(byte[] serialized) throws IOException {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+ DataInputStream in = new DataInputStream(bais)) {
+ long minTimestamp = in.readLong();
+ long maxTimestamp = in.readLong();
+ String dataFilePath = in.readUTF();
+ return new PinotSinkCommittable(dataFilePath, minTimestamp, maxTimestamp);
+ }
+ }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkGlobalCommittableSerializer.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkGlobalCommittableSerializer.java
new file mode 100644
index 00000000..8e456206
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkGlobalCommittableSerializer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.serializer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommittable;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Serializer for {@link PinotSinkGlobalCommittable}
+ */
+@Internal
+public class PinotSinkGlobalCommittableSerializer implements SimpleVersionedSerializer {
+
+ private static final int CURRENT_VERSION = 1;
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(PinotSinkGlobalCommittable pinotSinkGlobalCommittable) throws IOException {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos)) {
+ out.writeLong(pinotSinkGlobalCommittable.getMinTimestamp());
+ out.writeLong(pinotSinkGlobalCommittable.getMaxTimestamp());
+
+ int size = pinotSinkGlobalCommittable.getDataFilePaths().size();
+ out.writeInt(size);
+ for (String dataFilePath : pinotSinkGlobalCommittable.getDataFilePaths()) {
+ out.writeUTF(dataFilePath);
+ }
+ out.flush();
+ return baos.toByteArray();
+ }
+ }
+
+ @Override
+ public PinotSinkGlobalCommittable deserialize(int version, byte[] serialized) throws IllegalStateException, IOException {
+ switch (version) {
+ case 1:
+ return deserializeV1(serialized);
+ default:
+ throw new IllegalStateException("Unrecognized version or corrupt state: " + version);
+ }
+ }
+
+ private PinotSinkGlobalCommittable deserializeV1(byte[] serialized) throws IOException {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+ DataInputStream in = new DataInputStream(bais)) {
+ long minTimestamp = in.readLong();
+ long maxTimestamp = in.readLong();
+
+ long size = in.readInt();
+ List dataFilePaths = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ dataFilePaths.add(in.readUTF());
+ }
+ return new PinotSinkGlobalCommittable(dataFilePaths, minTimestamp, maxTimestamp);
+ }
+ }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkWriterStateSerializer.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkWriterStateSerializer.java
new file mode 100644
index 00000000..6dc7efaa
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/serializer/PinotSinkWriterStateSerializer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.serializer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriterState;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Serializer for {@link PinotSinkWriterState}
+ */
+@Internal
+public class PinotSinkWriterStateSerializer implements SimpleVersionedSerializer {
+
+ private static final int CURRENT_VERSION = 1;
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(PinotSinkWriterState writerState) throws IOException {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos)) {
+ out.writeLong(writerState.getMinTimestamp());
+ out.writeLong(writerState.getMaxTimestamp());
+
+ out.writeInt(writerState.getSerializedElements().size());
+ for (String serialized : writerState.getSerializedElements()) {
+ out.writeUTF(serialized);
+ }
+
+ out.flush();
+ return baos.toByteArray();
+ }
+ }
+
+ @Override
+ public PinotSinkWriterState deserialize(int version, byte[] serialized) throws IllegalStateException, IOException {
+ switch (version) {
+ case 1:
+ return deserializeV1(serialized);
+ default:
+ throw new IllegalStateException("Unrecognized version or corrupt state: " + version);
+ }
+ }
+
+ private PinotSinkWriterState deserializeV1(byte[] serialized) throws IOException {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+ DataInputStream in = new DataInputStream(bais)) {
+ long minTimestamp = in.readLong();
+ long maxTimestamp = in.readLong();
+
+ long size = in.readInt();
+ List serializedElements = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ serializedElements.add(in.readUTF());
+ }
+ return new PinotSinkWriterState(serializedElements, minTimestamp, maxTimestamp);
+ }
+ }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriter.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriter.java
new file mode 100644
index 00000000..1a84e022
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriter.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.writer;
+
+import com.google.common.collect.Iterables;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Accepts incoming elements and creates {@link PinotSinkCommittable}s out of them on request.
+ *
+ * @param Type of incoming elements
+ */
+@Internal
+public class PinotSinkWriter implements SinkWriter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PinotSinkWriter.class);
+
+ private final int maxRowsPerSegment;
+ private final EventTimeExtractor eventTimeExtractor;
+ private final JsonSerializer jsonSerializer;
+
+ private final List> activeSegments;
+ private final FileSystemAdapter fsAdapter;
+
+ private final int subtaskId;
+
+ /**
+ * @param subtaskId Subtask id provided by Flink
+ * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment
+ * @param eventTimeExtractor Defines the way event times are extracted from received objects
+ * @param jsonSerializer Serializer used to convert elements to JSON
+ * @param fsAdapter Filesystem adapter used to save files for sharing files across nodes
+ */
+ public PinotSinkWriter(int subtaskId, int maxRowsPerSegment,
+ EventTimeExtractor eventTimeExtractor,
+ JsonSerializer jsonSerializer, FileSystemAdapter fsAdapter) {
+ this.subtaskId = subtaskId;
+ this.maxRowsPerSegment = maxRowsPerSegment;
+ this.eventTimeExtractor = checkNotNull(eventTimeExtractor);
+ this.jsonSerializer = checkNotNull(jsonSerializer);
+ this.fsAdapter = checkNotNull(fsAdapter);
+ this.activeSegments = new ArrayList<>();
+ }
+
+ /**
+ * Takes elements from an upstream tasks and writes them into {@link PinotWriterSegment}
+ *
+ * @param element Object from upstream task
+ * @param context SinkWriter context
+ * @throws IOException
+ */
+ @Override
+ public void write(IN element, Context context) throws IOException {
+ final PinotWriterSegment inProgressSegment = getOrCreateInProgressSegment();
+ inProgressSegment.write(element, eventTimeExtractor.getEventTime(element, context));
+ }
+
+ /**
+ * Creates {@link PinotSinkCommittable}s from elements previously received via {@link #write}.
+ * If flush is set, all {@link PinotWriterSegment}s are transformed into
+ * {@link PinotSinkCommittable}s. If flush is not set, only currently non-active
+ * {@link PinotSinkCommittable}s are transformed into {@link PinotSinkCommittable}s.
+ * To convert a {@link PinotWriterSegment} into a {@link PinotSinkCommittable} the data gets
+ * written to the shared filesystem. Moreover, minimum and maximum timestamps are identified.
+ * Finally, all {@link PinotWriterSegment}s transformed into {@link PinotSinkCommittable}s are
+ * removed from {@link #activeSegments}.
+ *
+ * @param flush Flush all currently known elements into the {@link PinotSinkCommittable}s
+ * @return List of {@link PinotSinkCommittable} to process in {@link org.apache.flink.streaming.connectors.pinot.committer.PinotSinkGlobalCommitter}
+ * @throws IOException
+ */
+ @Override
+ public List prepareCommit(boolean flush) throws IOException {
+ // Identify segments to commit. If the flush argument is set all segments shall be committed.
+ // Otherwise, take only those PinotWriterSegments that do not accept any more elements.
+ List> segmentsToCommit = activeSegments.stream()
+ .filter(s -> flush || !s.acceptsElements())
+ .collect(Collectors.toList());
+ LOG.debug("Identified {} segments to commit [subtaskId={}]", segmentsToCommit.size(), subtaskId);
+
+ LOG.debug("Creating committables... [subtaskId={}]", subtaskId);
+ List committables = new ArrayList<>();
+ for (final PinotWriterSegment segment : segmentsToCommit) {
+ committables.add(segment.prepareCommit());
+ }
+ LOG.debug("Created {} committables [subtaskId={}]", committables.size(), subtaskId);
+
+ // Remove all PinotWriterSegments that will be emitted within the committables.
+ activeSegments.removeAll(segmentsToCommit);
+ return committables;
+ }
+
+ /**
+ * Gets the {@link PinotWriterSegment} still accepting elements or creates a new one.
+ *
+ * @return {@link PinotWriterSegment} accepting at least one more element
+ */
+ private PinotWriterSegment getOrCreateInProgressSegment() {
+ final PinotWriterSegment latestSegment = Iterables.getLast(activeSegments, null);
+ if (latestSegment == null || !latestSegment.acceptsElements()) {
+ final PinotWriterSegment inProgressSegment = new PinotWriterSegment<>(maxRowsPerSegment, jsonSerializer, fsAdapter);
+ activeSegments.add(inProgressSegment);
+ return inProgressSegment;
+ }
+ return latestSegment;
+ }
+
+ /**
+ * Snapshots the latest PinotWriterSegment (if existent), so that the contained (and not yet
+ * committed) elements can be recovered later on in case of a failure.
+ *
+ * @return A list containing at most one PinotSinkWriterState
+ */
+ @Override
+ public List snapshotState() {
+ final PinotWriterSegment latestSegment = Iterables.getLast(activeSegments, null);
+ if (latestSegment == null || !latestSegment.acceptsElements()) {
+ return new ArrayList<>();
+ }
+
+ return Collections.singletonList(latestSegment.snapshotState());
+ }
+
+ /**
+ * Initializes the writer according to a previously taken snapshot.
+ *
+ * @param state PinotSinkWriterState extracted from snapshot
+ */
+ public void initializeState(PinotSinkWriterState state) {
+ if (activeSegments.size() != 0) {
+ throw new IllegalStateException("Please call the initialization before creating the first PinotWriterSegment.");
+ }
+ // Create a new PinotWriterSegment and recover its state from the given PinotSinkWriterState
+ final PinotWriterSegment inProgressSegment = new PinotWriterSegment<>(maxRowsPerSegment, jsonSerializer, fsAdapter);
+ inProgressSegment.initializeState(state.getSerializedElements(), state.getMinTimestamp(), state.getMaxTimestamp());
+ activeSegments.add(inProgressSegment);
+ }
+
+ /**
+ * Empty method, as we do not open any connections.
+ */
+ @Override
+ public void close() {
+ }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java
new file mode 100644
index 00000000..0e23e2fa
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotSinkWriterState.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.writer;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class PinotSinkWriterState implements Serializable {
+
+ private final List serializedElements;
+ private final long minTimestamp;
+ private final long maxTimestamp;
+
+ public PinotSinkWriterState(List serializedElements, long minTimestamp, long maxTimestamp) {
+ this.serializedElements = serializedElements;
+ this.minTimestamp = minTimestamp;
+ this.maxTimestamp = maxTimestamp;
+ }
+
+ public List getSerializedElements() {
+ return serializedElements;
+ }
+
+ public long getMinTimestamp() {
+ return minTimestamp;
+ }
+
+ public long getMaxTimestamp() {
+ return maxTimestamp;
+ }
+}
diff --git a/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java
new file mode 100644
index 00000000..50be1459
--- /dev/null
+++ b/flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link PinotWriterSegment} represents exactly one segment that can be found in the Pinot
+ * cluster once the commit has been completed.
+ *
+ * @param Type of incoming elements
+ */
+@Internal
+public class PinotWriterSegment implements Serializable {
+
+ private final int maxRowsPerSegment;
+ private final JsonSerializer jsonSerializer;
+ private final FileSystemAdapter fsAdapter;
+
+ private boolean acceptsElements = true;
+
+ private final List serializedElements;
+ private String dataPathOnSharedFS;
+ private long minTimestamp = Long.MAX_VALUE;
+ private long maxTimestamp = Long.MIN_VALUE;
+
+ /**
+ * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment
+ * @param jsonSerializer Serializer used to convert elements to JSON
+ * @param fsAdapter Filesystem adapter used to save files for sharing files across nodes
+ */
+ PinotWriterSegment(int maxRowsPerSegment, JsonSerializer jsonSerializer, FileSystemAdapter fsAdapter) {
+ checkArgument(maxRowsPerSegment > 0L);
+ this.maxRowsPerSegment = maxRowsPerSegment;
+ this.jsonSerializer = checkNotNull(jsonSerializer);
+ this.fsAdapter = checkNotNull(fsAdapter);
+ this.serializedElements = new ArrayList<>();
+ }
+
+ /**
+ * Takes elements and stores them in memory until either {@link #maxRowsPerSegment} is reached
+ * or {@link #prepareCommit} is called.
+ *
+ * @param element Object from upstream task
+ * @param timestamp Timestamp assigned to element
+ * @throws IOException
+ */
+ public void write(IN element, long timestamp) throws IOException {
+ if (!acceptsElements()) {
+ throw new IllegalStateException("This PinotSegmentWriter does not accept any elements anymore.");
+ }
+ // Store serialized element in serializedElements
+ serializedElements.add(jsonSerializer.toJson(element));
+ minTimestamp = Long.min(minTimestamp, timestamp);
+ maxTimestamp = Long.max(maxTimestamp, timestamp);
+
+ // Writes elements to local filesystem once the maximum number of items is reached
+ if (serializedElements.size() == maxRowsPerSegment) {
+ acceptsElements = false;
+ dataPathOnSharedFS = writeToSharedFilesystem();
+ serializedElements.clear();
+ }
+ }
+
+ /**
+ * Writes elements to local file (if not already done). Copies just created file to the shared
+ * filesystem defined via {@link FileSystemAdapter} and creates a {@link PinotSinkCommittable}.
+ *
+ * @return {@link PinotSinkCommittable} pointing to file on shared filesystem
+ * @throws IOException
+ */
+ public PinotSinkCommittable prepareCommit() throws IOException {
+ if (dataPathOnSharedFS == null) {
+ dataPathOnSharedFS = writeToSharedFilesystem();
+ }
+ return new PinotSinkCommittable(dataPathOnSharedFS, minTimestamp, maxTimestamp);
+ }
+
+ /**
+ * Takes elements from {@link #serializedElements} and writes them to the shared filesystem.
+ *
+ * @return Path pointing to just written data on shared filesystem
+ * @throws IOException
+ */
+ private String writeToSharedFilesystem() throws IOException {
+ return fsAdapter.writeToSharedFileSystem(serializedElements);
+ }
+
+ /**
+ * Determines whether this segment can accept at least one more elements
+ *
+ * @return True if at least one more element will be accepted
+ */
+ public boolean acceptsElements() {
+ return acceptsElements;
+ }
+
+ /**
+ * Recovers a previously written state.
+ *
+ * @param _serializedElements List containing received, but not yet committed list of serialized elements.
+ * @param _minTimestamp Minimum event timestamp of all elements
+ * @param _maxTimestamp Maximum event timestamp of all elements
+ */
+ public void initializeState(List _serializedElements, long _minTimestamp, long _maxTimestamp) {
+ if (!serializedElements.isEmpty()) {
+ throw new IllegalStateException("Cannot initialize a PinotWriterSegment that has already received elements.");
+ }
+
+ serializedElements.addAll(_serializedElements);
+ minTimestamp = _minTimestamp;
+ maxTimestamp = _maxTimestamp;
+ }
+
+ /**
+ * Snapshots the current state of an active {@link PinotWriterSegment}.
+ *
+ * @return List of elements currently stored within the {@link PinotWriterSegment}
+ */
+ public PinotSinkWriterState snapshotState() {
+ if (!acceptsElements()) {
+ throw new IllegalStateException("Snapshots can only be created of in-progress segments.");
+ }
+
+ return new PinotSinkWriterState(serializedElements, minTimestamp, maxTimestamp);
+ }
+}
diff --git a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/LocalFileSystemAdapter.java b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/LocalFileSystemAdapter.java
new file mode 100644
index 00000000..069daa39
--- /dev/null
+++ b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/LocalFileSystemAdapter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The LocalFileSystemAdapter is used when sharing files via the local filesystem.
+ * Keep in mind that using this FileSystemAdapter requires running the Flink app on a single node.
+ */
+public class LocalFileSystemAdapter implements FileSystemAdapter {
+
+ private final String tempDirPrefix;
+
+ public LocalFileSystemAdapter(String tempDirPrefix) {
+ this.tempDirPrefix = checkNotNull(tempDirPrefix);
+ }
+
+ /**
+ * Writes a list of serialized elements to the local filesystem.
+ *
+ * @param elements List of serialized elements
+ * @return Path identifying the written file
+ * @throws IOException
+ */
+ @Override
+ public String writeToSharedFileSystem(List elements) throws IOException {
+ File tempDir = Files.createTempDirectory(tempDirPrefix).toFile();
+ return FileSystemUtils.writeToLocalFile(elements, tempDir).getAbsolutePath();
+ }
+
+ /**
+ * Reads a previously written list of serialized elements from the local filesystem.
+ *
+ * @param path Path returned by {@link #writeToSharedFileSystem}
+ * @return List of serialized elements read from the local filesystem
+ * @throws IOException
+ */
+ @Override
+ public List readFromSharedFileSystem(String path) throws IOException {
+ File dataFile = new File(path);
+ return Files.readAllLines(dataFile.toPath(), Charset.defaultCharset());
+ }
+
+ /**
+ * Deletes a file from the local filesystem
+ *
+ * @param path Path returned by {@link #writeToSharedFileSystem}
+ * @throws IOException
+ */
+ @Override
+ public void deleteFromSharedFileSystem(String path) {
+ new File(path).delete();
+ }
+}
diff --git a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java
new file mode 100644
index 00000000..8649eebe
--- /dev/null
+++ b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotSinkTest.java
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException;
+import org.apache.flink.streaming.connectors.pinot.external.EventTimeExtractor;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.flink.streaming.connectors.pinot.segment.name.PinotSinkSegmentNameGenerator;
+import org.apache.flink.streaming.connectors.pinot.segment.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.client.PinotClientException;
+import org.apache.pinot.client.ResultSet;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.opentest4j.AssertionFailedError;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * E2e tests for Pinot Sink using BATCH and STREAMING execution mode
+ */
+public class PinotSinkTest extends PinotTestBase {
+
+ private static final int MAX_ROWS_PER_SEGMENT = 5;
+ private static final long STREAMING_CHECKPOINTING_INTERVAL = 50;
+ private static final int DATA_CHECKING_TIMEOUT_SECONDS = 60;
+ private static final AtomicBoolean hasFailedOnce = new AtomicBoolean(false);
+ private static CountDownLatch latch;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ super.setUp();
+ // Reset hasFailedOnce flag used during failure recovery testing before each test.
+ hasFailedOnce.set(false);
+ // Reset latch used to keep the generator streaming source up until the test is completed.
+ latch = new CountDownLatch(1);
+ }
+
+ /**
+ * Tests the BATCH execution of the {@link PinotSink}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testBatchSink() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.setParallelism(2);
+
+ List rawData = getRawTestData(12);
+ DataStream dataStream = setupBatchDataSource(env, rawData);
+ setupSink(dataStream);
+
+ // Run
+ env.execute();
+
+ // Check for data in Pinot
+ checkForDataInPinotWithRetry(rawData);
+ }
+
+ /**
+ * Tests failure recovery of the {@link PinotSink} using BATCH execution mode.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testFailureRecoveryInBatchingSink() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10));
+ env.setParallelism(2);
+
+ List rawData = getRawTestData(12);
+ DataStream dataStream = setupBatchDataSource(env, rawData);
+ dataStream = setupFailingMapper(dataStream, 8);
+ setupSink(dataStream);
+
+ // Run
+ env.execute();
+
+ // Check for data in Pinot
+ checkForDataInPinotWithRetry(rawData);
+ }
+
+ /**
+ * Tests the STREAMING execution of the {@link PinotSink}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testStreamingSink() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.setParallelism(2);
+ env.enableCheckpointing(STREAMING_CHECKPOINTING_INTERVAL);
+
+ List rawData = getRawTestData(20);
+ DataStream dataStream = setupStreamingDataSource(env, rawData);
+ setupSink(dataStream);
+
+ // Start execution of job
+ env.executeAsync();
+
+ // Check for data in Pinot
+ checkForDataInPinotWithRetry(rawData);
+
+ // Generator source can now shut down
+ latch.countDown();
+ }
+
+ /**
+ * Tests failure recovery of the {@link PinotSink} using STREAMING execution mode.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testFailureRecoveryInStreamingSink() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ env.setParallelism(1);
+ env.enableCheckpointing(STREAMING_CHECKPOINTING_INTERVAL);
+
+ List rawData = getRawTestData(20);
+ DataStream dataStream = setupFailingStreamingDataSource(env, rawData, 12);
+ setupSink(dataStream);
+
+ // Start execution of job
+ env.executeAsync();
+
+ // Check for data in Pinot
+ checkForDataInPinotWithRetry(rawData);
+
+ // Generator source can now shut down
+ latch.countDown();
+ }
+
+ /**
+ * Generates a small test dataset consisting of {@link SingleColumnTableRow}s.
+ *
+ * @return List of SingleColumnTableRow
+ */
+ private List getRawTestData(int numItems) {
+ return IntStream.range(1, numItems + 1)
+ .mapToObj(num -> "ColValue" + num)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Setup the data source for STREAMING tests.
+ *
+ * @param env Stream execution environment
+ * @param rawDataValues Data values to send
+ * @return resulting data stream
+ */
+ private DataStream setupStreamingDataSource(StreamExecutionEnvironment env, List rawDataValues) {
+ StreamingSource source = new StreamingSource.Builder(rawDataValues, 10).build();
+ return env.addSource(source)
+ .name("Test input");
+ }
+
+ /**
+ * Setup the data source for STREAMING tests.
+ *
+ * @param env Stream execution environment
+ * @param rawDataValues Data values to send
+ * @param failOnceAtNthElement Number of elements to process before raising the exception
+ * @return resulting data stream
+ */
+ private DataStream setupFailingStreamingDataSource(StreamExecutionEnvironment env, List rawDataValues, int failOnceAtNthElement) {
+ StreamingSource source = new StreamingSource.Builder(rawDataValues, 10)
+ .raiseFailureOnce(failOnceAtNthElement)
+ .build();
+ return env.addSource(source)
+ .name("Test input");
+ }
+
+ /**
+ * Setup the data source for BATCH tests.
+ *
+ * @param env Stream execution environment
+ * @param rawDataValues Data values to send
+ * @return resulting data stream
+ */
+ private DataStream setupBatchDataSource(StreamExecutionEnvironment env, List rawDataValues) {
+ return env.fromCollection(rawDataValues)
+ .map(value -> new SingleColumnTableRow(value, System.currentTimeMillis()))
+ .name("Test input");
+ }
+
+ /**
+ * Setup a mapper that fails when processing the nth element with n = failOnceAtNthElement.
+ *
+ * @param dataStream Input data stream
+ * @param failOnceAtNthElement Number of elements to process before raising the exception
+ * @return resulting data stream
+ */
+ private DataStream setupFailingMapper(DataStream dataStream, int failOnceAtNthElement) {
+ AtomicInteger messageCounter = new AtomicInteger(0);
+
+ return dataStream.map(element -> {
+ if (!hasFailedOnce.get() && messageCounter.incrementAndGet() == failOnceAtNthElement) {
+ hasFailedOnce.set(true);
+ throw new Exception(String.format("Mapper was expected to fail after %d elements", failOnceAtNthElement));
+ }
+ return element;
+ });
+ }
+
+ /**
+ * Sets up a DataStream using the provided execution environment and the provided input data.
+ *
+ * @param dataStream data stream
+ */
+ private void setupSink(DataStream dataStream) {
+ String tempDirPrefix = "flink-pinot-connector-test";
+ PinotSinkSegmentNameGenerator segmentNameGenerator = new SimpleSegmentNameGenerator(getTableName(), "flink-connector");
+ FileSystemAdapter fsAdapter = new LocalFileSystemAdapter(tempDirPrefix);
+ JsonSerializer jsonSerializer = new SingleColumnTableRowSerializer();
+
+ EventTimeExtractor eventTimeExtractor = new SingleColumnTableRowEventTimeExtractor();
+
+ PinotSink sink = new PinotSink.Builder(getPinotHost(), getPinotControllerPort(), getTableName())
+ .withMaxRowsPerSegment(MAX_ROWS_PER_SEGMENT)
+ .withTempDirectoryPrefix(tempDirPrefix)
+ .withJsonSerializer(jsonSerializer)
+ .withEventTimeExtractor(eventTimeExtractor)
+ .withSegmentNameGenerator(segmentNameGenerator)
+ .withFileSystemAdapter(fsAdapter)
+ .withNumCommitThreads(2)
+ .build();
+
+ // Sink into Pinot
+ dataStream.sinkTo(sink).name("Pinot sink");
+ }
+
+ /**
+ * As Pinot might take some time to index the recently pushed segments we might need to retry
+ * the {@link #checkForDataInPinot} method multiple times. This method provides a simple wrapper
+ * using linear retry backoff delay.
+ *
+ * @param rawData Data to expect in the Pinot table
+ * @throws InterruptedException
+ */
+ private void checkForDataInPinotWithRetry(List rawData) throws InterruptedException, PinotControllerApiException {
+ long endTime = System.currentTimeMillis() + 1000L * DATA_CHECKING_TIMEOUT_SECONDS;
+ // Use max 10 retries with linear retry backoff delay
+ long retryDelay = 1000L / 10 * DATA_CHECKING_TIMEOUT_SECONDS;
+ while (System.currentTimeMillis() < endTime) {
+ try {
+ checkForDataInPinot(rawData);
+ // In case of no error, we can skip further retries
+ return;
+ } catch (AssertionFailedError | PinotControllerApiException | PinotClientException e) {
+ // In case of an error retry after delay
+ Thread.sleep(retryDelay);
+ }
+ }
+
+ // Finally check for data in Pinot if retryTimeoutInSeconds was exceeded
+ checkForDataInPinot(rawData);
+ }
+
+ /**
+ * Checks whether data is present in the Pinot target table. numElementsToCheck defines the
+ * number of elements (from the head of data) to check for existence in the pinot table.
+ *
+ * @param rawData Data to expect in the Pinot table
+ * @throws AssertionFailedError in case the assertion fails
+ * @throws PinotControllerApiException in case there aren't any rows in the Pinot table
+ */
+ private void checkForDataInPinot(List rawData) throws AssertionFailedError, PinotControllerApiException, PinotClientException {
+ // Now get the result from Pinot and verify if everything is there
+ ResultSet resultSet = pinotHelper.getTableEntries(getTableName(), rawData.size() + 5);
+
+ Assertions.assertEquals(rawData.size(), resultSet.getRowCount(),
+ String.format("Expected %d elements in Pinot but saw %d", rawData.size(), resultSet.getRowCount()));
+
+ // Check output strings
+ List output = IntStream.range(0, resultSet.getRowCount())
+ .mapToObj(i -> resultSet.getString(i, 0))
+ .collect(Collectors.toList());
+
+ for (String test : rawData) {
+ Assertions.assertTrue(output.contains(test), "Missing " + test);
+ }
+ }
+
+ /**
+ * EventTimeExtractor for {@link SingleColumnTableRow} used in e2e tests.
+ * Extracts the timestamp column from {@link SingleColumnTableRow}.
+ */
+ private static class SingleColumnTableRowEventTimeExtractor implements EventTimeExtractor {
+
+ @Override
+ public long getEventTime(SingleColumnTableRow element, SinkWriter.Context context) {
+ return element.getTimestamp();
+ }
+
+ @Override
+ public String getTimeColumn() {
+ return "timestamp";
+ }
+
+ @Override
+ public TimeUnit getSegmentTimeUnit() {
+ return TimeUnit.MILLISECONDS;
+ }
+ }
+
+ /**
+ * Simple source that publishes data and finally waits for {@link #latch}.
+ * By setting {@link #failOnceAtNthElement} > -1, one can define the number of elements to
+ * process before raising an exception. If configured, the exception will only be raised once.
+ */
+ private static class StreamingSource implements SourceFunction, CheckpointedFunction {
+
+ private static final int serialVersionUID = 1;
+
+ private final List rawDataValues;
+ private final int sleepDurationMs;
+ private final int failOnceAtNthElement;
+
+ private int numElementsEmitted = 0;
+
+ private final AtomicBoolean waitingForNextSnapshot;
+ private final AtomicBoolean awaitedSnapshotCreated;
+
+ private ListState state = null;
+
+ private StreamingSource(final List rawDataValues, final int sleepDurationMs, int failOnceAtNthElement) {
+ this.rawDataValues = rawDataValues;
+ checkArgument(sleepDurationMs > 0);
+ this.sleepDurationMs = sleepDurationMs;
+ checkArgument(failOnceAtNthElement == -1 || failOnceAtNthElement > MAX_ROWS_PER_SEGMENT);
+ this.failOnceAtNthElement = failOnceAtNthElement;
+
+ // Initializes exception raising logic
+ this.waitingForNextSnapshot = new AtomicBoolean(false);
+ this.awaitedSnapshotCreated = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void run(final SourceContext ctx) throws Exception {
+ while (numElementsEmitted < rawDataValues.size()) {
+ if (!hasFailedOnce.get() && failOnceAtNthElement == numElementsEmitted) {
+ failAfterNextSnapshot();
+ }
+
+ synchronized (ctx.getCheckpointLock()) {
+ SingleColumnTableRow element = new SingleColumnTableRow(
+ rawDataValues.get(numElementsEmitted), System.currentTimeMillis());
+ ctx.collect(element);
+ numElementsEmitted++;
+ }
+ Thread.sleep(sleepDurationMs);
+ }
+
+ // Keep generator source up until the test was completed.
+ latch.await();
+ }
+
+ /**
+ * When {@link #failOnceAtNthElement} elements were received, we raise an exception after
+ * the next checkpoint was created. We ensure that at least one segment has been committed
+ * to Pinot by then, as we require {@link #failOnceAtNthElement} to be greater than
+ * {@link #MAX_ROWS_PER_SEGMENT} (at a parallelism of 1). This allows to check whether the
+ * snapshot creation and failure recovery in
+ * {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter} works properly,
+ * respecting the already committed elements and those that are stored in an active
+ * {@link org.apache.flink.streaming.connectors.pinot.writer.PinotWriterSegment}. Committed
+ * elements must not be saved to the snapshot while those in an active segment must be saved
+ * to the snapshot in order to enable later-on recovery.
+ *
+ * @throws Exception
+ */
+ private void failAfterNextSnapshot() throws Exception {
+ hasFailedOnce.set(true);
+ waitingForNextSnapshot.set(true);
+
+ // Waiting for the next snapshot ensures that
+ // at least one segment has been committed to Pinot
+ while (!awaitedSnapshotCreated.get()) {
+ Thread.sleep(50);
+ }
+ throw new Exception(String.format("Source was expected to fail after %d elements", failOnceAtNthElement));
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ state = context.getOperatorStateStore()
+ .getListState(new ListStateDescriptor<>("state", IntSerializer.INSTANCE));
+
+ for (Integer i : state.get()) {
+ numElementsEmitted += i;
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ state.clear();
+ state.add(numElementsEmitted);
+
+ // Notify that the awaited snapshot was been created
+ if (waitingForNextSnapshot.get()) {
+ awaitedSnapshotCreated.set(true);
+ }
+ }
+
+ static class Builder {
+ final List rawDataValues;
+ final int sleepDurationMs;
+ int failOnceAtNthElement = -1;
+
+ Builder(List rawDataValues, int sleepDurationMs) {
+ this.rawDataValues = rawDataValues;
+ this.sleepDurationMs = sleepDurationMs;
+ }
+
+ public Builder raiseFailureOnce(int failOnceAtNthElement) {
+ checkArgument(failOnceAtNthElement > MAX_ROWS_PER_SEGMENT,
+ "failOnceAtNthElement (if set) is required to be larger than the number of elements per segment (MAX_ROWS_PER_SEGMENT).");
+ this.failOnceAtNthElement = failOnceAtNthElement;
+ return this;
+ }
+
+ public StreamingSource build() {
+ return new StreamingSource(rawDataValues, sleepDurationMs, failOnceAtNthElement);
+ }
+ }
+ }
+}
diff --git a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java
new file mode 100644
index 00000000..a5f50215
--- /dev/null
+++ b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestBase.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import org.apache.flink.util.TestLogger;
+import org.apache.pinot.spi.config.table.*;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.time.Duration;
+
+/**
+ * Base class for PinotSink e2e tests
+ */
+@Testcontainers
+public class PinotTestBase extends TestLogger {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(PinotTestBase.class);
+
+ private static final String DOCKER_IMAGE_NAME = "apachepinot/pinot:0.6.0";
+ private static final Integer PINOT_INTERNAL_BROKER_PORT = 8000;
+ private static final Integer PINOT_INTERNAL_CONTROLLER_PORT = 9000;
+
+ protected static TableConfig TABLE_CONFIG;
+ protected static final Schema TABLE_SCHEMA = PinotTableConfig.getTableSchema();
+ protected static PinotTestHelper pinotHelper;
+
+ /**
+ * Creates the Pinot testcontainer. We delay the start of tests until Pinot has started all
+ * internal components. This is identified through a log statement.
+ */
+ @Container
+ public static GenericContainer> pinot = new GenericContainer<>(DockerImageName.parse(DOCKER_IMAGE_NAME))
+ .withCommand("QuickStart", "-type", "batch")
+ .withExposedPorts(PINOT_INTERNAL_BROKER_PORT, PINOT_INTERNAL_CONTROLLER_PORT)
+ .waitingFor(
+ // Wait for controller, server and broker instances to be available
+ new HttpWaitStrategy()
+ .forPort(PINOT_INTERNAL_CONTROLLER_PORT)
+ .forPath("/instances")
+ .forStatusCode(200)
+ .forResponsePredicate(res -> {
+ try {
+ JsonNode instances = JsonUtils.stringToJsonNode(res).get("instances");
+ // Expect 3 instances to be up and running (controller, broker and server)
+ return instances.size() == 3;
+ } catch (IOException e) {
+ LOG.error("Error while reading json response in HttpWaitStrategy.", e);
+ }
+ return false;
+ })
+ // Allow Pinot to take up to 180s for starting up
+ .withStartupTimeout(Duration.ofSeconds(180))
+ );
+
+ /**
+ * Creates a new instance of the {@link PinotTestHelper} using the testcontainer port mappings
+ * and creates the test table.
+ *
+ * @throws IOException
+ */
+ @BeforeEach
+ public void setUp() throws IOException {
+ TABLE_CONFIG = PinotTableConfig.getTableConfig();
+ pinotHelper = new PinotTestHelper(getPinotHost(), getPinotControllerPort(), getPinotBrokerPort());
+ pinotHelper.createTable(TABLE_CONFIG, TABLE_SCHEMA);
+ }
+
+ /**
+ * Delete the test table after each test.
+ *
+ * @throws Exception
+ */
+ @AfterEach
+ public void tearDown() throws Exception {
+ pinotHelper.deleteTable(TABLE_CONFIG, TABLE_SCHEMA);
+ }
+
+ /**
+ * Returns the current Pinot table name
+ *
+ * @return Pinot table name
+ */
+ protected String getTableName() {
+ return TABLE_CONFIG.getTableName();
+ }
+
+ /**
+ * Returns the host the Pinot container is available at
+ *
+ * @return Pinot container host
+ */
+ protected String getPinotHost() {
+ return pinot.getHost();
+ }
+
+
+ /**
+ * Returns the Pinot controller port from the container ports.
+ *
+ * @return Pinot controller port
+ */
+ protected String getPinotControllerPort() {
+ return pinot.getMappedPort(PINOT_INTERNAL_CONTROLLER_PORT).toString();
+ }
+
+ /**
+ * Returns the Pinot broker port from the container ports.
+ *
+ * @return Pinot broker port
+ */
+ private String getPinotBrokerPort() {
+ return pinot.getMappedPort(PINOT_INTERNAL_BROKER_PORT).toString();
+ }
+
+ /**
+ * Class defining the elements passed to the {@link PinotSink} during the tests.
+ */
+ protected static class SingleColumnTableRow {
+
+ private String _col1;
+ private Long _timestamp;
+
+ SingleColumnTableRow(@JsonProperty(value = "col1", required = true) String col1,
+ @JsonProperty(value = "timestamp", required = true) Long timestamp) {
+ this._col1 = col1;
+ this._timestamp = timestamp;
+ }
+
+ @JsonProperty("col1")
+ public String getCol1() {
+ return this._col1;
+ }
+
+ public void setCol1(String _col1) {
+ this._col1 = _col1;
+ }
+
+ @JsonProperty("timestamp")
+ public Long getTimestamp() {
+ return this._timestamp;
+ }
+
+ public void setTimestamp(Long timestamp) {
+ this._timestamp = timestamp;
+ }
+ }
+
+ /**
+ * Serializes {@link SingleColumnTableRow} to JSON.
+ */
+ protected static class SingleColumnTableRowSerializer extends JsonSerializer {
+
+ @Override
+ public String toJson(SingleColumnTableRow element) {
+ return JsonUtils.objectToJsonNode(element).toString();
+ }
+ }
+
+ /**
+ * Pinot table configuration helpers.
+ */
+ private static class PinotTableConfig {
+
+ static final String TABLE_NAME_PREFIX = "FLTable";
+ static final String SCHEMA_NAME = "FLTableSchema";
+
+ private static SegmentsValidationAndRetentionConfig getValidationConfig() {
+ SegmentsValidationAndRetentionConfig validationConfig = new SegmentsValidationAndRetentionConfig();
+ validationConfig.setSegmentAssignmentStrategy("BalanceNumSegmentAssignmentStrategy");
+ validationConfig.setSegmentPushType("APPEND");
+ validationConfig.setSchemaName(SCHEMA_NAME);
+ validationConfig.setReplication("1");
+ return validationConfig;
+ }
+
+ private static TenantConfig getTenantConfig() {
+ TenantConfig tenantConfig = new TenantConfig("DefaultTenant", "DefaultTenant", null);
+ return tenantConfig;
+ }
+
+ private static IndexingConfig getIndexingConfig() {
+ IndexingConfig indexingConfig = new IndexingConfig();
+ return indexingConfig;
+ }
+
+ private static TableCustomConfig getCustomConfig() {
+ TableCustomConfig customConfig = new TableCustomConfig(null);
+ return customConfig;
+ }
+
+ private static String generateTableName() {
+ // We want to use a new table name for each test in order to prevent interference
+ // with segments that were pushed in the previous test,
+ // but whose indexing by Pinot was delayed (thus, the previous test must have failed).
+ return String.format("%s_%d", TABLE_NAME_PREFIX, System.currentTimeMillis());
+ }
+
+ static TableConfig getTableConfig() {
+ return new TableConfig(
+ generateTableName(),
+ TableType.OFFLINE.name(),
+ getValidationConfig(),
+ getTenantConfig(),
+ getIndexingConfig(),
+ getCustomConfig(),
+ null, null, null, null, null,
+ null, null, null, null
+ );
+ }
+
+ static Schema getTableSchema() {
+ Schema schema = new Schema();
+ schema.setSchemaName(SCHEMA_NAME);
+ schema.addField(new DimensionFieldSpec("col1", FieldSpec.DataType.STRING, true));
+ schema.addField(new DimensionFieldSpec("timestamp", FieldSpec.DataType.STRING, true));
+ return schema;
+ }
+ }
+}
diff --git a/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestHelper.java b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestHelper.java
new file mode 100644
index 00000000..73a4403b
--- /dev/null
+++ b/flink-connector-pinot/src/test/java/org/apache/flink/streaming/connectors/pinot/PinotTestHelper.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot;
+
+import org.apache.flink.streaming.connectors.pinot.exceptions.PinotControllerApiException;
+import org.apache.pinot.client.*;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Helper class ot interact with the Pinot controller and broker in the e2e tests
+ */
+public class PinotTestHelper implements Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PinotTestHelper.class);
+ private final String host;
+ private final String brokerPort;
+ private final PinotControllerHttpClient httpClient;
+
+ /**
+ * @param host Host the Pinot controller and broker are accessible at
+ * @param controllerPort The Pinot controller's external port at {@code host}
+ * @param brokerPort A Pinot broker's external port at {@code host}
+ */
+ public PinotTestHelper(String host, String controllerPort, String brokerPort) {
+ this.host = host;
+ this.brokerPort = brokerPort;
+ this.httpClient = new PinotControllerHttpClient(host, controllerPort);
+ }
+
+ /**
+ * Adds a Pinot table schema.
+ *
+ * @param tableSchema Pinot table schema to add
+ * @throws IOException
+ */
+ private void addSchema(Schema tableSchema) throws IOException {
+ PinotControllerHttpClient.ApiResponse res = httpClient.post("/schemas", JsonUtils.objectToString(tableSchema));
+ LOG.debug("Schema add request for schema {} returned {}", tableSchema.getSchemaName(), res.responseBody);
+ if (res.statusLine.getStatusCode() != 200) {
+ throw new PinotControllerApiException(res.responseBody);
+ }
+ }
+
+ /**
+ * Deletes a Pinot table schema.
+ *
+ * @param tableSchema Pinot table schema to delete
+ * @throws IOException
+ */
+ private void deleteSchema(Schema tableSchema) throws IOException {
+ PinotControllerHttpClient.ApiResponse res = httpClient.delete(String.format("/schemas/%s", tableSchema.getSchemaName()));
+ LOG.debug("Schema delete request for schema {} returned {}", tableSchema.getSchemaName(), res.responseBody);
+ if (res.statusLine.getStatusCode() != 200) {
+ throw new PinotControllerApiException(res.responseBody);
+ }
+ }
+
+ /**
+ * Creates a Pinot table.
+ *
+ * @param tableConfig Pinot table configuration of table to create
+ * @throws IOException
+ */
+ private void addTable(TableConfig tableConfig) throws IOException {
+ PinotControllerHttpClient.ApiResponse res = httpClient.post("/tables", JsonUtils.objectToString(tableConfig));
+ LOG.debug("Table creation request for table {} returned {}", tableConfig.getTableName(), res.responseBody);
+ if (res.statusLine.getStatusCode() != 200) {
+ throw new PinotControllerApiException(res.responseBody);
+ }
+ }
+
+ /**
+ * Deletes a Pinot table with all its segments.
+ *
+ * @param tableConfig Pinot table configuration of table to delete
+ * @throws IOException
+ */
+ private void removeTable(TableConfig tableConfig) throws IOException {
+ PinotControllerHttpClient.ApiResponse res = httpClient.delete(String.format("/tables/%s", tableConfig.getTableName()));
+ LOG.debug("Table deletion request for table {} returned {}", tableConfig.getTableName(), res.responseBody);
+ if (res.statusLine.getStatusCode() != 200) {
+ throw new PinotControllerApiException(res.responseBody);
+ }
+ }
+
+ /**
+ * Creates a Pinot table by first adding a schema and then creating the actual table using the
+ * Pinot table configuration
+ *
+ * @param tableConfig Pinot table configuration
+ * @param tableSchema Pinot table schema
+ * @throws IOException
+ */
+ public void createTable(TableConfig tableConfig, Schema tableSchema) throws IOException {
+ this.addSchema(tableSchema);
+ this.addTable(tableConfig);
+ }
+
+ /**
+ * Deletes a Pinot table by first deleting the table and its segments and then deleting the
+ * table's schema.
+ *
+ * @param tableConfig Pinot table configuration
+ * @param tableSchema Pinot table schema
+ * @throws IOException
+ */
+ public void deleteTable(TableConfig tableConfig, Schema tableSchema) throws IOException {
+ this.removeTable(tableConfig);
+ this.deleteSchema(tableSchema);
+ }
+
+ /**
+ * Fetch table entries via the Pinot broker.
+ *
+ * @param tableName Target table's name
+ * @param maxNumberOfEntries Max number of entries to fetch
+ * @return ResultSet
+ * @throws PinotControllerApiException
+ */
+ public ResultSet getTableEntries(String tableName, Integer maxNumberOfEntries) throws PinotControllerApiException {
+ Connection brokerConnection = null;
+ try {
+ String brokerHostPort = String.format("%s:%s", this.host, this.brokerPort);
+ brokerConnection = ConnectionFactory.fromHostList(brokerHostPort);
+ String query = String.format("SELECT * FROM %s LIMIT %d", tableName, maxNumberOfEntries);
+
+ Request pinotClientRequest = new Request("sql", query);
+ ResultSetGroup pinotResultSetGroup = brokerConnection.execute(pinotClientRequest);
+
+ if (pinotResultSetGroup.getResultSetCount() != 1) {
+ throw new PinotControllerApiException("Could not find any data in Pinot cluster.");
+ }
+ return pinotResultSetGroup.getResultSet(0);
+ } finally {
+ if (brokerConnection != null) {
+ brokerConnection.close();
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ httpClient.close();
+ }
+}
diff --git a/flink-connector-pinot/src/test/resources/log4j.properties b/flink-connector-pinot/src/test/resources/log4j.properties
new file mode 100644
index 00000000..b15f2bef
--- /dev/null
+++ b/flink-connector-pinot/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=DEBUG, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
index c56ac14e..0840deb5 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisClusterConfig.java
@@ -48,12 +48,15 @@ public class FlinkJedisClusterConfig extends FlinkJedisConfigBase {
* @param maxIdle the cap on the number of "idle" instances in the pool
* @param minIdle the minimum number of idle objects to maintain in the pool
* @param password the password of redis cluster
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned, default value is false
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool, default value is false
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false
* @throws NullPointerException if parameter {@code nodes} is {@code null}
*/
private FlinkJedisClusterConfig(Set nodes, int connectionTimeout, int maxRedirections,
- int maxTotal, int maxIdle, int minIdle,
- String password) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
+ int maxTotal, int maxIdle, int minIdle, String password,
+ boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
Objects.requireNonNull(nodes, "Node information should be presented");
Util.checkArgument(!nodes.isEmpty(), "Redis cluster hosts should not be empty");
@@ -96,6 +99,9 @@ public static class Builder {
private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+ private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
+ private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
+ private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
private String password;
/**
@@ -179,25 +185,68 @@ public Builder setPassword(String password) {
return this;
}
+ /**
+ * Sets value for the {@code testOnBorrow} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned
+ * @return Builder itself
+ */
+ public Builder setTestOnBorrow(boolean testOnBorrow) {
+ this.testOnBorrow = testOnBorrow;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testOnReturn} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool
+ * @return Builder itself
+ */
+ public Builder setTestOnReturn(boolean testOnReturn) {
+ this.testOnReturn = testOnReturn;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testWhileIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * Setting this to true will also set default idle-testing parameters provided in Jedis
+ * @see redis.clients.jedis.JedisPoolConfig
+ *
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor
+ * @return Builder itself
+ */
+ public Builder setTestWhileIdle(boolean testWhileIdle) {
+ this.testWhileIdle = testWhileIdle;
+ return this;
+ }
+
/**
* Builds JedisClusterConfig.
*
* @return JedisClusterConfig
*/
public FlinkJedisClusterConfig build() {
- return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password);
+ return new FlinkJedisClusterConfig(nodes, timeout, maxRedirections, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
}
}
@Override
public String toString() {
return "FlinkJedisClusterConfig{" +
- "nodes=" + nodes +
- ", timeout=" + connectionTimeout +
- ", maxRedirections=" + maxRedirections +
- ", maxTotal=" + maxTotal +
- ", maxIdle=" + maxIdle +
- ", minIdle=" + minIdle +
- '}';
+ "nodes=" + nodes +
+ ", maxRedirections=" + maxRedirections +
+ ", maxTotal=" + maxTotal +
+ ", maxIdle=" + maxIdle +
+ ", minIdle=" + minIdle +
+ ", connectionTimeout=" + connectionTimeout +
+ ", password=" + password +
+ ", testOnBorrow=" + testOnBorrow +
+ ", testOnReturn=" + testOnReturn +
+ ", testWhileIdle=" + testWhileIdle +
+ '}';
}
}
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
index 84b1bf23..a41b0e03 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBase.java
@@ -33,7 +33,12 @@ public abstract class FlinkJedisConfigBase implements Serializable {
protected final int connectionTimeout;
protected final String password;
- protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle, String password) {
+ protected final boolean testOnBorrow;
+ protected final boolean testOnReturn;
+ protected final boolean testWhileIdle;
+
+ protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle, String password, boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+
Util.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative");
Util.checkArgument(maxTotal >= 0, "maxTotal value can not be negative");
Util.checkArgument(maxIdle >= 0, "maxIdle value can not be negative");
@@ -43,6 +48,9 @@ protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle,
this.maxTotal = maxTotal;
this.maxIdle = maxIdle;
this.minIdle = minIdle;
+ this.testOnBorrow = testOnBorrow;
+ this.testOnReturn = testOnReturn;
+ this.testWhileIdle = testWhileIdle;
this.password = password;
}
@@ -99,4 +107,40 @@ public int getMinIdle() {
public String getPassword() {
return password;
}
+
+ /**
+ * Get the value for the {@code testOnBorrow} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @return The current setting of {@code testOnBorrow} for this
+ * configuration instance
+ * @see GenericObjectPoolConfig#getTestOnBorrow()
+ */
+ public boolean getTestOnBorrow() {
+ return testOnBorrow;
+ }
+
+ /**
+ * Get the value for the {@code testOnReturn} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @return The current setting of {@code testOnReturn} for this
+ * configuration instance
+ * @see GenericObjectPoolConfig#getTestOnReturn()
+ */
+ public boolean getTestOnReturn() {
+ return testOnReturn;
+ }
+
+ /**
+ * Get the value for the {@code testWhileIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @return The current setting of {@code testWhileIdle} for this
+ * configuration instance
+ * @see GenericObjectPoolConfig#getTestWhileIdle()
+ */
+ public boolean getTestWhileIdle() {
+ return testWhileIdle;
+ }
}
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
index 3f8fc2ff..5012da1f 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisPoolConfig.java
@@ -45,11 +45,16 @@ public class FlinkJedisPoolConfig extends FlinkJedisConfigBase {
* @param maxTotal the maximum number of objects that can be allocated by the pool, default value is 8
* @param maxIdle the cap on the number of "idle" instances in the pool, default value is 8
* @param minIdle the minimum number of idle objects to maintain in the pool, default value is 0
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned, default value is false
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool, default value is false
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false
* @throws NullPointerException if parameter {@code host} is {@code null}
*/
private FlinkJedisPoolConfig(String host, int port, int connectionTimeout, String password, int database,
- int maxTotal, int maxIdle, int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
+ int maxTotal, int maxIdle, int minIdle,
+ boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
+
Objects.requireNonNull(host, "Host information should be presented");
this.host = host;
this.port = port;
@@ -96,6 +101,9 @@ public static class Builder {
private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+ private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
+ private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
+ private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
/**
* Sets value for the {@code maxTotal} configuration attribute
@@ -188,6 +196,44 @@ public Builder setPassword(String password) {
return this;
}
+ /**
+ * Sets value for the {@code testOnBorrow} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned
+ * @return Builder itself
+ */
+ public Builder setTestOnBorrow(boolean testOnBorrow) {
+ this.testOnBorrow = testOnBorrow;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testOnReturn} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool
+ * @return Builder itself
+ */
+ public Builder setTestOnReturn(boolean testOnReturn) {
+ this.testOnReturn = testOnReturn;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testWhileIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * Setting this to true will also set default idle-testing parameters provided in Jedis
+ * @see redis.clients.jedis.JedisPoolConfig
+ *
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor
+ * @return Builder itself
+ */
+ public Builder setTestWhileIdle(boolean testWhileIdle) {
+ this.testWhileIdle = testWhileIdle;
+ return this;
+ }
/**
* Builds JedisPoolConfig.
@@ -195,20 +241,24 @@ public Builder setPassword(String password) {
* @return JedisPoolConfig
*/
public FlinkJedisPoolConfig build() {
- return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle);
+ return new FlinkJedisPoolConfig(host, port, timeout, password, database, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, testWhileIdle);
}
}
@Override
public String toString() {
return "FlinkJedisPoolConfig{" +
- "host='" + host + '\'' +
- ", port=" + port +
- ", timeout=" + connectionTimeout +
- ", database=" + database +
- ", maxTotal=" + maxTotal +
- ", maxIdle=" + maxIdle +
- ", minIdle=" + minIdle +
- '}';
+ "host=" + host +
+ ", port=" + port +
+ ", database=" + database +
+ ", maxTotal=" + maxTotal +
+ ", maxIdle=" + maxIdle +
+ ", minIdle=" + minIdle +
+ ", connectionTimeout=" +
+ ", password=" + password +
+ ", testOnBorrow=" + testOnBorrow +
+ ", testOnReturn=" + testOnReturn +
+ ", testWhileIdle=" + testWhileIdle +
+ '}';
}
}
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
index 928f5e8c..340eb4e4 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisSentinelConfig.java
@@ -52,15 +52,19 @@ public class FlinkJedisSentinelConfig extends FlinkJedisConfigBase {
* @param maxTotal maxTotal the maximum number of objects that can be allocated by the pool
* @param maxIdle the cap on the number of "idle" instances in the pool
* @param minIdle the minimum number of idle objects to maintain in the pool
- *
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned, default value is false
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool, default value is false
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor, default value is false
* @throws NullPointerException if {@code masterName} or {@code sentinels} is {@code null}
* @throws IllegalArgumentException if {@code sentinels} are empty
*/
private FlinkJedisSentinelConfig(String masterName, Set sentinels,
- int connectionTimeout, int soTimeout,
- String password, int database,
- int maxTotal, int maxIdle, int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle, password);
+ int connectionTimeout, int soTimeout,
+ String password, int database,
+ int maxTotal, int maxIdle, int minIdle,
+ boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, password, testOnBorrow, testOnReturn, testWhileIdle);
+
Objects.requireNonNull(masterName, "Master name should be presented");
Objects.requireNonNull(sentinels, "Sentinels information should be presented");
Util.checkArgument(!sentinels.isEmpty(), "Sentinel hosts should not be empty");
@@ -120,6 +124,9 @@ public static class Builder {
private int maxTotal = GenericObjectPoolConfig.DEFAULT_MAX_TOTAL;
private int maxIdle = GenericObjectPoolConfig.DEFAULT_MAX_IDLE;
private int minIdle = GenericObjectPoolConfig.DEFAULT_MIN_IDLE;
+ private boolean testOnBorrow = GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW;
+ private boolean testOnReturn = GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN;
+ private boolean testWhileIdle = GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE;
/**
* Sets master name of the replica set.
@@ -223,6 +230,45 @@ public Builder setMinIdle(int minIdle) {
return this;
}
+ /**
+ * Sets value for the {@code testOnBorrow} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param testOnBorrow Whether objects borrowed from the pool will be validated before being returned
+ * @return Builder itself
+ */
+ public Builder setTestOnBorrow(boolean testOnBorrow) {
+ this.testOnBorrow = testOnBorrow;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testOnReturn} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * @param testOnReturn Whether objects borrowed from the pool will be validated when they are returned to the pool
+ * @return Builder itself
+ */
+ public Builder setTestOnReturn(boolean testOnReturn) {
+ this.testOnReturn = testOnReturn;
+ return this;
+ }
+
+ /**
+ * Sets value for the {@code testWhileIdle} configuration attribute
+ * for pools to be created with this configuration instance.
+ *
+ * Setting this to true will also set default idle-testing parameters provided in Jedis
+ * @see redis.clients.jedis.JedisPoolConfig
+ *
+ * @param testWhileIdle Whether objects sitting idle in the pool will be validated by the idle object evictor
+ * @return Builder itself
+ */
+ public Builder setTestWhileIdle(boolean testWhileIdle) {
+ this.testWhileIdle = testWhileIdle;
+ return this;
+ }
+
/**
* Builds JedisSentinelConfig.
*
@@ -230,20 +276,25 @@ public Builder setMinIdle(int minIdle) {
*/
public FlinkJedisSentinelConfig build(){
return new FlinkJedisSentinelConfig(masterName, sentinels, connectionTimeout, soTimeout,
- password, database, maxTotal, maxIdle, minIdle);
+ password, database, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, testWhileIdle);
}
}
@Override
public String toString() {
return "FlinkJedisSentinelConfig{" +
- "masterName='" + masterName + '\'' +
- ", connectionTimeout=" + connectionTimeout +
- ", soTimeout=" + soTimeout +
- ", database=" + database +
- ", maxTotal=" + maxTotal +
- ", maxIdle=" + maxIdle +
- ", minIdle=" + minIdle +
- '}';
+ "masterName=" + masterName +
+ ", sentinels=" + sentinels +
+ ", soTimeout=" + soTimeout +
+ ", database=" + database +
+ ", maxTotal=" + maxTotal +
+ ", maxIdle=" + maxIdle +
+ ", minIdle=" + minIdle +
+ ", connectionTimeout=" + connectionTimeout +
+ ", password=" + password +
+ ", testOnBorrow=" + testOnBorrow +
+ ", testOnReturn=" + testOnReturn +
+ ", testWhileIdle=" + testWhileIdle +
+ '}';
}
}
diff --git a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
index bdb9fed1..b06a6e97 100644
--- a/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
+++ b/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilder.java
@@ -23,6 +23,7 @@
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisSentinelPool;
import java.util.Objects;
@@ -65,8 +66,8 @@ public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig)
GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisPoolConfig);
JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
- jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
- jedisPoolConfig.getDatabase());
+ jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
+ jedisPoolConfig.getDatabase());
return new RedisContainer(jedisPool);
}
@@ -83,11 +84,11 @@ public static RedisCommandsContainer build(FlinkJedisClusterConfig jedisClusterC
GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisClusterConfig);
JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(),
- jedisClusterConfig.getConnectionTimeout(),
- jedisClusterConfig.getConnectionTimeout(),
- jedisClusterConfig.getMaxRedirections(),
- jedisClusterConfig.getPassword(),
- genericObjectPoolConfig);
+ jedisClusterConfig.getConnectionTimeout(),
+ jedisClusterConfig.getConnectionTimeout(),
+ jedisClusterConfig.getMaxRedirections(),
+ jedisClusterConfig.getPassword(),
+ genericObjectPoolConfig);
return new RedisClusterContainer(jedisCluster);
}
@@ -104,17 +105,20 @@ public static RedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentine
GenericObjectPoolConfig genericObjectPoolConfig = getGenericObjectPoolConfig(jedisSentinelConfig);
JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(),
- jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
- jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
- jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
+ jedisSentinelConfig.getSentinels(), genericObjectPoolConfig,
+ jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(),
+ jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
return new RedisContainer(jedisSentinelPool);
}
- private static GenericObjectPoolConfig getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) {
- GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
+ public static GenericObjectPoolConfig getGenericObjectPoolConfig(FlinkJedisConfigBase jedisConfig) {
+ GenericObjectPoolConfig genericObjectPoolConfig = jedisConfig.getTestWhileIdle() ? new JedisPoolConfig(): new GenericObjectPoolConfig();
genericObjectPoolConfig.setMaxIdle(jedisConfig.getMaxIdle());
genericObjectPoolConfig.setMaxTotal(jedisConfig.getMaxTotal());
genericObjectPoolConfig.setMinIdle(jedisConfig.getMinIdle());
+ genericObjectPoolConfig.setTestOnBorrow(jedisConfig.getTestOnBorrow());
+ genericObjectPoolConfig.setTestOnReturn(jedisConfig.getTestOnReturn());
+
return genericObjectPoolConfig;
}
}
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
index 6f519ed7..80189dff 100644
--- a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/config/FlinkJedisConfigBaseTest.java
@@ -23,28 +23,28 @@ public class FlinkJedisConfigBaseTest extends TestLogger {
@Test(expected = IllegalArgumentException.class)
public void shouldThrowIllegalArgumentExceptionIfTimeOutIsNegative(){
- new TestConfig(-1, 0, 0, 0);
+ new TestConfig(-1, 0, 0, 0, false, false, false);
}
@Test(expected = IllegalArgumentException.class)
public void shouldThrowIllegalArgumentExceptionIfMaxTotalIsNegative(){
- new TestConfig(1, -1, 0, 0);
+ new TestConfig(1, -1, 0, 0, false, false, false);
}
@Test(expected = IllegalArgumentException.class)
public void shouldThrowIllegalArgumentExceptionIfMaxIdleIsNegative(){
- new TestConfig(0, 0, -1, 0);
+ new TestConfig(0, 0, -1, 0, false, false, false);
}
@Test(expected = IllegalArgumentException.class)
public void shouldThrowIllegalArgumentExceptionIfMinIdleIsNegative(){
- new TestConfig(0, 0, 0, -1);
+ new TestConfig(0, 0, 0, -1, false, false, false);
}
private class TestConfig extends FlinkJedisConfigBase {
-
- protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle) {
- super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy");
+ protected TestConfig(int connectionTimeout, int maxTotal, int maxIdle, int minIdle,
+ boolean testOnBorrow, boolean testOnReturn, boolean testWhileIdle) {
+ super(connectionTimeout, maxTotal, maxIdle, minIdle, "dummy", testOnBorrow, testOnReturn, testWhileIdle);
}
}
}
diff --git a/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
new file mode 100644
index 00000000..eac5ca04
--- /dev/null
+++ b/flink-connector-redis/src/test/java/org/apache/flink/streaming/connectors/redis/common/container/RedisCommandsContainerBuilderTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.redis.common.container;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.junit.Test;
+import redis.clients.jedis.JedisPoolConfig;
+
+public class RedisCommandsContainerBuilderTest extends AbstractTestBase {
+
+ @Test
+ public void testNotTestWhileIdle() {
+ FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("host").setPort(0).setDatabase(0).build();
+ GenericObjectPoolConfig genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
+ assertFalse(genericObjectPoolConfig.getTestWhileIdle());
+ assertEqualConfig(flinkJedisPoolConfig, genericObjectPoolConfig);
+ }
+
+ @Test
+ public void testTestWhileIdle() {
+ FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("host").setPort(0).setDatabase(0).setTestWhileIdle(true).build();
+ GenericObjectPoolConfig genericObjectPoolConfig = RedisCommandsContainerBuilder.getGenericObjectPoolConfig(flinkJedisPoolConfig);
+ assertTrue(genericObjectPoolConfig.getTestWhileIdle());
+ assertEqualConfig(flinkJedisPoolConfig, genericObjectPoolConfig);
+
+ JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
+ assertEquals(genericObjectPoolConfig.getMinEvictableIdleTimeMillis(), jedisPoolConfig.getMinEvictableIdleTimeMillis());
+ assertEquals(genericObjectPoolConfig.getTimeBetweenEvictionRunsMillis(), jedisPoolConfig.getTimeBetweenEvictionRunsMillis());
+ assertEquals(genericObjectPoolConfig.getNumTestsPerEvictionRun(), jedisPoolConfig.getNumTestsPerEvictionRun());
+ }
+
+ private void assertEqualConfig(FlinkJedisPoolConfig flinkJedisPoolConfig, GenericObjectPoolConfig genericObjectPoolConfig) {
+ assertEquals(genericObjectPoolConfig.getMaxIdle(), flinkJedisPoolConfig.getMaxIdle());
+ assertEquals(genericObjectPoolConfig.getMinIdle(), flinkJedisPoolConfig.getMinIdle());
+ assertEquals(genericObjectPoolConfig.getMaxTotal(), flinkJedisPoolConfig.getMaxTotal());
+ assertEquals(genericObjectPoolConfig.getTestWhileIdle(), flinkJedisPoolConfig.getTestWhileIdle());
+ assertEquals(genericObjectPoolConfig.getTestOnBorrow(), flinkJedisPoolConfig.getTestOnBorrow());
+ assertEquals(genericObjectPoolConfig.getTestOnReturn(), flinkJedisPoolConfig.getTestOnReturn());
+ }
+}
diff --git a/pom.xml b/pom.xml
index 505eda4c..26a8d981 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,6 +78,7 @@
flink-connector-influxdb2
flink-connector-kudu
flink-connector-netty
+ flink-connector-pinot
flink-connector-redis
flink-library-siddhi