diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerFactory.java index 9e1e68c199bd..b1198180abbc 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerFactory.java @@ -40,7 +40,7 @@ public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int @Override public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Set fieldsToRead, String groupId) { - return new PulsarStreamLevelConsumer(clientId, tableName, _streamConfig, fieldsToRead, groupId); + throw new UnsupportedOperationException("Apache pinot no longer support high level consumer"); } @Override diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java deleted file mode 100644 index 78835f492c5c..000000000000 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumer.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.stream.pulsar; - -import java.util.Set; -import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.spi.stream.StreamConfig; -import org.apache.pinot.spi.stream.StreamDecoderProvider; -import org.apache.pinot.spi.stream.StreamLevelConsumer; -import org.apache.pinot.spi.stream.StreamMessageDecoder; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Reader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * A {@link StreamLevelConsumer} implementation for the Pulsar stream - */ -// Pinot no longer support high level consumer model since v0.12.* -@Deprecated -public class PulsarStreamLevelConsumer implements StreamLevelConsumer { - private Logger _logger; - - private StreamMessageDecoder _messageDecoder; - - private StreamConfig _streamConfig; - private PulsarConfig _pulsarStreamLevelStreamConfig; - - private Reader _reader; - - private long _lastLogTime = 0; - private long _lastCount = 0; - private long _currentCount = 0L; - - public PulsarStreamLevelConsumer(String clientId, String tableName, StreamConfig streamConfig, - Set sourceFields, String subscriberId) { - _streamConfig = streamConfig; - _pulsarStreamLevelStreamConfig = new PulsarConfig(streamConfig, subscriberId); - - _messageDecoder = StreamDecoderProvider.create(streamConfig, sourceFields); - - _logger = - LoggerFactory.getLogger(PulsarConfig.class.getName() + "_" + tableName + "_" + streamConfig.getTopicName()); - _logger.info("PulsarStreamLevelConsumer: streamConfig : {}", _streamConfig); - } - - @Override - public void start() - throws Exception { - _reader = PulsarStreamLevelConsumerManager.acquirePulsarConsumerForConfig(_pulsarStreamLevelStreamConfig); - } - - /** - * Get next {@link GenericRow} after decoding pulsar {@link Message} - */ - @Override - public GenericRow next(GenericRow destination) { - try { - if (_reader.hasMessageAvailable()) { - final Message record = _reader.readNext(); - destination = _messageDecoder.decode(record.getData(), destination); - - _currentCount++; - - final long now = System.currentTimeMillis(); - // Log every minute or 100k events - if (now - _lastLogTime > 60000 || _currentCount - _lastCount >= 100000) { - if (_lastCount == 0) { - _logger.info("Consumed {} events from pulsar stream {}", _currentCount, _streamConfig.getTopicName()); - } else { - _logger.info("Consumed {} events from pulsar stream {} (rate:{}/s)", _currentCount - _lastCount, - _streamConfig.getTopicName(), (float) (_currentCount - _lastCount) * 1000 / (now - _lastLogTime)); - } - _lastCount = _currentCount; - _lastLogTime = now; - } - return destination; - } - } catch (Exception e) { - _logger.warn("Caught exception while consuming events", e); - } - return null; - } - - @Override - public void commit() { - } - - @Override - public void shutdown() - throws Exception { - if (_reader != null) { - PulsarStreamLevelConsumerManager.releasePulsarConsumer(_reader); - _reader = null; - } - } -} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java deleted file mode 100644 index ff23a0cea6c0..000000000000 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamLevelConsumerManager.java +++ /dev/null @@ -1,193 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.stream.pulsar; - -import com.google.common.util.concurrent.Uninterruptibles; -import java.util.HashMap; -import java.util.IdentityHashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.tuple.ImmutableTriple; -import org.apache.pulsar.client.api.Authentication; -import org.apache.pulsar.client.api.AuthenticationFactory; -import org.apache.pulsar.client.api.ClientBuilder; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Reader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Implements pulsar high level connection manager. - */ -public class PulsarStreamLevelConsumerManager { - private PulsarStreamLevelConsumerManager() { - } - - private static final Logger LOGGER = LoggerFactory.getLogger(PulsarStreamLevelConsumerManager.class); - private static final Long IN_USE = -1L; - private static final long CONSUMER_SHUTDOWN_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(60); // One minute - private static final Map, Reader> CONSUMER_FOR_CONFIG_KEY = - new HashMap<>(); - private static final IdentityHashMap, Long> CONSUMER_RELEASE_TIME = new IdentityHashMap<>(); - protected static PulsarClient _pulsarClient; - protected static Reader _reader; - - /** - * Get {@link Reader} for {@link PulsarConfig}. If the reader is already created we return the instance, otherwise - * a new reader is created. - */ - public static Reader acquirePulsarConsumerForConfig(PulsarConfig pulsarStreamLevelStreamConfig) { - final ImmutableTriple configKey = - new ImmutableTriple<>(pulsarStreamLevelStreamConfig.getPulsarTopicName(), - pulsarStreamLevelStreamConfig.getSubscriberId(), pulsarStreamLevelStreamConfig.getBootstrapServers()); - - synchronized (PulsarStreamLevelConsumerManager.class) { - // If we have the consumer and it's not already acquired, return it, otherwise error out if it's already acquired - if (CONSUMER_FOR_CONFIG_KEY.containsKey(configKey)) { - Reader pulsarConsumer = CONSUMER_FOR_CONFIG_KEY.get(configKey); - if (CONSUMER_RELEASE_TIME.get(pulsarConsumer).equals(IN_USE)) { - throw new RuntimeException("Consumer " + pulsarConsumer + " already in use!"); - } else { - LOGGER.info("Reusing pulsar consumer with id {}", pulsarConsumer); - CONSUMER_RELEASE_TIME.put(pulsarConsumer, IN_USE); - return pulsarConsumer; - } - } - - LOGGER.info("Creating new pulsar consumer and iterator for topic {}", - pulsarStreamLevelStreamConfig.getPulsarTopicName()); - - // Create the consumer - try { - ClientBuilder pulsarClientBuilder = PulsarClient.builder().serviceUrl( - pulsarStreamLevelStreamConfig.getBootstrapServers()); - if (pulsarStreamLevelStreamConfig.getTlsTrustCertsFilePath() != null) { - pulsarClientBuilder.tlsTrustCertsFilePath(pulsarStreamLevelStreamConfig.getTlsTrustCertsFilePath()); - } - - if (pulsarStreamLevelStreamConfig.getAuthenticationToken() != null) { - Authentication authentication = AuthenticationFactory.token( - pulsarStreamLevelStreamConfig.getAuthenticationToken()); - pulsarClientBuilder.authentication(authentication); - } - - _pulsarClient = pulsarClientBuilder.build(); - - _reader = _pulsarClient.newReader().topic(pulsarStreamLevelStreamConfig.getPulsarTopicName()) - .startMessageId(pulsarStreamLevelStreamConfig.getInitialMessageId()).create(); - - // Mark both the consumer and iterator as acquired - CONSUMER_FOR_CONFIG_KEY.put(configKey, _reader); - CONSUMER_RELEASE_TIME.put(_reader, IN_USE); - - LOGGER.info("Created consumer with id {} for topic {}", _reader, - pulsarStreamLevelStreamConfig.getPulsarTopicName()); - - return _reader; - } catch (PulsarClientException e) { - LOGGER.error("Could not create pulsar consumer", e); - return null; - } - } - } - - /** - * remove the {@link Reader} from consumer pool after closing it. - */ - public static void releasePulsarConsumer(final Reader pulsarConsumer) { - synchronized (PulsarStreamLevelConsumerManager.class) { - // Release the consumer, mark it for shutdown in the future - final long releaseTime = System.currentTimeMillis() + CONSUMER_SHUTDOWN_DELAY_MILLIS; - CONSUMER_RELEASE_TIME.put(pulsarConsumer, releaseTime); - - LOGGER.info("Marking consumer with id {} for release at {}", pulsarConsumer, releaseTime); - - // Schedule the shutdown of the consumer - new Thread() { - @Override - public void run() { - try { - // Await the shutdown time - Uninterruptibles.sleepUninterruptibly(CONSUMER_SHUTDOWN_DELAY_MILLIS, TimeUnit.MILLISECONDS); - - // Shutdown all consumers that have not been re-acquired - synchronized (PulsarStreamLevelConsumerManager.class) { - LOGGER.info("Executing release check for consumer {} at {}, scheduled at {}", pulsarConsumer, - System.currentTimeMillis(), releaseTime); - - Iterator, Reader>> configIterator = - CONSUMER_FOR_CONFIG_KEY.entrySet().iterator(); - - while (configIterator.hasNext()) { - Map.Entry, Reader> entry = configIterator.next(); - Reader pulsarConsumer = entry.getValue(); - - final Long releaseTime = CONSUMER_RELEASE_TIME.get(pulsarConsumer); - if (!releaseTime.equals(IN_USE) && releaseTime < System.currentTimeMillis()) { - LOGGER.info("Releasing consumer {}", pulsarConsumer); - - try { - pulsarConsumer.close(); - } catch (Exception e) { - LOGGER.warn("Caught exception while shutting down Pulsar consumer with id {}", pulsarConsumer, e); - } - - configIterator.remove(); - CONSUMER_RELEASE_TIME.remove(pulsarConsumer); - } else { - LOGGER.info("Not releasing consumer {}, it has been reacquired", pulsarConsumer); - } - } - } - } catch (Exception e) { - LOGGER.warn("Caught exception in release of consumer {}", pulsarConsumer, e); - } - } - }.start(); - } - } - - public static void closeAllConsumers() { - try { - // Shutdown all consumers - synchronized (PulsarStreamLevelConsumerManager.class) { - LOGGER.info("Trying to shutdown all the pulsar consumers"); - Iterator> consumerIterator = CONSUMER_FOR_CONFIG_KEY.values().iterator(); - - while (consumerIterator.hasNext()) { - Reader pulsarConsumer = consumerIterator.next(); - LOGGER.info("Trying to shutdown consumer {}", pulsarConsumer); - try { - pulsarConsumer.close(); - } catch (Exception e) { - LOGGER.warn("Caught exception while shutting down Pulsar consumer with id {}", pulsarConsumer, e); - } - consumerIterator.remove(); - } - CONSUMER_FOR_CONFIG_KEY.clear(); - CONSUMER_RELEASE_TIME.clear(); - } - } catch (Exception e) { - LOGGER.warn("Caught exception during shutting down all pulsar consumers", e); - } - } -}