From 0fa2e27babde10484ebb9b3304823b6b4bcf6ad0 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Fri, 7 Feb 2020 11:02:35 +0800 Subject: [PATCH] Flink batch support --- pom.xml | 16 +- .../connectors/pulsar/CachedClients.java | 179 ++++++++++++ .../connectors/pulsar/ConnectorUtils.java | 71 +++++ .../batch/connectors/pulsar/InputLedger.java | 70 +++++ .../connectors/pulsar/InputSplitReader.java | 256 ++++++++++++++++++ .../connectors/pulsar/PulsarInputFormat.java | 171 ++++++++++++ .../connectors/pulsar/PulsarInputSplit.java | 36 +++ .../batch/connectors/pulsar/SplitUtils.java | 191 +++++++++++++ .../flink/pulsar/common/ConnectorConfig.java | 118 ++++++++ .../pulsar/common/ConnectorConfigUtils.java | 72 +++++ .../common}/DateTimeUtils.java | 2 +- .../common}/JSONOptions.java | 2 +- .../common}/JSONOptionsInRead.java | 2 +- .../common}/JacksonRecordParser.java | 2 +- .../internal => pulsar/common}/ParseMode.java | 2 +- .../common}/PulsarDeserializer.java | 4 +- .../common}/PulsarMetadataReader.java | 74 ++--- .../common}/PulsarOptions.java | 10 +- .../common}/PulsarSerializer.java | 2 +- .../common}/SchemaUtils.java | 12 +- .../flink/pulsar/common/SourceSinkUtils.java | 93 +++++++ .../connectors/pulsar/FlinkPulsarRowSink.java | 14 +- .../pulsar/FlinkPulsarRowSource.java | 4 +- .../pulsar/FlinkPulsarSinkBase.java | 19 +- .../connectors/pulsar/FlinkPulsarSource.java | 39 ++- .../connectors/pulsar/PulsarTableSource.java | 14 +- .../pulsar/PulsarTableSourceSinkFactory.java | 2 +- .../pulsar/internal/PulsarFetcher.java | 1 + .../pulsar/internal/PulsarRowFetcher.java | 2 + .../pulsar/internal/ReaderThread.java | 1 + .../pulsar/internal/RowReaderThread.java | 3 + .../pulsar/internal/SourceSinkUtils.java | 123 --------- .../table/catalog/pulsar/PulsarCatalog.java | 17 +- .../descriptors/PulsarCatalogValidator.java | 2 +- .../factories/PulsarCatalogFactory.java | 3 +- .../table/descriptors/PulsarValidator.java | 8 +- .../pulsar/PulsarInputFormatITest.java | 108 ++++++++ .../connectors/pulsar/FlinkPulsarITest.java | 28 +- .../pulsar/FlinkPulsarSinkTest.java | 2 +- .../pulsar/FlinkPulsarSourceTest.java | 18 +- .../pulsar/FlinkPulsarTableITest.java | 7 +- .../connectors/pulsar/PulsarTestBase.java | 12 +- .../connectors/pulsar/SchemaITest.java | 7 +- .../pulsar/internal/DiscovererTest.java | 34 ++- .../pulsar/testutils/TestMetadataReader.java | 8 +- 45 files changed, 1564 insertions(+), 297 deletions(-) create mode 100644 src/main/java/org/apache/flink/batch/connectors/pulsar/CachedClients.java create mode 100644 src/main/java/org/apache/flink/batch/connectors/pulsar/ConnectorUtils.java create mode 100644 src/main/java/org/apache/flink/batch/connectors/pulsar/InputLedger.java create mode 100644 src/main/java/org/apache/flink/batch/connectors/pulsar/InputSplitReader.java create mode 100644 src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarInputFormat.java create mode 100644 src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarInputSplit.java create mode 100644 src/main/java/org/apache/flink/batch/connectors/pulsar/SplitUtils.java create mode 100644 src/main/java/org/apache/flink/pulsar/common/ConnectorConfig.java create mode 100644 src/main/java/org/apache/flink/pulsar/common/ConnectorConfigUtils.java rename src/main/java/org/apache/flink/{streaming/connectors/pulsar/internal => pulsar/common}/DateTimeUtils.java (99%) rename src/main/java/org/apache/flink/{streaming/connectors/pulsar/internal => pulsar/common}/JSONOptions.java (99%) rename src/main/java/org/apache/flink/{streaming/connectors/pulsar/internal => pulsar/common}/JSONOptionsInRead.java (97%) rename src/main/java/org/apache/flink/{streaming/connectors/pulsar/internal => pulsar/common}/JacksonRecordParser.java (99%) rename src/main/java/org/apache/flink/{streaming/connectors/pulsar/internal => pulsar/common}/ParseMode.java (95%) rename src/main/java/org/apache/flink/{streaming/connectors/pulsar/internal => pulsar/common}/PulsarDeserializer.java (99%) rename src/main/java/org/apache/flink/{streaming/connectors/pulsar/internal => pulsar/common}/PulsarMetadataReader.java (89%) rename src/main/java/org/apache/flink/{streaming/connectors/pulsar/internal => pulsar/common}/PulsarOptions.java (84%) rename src/main/java/org/apache/flink/{streaming/connectors/pulsar/internal => pulsar/common}/PulsarSerializer.java (99%) rename src/main/java/org/apache/flink/{streaming/connectors/pulsar/internal => pulsar/common}/SchemaUtils.java (97%) create mode 100644 src/main/java/org/apache/flink/pulsar/common/SourceSinkUtils.java delete mode 100644 src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SourceSinkUtils.java create mode 100644 src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarInputFormatITest.java diff --git a/pom.xml b/pom.xml index e16aeb076..07d93ee04 100644 --- a/pom.xml +++ b/pom.xml @@ -67,11 +67,11 @@ - 2.4.2 + 2.5.0 2.11.12 2.11 3.0.3 - 2.4.2 + 2.5.1 1.10.6 1.3 @@ -93,6 +93,12 @@ ${pulsar.version} + + org.apache.pulsar + managed-ledger-shaded + ${pulsar.version} + + com.fasterxml.jackson.core jackson-databind @@ -260,12 +266,6 @@ - - commons-lang - commons-lang - 2.6 - compile - diff --git a/src/main/java/org/apache/flink/batch/connectors/pulsar/CachedClients.java b/src/main/java/org/apache/flink/batch/connectors/pulsar/CachedClients.java new file mode 100644 index 000000000..41e935fee --- /dev/null +++ b/src/main/java/org/apache/flink/batch/connectors/pulsar/CachedClients.java @@ -0,0 +1,179 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.pulsar; + +import org.apache.flink.pulsar.common.ConnectorConfig; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.LedgerOffloader; +import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; +import org.apache.bookkeeper.mledger.offload.OffloaderUtils; +import org.apache.bookkeeper.mledger.offload.Offloaders; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.shade.com.google.common.collect.ImmutableMap; +import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsProvider; + +import java.io.IOException; +import java.util.Map; + +import static org.apache.flink.batch.connectors.pulsar.ConnectorUtils.createInstance; +import static org.apache.flink.batch.connectors.pulsar.ConnectorUtils.getProperties; +import static org.apache.pulsar.shade.com.google.common.base.Preconditions.checkNotNull; + +/** + * Cached ML / BK client to be shared among threads inside a process. + */ +@Slf4j +public class CachedClients { + + static CachedClients instance; + + private final ManagedLedgerFactoryImpl managedLedgerFactory; + + private final StatsProvider statsProvider; + private OrderedScheduler offloaderScheduler; + private Offloaders offloaderManager; + private LedgerOffloader offloader; + + private static final String OFFLOADERS_DIRECTOR = "offloadersDirectory"; + private static final String MANAGED_LEDGER_OFFLOAD_DRIVER = "managedLedgerOffloadDriver"; + private static final String MANAGED_LEDGER_OFFLOAD_MAX_THREADS = "managedLedgerOffloadMaxThreads"; + + public static CachedClients getInstance(ConnectorConfig config) throws Exception { + synchronized (CachedClients.class) { + if (instance == null) { + instance = new CachedClients(config); + } + } + return instance; + } + + private CachedClients(ConnectorConfig config) throws Exception { + this.managedLedgerFactory = initManagedLedgerFactory(config); + this.statsProvider = createInstance(config.getStatsProvider(), + StatsProvider.class, getClass().getClassLoader()); + + // start stats provider + ClientConfiguration clientConfiguration = new ClientConfiguration(); + config.getStatsProviderConfigs().forEach(clientConfiguration::setProperty); + this.statsProvider.start(clientConfiguration); + + this.offloader = initManagedLedgerOffloader(config); + } + + private ManagedLedgerFactoryImpl initManagedLedgerFactory(ConnectorConfig config) throws Exception { + ClientConfiguration bkClientConfiguration = new ClientConfiguration() + .setZkServers(config.getZookeeperUri()) + .setMetadataServiceUri("zk://" + config.getZookeeperUri() + "/ledgers") + .setClientTcpNoDelay(false) + .setUseV2WireProtocol(true) + .setStickyReadsEnabled(false) + .setAllowShadedLedgerManagerFactoryClass(true) + .setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.") + .setReadEntryTimeout(60) + .setThrottleValue(config.getBookkeeperThrottleValue()) + .setNumIOThreads(config.getBookkeeperNumIOThreads()) + .setNumWorkerThreads(config.getBookkeeperNumWorkerThreads()); + + ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig(); + managedLedgerFactoryConfig.setMaxCacheSize(config.getManagedLedgerCacheSizeMB()); + managedLedgerFactoryConfig.setNumManagedLedgerWorkerThreads( + config.getManagedLedgerNumWorkerThreads()); + managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads( + config.getManagedLedgerNumSchedulerThreads()); + + return new ManagedLedgerFactoryImpl(bkClientConfiguration, managedLedgerFactoryConfig); + } + + private synchronized OrderedScheduler getOffloaderScheduler(ConnectorConfig config) { + if (this.offloaderScheduler == null) { + this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder() + .numThreads(config.getManagedLedgerOffloadMaxThreads()) + .name("pulsar-offloader").build(); + } + return this.offloaderScheduler; + } + + private LedgerOffloader initManagedLedgerOffloader(ConnectorConfig config) { + + try { + if (StringUtils.isNotBlank(config.getManagedLedgerOffloadDriver())) { + checkNotNull(config.getOffloadersDirectory(), + "Offloader driver is configured to be '%s' but no offloaders directory is configured.", + config.getManagedLedgerOffloadDriver()); + this.offloaderManager = OffloaderUtils.searchForOffloaders(config.getOffloadersDirectory()); + LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory( + config.getManagedLedgerOffloadDriver()); + + Map offloaderProperties = config.getOffloaderProperties(); + offloaderProperties.put(OFFLOADERS_DIRECTOR, config.getOffloadersDirectory()); + offloaderProperties.put(MANAGED_LEDGER_OFFLOAD_DRIVER, config.getManagedLedgerOffloadDriver()); + offloaderProperties + .put(MANAGED_LEDGER_OFFLOAD_MAX_THREADS, String.valueOf(config.getManagedLedgerOffloadMaxThreads())); + + try { + return offloaderFactory.create( + getProperties(offloaderProperties), + ImmutableMap.of( + LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), + LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha() + ), + getOffloaderScheduler(config)); + } catch (IOException ioe) { + log.error("Failed to create offloader: ", ioe); + throw new RuntimeException(ioe.getMessage(), ioe.getCause()); + } + } else { + log.info("No ledger offloader configured, using NULL instance"); + return NullLedgerOffloader.INSTANCE; + } + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + + public ManagedLedgerConfig getManagedLedgerConfig() { + + return new ManagedLedgerConfig() + .setLedgerOffloader(this.offloader); + } + + public ManagedLedgerFactoryImpl getManagedLedgerFactory() { + return managedLedgerFactory; + } + + public StatsProvider getStatsProvider() { + return statsProvider; + } + + public static void shutdown() throws Exception { + synchronized (CachedClients.class) { + if (instance != null) { + instance.statsProvider.stop(); + instance.managedLedgerFactory.shutdown(); + instance.offloaderScheduler.shutdown(); + instance.offloaderManager.close(); + instance = null; + } + } + } +} diff --git a/src/main/java/org/apache/flink/batch/connectors/pulsar/ConnectorUtils.java b/src/main/java/org/apache/flink/batch/connectors/pulsar/ConnectorUtils.java new file mode 100644 index 000000000..6fa3bc761 --- /dev/null +++ b/src/main/java/org/apache/flink/batch/connectors/pulsar/ConnectorUtils.java @@ -0,0 +1,71 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.pulsar; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Map; +import java.util.Properties; + +/** + * Utilities in constructing CachedClients. + */ +public class ConnectorUtils { + + /** + * Create an instance of userClassName using provided classLoader. + * This instance should implement the provided interface xface. + * + * @param userClassName user class name + * @param xface the interface that the reflected instance should implement + * @param classLoader class loader to load the class. + * @return the instance + */ + public static T createInstance(String userClassName, + Class xface, + ClassLoader classLoader) { + Class theCls; + try { + theCls = Class.forName(userClassName, true, classLoader); + } catch (ClassNotFoundException | NoClassDefFoundError cnfe) { + throw new RuntimeException("User class must be in class path", cnfe); + } + if (!xface.isAssignableFrom(theCls)) { + throw new RuntimeException(userClassName + " not " + xface.getName()); + } + Class tCls = (Class) theCls.asSubclass(xface); + try { + Constructor meth = tCls.getDeclaredConstructor(); + return meth.newInstance(); + } catch (InstantiationException ie) { + throw new RuntimeException("User class must be concrete", ie); + } catch (NoSuchMethodException e) { + throw new RuntimeException("User class must have a no-arg constructor", e); + } catch (IllegalAccessException e) { + throw new RuntimeException("User class must a public constructor", e); + } catch (InvocationTargetException e) { + throw new RuntimeException("User class constructor throws exception", e); + } + } + + public static Properties getProperties(Map configMap) { + Properties properties = new Properties(); + for (Map.Entry entry : configMap.entrySet()) { + properties.setProperty(entry.getKey(), entry.getValue()); + } + return properties; + } + +} diff --git a/src/main/java/org/apache/flink/batch/connectors/pulsar/InputLedger.java b/src/main/java/org/apache/flink/batch/connectors/pulsar/InputLedger.java new file mode 100644 index 000000000..3021e95fa --- /dev/null +++ b/src/main/java/org/apache/flink/batch/connectors/pulsar/InputLedger.java @@ -0,0 +1,70 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.pulsar; + +import lombok.AllArgsConstructor; +import lombok.Data; +import org.apache.pulsar.shade.com.google.common.base.MoreObjects; + +import javax.validation.constraints.NotNull; + +import java.io.Serializable; +import java.util.Map; +import java.util.UUID; + +/** + * A ledger with a range to be read. + */ +@Data +@AllArgsConstructor +public class InputLedger implements Serializable, Comparable { + + private String topic; + + private long ledgerId; + + private long startEntryId; + + private long endEntryId; + + private UUID uuid; + + private Map offloaderDrvierMeta; + + public long ledgerSize() { + return endEntryId - startEntryId + 1; + } + + public boolean isOffloadedLedger() { + return uuid != null; + } + + @Override + public int compareTo(@NotNull InputLedger o) { + return ledgerSize() > o.ledgerSize() ? 1 : + (ledgerSize() == o.ledgerSize() ? 0 : -1); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("topic", topic) + .add("ledger", ledgerId) + .add("startEntryId", startEntryId) + .add("endEntryId", endEntryId) + .add("size", ledgerSize()) + .toString(); + } +} diff --git a/src/main/java/org/apache/flink/batch/connectors/pulsar/InputSplitReader.java b/src/main/java/org/apache/flink/batch/connectors/pulsar/InputSplitReader.java new file mode 100644 index 000000000..229a8ba3e --- /dev/null +++ b/src/main/java/org/apache/flink/batch/connectors/pulsar/InputSplitReader.java @@ -0,0 +1,256 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.pulsar; + +import org.apache.flink.pulsar.common.ConnectorConfig; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.LedgerOffloader; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.impl.EntryImpl; +import org.apache.pulsar.common.api.raw.MessageParser; +import org.apache.pulsar.common.api.raw.RawMessage; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.shade.com.google.common.collect.Lists; +import org.apache.pulsar.shade.org.apache.bookkeeper.client.BookKeeper; +import org.apache.pulsar.shade.org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.pulsar.shade.org.apache.bookkeeper.client.api.ReadHandle; +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.SpscArrayQueue; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Segment reader for a input split. + * @param the type of emit data. + */ +@Slf4j +public abstract class InputSplitReader { + + private ConnectorConfig connectorConfig; + + private int partitionId; + + private List ledgersToRead; + + private CachedClients cachedClients; + private BookKeeper bkClient; + private ManagedLedgerConfig mlConfig; + private LedgerOffloader offloader; + + private Executor executor; + + private SpscArrayQueue messageQueue; + private SpscArrayQueue entryQueue; + + private Thread deserializerThread; + private RawMessage currentMessage; + + private AtomicLong outstandingLedgerReads = new AtomicLong(0); + private int ledgerSize; + private int currentLedgerIdx = 0; + + private long partitionSize; + private long entriesProcessed = 0; + + private Map ledger2Topic = new ConcurrentHashMap<>(); + + public InputSplitReader(ConnectorConfig connectorConfig, int partitionId, List ledgersToRead) throws Exception { + this.connectorConfig = connectorConfig; + this.partitionId = partitionId; + this.ledgersToRead = ledgersToRead; + + this.cachedClients = CachedClients.getInstance(connectorConfig); + this.bkClient = cachedClients.getManagedLedgerFactory().getBookKeeper(); + this.mlConfig = cachedClients.getManagedLedgerConfig(); + this.offloader = mlConfig.getLedgerOffloader(); + + this.executor = Executors.newSingleThreadExecutor(); + + this.messageQueue = new SpscArrayQueue<>(connectorConfig.getMaxSplitMessageQueueSize()); + this.entryQueue = new SpscArrayQueue<>(connectorConfig.getMaxSplitEntryQueueSize()); + + this.ledgerSize = ledgersToRead.size(); + + this.partitionSize = ledgersToRead.stream().mapToLong(InputLedger::ledgerSize).sum(); + } + + public boolean next() throws IOException { + if (deserializerThread == null) { + deserializerThread = new DeserializeEntries(); + deserializerThread.start(); + + getEntries(ledgersToRead.get(currentLedgerIdx)); + currentLedgerIdx++; + } + + if (currentMessage != null) { + currentMessage.release(); + currentMessage = null; + } + + while (true) { + if (messageQueue.isEmpty() && entriesProcessed >= partitionSize) { + return false; + } + + if (currentLedgerIdx < ledgerSize && outstandingLedgerReads.get() == 0) { + getEntries(ledgersToRead.get(currentLedgerIdx)); + currentLedgerIdx++; + } + + currentMessage = messageQueue.poll(); + if (currentMessage != null) { + return true; + } else { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + + public abstract T deserialize(RawMessage currentMessage) throws IOException; + + public T get() throws IOException { + return deserialize(currentMessage); + } + + public void close() throws Exception { + if (currentMessage != null) { + currentMessage.release(); + } + + if (messageQueue != null) { + messageQueue.drain(RawMessage::release); + } + + if (entryQueue != null) { + entryQueue.drain(Entry::release); + } + + if (deserializerThread != null) { + deserializerThread.interrupt(); + } + } + + class DeserializeEntries extends Thread { + + protected boolean isRunning = false; + + DeserializeEntries() { + super("derserialize-thread-split-" + partitionId); + } + + @Override + public void interrupt() { + isRunning = false; + } + + @Override + public void run() { + isRunning = true; + while (isRunning) { + + int read = entryQueue.drain(entry -> { + TopicName tp = ledger2Topic.getOrDefault(entry.getLedgerId(), TopicName.get("DUMMY")); + try { + MessageParser.parseMessage(tp, entry.getLedgerId(), entry.getEntryId(), entry.getDataBuffer(), + (message) -> { + try { + // enqueue deserialize message from this entry + while (!messageQueue.offer(message)) { + Thread.sleep(1); + } + + } catch (InterruptedException e) { + //no-op + } + }, connectorConfig.getMaxMessageSize()); + } catch (IOException e) { + log.error(String.format("Failed to parse message from pulsar topic %s", tp.toString()), e); + } finally { + entriesProcessed++; + entry.release(); + } + }); + + if (read <= 0) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + return; + } + } + } + } + } + + CompletableFuture getEntries(InputLedger info) { + outstandingLedgerReads.incrementAndGet(); + + return getLedgerHandle(info).thenComposeAsync(readHandle -> { + try (LedgerEntries entries = readHandle.read(info.getStartEntryId(), info.getEndEntryId())) { + + entryQueue.fill(new MessagePassingQueue.Supplier() { + private int i = 0; + @Override + public Entry get() { + EntryImpl impl = EntryImpl.create(entries.getEntry(i)); + i++; + return impl; + } + }, Lists.newArrayList(entries.iterator()).size()); + + } catch (Exception e) { + throw new CompletionException(e); + } + return null; + }, executor).whenComplete((t, throwable) -> { + if (throwable != null) { + log.error(String.format("Get entry failed due to %s", throwable.getMessage())); + } else { + log.info(String.format("Finished extracting entries for ledger %s", info.toString())); + outstandingLedgerReads.decrementAndGet(); + } + }); + } + + CompletableFuture getLedgerHandle(InputLedger ledger) { + ledger2Topic.put(ledger.getLedgerId(), TopicName.get(ledger.getTopic())); + if (ledger.getUuid() != null) { + return offloader.readOffloaded(ledger.getLedgerId(), ledger.getUuid(), ledger.getOffloaderDrvierMeta()); + } else { + return bkClient.newOpenLedgerOp() + .withRecovery(false) + .withLedgerId(ledger.getLedgerId()) + .withDigestType(mlConfig.getDigestType()) + .withPassword(mlConfig.getPassword()) + .execute(); + } + } + +} diff --git a/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarInputFormat.java b/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarInputFormat.java new file mode 100644 index 000000000..6edb702a7 --- /dev/null +++ b/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarInputFormat.java @@ -0,0 +1,171 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.pulsar; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.pulsar.common.ConnectorConfig; +import org.apache.flink.pulsar.common.PulsarMetadataReader; + +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.api.raw.RawMessage; +import org.apache.pulsar.shade.io.netty.buffer.ByteBufUtil; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +/** + * Read Pulsar data in batch mode. + * Bypass broker and read segments directly for efficiency. + * @param the type of record. + */ +@Slf4j +@ToString +public class PulsarInputFormat extends RichInputFormat implements ResultTypeQueryable { + + private static final long serialVersionUID = 1L; + + // global fields + + private ConnectorConfig connectorConfig; + + private DeserializationSchema deserializer; + + // instance wise fields + + private transient Configuration parameters; + + private transient InputSplitReader reader; + + public PulsarInputFormat( + ConnectorConfig connectorConfig, + DeserializationSchema deserializer) { + this.connectorConfig = connectorConfig; + this.deserializer = deserializer; + } + + @Override + public void configure(Configuration parameters) { + this.parameters = parameters; + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { + return null; + } + + @Override + public PulsarInputSplit[] createInputSplits(int minNumSplits) throws IOException { + List pulsarSplits = new ArrayList<>(); + + try (PulsarMetadataReader reader = new PulsarMetadataReader(connectorConfig)) { + + Set topics = reader.getTopicPartitionsAll(); + + List allLegers = new ArrayList<>(); + + for (String topic : topics) { + Collection ledgers = SplitUtils.getLedgersInBetween( + topic, + (MessageIdImpl) MessageId.earliest, + (MessageIdImpl) MessageId.latest, + CachedClients.getInstance(connectorConfig)); + allLegers.addAll(ledgers); + } + + List> ldSplits = + SplitUtils.partitionToNSplits(allLegers, connectorConfig.getTargetNumSplits()); + + for (int i = 0; i < ldSplits.size(); i++) { + pulsarSplits.add(genSplit(i, ldSplits.get(i))); + } + + return pulsarSplits.toArray(new PulsarInputSplit[0]); + + } catch (Exception e) { + e.printStackTrace(); + } + + return null; + } + + protected PulsarInputSplit genSplit(int index, List ledgers) { + return new PulsarInputSplit(index, ledgers); + } + + @Override + public InputSplitAssigner getInputSplitAssigner(PulsarInputSplit[] inputSplits) { + return new DefaultInputSplitAssigner(inputSplits); + } + + @Override + public void open(PulsarInputSplit split) throws IOException { + try { + reader = new GenericSplitReader(connectorConfig, split.getSplitNumber(), split.getLedgersToRead()); + } catch (Exception e) { + throw new IOException(String.format("Failed to open split %d to read", split.getSplitNumber()), e); + } + } + + @Override + public boolean reachedEnd() throws IOException { + return !reader.next(); + } + + @Override + public T nextRecord(T reuse) throws IOException { + return reader.get(); + } + + @Override + public void close() throws IOException { + if (reader != null) { + try { + reader.close(); + } catch (Exception e) { + throw new IOException(); + } + } + } + + @Override + public TypeInformation getProducedType() { + return deserializer.getProducedType(); + } + + private class GenericSplitReader extends InputSplitReader { + + public GenericSplitReader(ConnectorConfig connectorConfig, int partitionId, List ledgersToRead) throws Exception { + super(connectorConfig, partitionId, ledgersToRead); + } + + @Override + public T deserialize(RawMessage currentMessage) throws IOException { + return deserializer.deserialize(ByteBufUtil.getBytes(currentMessage.getData())); + } + } +} diff --git a/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarInputSplit.java b/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarInputSplit.java new file mode 100644 index 000000000..7da940830 --- /dev/null +++ b/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarInputSplit.java @@ -0,0 +1,36 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.pulsar; + +import org.apache.flink.core.io.InputSplit; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * Pulsar InputSplit. + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class PulsarInputSplit implements InputSplit { + + private int splitNumber; + + private List ledgersToRead; +} diff --git a/src/main/java/org/apache/flink/batch/connectors/pulsar/SplitUtils.java b/src/main/java/org/apache/flink/batch/connectors/pulsar/SplitUtils.java new file mode 100644 index 000000000..1c7dfe76f --- /dev/null +++ b/src/main/java/org/apache/flink/batch/connectors/pulsar/SplitUtils.java @@ -0,0 +1,191 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.pulsar; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl; +import org.apache.bookkeeper.mledger.offload.OffloadUtils; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.shade.com.google.common.collect.ImmutableList; +import org.apache.pulsar.shade.com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * Utilities for split generation logic. + */ +@Slf4j +public class SplitUtils { + + public static Collection getLedgersInBetween( + String topic, + MessageIdImpl start, + MessageIdImpl end, + CachedClients cachedClients) throws Exception { + + ReadOnlyCursorImpl readOnlyCursor = null; + + try { + + ManagedLedgerFactory mlFactory = cachedClients.getManagedLedgerFactory(); + readOnlyCursor = (ReadOnlyCursorImpl) mlFactory.openReadOnlyCursor( + TopicName.get(topic).getPersistenceNamingEncoding(), PositionImpl.earliest, new ManagedLedgerConfig()); + ManagedLedgerImpl ml = (ManagedLedgerImpl) readOnlyCursor.getManagedLedger(); + + List allLedgers = + Lists.newArrayList(ml.getLedgersInfo().subMap(start.getLedgerId(), true, end.getLedgerId(), true).values()); + + long actualStartLedger = allLedgers.get(0).getLedgerId(); + long actualStartEntry = start.getEntryId() > 0 ? start.getEntryId() : 0; + + MLDataFormats.ManagedLedgerInfo.LedgerInfo endLedger = allLedgers.get(allLedgers.size() - 1); + long actualEndLedger = endLedger.getLedgerId(); + // TODO: the last entry added cannot be seen because of https://github.com/apache/pulsar/pull/5822. + // when we bump Pulsar version to 2.5.1, this comment can be removed. + long lastEntry = endLedger.getEntries() - 1; + long actualEndEntry = + end.getEntryId() >= lastEntry ? + lastEntry : + (end.getEntryId() >= 0 ? end.getEntryId() : 0); + + Map ledgersToRead = new HashMap<>(); + + for (MLDataFormats.ManagedLedgerInfo.LedgerInfo li : allLedgers) { + if (li.hasOffloadContext() && li.getOffloadContext().getComplete()) { + UUID uid = new UUID(li.getOffloadContext().getUidMsb(), li.getOffloadContext().getUidLsb()); + Map metadata = OffloadUtils.getOffloadDriverMetadata(li); + metadata.put("ManagedLedgerName", topic); + ledgersToRead.put(li.getLedgerId(), + new InputLedger(topic, li.getLedgerId(), 0L, li.getEntries() - 1, uid, metadata)); + } else { + ledgersToRead.put(li.getLedgerId(), + new InputLedger(topic, li.getLedgerId(), 0L, li.getEntries() - 1, null, null)); + } + } + + ledgersToRead.get(actualStartLedger).setStartEntryId(actualStartEntry); + ledgersToRead.get(actualEndLedger).setEndEntryId(actualEndEntry); + + return ledgersToRead.values(); + + } catch (Exception e) { + e.printStackTrace(); + return null; + } finally { + + if (readOnlyCursor != null) { + try { + readOnlyCursor.close(); + } catch (Exception e) { + log.error("Failed to close readOnly cursor", e); + } + } + } + } + + /** + * Best effort partitioning based on ledger size (use backtracking method). + * If ledger number less than or equal to the target parallelism, make a partition for each ledger. + * @param ledgers The ledgers list to be partitioned. + * @param parallelism The number of target partition. + * @return Ledgers grouped into partitions. + */ + public static List> partitionToNSplits(List ledgers, int parallelism) { + + if (ledgers.size() <= parallelism) { + return ledgers.stream().map(l -> ImmutableList.of(l)).collect(Collectors.toList()); + } + + long totalSize = sizeOfLedgerList(ledgers); + long avgSizePerSplit = totalSize / parallelism; + + ledgers.sort(null); + + int li = ledgers.size() - 1; + int k = parallelism; + + List[] outputGroups = new List[k]; + for (int i = 0; i < outputGroups.length; i++) { + outputGroups[i] = new ArrayList<>(); + } + + while (li >= 0 && ledgers.get(li).ledgerSize() >= avgSizePerSplit) { + outputGroups[k - 1].add(ledgers.get(li)); + li--; + k--; + } + + search(outputGroups, li, ledgers, avgSizePerSplit); + + return Arrays.asList(outputGroups); + } + + private static boolean search( + List[] outputGroups, + int li, + List ledgers, + long avgSizePerSplit) { + + if (li < 0) { + return true; + } + + InputLedger currentLedger = ledgers.get(li); + li -= 1; + + long smallestGroupSize = Long.MAX_VALUE; + int smallestGroupIndex = 0; + + for (int i = 0; i < outputGroups.length; i++) { + long groupISize = sizeOfLedgerList(outputGroups[i]); + + if (groupISize + currentLedger.ledgerSize() <= avgSizePerSplit) { + outputGroups[i].add(currentLedger); + if (search(outputGroups, li, ledgers, avgSizePerSplit)) { + return true; + } + outputGroups[i].remove(outputGroups[i].size() - 1); + } + + if (groupISize < smallestGroupSize) { + smallestGroupSize = groupISize; + smallestGroupIndex = i; + } + } + + // put current ledger to the smallest-sized list and restart the search + outputGroups[smallestGroupIndex].add(currentLedger); + search(outputGroups, li, ledgers, avgSizePerSplit); + + return true; + } + + public static long sizeOfLedgerList(List ledgers) { + return ledgers.stream().mapToLong(InputLedger::ledgerSize).sum(); + } +} diff --git a/src/main/java/org/apache/flink/pulsar/common/ConnectorConfig.java b/src/main/java/org/apache/flink/pulsar/common/ConnectorConfig.java new file mode 100644 index 000000000..27dad0171 --- /dev/null +++ b/src/main/java/org/apache/flink/pulsar/common/ConnectorConfig.java @@ -0,0 +1,118 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.pulsar.common; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +/** + * Configuration for the connector. + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ConnectorConfig implements Serializable { + + // ------------------------------------------------------------------------ + // connection configurations + // ------------------------------------------------------------------------ + + private String serviceUrl; + + private String adminUrl; + + private String zookeeperUri; + + private String authPluginClassName; + + private String authParams; + + private String tlsTrustCertsFilePath; + + private Boolean tlsAllowInsecureConnection; + + private Boolean tlsHostnameVerificationEnable; + + // ------------------------------------------------------------------------ + // common configurations + // ------------------------------------------------------------------------ + + private String topic; + + private String topics; + + private String topicsPattern; + + // ------------------------------------------------------------------------ + // segment reader configurations + // ------------------------------------------------------------------------ + + private int entryReadBatchSize = 100; + private int targetNumSplits = 2; + private int maxSplitMessageQueueSize = 10000; + private int maxSplitEntryQueueSize = 1000; + private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE; + private String statsProvider = NullStatsProvider.class.getName(); + + private Map statsProviderConfigs = new HashMap<>(); + + private boolean namespaceDelimiterRewriteEnable = false; + private String rewriteNamespaceDelimiter = "/"; + + // --- Ledger Offloading --- + private String managedLedgerOffloadDriver = null; + private int managedLedgerOffloadMaxThreads = 2; + private String offloadersDirectory = "./offloaders"; + private Map offloaderProperties = new HashMap<>(); + + // --- Bookkeeper + private int bookkeeperThrottleValue = 0; + private int bookkeeperNumIOThreads = 2 * Runtime.getRuntime().availableProcessors(); + private int bookkeeperNumWorkerThreads = Runtime.getRuntime().availableProcessors(); + + // --- ManagedLedger + private long managedLedgerCacheSizeMB = 0L; + private int managedLedgerNumWorkerThreads = Runtime.getRuntime().availableProcessors(); + private int managedLedgerNumSchedulerThreads = Runtime.getRuntime().availableProcessors(); + + // ------------------------------------------------------------------------ + // stream reader configurations + // ------------------------------------------------------------------------ + + private long partitionDiscoveryIntervalMillis = -1; + + private boolean failOnDataLoss = true; + + private int clientCacheSize = 5; + + private boolean flushOnCheckpoint = true; + + private boolean failOnWrite = false; + + private int pollTimeoutMs = 120000; + + private int commitMaxRetries = 3; + + public ConnectorConfig(String adminUrl) { + this.adminUrl = adminUrl; + } +} diff --git a/src/main/java/org/apache/flink/pulsar/common/ConnectorConfigUtils.java b/src/main/java/org/apache/flink/pulsar/common/ConnectorConfigUtils.java new file mode 100644 index 000000000..8f20f3cca --- /dev/null +++ b/src/main/java/org/apache/flink/pulsar/common/ConnectorConfigUtils.java @@ -0,0 +1,72 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.pulsar.common; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; +import org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocal; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.Map; + +/** + * Utils on loading ConnectorConfig from Map. + */ +public final class ConnectorConfigUtils { + + public static ObjectMapper create() { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, false); + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + return mapper; + } + + private static final FastThreadLocal mapper = new FastThreadLocal() { + @Override + protected ObjectMapper initialValue() throws Exception { + return create(); + } + }; + + public static ObjectMapper getThreadLocal() { + return mapper.get(); + } + + private ConnectorConfigUtils() {} + + public static ConnectorConfig loadData(Map config) { + return loadData(config, new ConnectorConfig()); + } + + public static ConnectorConfig loadData(Map config, + ConnectorConfig existingData) { + ObjectMapper mapper = getThreadLocal(); + try { + String existingConfigJson = mapper.writeValueAsString(existingData); + Map existingConfig = mapper.readValue(existingConfigJson, Map.class); + Map newConfig = Maps.newHashMap(); + newConfig.putAll(existingConfig); + newConfig.putAll(config); + String configJson = mapper.writeValueAsString(newConfig); + return mapper.readValue(configJson, ConnectorConfig.class); + } catch (IOException e) { + throw new RuntimeException("Failed to load config into existing configuration data", e); + } + } +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/DateTimeUtils.java b/src/main/java/org/apache/flink/pulsar/common/DateTimeUtils.java similarity index 99% rename from src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/DateTimeUtils.java rename to src/main/java/org/apache/flink/pulsar/common/DateTimeUtils.java index d89e81dc2..4e1bb2582 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/DateTimeUtils.java +++ b/src/main/java/org/apache/flink/pulsar/common/DateTimeUtils.java @@ -12,7 +12,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.pulsar.internal; +package org.apache.flink.pulsar.common; import org.apache.flink.api.java.tuple.Tuple2; diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/JSONOptions.java b/src/main/java/org/apache/flink/pulsar/common/JSONOptions.java similarity index 99% rename from src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/JSONOptions.java rename to src/main/java/org/apache/flink/pulsar/common/JSONOptions.java index c5d213c4b..338ea4025 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/JSONOptions.java +++ b/src/main/java/org/apache/flink/pulsar/common/JSONOptions.java @@ -12,7 +12,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.pulsar.internal; +package org.apache.flink.pulsar.common; import org.apache.flink.util.Preconditions; diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/JSONOptionsInRead.java b/src/main/java/org/apache/flink/pulsar/common/JSONOptionsInRead.java similarity index 97% rename from src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/JSONOptionsInRead.java rename to src/main/java/org/apache/flink/pulsar/common/JSONOptionsInRead.java index 9cdbae76f..169b586b7 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/JSONOptionsInRead.java +++ b/src/main/java/org/apache/flink/pulsar/common/JSONOptionsInRead.java @@ -12,7 +12,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.pulsar.internal; +package org.apache.flink.pulsar.common; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/JacksonRecordParser.java b/src/main/java/org/apache/flink/pulsar/common/JacksonRecordParser.java similarity index 99% rename from src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/JacksonRecordParser.java rename to src/main/java/org/apache/flink/pulsar/common/JacksonRecordParser.java index 2dadd658d..0120c6974 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/JacksonRecordParser.java +++ b/src/main/java/org/apache/flink/pulsar/common/JacksonRecordParser.java @@ -12,7 +12,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.pulsar.internal; +package org.apache.flink.pulsar.common; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.FieldsDataType; diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ParseMode.java b/src/main/java/org/apache/flink/pulsar/common/ParseMode.java similarity index 95% rename from src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ParseMode.java rename to src/main/java/org/apache/flink/pulsar/common/ParseMode.java index 02dc423d2..b605711d8 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ParseMode.java +++ b/src/main/java/org/apache/flink/pulsar/common/ParseMode.java @@ -12,7 +12,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.pulsar.internal; +package org.apache.flink.pulsar.common; import java.util.Collections; import java.util.Map; diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarDeserializer.java b/src/main/java/org/apache/flink/pulsar/common/PulsarDeserializer.java similarity index 99% rename from src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarDeserializer.java rename to src/main/java/org/apache/flink/pulsar/common/PulsarDeserializer.java index 6d52a1799..b34bbe9a9 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarDeserializer.java +++ b/src/main/java/org/apache/flink/pulsar/common/PulsarDeserializer.java @@ -12,7 +12,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.pulsar.internal; +package org.apache.flink.pulsar.common; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.CollectionDataType; @@ -58,7 +58,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.META_FIELD_NAMES; +import static org.apache.flink.pulsar.common.PulsarOptions.META_FIELD_NAMES; import static org.apache.pulsar.shade.org.apache.avro.Schema.Type.ARRAY; import static org.apache.pulsar.shade.org.apache.avro.Schema.Type.DOUBLE; import static org.apache.pulsar.shade.org.apache.avro.Schema.Type.FLOAT; diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarMetadataReader.java b/src/main/java/org/apache/flink/pulsar/common/PulsarMetadataReader.java similarity index 89% rename from src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarMetadataReader.java rename to src/main/java/org/apache/flink/pulsar/common/PulsarMetadataReader.java index 01bc582c1..de38ec4b5 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarMetadataReader.java +++ b/src/main/java/org/apache/flink/pulsar/common/PulsarMetadataReader.java @@ -12,9 +12,9 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.pulsar.internal; +package org.apache.flink.pulsar.common; -import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils.IncompatibleSchemaException; +import org.apache.flink.pulsar.common.SchemaUtils.IncompatibleSchemaException; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -24,6 +24,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.ListUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; @@ -59,11 +60,11 @@ @Slf4j public class PulsarMetadataReader implements AutoCloseable { - private final String adminUrl; + private final ConnectorConfig connectorConfig; private final String subscriptionName; - private final Map caseInsensitiveParams; + private final boolean useExternalSubscription; private final int indexOfThisSubtask; @@ -75,33 +76,40 @@ public class PulsarMetadataReader implements AutoCloseable { private Set seenTopics = new HashSet<>(); - private final boolean useExternalSubscription; + public PulsarMetadataReader(ConnectorConfig connectorConfig) throws PulsarClientException { + this(connectorConfig, -1, -1); + } public PulsarMetadataReader( - String adminUrl, + ConnectorConfig connectorConfig, + int indexOfThisSubtask, + int numParallelSubtasks) throws PulsarClientException { + + this(connectorConfig, "", indexOfThisSubtask, numParallelSubtasks); + } + + public PulsarMetadataReader( + ConnectorConfig connectorConfig, String subscriptionName, - Map caseInsensitiveParams, int indexOfThisSubtask, - int numParallelSubtasks, - boolean useExternalSubscription) throws PulsarClientException { + int numParallelSubtasks) throws PulsarClientException { - this.adminUrl = adminUrl; - this.subscriptionName = subscriptionName; - this.caseInsensitiveParams = caseInsensitiveParams; - this.indexOfThisSubtask = indexOfThisSubtask; - this.numParallelSubtasks = numParallelSubtasks; - this.useExternalSubscription = useExternalSubscription; - this.admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build(); + this(connectorConfig, subscriptionName, false, indexOfThisSubtask, numParallelSubtasks); } public PulsarMetadataReader( - String adminUrl, + ConnectorConfig connectorConfig, String subscriptionName, - Map caseInsensitiveParams, + boolean useExternalSubscription, int indexOfThisSubtask, int numParallelSubtasks) throws PulsarClientException { - this(adminUrl, subscriptionName, caseInsensitiveParams, indexOfThisSubtask, numParallelSubtasks, false); + this.subscriptionName = subscriptionName; + this.connectorConfig = connectorConfig; + this.indexOfThisSubtask = indexOfThisSubtask; + this.numParallelSubtasks = numParallelSubtasks; + this.useExternalSubscription = useExternalSubscription; + this.admin = PulsarAdmin.builder().serviceHttpUrl(connectorConfig.getAdminUrl()).build(); } @Override @@ -354,22 +362,20 @@ public Set getTopicPartitionsAll() throws PulsarAdminException { } public List getTopics() throws PulsarAdminException { - for (Map.Entry e : caseInsensitiveParams.entrySet()) { - if (PulsarOptions.TOPIC_OPTION_KEYS.contains(e.getKey())) { - String key = e.getKey(); - if (key.equals("topic")) { - return Collections.singletonList(TopicName.get(e.getValue()).toString()); - } else if (key.equals("topics")) { - return Arrays.asList(e.getValue().split(",")).stream() - .filter(s -> !s.isEmpty()) - .map(t -> TopicName.get(t).toString()) - .collect(Collectors.toList()); - } else { // topicspattern - return getTopicsWithPattern(e.getValue()); - } - } + if (connectorConfig.getTopic() != null) { + return Collections.singletonList(TopicName.get(connectorConfig.getTopic()).toString()); + } else if (connectorConfig.getTopics() != null) { + return Arrays.asList(connectorConfig.getTopics().split(",")).stream() + .filter(s -> !s.isEmpty()) + .map(t -> TopicName.get(t).toString()) + .collect(Collectors.toList()); + } else if (connectorConfig.getTopicsPattern() != null) { + return getTopicsWithPattern(connectorConfig.getTopicsPattern()); + } else { + throw new IllegalArgumentException( + "You should specify topic(s) using one of the topic options: " + + StringUtils.join(PulsarOptions.TOPIC_OPTION_KEYS, ",")); } - return null; } private List getTopicsWithPattern(String topicsPattern) throws PulsarAdminException { diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java b/src/main/java/org/apache/flink/pulsar/common/PulsarOptions.java similarity index 84% rename from src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java rename to src/main/java/org/apache/flink/pulsar/common/PulsarOptions.java index 1c02d42cc..8b7acae43 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarOptions.java +++ b/src/main/java/org/apache/flink/pulsar/common/PulsarOptions.java @@ -12,7 +12,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.pulsar.internal; +package org.apache.flink.pulsar.common; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.shade.com.google.common.collect.ImmutableSet; @@ -46,14 +46,6 @@ public class PulsarOptions { public static final String ADMIN_URL_OPTION_KEY = "admin-url"; public static final String STARTUP_MODE_OPTION_KEY = "startup-mode"; - public static final String PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY = "partitiondiscoveryintervalmillis"; - public static final String CLIENT_CACHE_SIZE_OPTION_KEY = "clientcachesize"; - public static final String FLUSH_ON_CHECKPOINT_OPTION_KEY = "flushoncheckpoint"; - public static final String FAIL_ON_WRITE_OPTION_KEY = "failonwrite"; - public static final String POLL_TIMEOUT_MS_OPTION_KEY = "polltimeoutms"; - public static final String COMMIT_MAX_RETRIES = "commitmaxretries"; - public static final String FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"; - public static final String INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE = "Some data may have been lost because they are not available in Pulsar any more; either the\n" + " data was aged out by Pulsar or the topic may have been deleted before all the data in the\n" + diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarSerializer.java b/src/main/java/org/apache/flink/pulsar/common/PulsarSerializer.java similarity index 99% rename from src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarSerializer.java rename to src/main/java/org/apache/flink/pulsar/common/PulsarSerializer.java index 8c2601eb4..13bb81a3f 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarSerializer.java +++ b/src/main/java/org/apache/flink/pulsar/common/PulsarSerializer.java @@ -12,7 +12,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.pulsar.internal; +package org.apache.flink.pulsar.common; import org.apache.flink.table.types.CollectionDataType; import org.apache.flink.table.types.DataType; diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SchemaUtils.java b/src/main/java/org/apache/flink/pulsar/common/SchemaUtils.java similarity index 97% rename from src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SchemaUtils.java rename to src/main/java/org/apache/flink/pulsar/common/SchemaUtils.java index c1071abb1..af2a5adc6 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SchemaUtils.java +++ b/src/main/java/org/apache/flink/pulsar/common/SchemaUtils.java @@ -12,7 +12,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.pulsar.internal; +package org.apache.flink.pulsar.common; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; @@ -59,11 +59,11 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.EVENT_TIME_NAME; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.KEY_ATTRIBUTE_NAME; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.MESSAGE_ID_NAME; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.PUBLISH_TIME_NAME; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.TOPIC_ATTRIBUTE_NAME; +import static org.apache.flink.pulsar.common.PulsarOptions.EVENT_TIME_NAME; +import static org.apache.flink.pulsar.common.PulsarOptions.KEY_ATTRIBUTE_NAME; +import static org.apache.flink.pulsar.common.PulsarOptions.MESSAGE_ID_NAME; +import static org.apache.flink.pulsar.common.PulsarOptions.PUBLISH_TIME_NAME; +import static org.apache.flink.pulsar.common.PulsarOptions.TOPIC_ATTRIBUTE_NAME; import static org.apache.flink.util.Preconditions.checkNotNull; /** diff --git a/src/main/java/org/apache/flink/pulsar/common/SourceSinkUtils.java b/src/main/java/org/apache/flink/pulsar/common/SourceSinkUtils.java new file mode 100644 index 000000000..fe086178e --- /dev/null +++ b/src/main/java/org/apache/flink/pulsar/common/SourceSinkUtils.java @@ -0,0 +1,93 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.pulsar.common; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Utilities for source sink options parsing. + */ +public class SourceSinkUtils { + + public static ConnectorConfig validateSourceOptions(ConnectorConfig connectorConfig) { + + int numTopicSettings = 0; + + if (connectorConfig.getTopic() != null) { + numTopicSettings += 1; + } + + if (connectorConfig.getTopics() != null) { + numTopicSettings += 1; + } + + if (connectorConfig.getTopicsPattern() != null) { + numTopicSettings += 1; + } + + if (numTopicSettings == 0 || numTopicSettings > 1) { + throw new IllegalArgumentException( + "You should specify topic(s) using one of the topic options: " + + StringUtils.join(PulsarOptions.TOPIC_OPTION_KEYS, ",")); + } + + if (connectorConfig.getTopic() != null) { + String topic = connectorConfig.getTopic(); + if (topic.contains(",")) { + throw new IllegalArgumentException( + "Use `topics` instead of `topic` for multi topic read"); + } + } + + if (connectorConfig.getTopics() != null) { + String tps = connectorConfig.getTopics(); + List topics = Arrays.asList(tps.split(",")).stream() + .map(String::trim).filter(t -> !t.isEmpty()).collect(Collectors.toList()); + if (topics.isEmpty()) { + throw new IllegalArgumentException( + "No topics is specified for read with option" + tps); + } + } + if (connectorConfig.getTopicsPattern() != null) { + if (connectorConfig.getTopicsPattern().trim().length() == 0) { + throw new IllegalArgumentException("TopicsPattern is empty"); + } + } + + return connectorConfig; + } + + public static boolean belongsTo(String topic, int numParallelSubtasks, int index) { + return (topic.hashCode() * 31 & Integer.MAX_VALUE) % numParallelSubtasks == index; + } + + public static Map getReaderParams(Map parameters) { + return parameters.keySet().stream() + .filter(k -> k.startsWith(PulsarOptions.PULSAR_READER_OPTION_KEY_PREFIX)) + .collect(Collectors.toMap(k -> k.substring(PulsarOptions.PULSAR_READER_OPTION_KEY_PREFIX.length()), k -> parameters.get(k))); + } + + public static Map getProducerParams(Map parameters) { + return parameters.keySet().stream() + .filter(k -> k.startsWith(PulsarOptions.PULSAR_PRODUCER_OPTION_KEY_PREFIX)) + .collect(Collectors.toMap(k -> k.substring(PulsarOptions.PULSAR_PRODUCER_OPTION_KEY_PREFIX.length()), k -> parameters.get(k))); + } + +} diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarRowSink.java b/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarRowSink.java index 5066e6e95..dc3f7d7b5 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarRowSink.java +++ b/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarRowSink.java @@ -16,9 +16,9 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.connectors.pulsar.internal.DateTimeUtils; -import org.apache.flink.streaming.connectors.pulsar.internal.PulsarSerializer; -import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils; +import org.apache.flink.pulsar.common.DateTimeUtils; +import org.apache.flink.pulsar.common.PulsarSerializer; +import org.apache.flink.pulsar.common.SchemaUtils; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.FieldsDataType; @@ -39,10 +39,10 @@ import java.util.Properties; import java.util.stream.Collectors; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.EVENT_TIME_NAME; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.KEY_ATTRIBUTE_NAME; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.META_FIELD_NAMES; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.TOPIC_ATTRIBUTE_NAME; +import static org.apache.flink.pulsar.common.PulsarOptions.EVENT_TIME_NAME; +import static org.apache.flink.pulsar.common.PulsarOptions.KEY_ATTRIBUTE_NAME; +import static org.apache.flink.pulsar.common.PulsarOptions.META_FIELD_NAMES; +import static org.apache.flink.pulsar.common.PulsarOptions.TOPIC_ATTRIBUTE_NAME; /** * Write Flink Row to Pulsar. diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarRowSource.java b/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarRowSource.java index 593c47bb3..c67a5bcfd 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarRowSource.java +++ b/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarRowSource.java @@ -15,11 +15,11 @@ package org.apache.flink.streaming.connectors.pulsar; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.pulsar.common.PulsarMetadataReader; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher; -import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader; import org.apache.flink.streaming.connectors.pulsar.internal.PulsarRowFetcher; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.table.types.FieldsDataType; @@ -50,7 +50,7 @@ public FlinkPulsarRowSource(String serviceUrl, String adminUrl, Properties prope @Override public TypeInformation getProducedType() { if (typeInformation == null) { - try (PulsarMetadataReader reader = new PulsarMetadataReader(adminUrl, "", caseInsensitiveParams, -1, -1)) { + try (PulsarMetadataReader reader = new PulsarMetadataReader(connectorConfig)) { List topics = reader.getTopics(); FieldsDataType schema = reader.getSchema(topics); typeInformation = (TypeInformation) LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(schema); diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java b/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java index 5f13a01f2..5f0ab0306 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java +++ b/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.java @@ -17,14 +17,16 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.configuration.Configuration; +import org.apache.flink.pulsar.common.ConnectorConfig; +import org.apache.flink.pulsar.common.ConnectorConfigUtils; +import org.apache.flink.pulsar.common.SchemaUtils; +import org.apache.flink.pulsar.common.SourceSinkUtils; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient; -import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils; -import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializableObject; @@ -64,7 +66,7 @@ abstract class FlinkPulsarSinkBase extends RichSinkFunction implements Che protected ClientConfigurationData clientConfigurationData; - protected final Map caseInsensitiveParams; + protected final ConnectorConfig connectorConfig; protected final Map producerConf; @@ -121,19 +123,16 @@ public FlinkPulsarSinkBase( this.properties = checkNotNull(properties); - this.caseInsensitiveParams = - SourceSinkUtils.toCaceInsensitiveParams(Maps.fromProperties(properties)); + this.connectorConfig = ConnectorConfigUtils.loadData(Maps.fromProperties(properties)); this.producerConf = SourceSinkUtils.getProducerParams(Maps.fromProperties(properties)); - this.flushOnCheckpoint = - SourceSinkUtils.flushOnCheckpoint(caseInsensitiveParams); + this.flushOnCheckpoint = connectorConfig.isFlushOnCheckpoint(); - this.failOnWrite = - SourceSinkUtils.failOnWrite(caseInsensitiveParams); + this.failOnWrite = connectorConfig.isFailOnWrite(); - CachedPulsarClient.setCacheSize(SourceSinkUtils.getClientCacheSize(caseInsensitiveParams)); + CachedPulsarClient.setCacheSize(connectorConfig.getClientCacheSize()); if (this.clientConfigurationData.getServiceUrl() == null) { throw new IllegalArgumentException("ServiceUrl must be supplied in the client configuration"); diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java b/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java index 0910cc508..30eb2eadd 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java +++ b/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSource.java @@ -26,6 +26,11 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; +import org.apache.flink.pulsar.common.ConnectorConfig; +import org.apache.flink.pulsar.common.ConnectorConfigUtils; +import org.apache.flink.pulsar.common.PulsarMetadataReader; +import org.apache.flink.pulsar.common.PulsarOptions; +import org.apache.flink.pulsar.common.SourceSinkUtils; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -38,9 +43,6 @@ import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient; import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCommitCallback; import org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher; -import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader; -import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions; -import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; @@ -91,11 +93,9 @@ public class FlinkPulsarSource // configuration state, set on the client relevant for all subtasks // ------------------------------------------------------------------------ - protected String adminUrl; - protected ClientConfigurationData clientConfigurationData; - protected final Map caseInsensitiveParams; + protected final ConnectorConfig connectorConfig; protected final Map readerConf; @@ -190,22 +190,22 @@ public FlinkPulsarSource( ClientConfigurationData clientConf, DeserializationSchema deserializer, Properties properties) { - this.adminUrl = checkNotNull(adminUrl); + + this.connectorConfig = SourceSinkUtils.validateSourceOptions( + ConnectorConfigUtils.loadData( + Maps.fromProperties(properties), + new ConnectorConfig(checkNotNull(adminUrl)))); + this.clientConfigurationData = checkNotNull(clientConf); this.deserializer = deserializer; this.properties = properties; - this.caseInsensitiveParams = - SourceSinkUtils.validateStreamSourceOptions(Maps.fromProperties(properties)); this.readerConf = SourceSinkUtils.getReaderParams(Maps.fromProperties(properties)); - this.discoveryIntervalMillis = - SourceSinkUtils.getPartitionDiscoveryIntervalInMillis(caseInsensitiveParams); - this.pollTimeoutMs = - SourceSinkUtils.getPollTimeoutMs(caseInsensitiveParams); - this.commitMaxRetries = - SourceSinkUtils.getCommitMaxRetries(caseInsensitiveParams); + this.discoveryIntervalMillis = connectorConfig.getPartitionDiscoveryIntervalMillis(); + this.pollTimeoutMs = connectorConfig.getPollTimeoutMs(); + this.commitMaxRetries = connectorConfig.getCommitMaxRetries(); - CachedPulsarClient.setCacheSize(SourceSinkUtils.getClientCacheSize(caseInsensitiveParams)); + CachedPulsarClient.setCacheSize(connectorConfig.getClientCacheSize()); if (this.clientConfigurationData.getServiceUrl() == null) { throw new IllegalArgumentException("ServiceUrl must be supplied in the client configuration"); @@ -404,12 +404,11 @@ protected String getSubscriptionName() { protected PulsarMetadataReader createMetadataReader() throws PulsarClientException { return new PulsarMetadataReader( - adminUrl, + connectorConfig, getSubscriptionName(), - caseInsensitiveParams, + startupMode == StartupMode.EXTERNAL_SUBSCRIPTION, taskIndex, - numParallelTasks, - startupMode == StartupMode.EXTERNAL_SUBSCRIPTION); + numParallelTasks); } @Override diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSource.java b/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSource.java index 42010bd64..5fac104d9 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSource.java +++ b/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSource.java @@ -15,12 +15,13 @@ package org.apache.flink.streaming.connectors.pulsar; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.pulsar.common.ConnectorConfig; +import org.apache.flink.pulsar.common.ConnectorConfigUtils; +import org.apache.flink.pulsar.common.PulsarMetadataReader; +import org.apache.flink.pulsar.common.SchemaUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.pulsar.config.StartupMode; -import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader; -import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils; -import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; import org.apache.flink.table.api.ValidationException; @@ -69,7 +70,7 @@ public class PulsarTableSource private final String externalSubscriptionName; - private final Map caseInsensitiveParams; + private final ConnectorConfig connectorConfig; private final Optional providedSchema; private final Optional proctimeAttribute; @@ -97,8 +98,7 @@ public PulsarTableSource( this.specificStartupOffsets = specificStartupOffsets; this.externalSubscriptionName = externalSubscriptionName; - this.caseInsensitiveParams = - SourceSinkUtils.validateStreamSourceOptions(Maps.fromProperties(properties)); + this.connectorConfig = ConnectorConfigUtils.loadData(Maps.fromProperties(properties)); this.schema = inferTableSchema(); @@ -167,7 +167,7 @@ private TableSchema inferTableSchema() { return providedSchema.get(); } else { try { - PulsarMetadataReader reader = new PulsarMetadataReader(adminUrl, "", caseInsensitiveParams, -1, -1); + PulsarMetadataReader reader = new PulsarMetadataReader(connectorConfig); List topics = reader.getTopics(); FieldsDataType schema = reader.getSchema(topics); return SchemaUtils.toTableSchema(schema); diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSourceSinkFactory.java b/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSourceSinkFactory.java index 25b0fe348..173fa708d 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSourceSinkFactory.java +++ b/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSourceSinkFactory.java @@ -14,8 +14,8 @@ package org.apache.flink.streaming.connectors.pulsar; +import org.apache.flink.pulsar.common.PulsarMetadataReader; import org.apache.flink.streaming.connectors.pulsar.config.StartupMode; -import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.CatalogTable; diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.java b/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.java index b1565a1dc..578de9fd6 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.java +++ b/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.java @@ -15,6 +15,7 @@ package org.apache.flink.streaming.connectors.pulsar.internal; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.pulsar.common.PulsarMetadataReader; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction; diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarRowFetcher.java b/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarRowFetcher.java index 865b97593..f741a9508 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarRowFetcher.java +++ b/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarRowFetcher.java @@ -15,6 +15,8 @@ package org.apache.flink.streaming.connectors.pulsar.internal; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.pulsar.common.PulsarMetadataReader; +import org.apache.flink.pulsar.common.SchemaUtils; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction; diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java b/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java index 7b21df866..dc0a3d4be 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java +++ b/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/ReaderThread.java @@ -15,6 +15,7 @@ package org.apache.flink.streaming.connectors.pulsar.internal; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.pulsar.common.PulsarOptions; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Message; diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/RowReaderThread.java b/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/RowReaderThread.java index 942f09768..9d965f128 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/RowReaderThread.java +++ b/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/RowReaderThread.java @@ -14,6 +14,9 @@ package org.apache.flink.streaming.connectors.pulsar.internal; +import org.apache.flink.pulsar.common.JSONOptionsInRead; +import org.apache.flink.pulsar.common.PulsarDeserializer; +import org.apache.flink.pulsar.common.SchemaUtils; import org.apache.flink.types.Row; import lombok.extern.slf4j.Slf4j; diff --git a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SourceSinkUtils.java b/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SourceSinkUtils.java deleted file mode 100644 index 93f7315ab..000000000 --- a/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/SourceSinkUtils.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.pulsar.internal; - -import org.apache.commons.lang3.StringUtils; - -import java.util.Arrays; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * Utilities for source sink options parsing. - */ -public class SourceSinkUtils { - - public static Map validateStreamSourceOptions(Map parameters) { - Map caseInsensitiveParams = parameters.entrySet().stream() - .collect(Collectors.toMap(t -> t.getKey().toLowerCase(Locale.ROOT), t -> t.getValue())); - - return validateSourceOptions(caseInsensitiveParams); - } - - private static Map validateSourceOptions(Map caseInsensitiveParams) { - Map topicOptions = caseInsensitiveParams.entrySet().stream() - .filter(t -> PulsarOptions.TOPIC_OPTION_KEYS.contains(t.getKey())) - .collect(Collectors.toMap(map -> map.getKey(), map -> map.getValue())); - - if (topicOptions.isEmpty() || topicOptions.size() > 1) { - throw new IllegalArgumentException( - "You should specify topic(s) using one of the topic options: " + - StringUtils.join(PulsarOptions.TOPIC_OPTION_KEYS, ",")); - } - - for (Map.Entry topicEntry : topicOptions.entrySet()) { - String key = topicEntry.getKey(); - String value = topicEntry.getValue(); - if (key.equals("topic")) { - if (value.contains(",")) { - throw new IllegalArgumentException( - "Use `topics` instead of `topic` for multi topic read"); - } - } else if (key.equals("topics")) { - List topics = Arrays.asList(value.split(",")).stream() - .map(String::trim).filter(t -> !t.isEmpty()).collect(Collectors.toList()); - if (topics.isEmpty()) { - throw new IllegalArgumentException( - "No topics is specified for read with option" + value); - } - } else { - if (value.trim().length() == 0) { - throw new IllegalArgumentException("TopicsPattern is empty"); - } - } - } - return caseInsensitiveParams; - } - - public static boolean belongsTo(String topic, int numParallelSubtasks, int index) { - return (topic.hashCode() * 31 & Integer.MAX_VALUE) % numParallelSubtasks == index; - } - - public static long getPartitionDiscoveryIntervalInMillis(Map parameters) { - String interval = parameters.getOrDefault(PulsarOptions.PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY, "-1"); - return Long.parseLong(interval); - } - - public static int getPollTimeoutMs(Map parameters) { - String interval = parameters.getOrDefault(PulsarOptions.POLL_TIMEOUT_MS_OPTION_KEY, "120000"); - return Integer.parseInt(interval); - } - - public static int getCommitMaxRetries(Map parameters) { - String interval = parameters.getOrDefault(PulsarOptions.POLL_TIMEOUT_MS_OPTION_KEY, "120000"); - return Integer.parseInt(interval); - } - - public static int getClientCacheSize(Map parameters) { - String size = parameters.getOrDefault(PulsarOptions.CLIENT_CACHE_SIZE_OPTION_KEY, "5"); - return Integer.parseInt(size); - } - - public static boolean flushOnCheckpoint(Map parameters) { - String b = parameters.getOrDefault(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, "true"); - return Boolean.parseBoolean(b); - } - - public static boolean failOnWrite(Map parameters) { - String b = parameters.getOrDefault(PulsarOptions.FAIL_ON_WRITE_OPTION_KEY, "false"); - return Boolean.parseBoolean(b); - } - - public static Map getReaderParams(Map parameters) { - return parameters.keySet().stream() - .filter(k -> k.startsWith(PulsarOptions.PULSAR_READER_OPTION_KEY_PREFIX)) - .collect(Collectors.toMap(k -> k.substring(PulsarOptions.PULSAR_READER_OPTION_KEY_PREFIX.length()), k -> parameters.get(k))); - } - - public static Map toCaceInsensitiveParams(Map parameters) { - return parameters.entrySet().stream() - .collect(Collectors.toMap(t -> t.getKey().toLowerCase(Locale.ROOT), t -> t.getValue())); - } - - public static Map getProducerParams(Map parameters) { - return parameters.keySet().stream() - .filter(k -> k.startsWith(PulsarOptions.PULSAR_PRODUCER_OPTION_KEY_PREFIX)) - .collect(Collectors.toMap(k -> k.substring(PulsarOptions.PULSAR_PRODUCER_OPTION_KEY_PREFIX.length()), k -> parameters.get(k))); - } - -} diff --git a/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalog.java b/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalog.java index 7c276b7a7..a3e1e7009 100644 --- a/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalog.java +++ b/src/main/java/org/apache/flink/table/catalog/pulsar/PulsarCatalog.java @@ -14,10 +14,11 @@ package org.apache.flink.table.catalog.pulsar; +import org.apache.flink.pulsar.common.ConnectorConfig; +import org.apache.flink.pulsar.common.PulsarMetadataReader; +import org.apache.flink.pulsar.common.PulsarOptions; +import org.apache.flink.pulsar.common.SchemaUtils; import org.apache.flink.streaming.connectors.pulsar.PulsarTableSourceSinkFactory; -import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader; -import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions; -import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; @@ -62,15 +63,15 @@ @Slf4j public class PulsarCatalog extends AbstractCatalog { - private String adminUrl; + private ConnectorConfig connectorConfig; private Map properties; private PulsarMetadataReader metadataReader; - public PulsarCatalog(String adminUrl, String catalogName, Map props, String defaultDatabase) { + public PulsarCatalog(ConnectorConfig connectorConfig, String catalogName, Map props, String defaultDatabase) { super(catalogName, defaultDatabase); - this.adminUrl = adminUrl; + this.connectorConfig = connectorConfig; this.properties = new HashMap<>(); for (Map.Entry kv : props.entrySet()) { properties.put(CONNECTOR + "." + kv.getKey(), kv.getValue()); @@ -90,9 +91,9 @@ public Optional getTableFactory() { public void open() throws CatalogException { if (metadataReader == null) { try { - metadataReader = new PulsarMetadataReader(adminUrl, "", new HashMap<>(), -1, -1); + metadataReader = new PulsarMetadataReader(connectorConfig); } catch (PulsarClientException e) { - throw new CatalogException("Failed to create Pulsar admin using " + adminUrl, e); + throw new CatalogException("Failed to create Pulsar admin using " + connectorConfig.getAdminUrl(), e); } } } diff --git a/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogValidator.java b/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogValidator.java index 6eb162cf2..1a38c25c9 100644 --- a/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogValidator.java +++ b/src/main/java/org/apache/flink/table/catalog/pulsar/descriptors/PulsarCatalogValidator.java @@ -14,7 +14,7 @@ package org.apache.flink.table.catalog.pulsar.descriptors; -import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions; +import org.apache.flink.pulsar.common.PulsarOptions; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.descriptors.CatalogDescriptorValidator; import org.apache.flink.table.descriptors.DescriptorProperties; diff --git a/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactory.java b/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactory.java index 560150f86..529a3edea 100644 --- a/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactory.java +++ b/src/main/java/org/apache/flink/table/catalog/pulsar/factories/PulsarCatalogFactory.java @@ -14,6 +14,7 @@ package org.apache.flink.table.catalog.pulsar.factories; +import org.apache.flink.pulsar.common.ConnectorConfig; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.pulsar.PulsarCatalog; import org.apache.flink.table.catalog.pulsar.descriptors.PulsarCatalogValidator; @@ -46,7 +47,7 @@ public Catalog createCatalog(String name, Map properties) { String defaultDB = dp.getOptionalString(CATALOG_DEFAULT_DATABASE).orElse("public/default"); String adminUrl = dp.getString(CATALOG_ADMIN_URL); - return new PulsarCatalog(adminUrl, name, dp.asMap(), defaultDB); + return new PulsarCatalog(new ConnectorConfig(adminUrl), name, dp.asMap(), defaultDB); } @Override diff --git a/src/main/java/org/apache/flink/table/descriptors/PulsarValidator.java b/src/main/java/org/apache/flink/table/descriptors/PulsarValidator.java index b712eb621..b5dc7e0db 100644 --- a/src/main/java/org/apache/flink/table/descriptors/PulsarValidator.java +++ b/src/main/java/org/apache/flink/table/descriptors/PulsarValidator.java @@ -20,10 +20,10 @@ import java.util.Map; import java.util.function.Consumer; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.ADMIN_URL_OPTION_KEY; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.SERVICE_URL_OPTION_KEY; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.STARTUP_MODE_OPTION_KEY; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.TOPIC_SINGLE_OPTION_KEY; +import static org.apache.flink.pulsar.common.PulsarOptions.ADMIN_URL_OPTION_KEY; +import static org.apache.flink.pulsar.common.PulsarOptions.SERVICE_URL_OPTION_KEY; +import static org.apache.flink.pulsar.common.PulsarOptions.STARTUP_MODE_OPTION_KEY; +import static org.apache.flink.pulsar.common.PulsarOptions.TOPIC_SINGLE_OPTION_KEY; import static org.apache.flink.table.descriptors.DescriptorProperties.noValidation; /** diff --git a/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarInputFormatITest.java b/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarInputFormatITest.java new file mode 100644 index 000000000..a22058cc7 --- /dev/null +++ b/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarInputFormatITest.java @@ -0,0 +1,108 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.pulsar; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.pulsar.common.ConnectorConfig; +import org.apache.flink.streaming.connectors.pulsar.PulsarTestBaseWithFlink; + +import org.apache.pulsar.common.schema.SchemaType; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Integration test for PulsarInputFormat. + */ +public class PulsarInputFormatITest extends PulsarTestBaseWithFlink { + + @Ignore + public void testBatchSimpleFormat() throws Exception { + + String tp = newTopic(); + List messages = + IntStream.range(0, 50).mapToObj(t -> Integer.valueOf(t)).collect(Collectors.toList()); + + sendTypedMessages(tp, SchemaType.INT32, messages, Optional.empty()); + + ConnectorConfig config = new ConnectorConfig(); + config.setTopic(tp); + config.setAdminUrl(adminUrl); + config.setZookeeperUri(zkUrl); + + PulsarInputFormat inputFormat = new PulsarInputFormat<>(config, new IntegerDeserializer()); + inputFormat.openInputFormat(); + InputSplit[] splits = inputFormat.createInputSplits(3); + Assert.assertTrue(splits.length <= 3); + List ilist = new ArrayList<>(50); + + for (InputSplit split : splits) { + inputFormat.open((PulsarInputSplit) split); + while (!inputFormat.reachedEnd()) { + ilist.add(inputFormat.nextRecord(1)); + } + inputFormat.close(); + } + + inputFormat.closeInputFormat(); + // TODO: the last entry added cannot be seen because of https://github.com/apache/pulsar/pull/5822. + // when we bump Pulsar version to 2.5.1, this comment can be removed. + Assert.assertArrayEquals(ilist.toArray(new Integer[0]), messages.subList(0, messages.size() - 1).toArray(new Integer[0])); + } + + private static class IntegerDeserializer implements DeserializationSchema { + private final TypeInformation ti; + private final TypeSerializer ser; + + public IntegerDeserializer() { + this.ti = org.apache.flink.api.scala.typeutils.Types.INT(); + this.ser = ti.createSerializer(new ExecutionConfig()); + } + + @Override + public Integer deserialize(byte[] message) throws IOException { + + DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); + Integer i = ser.deserialize(in); + + return i; + } + + @Override + public boolean isEndOfStream(Integer nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return ti; + } + } + +} diff --git a/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarITest.java b/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarITest.java index 1396ba44c..86dcd2c53 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarITest.java +++ b/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarITest.java @@ -31,7 +31,6 @@ import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; @@ -58,6 +57,7 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; +import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; @@ -93,14 +93,10 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.flink.pulsar.common.PulsarOptions.TOPIC_ATTRIBUTE_NAME; +import static org.apache.flink.pulsar.common.PulsarOptions.TOPIC_MULTI_OPTION_KEY; +import static org.apache.flink.pulsar.common.PulsarOptions.TOPIC_SINGLE_OPTION_KEY; import static org.apache.flink.streaming.connectors.pulsar.SchemaData.fooList; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.CLIENT_CACHE_SIZE_OPTION_KEY; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.FAIL_ON_WRITE_OPTION_KEY; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.TOPIC_ATTRIBUTE_NAME; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.TOPIC_MULTI_OPTION_KEY; -import static org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions.TOPIC_SINGLE_OPTION_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -238,8 +234,8 @@ public void testClientCacheParameterPassedToTasks() throws Exception { DataStream stream = see.addSource(new MultiTopicSource(topics, numElements)); Properties sinkProp = sinkProperties(); - sinkProp.setProperty(FLUSH_ON_CHECKPOINT_OPTION_KEY, "true"); - sinkProp.setProperty(CLIENT_CACHE_SIZE_OPTION_KEY, "7"); + sinkProp.setProperty("flushOnCheckpoint", "true"); + sinkProp.setProperty("clientCacheSize", "7"); stream.addSink(new AssertSink(serviceUrl, adminUrl, 7, sinkProp, intRowWithTopicType())); see.execute("write with topics"); } @@ -880,19 +876,19 @@ private TypeInformation intRowWithTopicTypeInfo() { private Properties sinkProperties() { Properties props = new Properties(); - props.setProperty(FLUSH_ON_CHECKPOINT_OPTION_KEY, "true"); - props.setProperty(FAIL_ON_WRITE_OPTION_KEY, "true"); + props.setProperty("flushOnCheckpoint", "true"); + props.setProperty("failOnWrite", "true"); return props; } private Properties sourceProperties() { Properties props = new Properties(); - props.setProperty(PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY, "5000"); + props.setProperty("partitionDiscoveryIntervalMillis", "5000"); return props; } private void produceIntoPulsar(DataStream stream, DataType dt, Properties props) { - props.setProperty(FLUSH_ON_CHECKPOINT_OPTION_KEY, "true"); + props.setProperty("flushOnCheckpoint", "true"); stream.addSink(new FlinkPulsarRowSink(serviceUrl, adminUrl, Optional.empty(), props, dt)); } @@ -1141,8 +1137,8 @@ public InfiniteStringGenerator(String tp) { public void run() { try { Properties props = new Properties(); - props.setProperty(FLUSH_ON_CHECKPOINT_OPTION_KEY, "true"); - props.setProperty(FAIL_ON_WRITE_OPTION_KEY, "true"); + props.setProperty("flushOnCheckpoint", "true"); + props.setProperty("failOnWrite", "true"); StreamSink sink = new StreamSink<>( new FlinkPulsarSinkBase(serviceUrl, adminUrl, Optional.of(tp), props, new TopicKeyExtractor() { diff --git a/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkTest.java b/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkTest.java index 180f6fd0c..323b48858 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkTest.java +++ b/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkTest.java @@ -252,7 +252,7 @@ public void go() throws Exception { @Test//(timeout = 5000) public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable { Properties props = dummyProperties(); - props.setProperty("flushoncheckpoint", "false"); + props.setProperty("flushOnCheckpoint", "false"); final DummyFlinkPulsarSink sink = new DummyFlinkPulsarSink<>(dummyClientConf(), props, mock(TopicKeyExtractor.class), null); diff --git a/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSourceTest.java b/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSourceTest.java index c45e4706d..afe4f10d8 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSourceTest.java +++ b/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSourceTest.java @@ -25,6 +25,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.pulsar.common.ConnectorConfig; +import org.apache.flink.pulsar.common.PulsarMetadataReader; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; @@ -36,7 +38,6 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.pulsar.internal.PulsarCommitCallback; import org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher; -import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader; import org.apache.flink.streaming.connectors.pulsar.testutils.TestMetadataReader; import org.apache.flink.streaming.connectors.pulsar.testutils.TestSourceContext; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -100,6 +101,13 @@ public class FlinkPulsarSourceTest extends TestLogger { private static final int maxParallelism = Short.MAX_VALUE / 2; + private static ConnectorConfig ccWithTopic(String topic) { + ConnectorConfig connectorConfig = new ConnectorConfig(); + connectorConfig.setTopic(topic); + connectorConfig.setAdminUrl(""); + return connectorConfig; + } + @Test @SuppressWarnings("unchecked") public void testEitherWatermarkExtractor() { @@ -379,7 +387,7 @@ private void testRescaling( for (int i = 0; i < initialParallelism; i++) { TestMetadataReader discoverer = new TestMetadataReader( - Collections.singletonMap("topic", "test-topic"), + ccWithTopic("test-topic"), i, initialParallelism, TestMetadataReader.createMockGetAllTopicsSequenceFromFixedReturn(Sets.newHashSet(startupTopics))); @@ -430,7 +438,7 @@ private void testRescaling( mergedState, maxParallelism, initialParallelism, restoredParallelism, i); TestMetadataReader discoverer = new TestMetadataReader( - Collections.singletonMap("topic", "test-topic"), + ccWithTopic("test-topic"), i, restoredParallelism, TestMetadataReader.createMockGetAllTopicsSequenceFromFixedReturn(Sets.newHashSet(restoredTopics))); @@ -498,7 +506,7 @@ private static class FailingPartitionDiscoverer extends PulsarMetadataReader { private final RuntimeException failureCause; public FailingPartitionDiscoverer(RuntimeException failureCause) throws PulsarClientException { - super("", "", Collections.singletonMap("topic", "foo"), 0, 1); + super(ccWithTopic("test-topic"), 0, 1); this.failureCause = failureCause; } @@ -528,7 +536,7 @@ private static class DummyPartitionDiscoverer extends PulsarMetadataReader { private static Set allPartitions = Sets.newHashSet("foo"); public DummyPartitionDiscoverer() throws PulsarClientException { - super("", "", Collections.singletonMap("topic", "foo"), 0, 1); + super(ccWithTopic("foo"), 0, 1); } @Override diff --git a/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarTableITest.java b/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarTableITest.java index 0a0329c9d..dafd429c3 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarTableITest.java +++ b/src/test/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarTableITest.java @@ -17,7 +17,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions; import org.apache.flink.streaming.connectors.pulsar.testutils.FailingIdentityMapper; import org.apache.flink.streaming.connectors.pulsar.testutils.SingletonStreamSink; import org.apache.flink.table.api.Table; @@ -240,13 +239,13 @@ private ConnectorDescriptor getPulsarDescriptor(String tableName) { .urls(getServiceUrl(), getAdminUrl()) .topic(tableName) .startFromEarliest() - .property(PulsarOptions.PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY, "5000"); + .property("partitionDiscoveryIntervalMillis", "5000"); } private Properties getSinkProperties() { Properties properties = new Properties(); - properties.setProperty(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, "true"); - properties.setProperty(PulsarOptions.FAIL_ON_WRITE_OPTION_KEY, "true"); + properties.setProperty("flushOnCheckpoint", "true"); + properties.setProperty("failOnWrite", "true"); return properties; } } diff --git a/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBase.java b/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBase.java index 270e0faaf..26f0f7885 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBase.java +++ b/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarTestBase.java @@ -18,13 +18,14 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.jmx.JMXReporter; -import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions; +import org.apache.flink.pulsar.common.PulsarOptions; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.util.TestLogger; import io.streamnative.tests.pulsar.service.PulsarService; import io.streamnative.tests.pulsar.service.PulsarServiceFactory; import io.streamnative.tests.pulsar.service.PulsarServiceSpec; +import io.streamnative.tests.pulsar.service.testcontainers.PulsarStandaloneContainerService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.NotImplementedException; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -56,6 +57,8 @@ public abstract class PulsarTestBase extends TestLogger { protected static String adminUrl; + protected static String zkUrl; + public static String getServiceUrl() { return serviceUrl; } @@ -64,6 +67,10 @@ public static String getAdminUrl() { return adminUrl; } + public static String getZkUrl() { + return zkUrl; + } + @BeforeClass public static void prepare() throws Exception { @@ -87,6 +94,9 @@ public static void prepare() throws Exception { } } + PulsarStandaloneContainerService service = (PulsarStandaloneContainerService) pulsarService; + zkUrl = service.getZkUrl(); + try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) { admin.namespaces().createNamespace("public/default", Sets.newHashSet("standalone")); } diff --git a/src/test/java/org/apache/flink/streaming/connectors/pulsar/SchemaITest.java b/src/test/java/org/apache/flink/streaming/connectors/pulsar/SchemaITest.java index cc48696df..5adb74b12 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/pulsar/SchemaITest.java +++ b/src/test/java/org/apache/flink/streaming/connectors/pulsar/SchemaITest.java @@ -17,7 +17,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions; import org.apache.flink.streaming.connectors.pulsar.testutils.FailingIdentityMapper; import org.apache.flink.streaming.connectors.pulsar.testutils.SingletonStreamSink; import org.apache.flink.table.api.DataTypes; @@ -298,14 +297,14 @@ private ConnectorDescriptor getPulsarSourceDescriptor(String tableName) { .urls(getServiceUrl(), getAdminUrl()) .topic(tableName) .startFromEarliest() - .property(PulsarOptions.PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY, "5000"); + .property("partitionDiscoveryIntervalMillis", "5000"); } private ConnectorDescriptor getPulsarSinkDescriptor(String tableName) { return new Pulsar() .urls(getServiceUrl(), getAdminUrl()) .topic(tableName) - .property(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, "true") - .property(PulsarOptions.FAIL_ON_WRITE_OPTION_KEY, "true"); + .property("flushOnCheckpoint", "true") + .property("failOnWrite", "true"); } } diff --git a/src/test/java/org/apache/flink/streaming/connectors/pulsar/internal/DiscovererTest.java b/src/test/java/org/apache/flink/streaming/connectors/pulsar/internal/DiscovererTest.java index 663d6028a..f3668fbec 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/pulsar/internal/DiscovererTest.java +++ b/src/test/java/org/apache/flink/streaming/connectors/pulsar/internal/DiscovererTest.java @@ -14,6 +14,7 @@ package org.apache.flink.streaming.connectors.pulsar.internal; +import org.apache.flink.pulsar.common.ConnectorConfig; import org.apache.flink.streaming.connectors.pulsar.testutils.TestMetadataReader; import org.apache.flink.util.TestLogger; @@ -29,7 +30,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import static org.junit.Assert.assertEquals; @@ -45,17 +45,25 @@ public class DiscovererTest extends TestLogger { private static final String TEST_TOPIC = "test-topic"; private static final String TEST_TOPIC_PATTERN = "^" + TEST_TOPIC + "[0-9]*$"; - private final Map params; + private final ConnectorConfig config; - public DiscovererTest(Map params) { - this.params = params; + public DiscovererTest(ConnectorConfig config) { + this.config = config; } @Parameterized.Parameters(name = "params = {0}") - public static Collection[]> pattern() { + public static Collection pattern() { + ConnectorConfig c1 = new ConnectorConfig(); + c1.setTopic(TEST_TOPIC); + c1.setAdminUrl(""); + + ConnectorConfig c2 = new ConnectorConfig(); + c2.setTopicsPattern(TEST_TOPIC_PATTERN); + c2.setAdminUrl(""); + return Arrays.asList( - new Map[]{Collections.singletonMap("topic", TEST_TOPIC)}, - new Map[]{Collections.singletonMap("topicspattern", TEST_TOPIC_PATTERN)}); + new ConnectorConfig[]{c1}, + new ConnectorConfig[]{c2}); } String topicName(String topic, int partition) { @@ -75,7 +83,7 @@ public void testPartitionEqualConsumerNumber() { for (int i = 0; i < numSubTasks; i++) { TestMetadataReader discoverer = new TestMetadataReader( - params, i, numSubTasks, + config, i, numSubTasks, TestMetadataReader.createMockGetAllTopicsSequenceFromFixedReturn(mockAllTopics)); Set initials = discoverer.discoverTopicChanges(); @@ -112,7 +120,7 @@ public void testPartitionGreaterThanConsumerNumber() { for (int i = 0; i < numTasks; i++) { TestMetadataReader discoverer = new TestMetadataReader( - params, i, numTasks, + config, i, numTasks, TestMetadataReader.createMockGetAllTopicsSequenceFromFixedReturn(mockAllTopics)); Set initials = discoverer.discoverTopicChanges(); @@ -153,7 +161,7 @@ public void testPartitionLessThanConsumerNumber() throws Exception { for (int i = 0; i < numTasks; i++) { TestMetadataReader discoverer = new TestMetadataReader( - params, i, numTasks, + config, i, numTasks, TestMetadataReader.createMockGetAllTopicsSequenceFromFixedReturn(mockAllTopics)); Set initials = discoverer.discoverTopicChanges(); @@ -205,13 +213,13 @@ public void testGrowingPartitions() { int minAll = allTopics.size() / numTasks; int maxAll = allTopics.size() / numTasks + 1; - TestMetadataReader discover1 = new TestMetadataReader(params, 0, numTasks, + TestMetadataReader discover1 = new TestMetadataReader(config, 0, numTasks, TestMetadataReader.createMockGetAllTopicsSequenceFromTwoReturns(mockGet)); - TestMetadataReader discover2 = new TestMetadataReader(params, 1, numTasks, + TestMetadataReader discover2 = new TestMetadataReader(config, 1, numTasks, TestMetadataReader.createMockGetAllTopicsSequenceFromTwoReturns(mockGet)); - TestMetadataReader discover3 = new TestMetadataReader(params, 2, numTasks, + TestMetadataReader discover3 = new TestMetadataReader(config, 2, numTasks, TestMetadataReader.createMockGetAllTopicsSequenceFromTwoReturns(mockGet)); Set initials1 = discover1.discoverTopicChanges(); diff --git a/src/test/java/org/apache/flink/streaming/connectors/pulsar/testutils/TestMetadataReader.java b/src/test/java/org/apache/flink/streaming/connectors/pulsar/testutils/TestMetadataReader.java index 54d510b06..c2c699ccf 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/pulsar/testutils/TestMetadataReader.java +++ b/src/test/java/org/apache/flink/streaming/connectors/pulsar/testutils/TestMetadataReader.java @@ -14,13 +14,13 @@ package org.apache.flink.streaming.connectors.pulsar.testutils; -import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader; +import org.apache.flink.pulsar.common.ConnectorConfig; +import org.apache.flink.pulsar.common.PulsarMetadataReader; import org.apache.pulsar.client.api.PulsarClientException; import org.mockito.stubbing.Answer; import java.util.List; -import java.util.Map; import java.util.Set; import static org.mockito.Matchers.anyInt; @@ -37,11 +37,11 @@ public class TestMetadataReader extends PulsarMetadataReader { private int getAllTopicsInvCount = 0; public TestMetadataReader( - Map caseInsensitiveParams, + ConnectorConfig connectorConfig, int indexOfThisSubtask, int numParallelSubtasks, List> mockGetAllTopicsReturnSequence) throws PulsarClientException { - super("", "", caseInsensitiveParams, indexOfThisSubtask, numParallelSubtasks); + super(connectorConfig, "", indexOfThisSubtask, numParallelSubtasks); this.mockGetAllTopicsReturnSequence = mockGetAllTopicsReturnSequence; }