From 418bf05024d83ba7fc53a47658113f87a761e52e Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Mon, 6 Jan 2025 05:13:06 -0800 Subject: [PATCH] Create `AfterInitializeMemento` for `Initializer`s for GoT to tunnel state between `GenerateWorkUnits` and `CommitActivity` --- gobblin-api/build.gradle | 2 + .../configuration/ConfigurationKeys.java | 2 + .../gobblin/initializer/Initializer.java | 98 +++++++++- .../MultiConverterInitializer.java | 20 +- .../gobblin/initializer/MultiInitializer.java | 49 ++++- .../initializer/MultiWriterInitializer.java | 24 ++- .../initializer/MultiInitializerTest.java | 175 ++++++++++++++++++ .../initializer/JdbcWriterInitializer.java | 59 ++++-- .../ddm/activity/impl/CommitActivityImpl.java | 31 +++- .../activity/impl/GenerateWorkUnitsImpl.java | 52 ++++-- .../temporal/util/nesting/work/Workload.java | 3 +- 11 files changed, 469 insertions(+), 46 deletions(-) create mode 100644 gobblin-core/src/test/java/org/apache/gobblin/initializer/MultiInitializerTest.java diff --git a/gobblin-api/build.gradle b/gobblin-api/build.gradle index b083bb7f8c3..f725db86340 100644 --- a/gobblin-api/build.gradle +++ b/gobblin-api/build.gradle @@ -20,6 +20,8 @@ apply plugin: 'java' dependencies { compile externalDependency.guava compile externalDependency.gson + compile externalDependency.jacksonCore + compile externalDependency.jacksonMapper compile externalDependency.jasypt compile externalDependency.jodaTime compile externalDependency.commonsLang3 diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 13a145a3e92..b6569af2d2b 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -203,6 +203,7 @@ public class ConfigurationKeys { public static final String TASK_DATA_ROOT_DIR_KEY = "task.data.root.dir"; public static final String SOURCE_CLASS_KEY = "source.class"; public static final String CONVERTER_CLASSES_KEY = "converter.classes"; + public static final String CONVERTER_INITIALIZERS_SERIALIZED_MEMENTOS_KEY = "converter.initializers.serialized.mementos"; public static final String RECORD_STREAM_PROCESSOR_CLASSES_KEY = "recordStreamProcessor.classes"; public static final String FORK_OPERATOR_CLASS_KEY = "fork.operator.class"; public static final String DEFAULT_FORK_OPERATOR_CLASS = "org.apache.gobblin.fork.IdentityForkOperator"; @@ -434,6 +435,7 @@ public class ConfigurationKeys { public static final String WRITER_TRUNCATE_STAGING_TABLE = WRITER_PREFIX + ".truncate.staging.table"; public static final String WRITER_OUTPUT_DIR = WRITER_PREFIX + ".output.dir"; public static final String WRITER_BUILDER_CLASS = WRITER_PREFIX + ".builder.class"; + public static final String WRITER_INITIALIZER_SERIALIZED_MEMENTO_KEY = "writer.initializer.serialized.memento"; public static final String DEFAULT_WRITER_BUILDER_CLASS = "org.apache.gobblin.writer.AvroDataWriterBuilder"; public static final String WRITER_FILE_NAME = WRITER_PREFIX + ".file.name"; public static final String WRITER_FILE_PATH = WRITER_PREFIX + ".file.path"; diff --git a/gobblin-api/src/main/java/org/apache/gobblin/initializer/Initializer.java b/gobblin-api/src/main/java/org/apache/gobblin/initializer/Initializer.java index 4454c8ae775..6be58a1a75e 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/initializer/Initializer.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/initializer/Initializer.java @@ -18,14 +18,90 @@ package org.apache.gobblin.initializer; import java.io.Closeable; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + public interface Initializer extends Closeable { /** - * Initialize for the writer. + * Marker interface to convey an opaque snapshot of the internal state of any concrete {@link Initializer}, thus affording state serialization for + * eventual "revival" as a new `Initializer` holding equivalent internal state. {@link #commemorate()} the memento after {@link #initialize()} + * and subsequently {@link #recall(AfterInitializeMemento)} before {@link #close()}ing it. + * + * Whereas the "Initializer Lifecycle", when synchronous and with the same instance, is: + * [concrete `? implements Initializer` instance A] `.initialize()` -> DO PROCESSING -> `.close()` + * When using `AfterInitializer` across instances and even memory-space boundaries it becomes: + * [concrete `T0 implements Initializer` instance A] `.initialize()` -> `.commemorate()` -> PERSIST/TRANSMIT MEMENTO + * -> DO PROCESSING -> + * [concrete `T0 implements Initializer` instance B] RECEIVE MEMENTO -> `.recall()` -> `.close()` * - * @param state - * @param workUnits WorkUnits created by Source + * For both backwards compatibility and because not every concrete `Initializer` has internal state worth capturing, not every `Initializer` + * impl will implement an `AfterInitializeMemento`. Those that do will supply a unique impl cultivating self-aware impl details of their + * `Initializer`. An `AfterInitializeMemento` impl needs simply be (de)serializable by {@link ObjectMapper}. An `Initializer` impl with an + * `AfterInitializeMemento` impl MUST NOT (re-)process any {@link org.apache.gobblin.source.workunit.WorkUnit}s during its {@link #close()} + * method: `WorkUnit` processing MUST proceed entirely within {@link #initialize()}. + */ + @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class") // to handle variety of concrete impls + public interface AfterInitializeMemento { + static Logger logger = LoggerFactory.getLogger(AfterInitializeMemento.class); + + /** + * Convey attempt to work with a concrete {@link AfterInitializeMemento} of type other than the single expected companion type known to `forInitializer`. + * @see #castAsOrThrow(Class, Initializer) + */ + static class MismatchedMementoException extends RuntimeException { + public MismatchedMementoException(AfterInitializeMemento memento, Class asClass, Initializer forInitializer) { + super(String.format("Memento '%s' for Initializer '%s' of class '%s' - NOT '%s'", memento, forInitializer.getClass().getName(), + memento.getClass().getName(), asClass.getName())); + } + } + + /** stringify as JSON */ + static String serialize(AfterInitializeMemento memento) { + ObjectMapper objectMapper = new ObjectMapper(); + try { + String result = objectMapper.writeValueAsString(memento); + logger.info("Serializing AfterInitializeMemento {} as '{}'", memento, result); + return result; + } catch (JsonProcessingException e) { + logger.error("Failed to serialize AfterInitializeMemento '" + memento + "'", e); + throw new RuntimeException(e); + } + } + + /** destringify JSON */ + static AfterInitializeMemento deserialize(String serialized) { + ObjectMapper objectMapper = new ObjectMapper(); + try { + AfterInitializeMemento result = objectMapper.readValue(serialized, AfterInitializeMemento.class); + logger.info("Deserializing AfterInitializeMemento '{}' as {}", serialized, result); + return result; + } catch (JsonProcessingException e) { + logger.error("Failed to deserialize AfterInitializeMemento '" + serialized + "'", e); + throw new RuntimeException(e); + } + } + + /** cast `this` concrete `AfterInitializeMemento` to `castClass`, else {@link MismatchedMementoException} */ + default T castAsOrThrow(Class castClass, Initializer forInitializer) + throws MismatchedMementoException { + if (castClass.isAssignableFrom(this.getClass())) { + return (T) this; + } else { + throw new AfterInitializeMemento.MismatchedMementoException(this, castClass, forInitializer); + } + } + } + + /** + * Initialize the writer/converter (e.g. using the state and/or {@link org.apache.gobblin.source.workunit.WorkUnit}s provided when constructing the instance) */ public void initialize(); @@ -33,7 +109,23 @@ public interface Initializer extends Closeable { * Removed checked exception. * {@inheritDoc} * @see java.io.Closeable#close() + * + * NOTE: An `Initializer` impl with an `AfterInitializeMemento` impl MUST NOT (re-)process any {@link org.apache.gobblin.source.workunit.WorkUnit}s + * during its {@link #close()} method: `WorkUnit` processing MUST proceed entirely within {@link #initialize()}. */ @Override public void close(); + + /** @return any `Initializer`-specific companion memento, as required to convey internal state after {@link #initialize()}, as needed to {@link #close()} */ + default Optional commemorate() { + return Optional.empty(); + } + + /** + * "reinitialize" a fresh instance with (equiv.) post {@link #initialize()} internal state, per `Initializer`-specific companion `memento` + * to {@link #close()} + */ + default void recall(AfterInitializeMemento memento) { + // noop + } } diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/initializer/MultiConverterInitializer.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/initializer/MultiConverterInitializer.java index 14ac3e1d6b0..7fcc53e9e8c 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/converter/initializer/MultiConverterInitializer.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/initializer/MultiConverterInitializer.java @@ -18,27 +18,39 @@ package org.apache.gobblin.converter.initializer; import java.util.List; +import java.util.Optional; import lombok.ToString; + import org.apache.gobblin.initializer.Initializer; import org.apache.gobblin.initializer.MultiInitializer; @ToString public class MultiConverterInitializer implements ConverterInitializer { - private final Initializer intializer; + private final Initializer initializer; public MultiConverterInitializer(List converterInitializers) { - this.intializer = new MultiInitializer(converterInitializers); + this.initializer = new MultiInitializer(converterInitializers); } @Override public void initialize() { - this.intializer.initialize(); + this.initializer.initialize(); } @Override public void close() { - this.intializer.close(); + this.initializer.close(); + } + + @Override + public Optional commemorate() { + return this.initializer.commemorate(); + } + + @Override + public void recall(AfterInitializeMemento memento) { + this.initializer.recall(memento); } } diff --git a/gobblin-core/src/main/java/org/apache/gobblin/initializer/MultiInitializer.java b/gobblin-core/src/main/java/org/apache/gobblin/initializer/MultiInitializer.java index 44fa8b4931e..69c60b5d7da 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/initializer/MultiInitializer.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/initializer/MultiInitializer.java @@ -19,18 +19,46 @@ import java.io.IOException; import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.AccessLevel; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.Setter; import lombok.ToString; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Streams; import com.google.common.io.Closer; /** - * Wraps multiple writer initializer behind its interface. This is useful when there're more than one branch. + * Wraps multiple writer initializers, which is useful when more than one branch. */ @ToString public class MultiInitializer implements Initializer { + + /** Commemorate each (`Optional`) {@link org.apache.gobblin.initializer.Initializer.AfterInitializeMemento} of every wrapped {@link Initializer} */ + @Data + @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization + @NoArgsConstructor // IMPORTANT: for jackson (de)serialization + @RequiredArgsConstructor + private static class Memento implements AfterInitializeMemento { + // WARNING: not possible to use `List>`, as first attempted, due to: + // com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "present" (class java.util.Optional), not marked as + // ignorable (0 known properties: ]) + // at [Source:(String)"{\"@class\":\"org.apache.gobblin.initializer.MultiInitializer$Memento\",\"orderedInitializersMementos\":[{\"present\":false}]}"] + // (through reference chain: org.apache.gobblin.initializer.MultiInitializer$Memento[\"orderedInitializersMementos\"]->java.util.ArrayList[0] + // ->java.util.Optional[\"present\"])", + // the following does NOT fix, probably due to `Optional`'s nesting with `List`: + // @JsonIgnoreProperties(ignoreUnknown = true) + @NonNull private List orderedInitializersMementos; + } + + private final List initializers; private final Closer closer; @@ -57,4 +85,21 @@ public void close() { throw new RuntimeException(e); } } -} + + @Override + public Optional commemorate() { + return Optional.of(new MultiInitializer.Memento(this.initializers.stream() + .map(Initializer::commemorate) + .map(opt -> opt.orElse(null)) + .collect(Collectors.toList()))); + } + + @Override + public void recall(AfterInitializeMemento memento) { + Memento recollection = memento.castAsOrThrow(MultiInitializer.Memento.class, this); + Streams.zip(this.initializers.stream(), recollection.orderedInitializersMementos.stream(), (initializer, nullableInitializerMemento) -> { + Optional.ofNullable(nullableInitializerMemento).ifPresent(initializer::recall); + return null; + }).count(); // force evaluation, since `Streams.zip` used purely for side effects + } +} \ No newline at end of file diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/initializer/MultiWriterInitializer.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/initializer/MultiWriterInitializer.java index 7b3ba3e27b1..38c54313533 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/writer/initializer/MultiWriterInitializer.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/initializer/MultiWriterInitializer.java @@ -17,31 +17,41 @@ package org.apache.gobblin.writer.initializer; -import org.apache.gobblin.initializer.Initializer; -import org.apache.gobblin.initializer.MultiInitializer; - +import java.util.Optional; import java.util.List; import lombok.ToString; +import org.apache.gobblin.initializer.Initializer; +import org.apache.gobblin.initializer.MultiInitializer; + @ToString public class MultiWriterInitializer implements WriterInitializer { - private final Initializer intializer; + private final Initializer initializer; public MultiWriterInitializer(List writerInitializers) { - this.intializer = new MultiInitializer(writerInitializers); + this.initializer = new MultiInitializer(writerInitializers); } @Override public void initialize() { - this.intializer.initialize(); + this.initializer.initialize(); } @Override public void close() { - this.intializer.close(); + this.initializer.close(); } + @Override + public Optional commemorate() { + return this.initializer.commemorate(); + } + + @Override + public void recall(AfterInitializeMemento memento) { + this.initializer.recall(memento); + } } diff --git a/gobblin-core/src/test/java/org/apache/gobblin/initializer/MultiInitializerTest.java b/gobblin-core/src/test/java/org/apache/gobblin/initializer/MultiInitializerTest.java new file mode 100644 index 00000000000..ebcce0a4c6f --- /dev/null +++ b/gobblin-core/src/test/java/org/apache/gobblin/initializer/MultiInitializerTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.initializer; + +import java.util.Arrays; +import java.util.Optional; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import lombok.Data; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + + +/** Test {@link MultiInitializer} */ +public class MultiInitializerTest { + /** Concrete Initializer A - implements `AfterInitializeMemento` */ + private static class InitializerImplA implements Initializer { + private static int instanceCounter = 0; + + @Data + @NoArgsConstructor // IMPORTANT: for jackson (de)serialization + @RequiredArgsConstructor + private static class Memento implements Initializer.AfterInitializeMemento { + @NonNull private String state; + } + + @Getter private String state; + + @Override + public void initialize() { + this.state = "initialized-" + (++instanceCounter); + } + + @Override + public void close() { + // noop + } + + @Override + public Optional commemorate() { + return Optional.of(new Memento(this.state)); + } + + @Override + public void recall(AfterInitializeMemento memento) { + Memento recollection = memento.castAsOrThrow(Memento.class, this); + this.state = recollection.getState(); + } + } + + + /** Concrete Initializer B - DOES NOT implement `AfterInitializeMemento`! */ + private static class InitializerImplBNoMemento implements Initializer { + private static int instanceCounter = 0; + + @Getter private String state; + + @Override + public void initialize() { + this.state = "ignore-" + (++instanceCounter); + } + + @Override + public void close() { + // noop + } + } + + + /** Concrete Initializer C - implements `AfterInitializeMemento` */ + private static class InitializerImplC implements Initializer { + private static int instanceCounter = 0; + + @Data + @NoArgsConstructor // IMPORTANT: for jackson (de)serialization + @RequiredArgsConstructor + private static class MyMemento implements AfterInitializeMemento { + @NonNull private int state; + } + + @Getter private int state; + + @Override + public void initialize() { + this.state = 41 + (++instanceCounter); + } + + @Override + public void close() { + // noop + } + + @Override + public Optional commemorate() { + return Optional.of(new MyMemento(this.state)); + } + + @Override + public void recall(AfterInitializeMemento memento) { + MyMemento recollection = memento.castAsOrThrow(MyMemento.class, this); + this.state = recollection.getState(); + } + } + + @Test + public void testMementoCommemorateToSerializeAndDeserializeForRecall() { + // create "first generation" of concrete initializers + InitializerImplA initializerA_1 = new InitializerImplA(); + InitializerImplBNoMemento initializerB_1 = new InitializerImplBNoMemento(); + InitializerImplC initializerC_1 = new InitializerImplC(); + + // create the 1st-gen `MultiInitializer` and `initialize` all wrapped initializers + MultiInitializer multiInitializer1G = new MultiInitializer(Arrays.asList(initializerA_1, initializerB_1, initializerC_1)); + multiInitializer1G.initialize(); + + // `commemorate` and `serialize` 1st-gen state + Optional optMemento1G = multiInitializer1G.commemorate(); + Assert.assertTrue(optMemento1G.isPresent()); + String serializedMemento = Initializer.AfterInitializeMemento.serialize(optMemento1G.get()); + + // create a new 2nd-gen `MultiInitializer` using a "second generation" of concrete initializers + InitializerImplA initializerA_2 = new InitializerImplA(); + InitializerImplBNoMemento initializerB_2 = new InitializerImplBNoMemento(); + InitializerImplC initializerC_2 = new InitializerImplC(); + MultiInitializer multiInitializer2G = new MultiInitializer(Arrays.asList(initializerA_2, initializerB_2, initializerC_2)); + + // verify that state differs between 1st-gen and 2nd-gen `Initializer`s + Assert.assertNotEquals(initializerA_1.getState(), initializerA_2.getState()); + Assert.assertNotEquals(initializerB_1.getState(), initializerB_2.getState()); + Assert.assertNotEquals(initializerC_1.getState(), initializerC_2.getState()); + + try { + // verify not possible to `commemorate` prior to `recall()` + multiInitializer2G.commemorate(); + Assert.fail("`commemorate()` somehow possible even before `Initializer.initialize()` or `recall()`, despite `@NotNull` annotation on `state`"); + } catch (NullPointerException npe) { + // expected + } + + // now `deserialize` 1st-gen state and `recall` it to the 2nd-gen `MultiInitializer` + Initializer.AfterInitializeMemento deserializedMemento = Initializer.AfterInitializeMemento.deserialize(serializedMemento); + multiInitializer2G.recall(deserializedMemento); + Optional optMemento2G = multiInitializer2G.commemorate(); + Assert.assertTrue(optMemento2G.isPresent()); + + // verify that post-`recall`ed memento equivalent to post-`initialize`d one + Assert.assertEquals(optMemento1G.get(), optMemento2G.get()); + + // WARNING: in real code, DO NOT `initialize` following `recall`, as it would reset the state of the wrapped initializers, negating the `recall` + multiInitializer2G.initialize(); + Optional optMemento2G_alt = multiInitializer2G.commemorate(); + Assert.assertTrue(optMemento2G_alt.isPresent()); + // verify not simply that mementos always equal + Assert.assertNotEquals(optMemento2G.get(), optMemento2G_alt.get()); + } +} diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java index 8226de7698e..ca757421ba7 100644 --- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/initializer/JdbcWriterInitializer.java @@ -17,28 +17,23 @@ package org.apache.gobblin.writer.initializer; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.configuration.State; -import org.apache.gobblin.publisher.JdbcPublisher; -import org.apache.gobblin.source.workunit.WorkUnit; -import org.apache.gobblin.util.ForkOperatorUtils; -import org.apache.gobblin.util.jdbc.DataSourceBuilder; -import org.apache.gobblin.writer.Destination; -import org.apache.gobblin.writer.Destination.DestinationType; -import org.apache.gobblin.writer.commands.JdbcWriterCommands; -import org.apache.gobblin.writer.commands.JdbcWriterCommandsFactory; -import org.apache.gobblin.source.extractor.JobCommitPolicy; - import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collection; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; import javax.sql.DataSource; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.Setter; import lombok.ToString; import org.apache.commons.lang3.StringUtils; @@ -50,12 +45,37 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.publisher.JdbcPublisher; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.util.ForkOperatorUtils; +import org.apache.gobblin.util.jdbc.DataSourceBuilder; +import org.apache.gobblin.writer.Destination; +import org.apache.gobblin.writer.Destination.DestinationType; +import org.apache.gobblin.writer.commands.JdbcWriterCommands; +import org.apache.gobblin.writer.commands.JdbcWriterCommandsFactory; +import org.apache.gobblin.source.extractor.JobCommitPolicy; + /** * Initialize for JDBC writer and also performs clean up. */ @ToString public class JdbcWriterInitializer implements WriterInitializer { + /** Commemorate all and exactly those fields, for which to preserve instance state - the name(s) of temporary staging DB tables */ + @Data + @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable deserialization + @NoArgsConstructor // IMPORTANT: for jackson (de)serialization + @AllArgsConstructor + private static class Memento implements AfterInitializeMemento { + // NOTE: as this clearly MAY be `null` (below), DO NOT mark `@NonNull`, to avoid: + // userCreatedStagingTable is marked non-null but is null + private String userCreatedStagingTable; + @NonNull private Set createdStagingTables; + } + + private static final Logger LOG = LoggerFactory.getLogger(JdbcWriterInitializer.class); private static final String STAGING_TABLE_FORMAT = "stage_%d"; private static final int NAMING_STAGING_TABLE_TRIAL = 10; @@ -95,7 +115,7 @@ public JdbcWriterInitializer(State state, Collection workUnits, * Drop table if it's created by this instance. * Truncate staging tables passed by user. * {@inheritDoc} - * @see org.apache.gobblin.Initializer#close() + * @see org.apache.gobblin.initializer.Initializer#close() */ @Override public void close() { @@ -200,7 +220,6 @@ private static boolean getPropAsBoolean(State state, String key, int branches, i * 3.1. Create staging table with unique name. * 3.2. Try to drop and recreate the table to confirm if we can drop it later. * 4. Update Workunit state with staging table information. - * @param state */ @Override public void initialize() { @@ -277,6 +296,18 @@ public void initialize() { } } + @Override + public Optional commemorate() { + return Optional.of(new JdbcWriterInitializer.Memento(this.userCreatedStagingTable, Sets.newHashSet(this.createdStagingTables))); + } + + @Override + public void recall(AfterInitializeMemento memento) { + Memento recollection = memento.castAsOrThrow(JdbcWriterInitializer.Memento.class, this); + this.userCreatedStagingTable = recollection.userCreatedStagingTable; + this.createdStagingTables = Sets.newHashSet(recollection.createdStagingTables); + } + private JdbcWriterCommands createJdbcWriterCommands(Connection conn) { String destKey = ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_DESTINATION_TYPE_KEY, this.branches, this.branchId); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java index 6346e08df36..b43c079c101 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java @@ -40,7 +40,7 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; - +import com.google.common.io.Closer; import io.temporal.failure.ApplicationFailure; import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; @@ -48,6 +48,8 @@ import org.apache.gobblin.commit.DeliverySemantics; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.converter.initializer.ConverterInitializerFactory; +import org.apache.gobblin.initializer.Initializer; import org.apache.gobblin.metastore.StateStore; import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.runtime.JobContext; @@ -58,6 +60,8 @@ import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter; import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory; import org.apache.gobblin.source.extractor.JobCommitPolicy; +import org.apache.gobblin.source.workunit.BasicWorkUnitStream; +import org.apache.gobblin.source.workunit.WorkUnitStream; import org.apache.gobblin.temporal.ddm.activity.CommitActivity; import org.apache.gobblin.temporal.ddm.util.JobStateUtils; import org.apache.gobblin.temporal.ddm.work.CommitStats; @@ -69,6 +73,7 @@ import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.PropertiesUtils; import org.apache.gobblin.util.executors.IteratorExecutor; +import org.apache.gobblin.writer.initializer.WriterInitializerFactory; @Slf4j @@ -76,7 +81,7 @@ public class CommitActivityImpl implements CommitActivity { static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10; static int DEFAULT_NUM_COMMIT_THREADS = 1; - static String UNDEFINED_JOB_NAME = ""; + static String UNDEFINED_JOB_NAME = "<>"; @Override public CommitStats commit(WUProcessingSpec workSpec) { @@ -105,6 +110,22 @@ public CommitStats commit(WUProcessingSpec workSpec) { } catch (FailedDatasetUrnsException exception) { log.warn("Some datasets failed to be committed, proceeding with publishing commit step", exception); optFailure = Optional.of(exception); + } finally { + // if Work Discovery transmitted any writer/converter `Initializer.AfterInitializeMemento`s within `jobState`, deserialize them now to + // `.recall()` and "re-initialize" equivalent writer and/or converter(s) `Initializer`s, to complete their `.close()` + // NOTE: the "revived" `Initializer`s are constructed with empty placeholder WUs + Closer closer = Closer.create(); // (purely to suppress exceptions) + Optional.ofNullable(jobState.getProp(ConfigurationKeys.WRITER_INITIALIZER_SERIALIZED_MEMENTO_KEY)).map(mementoProp -> + Initializer.AfterInitializeMemento.deserialize(mementoProp) + ).ifPresent(memento -> + closer.register(WriterInitializerFactory.newInstace(jobState, createEmptyWorkUnitStream())).recall(memento) + ); + Optional.ofNullable(jobState.getProp(ConfigurationKeys.CONVERTER_INITIALIZERS_SERIALIZED_MEMENTOS_KEY)).map(mementoProp -> + Initializer.AfterInitializeMemento.deserialize(mementoProp) + ).ifPresent(memento -> + closer.register(ConverterInitializerFactory.newInstance(jobState, createEmptyWorkUnitStream())).recall(memento) + ); + closer.close(); } boolean shouldIncludeFailedTasks = PropertiesUtils.getPropAsBoolean(jobState.getProperties(), ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false"); @@ -247,4 +268,8 @@ private Map summarizeDatasetOutcomes(Map getQuantiles(TDigest digest, int numQuantiles) { } + /** [Internal, impl class] Intermediate result of generated work units with insightful analysis extracted by pre-processing them */ + @Data + private static class WorkUnitsWithInsights { + private final List workUnits; + private final Set pathsToCleanUp; + private final Optional optWriterMemento; + private final Optional optConverterMemento; + } + + @Override public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmitterContext eventSubmitterContext) { // TODO: decide whether to acquire a job lock (as MR did)! @@ -124,8 +138,8 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi FileSystem fs = closer.register(JobStateUtils.openFileSystem(jobState)); fs.mkdirs(workDirRoot); - Set pathsToCleanUp = new HashSet<>(); - List workUnits = generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState, eventSubmitterContext, closer, pathsToCleanUp); + WorkUnitsWithInsights genWUsInsights = generateWorkUnitsForJobStateAndCollectCleanupPaths(jobState, eventSubmitterContext, closer); + List workUnits = genWUsInsights.getWorkUnits(); int numSizeSummaryQuantiles = getConfiguredNumSizeSummaryQuantiles(jobState); WorkUnitsSizeSummary wuSizeSummary = digestWorkUnitsSize(workUnits).asSizeSummary(numSizeSummaryQuantiles); @@ -134,11 +148,20 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi // exceed available memory and this activity execution were to fail, a subsequent re-attempt would know the amount of work, to guide re-config/attempt createWorkPreparedSizeDistillationTimer(wuSizeSummary, eventSubmitterContext).stop(); + // add any (serialized) mementos before serializing `jobState`, for later `recall` during `CommitActivityImpl` + genWUsInsights.optWriterMemento.ifPresent(memento -> + jobState.setProp(ConfigurationKeys.WRITER_INITIALIZER_SERIALIZED_MEMENTO_KEY, + Initializer.AfterInitializeMemento.serialize(memento)) + ); + genWUsInsights.optConverterMemento.ifPresent(memento -> + jobState.setProp(ConfigurationKeys.CONVERTER_INITIALIZERS_SERIALIZED_MEMENTOS_KEY, + Initializer.AfterInitializeMemento.serialize(memento)) + ); JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs); JobStateUtils.writeJobState(jobState, workDirRoot, fs); // ATTENTION: the writing of `JobState` after all WUs signifies WU gen+serialization now complete String sourceClassName = JobStateUtils.getSourceClassName(jobState); - return new GenerateWorkUnitsResult(jobState.getTaskCount(), sourceClassName, wuSizeSummary, pathsToCleanUp); + return new GenerateWorkUnitsResult(jobState.getTaskCount(), sourceClassName, wuSizeSummary, genWUsInsights.getPathsToCleanUp()); } catch (ReflectiveOperationException roe) { String errMsg = "Unable to construct a source for generating workunits for job " + jobState.getJobId(); log.error(errMsg, roe); @@ -153,8 +176,7 @@ public GenerateWorkUnitsResult generateWorkUnits(Properties jobProps, EventSubmi } } - protected List generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState, EventSubmitterContext eventSubmitterContext, Closer closer, - Set pathsToCleanUp) + protected WorkUnitsWithInsights generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState, EventSubmitterContext eventSubmitterContext, Closer closer) throws ReflectiveOperationException { // report (timer) metrics for "Work Discovery", *planning only* - NOT including WU prep, like serialization, `DestinationDatasetHandlerService`ing, etc. // IMPORTANT: for accurate timing, SEPARATELY emit `.createWorkPreparationTimer`, to record time prior to measuring the WU size required for that one @@ -175,18 +197,20 @@ protected List generateWorkUnitsForJobStateAndCollectCleanupPaths(JobS if (!workUnitStream.getWorkUnits().hasNext()) { // no work unit to run: entirely normal result (not a failure) log.warn("No work units created for job " + jobState.getJobId()); - return Lists.newArrayList(); + return new WorkUnitsWithInsights(Lists.newArrayList(), new HashSet<>(), Optional.empty(), Optional.empty()); } boolean canCleanUpTempDirs = false; // unlike `AbstractJobLauncher` running the job end-to-end, this is Work Discovery only, so WAY TOO SOON for cleanup DestinationDatasetHandlerService datasetHandlerService = closer.register( new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs, eventSubmitterContext.create())); WorkUnitStream handledWorkUnitStream = datasetHandlerService.executeHandlers(workUnitStream); - pathsToCleanUp.addAll(calculateWorkDirsToCleanup(handledWorkUnitStream)); - // initialize writer and converter(s) - // TODO: determine whether registration here is effective, or the lifecycle of this activity is too brief (as is likely!) - closer.register(WriterInitializerFactory.newInstace(jobState, handledWorkUnitStream)).initialize(); - closer.register(ConverterInitializerFactory.newInstance(jobState, handledWorkUnitStream)).initialize(); + Set pathsToCleanUp = new HashSet<>(calculateWorkDirsToCleanup(handledWorkUnitStream)); + + // initialize writer and converter(s), but DO NOT `.close()` them here; rather `.commemorate()` for later `.recall()` during Commit + WriterInitializer writerInitializer = WriterInitializerFactory.newInstace(jobState, handledWorkUnitStream); + writerInitializer.initialize(); + ConverterInitializer converterInitializer = ConverterInitializerFactory.newInstance(jobState, handledWorkUnitStream); + converterInitializer.initialize(); // update jobState before it gets serialized long startTime = System.currentTimeMillis(); @@ -202,7 +226,11 @@ protected List generateWorkUnitsForJobStateAndCollectCleanupPaths(JobS // dump the work unit if tracking logs are enabled (post any materialization for counting) WorkUnitStream trackedWorkUnitStream = AbstractJobLauncher.addWorkUnitTrackingPerConfig(preparedWorkUnitStream, jobState, log); - return AbstractJobLauncher.materializeWorkUnitList(trackedWorkUnitStream); + return new WorkUnitsWithInsights( + AbstractJobLauncher.materializeWorkUnitList(trackedWorkUnitStream), + pathsToCleanUp, + writerInitializer.commemorate(), + converterInitializer.commemorate()); } protected static Set calculateWorkDirsToCleanup(WorkUnitStream workUnitStream) { diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java index 239825f7cff..0bb9192cc6f 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java @@ -17,10 +17,11 @@ package org.apache.gobblin.temporal.util.nesting.work; -import com.fasterxml.jackson.annotation.JsonTypeInfo; import java.util.Iterator; import java.util.Optional; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + /** * {@link Workload} models a logical collection of homogenous inputs over which a "foreach" operation can asynchronously apply