Skip to content

Commit

Permalink
Fix pulsar collector in round1
Browse files Browse the repository at this point in the history
  • Loading branch information
CodePrometheus committed Jan 26, 2025
1 parent 15983ff commit d313a63
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class LazyPulsarInit {
class LazyPulsarInit {

private final Collector collector;
private final CollectorMetrics metrics;
Expand All @@ -33,7 +33,7 @@ public class LazyPulsarInit {
this.consumerProps = builder.consumerProps;
}

public void init() {
void init() {
if (result == null) {
synchronized (this) {
if (result == null) {
Expand All @@ -52,6 +52,7 @@ private PulsarClient subscribe() {
.connectionTimeout(12, TimeUnit.SECONDS)
.build();
} catch (Exception e) {
failure.set(CheckResult.failed(e));
throw new RuntimeException("Pulsar client create failed" + e.getMessage(), e);
}

Expand All @@ -67,12 +68,12 @@ private PulsarClient subscribe() {
} catch (PulsarClientException ex) {
// Nobody cares me.
}
failure.set(CheckResult.failed(e));
throw new RuntimeException("Pulsar unable to subscribe the topic(" + topic + "), please check the pulsar service.", e);
}

}

public void close() throws PulsarClientException {
void close() throws PulsarClientException {
PulsarClient maybe = result;
if (maybe != null) result.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class PulsarSpanConsumer implements Closeable {
private final PulsarClient client;
private final Collector collector;
private final CollectorMetrics metrics;
private Consumer<byte[]> consumer;

public PulsarSpanConsumer(String topic, Map<String, Object> consumerProps, PulsarClient client, Collector collector, CollectorMetrics metrics) {
this.topic = topic;
Expand All @@ -44,7 +45,7 @@ public PulsarSpanConsumer(String topic, Map<String, Object> consumerProps, Pulsa
}

public void startConsumer() throws PulsarClientException {
client.newConsumer()
consumer = client.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
.loadConf(consumerProps)
Expand All @@ -54,18 +55,19 @@ public void startConsumer() throws PulsarClientException {

@Override public void close() {
try {
client.close();
if (consumer != null) {
consumer.close();
}
} catch (PulsarClientException e) {
LOG.error("Failed to close Pulsar client", e);
LOG.error("Failed to close Pulsar Consumer client", e);
}
}

record ZipkinMessageListener<T>(Collector collector, CollectorMetrics metrics) implements MessageListener<T> {

@Override public void received(Consumer<T> consumer, Message<T> msg) {
final byte[] serialized;
try {
serialized = msg.getData();
final byte[] serialized = msg.getData();
metrics.incrementMessages();
metrics.incrementBytes(serialized.length);

Expand All @@ -75,7 +77,7 @@ record ZipkinMessageListener<T>(Collector collector, CollectorMetrics metrics) i
consumer.acknowledgeAsync(msg);
} catch (Throwable th) {
metrics.incrementMessagesDropped();
LOG.error("Pulsar failed to process the message.", th);
LOG.error("Pulsar Span Consumer failed to process the message.", th);
consumer.negativeAcknowledge(msg);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
*/
package zipkin2.server.internal.pulsar;

import org.springframework.boot.autoconfigure.condition.AllNestedConditions;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Condition;
Expand All @@ -16,9 +18,11 @@
import zipkin2.collector.pulsar.PulsarCollector;
import zipkin2.storage.StorageComponent;

import static io.micrometer.common.util.StringUtils.isEmpty;

/** Auto-configuration for {@link PulsarCollector}. */
@ConditionalOnClass(PulsarCollector.class)
@Conditional(ZipkinPulsarCollectorConfiguration.PulsarServiceUrlSet.class)
@Conditional(ZipkinPulsarCollectorConfiguration.PulsarConditions.class)
@EnableConfigurationProperties(ZipkinPulsarCollectorProperties.class)
public class ZipkinPulsarCollectorConfiguration {

Expand All @@ -43,20 +47,23 @@ PulsarCollector pulsar(
* service-url: ${PULSAR_SERVICE_URL:}
* }</pre>
*/
static final class PulsarServiceUrlSet implements Condition {
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata a) {
return !isEmpty(
context.getEnvironment().getProperty("zipkin.collector.pulsar.service-url")) &&
notFalse(context.getEnvironment().getProperty("zipkin.collector.pulsar.enabled"));
static final class PulsarConditions extends AllNestedConditions {

PulsarConditions() {
super(ConfigurationPhase.REGISTER_BEAN);
}

private static boolean isEmpty(String s) {
return s == null || s.isEmpty();
@ConditionalOnProperty(prefix = "zipkin.collector.pulsar", name = "enabled",
havingValue = "true", matchIfMissing = true)
private static final class PulsarEnabledCondition {
}

private static boolean notFalse(String s) {
return s == null || !s.equals("false");
@Conditional(PulsarServiceUrlCondition.class)
private static final class PulsarServiceUrlCondition implements Condition {
@Override public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
String serviceUrl = context.getEnvironment().getProperty("zipkin.collector.pulsar.service-url");
return !isEmpty(serviceUrl);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

/** Properties for configuring and building a {@link PulsarCollector}. */
@ConfigurationProperties("zipkin.collector.pulsar")
public class ZipkinPulsarCollectorProperties {
class ZipkinPulsarCollectorProperties {

/** The service URL for the Pulsar service. */
private String serviceUrl;
Expand Down

0 comments on commit d313a63

Please sign in to comment.