Skip to content

Commit

Permalink
Merge pull request #16091 from camunda/backport-16070-to-stable/8.4
Browse files Browse the repository at this point in the history
[Backport stable/8.4] Exporter: expose partitionId on Context and LastExportedRecordPosition on Controller
  • Loading branch information
npepinpe authored Jan 25, 2024
2 parents 416be13 + 21ce5f2 commit 5022abe
Show file tree
Hide file tree
Showing 13 changed files with 67 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ public final class ExporterContext implements Context {

private final Logger logger;
private final Configuration configuration;
private final int partitionId;

private RecordFilter filter = DEFAULT_FILTER;

public ExporterContext(final Logger logger, final Configuration configuration) {
public ExporterContext(
final Logger logger, final Configuration configuration, final int partitionId) {
this.logger = logger;
this.configuration = configuration;
this.partitionId = partitionId;
}

@Override
Expand All @@ -38,6 +41,11 @@ public Configuration getConfiguration() {
return configuration;
}

@Override
public int getPartitionId() {
return partitionId;
}

public RecordFilter getFilter() {
return filter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

public final class ExporterRepository {
private static final Logger LOG = Loggers.EXPORTER_LOGGER;
private static final int NULL_PARTITION_ID = Integer.MIN_VALUE;
private final ExternalJarRepository jarRepository;
private final Map<String, ExporterDescriptor> exporters;

Expand Down Expand Up @@ -88,7 +89,8 @@ public ExporterDescriptor load(final String id, final ExporterCfg config)
private void validate(final ExporterDescriptor descriptor) throws ExporterLoadException {
try {
final Exporter instance = descriptor.newInstance();
final ExporterContext context = new ExporterContext(LOG, descriptor.getConfiguration());
final ExporterContext context =
new ExporterContext(LOG, descriptor.getConfiguration(), NULL_PARTITION_ID);

ThreadContextUtil.runCheckedWithClassLoader(
() -> instance.configure(context), instance.getClass().getClassLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ final class ExporterContainer implements Controller {
private ExporterMetrics metrics;
private ActorControl actor;

ExporterContainer(final ExporterDescriptor descriptor) {
ExporterContainer(final ExporterDescriptor descriptor, final int partitionId) {
context =
new ExporterContext(
Loggers.getExporterLogger(descriptor.getId()), descriptor.getConfiguration());
Loggers.getExporterLogger(descriptor.getId()),
descriptor.getConfiguration(),
partitionId);

exporter = descriptor.newInstance();
}
Expand Down Expand Up @@ -131,6 +133,11 @@ public void updateLastExportedRecordPosition(final long position, final byte[] m
actor.run(() -> updateExporterState(position, metadata));
}

@Override
public long getLastExportedRecordPosition() {
return getPosition();
}

@Override
public ScheduledTask scheduleCancellableTask(final Duration delay, final Runnable task) {
final var scheduledTimer = actor.schedule(delay, task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ public final class ExporterDirector extends Actor implements HealthMonitorable,

public ExporterDirector(final ExporterDirectorContext context, final boolean shouldPauseOnStart) {
name = context.getName();
containers =
context.getDescriptors().stream().map(ExporterContainer::new).collect(Collectors.toList());

logStream = Objects.requireNonNull(context.getLogStream());
partitionId = logStream.getPartitionId();
containers =
context.getDescriptors().stream()
.map(descriptor -> new ExporterContainer(descriptor, partitionId))
.collect(Collectors.toList());
metrics = new ExporterMetrics(partitionId);
recordExporter = new RecordExporter(metrics, containers, partitionId);
exportingRetryStrategy = new BackOffRetryStrategy(actor, Duration.ofSeconds(10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ public ExporterDescriptor loadExternalExporter(final File jarFile, final String
return repository.load("external", exporterCfg);
}

public ExporterContainer newContainer(final ExporterDescriptor descriptor) {
final var container = new ExporterContainer(descriptor);
public ExporterContainer newContainer(
final ExporterDescriptor descriptor, final int partitionId) {
final var container = new ExporterContainer(descriptor, partitionId);
container.initContainer(actor.getActorControl(), metrics, state);

return container;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
final class ExporterContainerTest {

private static final String EXPORTER_ID = "fakeExporter";
private static final int PARTITION_ID = 123;

private ExporterContainerRuntime runtime;
private FakeExporter exporter;
Expand All @@ -44,7 +45,7 @@ void beforeEach(final @TempDir Path storagePath) throws ExporterLoadException {

final var descriptor =
runtime.getRepository().load(EXPORTER_ID, FakeExporter.class, Map.of("key", "value"));
exporterContainer = runtime.newContainer(descriptor);
exporterContainer = runtime.newContainer(descriptor, PARTITION_ID);
exporter = (FakeExporter) exporterContainer.getExporter();
}

Expand All @@ -60,6 +61,7 @@ void shouldConfigureExporter() throws Exception {
assertThat(exporter.getContext().getLogger()).isNotNull();
assertThat(exporter.getContext().getConfiguration()).isNotNull();
assertThat(exporter.getContext().getConfiguration().getId()).isEqualTo(EXPORTER_ID);
assertThat(exporter.getContext().getPartitionId()).isEqualTo(PARTITION_ID);
assertThat(exporter.getContext().getConfiguration().getArguments())
.isEqualTo(Map.of("key", "value"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void shouldSetTclOnConfigure(final @TempDir File jarDirectory) throws Exception
final var exporterClass = createUnloadedExporter();
final var jarFile = exporterClass.toJar(new File(jarDirectory, "exporter.jar"));
final var descriptor = runtime.loadExternalExporter(jarFile, EXPORTER_CLASS_NAME);
final var container = runtime.newContainer(descriptor);
final var container = runtime.newContainer(descriptor, 0);

// when
container.configureExporter();
Expand All @@ -86,7 +86,7 @@ void shouldSetTclOnOpen(final @TempDir File jarDirectory)
final var jarFile = exporterClass.toJar(new File(jarDirectory, "exporter.jar"));
final var descriptor = runtime.loadExternalExporter(jarFile, EXPORTER_CLASS_NAME);
final var expectedClassLoader = descriptor.newInstance().getClass().getClassLoader();
final var container = runtime.newContainer(descriptor);
final var container = runtime.newContainer(descriptor, 0);

// when
container.openExporter();
Expand All @@ -106,7 +106,7 @@ void shouldSetTclOnExport(final @TempDir File jarDirectory)
final var jarFile = exporterClass.toJar(new File(jarDirectory, "exporter.jar"));
final var descriptor = runtime.loadExternalExporter(jarFile, EXPORTER_CLASS_NAME);
final var expectedClassLoader = descriptor.newInstance().getClass().getClassLoader();
final var container = runtime.newContainer(descriptor);
final var container = runtime.newContainer(descriptor, 0);

// when
final var record = mock(TypedRecord.class);
Expand All @@ -129,7 +129,7 @@ void shouldSetTclOnClose(final @TempDir File jarDirectory)
final var jarFile = exporterClass.toJar(new File(jarDirectory, "exporter.jar"));
final var descriptor = runtime.loadExternalExporter(jarFile, EXPORTER_CLASS_NAME);
final var expectedClassLoader = descriptor.newInstance().getClass().getClassLoader();
final var container = new ExporterContainer(descriptor);
final var container = new ExporterContainer(descriptor, 0);

// when
container.close();
Expand Down
5 changes: 5 additions & 0 deletions exporter-api/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
"justification": "The controller is used by the exporters. Extending the controller interface is not a breaking change.",
"code": "java.method.addedToInterface",
"classQualifiedName": "io.camunda.zeebe.exporter.api.context.Controller"
},
{
"justification": "The Context is used by the exporters. Extending the controller interface is not a breaking change.",
"code": "java.method.addedToInterface",
"classQualifiedName": "io.camunda.zeebe.exporter.api.context.Context"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ public interface Context {
*/
Configuration getConfiguration();

/**
* Gets the partition id of the exporter context. During the loading phase, while the
* configuration for each exporter is being validated, this method will return a null value since
* on instantiating the Exporter Context, we pass a null partition id, which will get replaced by
* a valid one at runtime.
*
* <p>* @return the partition id for this exporter.
*/
int getPartitionId();

/**
* Apply the given filter to limit the records which are exported.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public interface Controller {
*/
void updateLastExportedRecordPosition(long position, byte[] metadata);

/** Gets the last acknowledged exported record position. */
long getLastExportedRecordPosition();

/**
* Schedules a cancellable {@param task} to be ran after {@param delay} has expired.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,21 @@ public Configuration getConfiguration() {
return configuration;
}

public ExporterTestContext setConfiguration(final Configuration configuration) {
this.configuration = Objects.requireNonNull(configuration, "must specify a configuration");
return this;
@Override
public int getPartitionId() {
return 0;
}

@Override
public void setFilter(final RecordFilter filter) {
recordFilter = filter;
}

public ExporterTestContext setConfiguration(final Configuration configuration) {
this.configuration = Objects.requireNonNull(configuration, "must specify a configuration");
return this;
}

public RecordFilter getRecordFilter() {
return recordFilter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public void updateLastExportedRecordPosition(final long position, final byte[] m
}
}

@Override
public long getLastExportedRecordPosition() {
return getPosition();
}

@Override
public synchronized ScheduledTask scheduleCancellableTask(
final Duration delay, final Runnable task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ void shouldUpdateLastExportedRecordPosition() {

// then
assertThat(controller.getPosition()).isEqualTo(1);
assertThat(controller.getLastExportedRecordPosition()).isEqualTo(1);
}

@Test
Expand Down

0 comments on commit 5022abe

Please sign in to comment.