From 3e74ad97e4e844affa0dc982b58fbaf762d6f2a2 Mon Sep 17 00:00:00 2001 From: cgardens Date: Wed, 7 Oct 2020 15:33:09 -0700 Subject: [PATCH 1/2] schema => catalog --- .../base/IntegrationCliParser.java | 10 ++-- .../integrations/base/IntegrationConfig.java | 32 +++++------ .../integrations/base/IntegrationRunner.java | 2 +- .../integrations/base/JavaBaseConstants.java | 7 +-- .../integrations/base/StatefulConsumer.java | 54 +++++++++++++++++++ .../base/IntegrationCliParserTest.java | 14 ++--- .../base/IntegrationConfigTest.java | 26 ++++----- .../base/IntegrationRunnerTest.java | 8 +-- 8 files changed, 102 insertions(+), 51 deletions(-) create mode 100644 airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/StatefulConsumer.java diff --git a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/IntegrationCliParser.java b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/IntegrationCliParser.java index df46d4f51edcf..64671e557cf8d 100644 --- a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/IntegrationCliParser.java +++ b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/IntegrationCliParser.java @@ -64,7 +64,7 @@ public class IntegrationCliParser { .build()); COMMAND_GROUP.addOption(Option.builder() .longOpt(Command.DISCOVER.toString().toLowerCase()) - .desc("outputs a catalog describing the source's schema") + .desc("outputs a catalog describing the source's catalog") .build()); COMMAND_GROUP.addOption(Option.builder() .longOpt(Command.READ.toString().toLowerCase()) @@ -113,7 +113,7 @@ private static IntegrationConfig parseOptions(String[] args, Command command) { options.addOption(Option .builder().longOpt(JavaBaseConstants.ARGS_CONFIG_KEY).desc(JavaBaseConstants.ARGS_CONFIG_DESC).hasArg(true).required(true).build()); options.addOption(Option - .builder().longOpt(JavaBaseConstants.ARGS_SCHEMA_KEY).desc(JavaBaseConstants.ARGS_SCHEMA_DESC).hasArg(true).build()); + .builder().longOpt(JavaBaseConstants.ARGS_CATALOG_KEY).desc(JavaBaseConstants.ARGS_CATALOG_DESC).hasArg(true).build()); options.addOption(Option .builder().longOpt(JavaBaseConstants.ARGS_STATE_KEY).desc(JavaBaseConstants.ARGS_PATH_DESC).hasArg(true).build()); } @@ -121,7 +121,7 @@ private static IntegrationConfig parseOptions(String[] args, Command command) { options.addOption(Option .builder().longOpt(JavaBaseConstants.ARGS_CONFIG_KEY).desc(JavaBaseConstants.ARGS_CONFIG_DESC).hasArg(true).required(true).build()); options.addOption(Option - .builder().longOpt(JavaBaseConstants.ARGS_SCHEMA_KEY).desc(JavaBaseConstants.ARGS_SCHEMA_DESC).hasArg(true).build()); + .builder().longOpt(JavaBaseConstants.ARGS_CATALOG_KEY).desc(JavaBaseConstants.ARGS_CATALOG_DESC).hasArg(true).build()); } default -> throw new IllegalStateException("Unexpected value: " + command); } @@ -147,13 +147,13 @@ private static IntegrationConfig parseOptions(String[] args, Command command) { case READ -> { return IntegrationConfig.read( Path.of(argsMap.get(JavaBaseConstants.ARGS_CONFIG_KEY)), - Path.of(argsMap.get(JavaBaseConstants.ARGS_SCHEMA_KEY)), + Path.of(argsMap.get(JavaBaseConstants.ARGS_CATALOG_KEY)), argsMap.containsKey(JavaBaseConstants.ARGS_STATE_KEY) ? Path.of(argsMap.get(JavaBaseConstants.ARGS_STATE_KEY)) : null); } case WRITE -> { return IntegrationConfig.write( Path.of(argsMap.get(JavaBaseConstants.ARGS_CONFIG_KEY)), - Path.of(argsMap.get(JavaBaseConstants.ARGS_SCHEMA_KEY))); + Path.of(argsMap.get(JavaBaseConstants.ARGS_CATALOG_KEY))); } default -> throw new IllegalStateException("Unexpected value: " + command); } diff --git a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/IntegrationConfig.java b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/IntegrationConfig.java index 7ceed4594faaf..8732faf7500d2 100644 --- a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/IntegrationConfig.java +++ b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/IntegrationConfig.java @@ -33,13 +33,13 @@ public class IntegrationConfig { private final Command command; private final Path configPath; - private final Path schemaPath; + private final Path catalogPath; private final Path statePath; - private IntegrationConfig(Command command, Path configPath, Path schemaPath, Path statePath) { + private IntegrationConfig(Command command, Path configPath, Path catalogPath, Path statePath) { this.command = command; this.configPath = configPath; - this.schemaPath = schemaPath; + this.catalogPath = catalogPath; this.statePath = statePath; } @@ -57,16 +57,16 @@ public static IntegrationConfig discover(Path config) { return new IntegrationConfig(Command.DISCOVER, config, null, null); } - public static IntegrationConfig read(Path config, Path schema, Path state) { - Preconditions.checkNotNull(config); - Preconditions.checkNotNull(schema); - return new IntegrationConfig(Command.READ, config, schema, state); + public static IntegrationConfig read(Path configPath, Path catalogPath, Path statePath) { + Preconditions.checkNotNull(configPath); + Preconditions.checkNotNull(catalogPath); + return new IntegrationConfig(Command.READ, configPath, catalogPath, statePath); } - public static IntegrationConfig write(Path config, Path schema) { - Preconditions.checkNotNull(config); - Preconditions.checkNotNull(schema); - return new IntegrationConfig(Command.WRITE, config, schema, null); + public static IntegrationConfig write(Path configPath, Path catalogPath) { + Preconditions.checkNotNull(configPath); + Preconditions.checkNotNull(catalogPath); + return new IntegrationConfig(Command.WRITE, configPath, catalogPath, null); } public Command getCommand() { @@ -78,9 +78,9 @@ public Path getConfigPath() { return configPath; } - public Path getSchemaPath() { + public Path getCatalogPath() { Preconditions.checkState(command == Command.READ || command == Command.WRITE); - return schemaPath; + return catalogPath; } public Optional getStatePath() { @@ -93,7 +93,7 @@ public String toString() { return "IntegrationConfig{" + "command=" + command + ", configPath='" + configPath + '\'' + - ", schemaPath='" + schemaPath + '\'' + + ", catalogPath='" + catalogPath + '\'' + ", statePath='" + statePath + '\'' + '}'; } @@ -109,13 +109,13 @@ public boolean equals(Object o) { IntegrationConfig that = (IntegrationConfig) o; return command == that.command && Objects.equals(configPath, that.configPath) && - Objects.equals(schemaPath, that.schemaPath) && + Objects.equals(catalogPath, that.catalogPath) && Objects.equals(statePath, that.statePath); } @Override public int hashCode() { - return Objects.hash(command, configPath, schemaPath, statePath); + return Objects.hash(command, configPath, catalogPath, statePath); } } diff --git a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index 0345b4582f9de..e564a74e4c024 100644 --- a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -81,7 +81,7 @@ public void run(String[] args) throws Exception { throw new RuntimeException("Not implemented"); case WRITE -> { final JsonNode config = parseConfig(parsed.getConfigPath()); - final Schema schema = parseConfig(parsed.getSchemaPath(), Schema.class); + final Schema schema = parseConfig(parsed.getCatalogPath(), Schema.class); final DestinationConsumer consumer = destination.write(config, schema); consumeWriteStream(consumer); } diff --git a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/JavaBaseConstants.java b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/JavaBaseConstants.java index 7adf3107f02f1..cc94543c45112 100644 --- a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/JavaBaseConstants.java +++ b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/JavaBaseConstants.java @@ -28,15 +28,12 @@ public class JavaBaseConstants { - public static String ENV_DESTINATION_CLASS = "DESTINATION_CLASS"; - public static String ENV_DESTINATION_JAR_PATH = "DESTINATION_JAR_PATH"; - public static String ARGS_CONFIG_KEY = "config"; - public static String ARGS_SCHEMA_KEY = "schema"; + public static String ARGS_CATALOG_KEY = "catalog"; public static String ARGS_STATE_KEY = "state"; public static String ARGS_CONFIG_DESC = "path to the json configuration file"; - public static String ARGS_SCHEMA_DESC = "input path for the schema"; + public static String ARGS_CATALOG_DESC = "input path for the catalog"; public static String ARGS_PATH_DESC = "path to the json-encoded state file"; // todo (cgardens) - this mount path should be passed in by the worker and read as an arg or diff --git a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/StatefulConsumer.java b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/StatefulConsumer.java new file mode 100644 index 0000000000000..9894280ada080 --- /dev/null +++ b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/StatefulConsumer.java @@ -0,0 +1,54 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.base; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class StatefulConsumer implements DestinationConsumer { + + private static final Logger LOGGER = LoggerFactory.getLogger(StatefulConsumer.class); + + private boolean hasFailed = false; + + protected abstract void acceptInternal(T t) throws Exception; + + public void accept(T t) throws Exception { + try { + acceptInternal(t); + } catch (Exception e) { + hasFailed = true; + throw e; + } + } + + protected abstract void close(boolean hasFailed) throws Exception; + + public void close() throws Exception { + LOGGER.info("hasFailed: {}.", hasFailed); + close(hasFailed); + } + +} diff --git a/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/IntegrationCliParserTest.java b/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/IntegrationCliParserTest.java index 3e8f18eefdf9c..4bdb8db5b0990 100644 --- a/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/IntegrationCliParserTest.java +++ b/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/IntegrationCliParserTest.java @@ -33,7 +33,7 @@ class IntegrationCliParserTest { private static final String CONFIG_FILENAME = "config.json"; - private static final String SCHEMA_FILENAME = "schema.json"; + private static final String CATALOG_FILENAME = "catalog.json"; private static final String STATE_FILENAME = "state.json"; @Test @@ -59,23 +59,23 @@ void testDiscover() { @Test void testWrite() { - final String[] args = new String[] {"--write", "--config", CONFIG_FILENAME, "--schema", SCHEMA_FILENAME}; + final String[] args = new String[] {"--write", "--config", CONFIG_FILENAME, "--catalog", CATALOG_FILENAME}; final IntegrationConfig actual = new IntegrationCliParser().parse(args); - assertEquals(IntegrationConfig.write(Path.of(CONFIG_FILENAME), Path.of(SCHEMA_FILENAME)), actual); + assertEquals(IntegrationConfig.write(Path.of(CONFIG_FILENAME), Path.of(CATALOG_FILENAME)), actual); } @Test void testReadWithoutState() { - final String[] args = new String[] {"--read", "--config", CONFIG_FILENAME, "--schema", SCHEMA_FILENAME}; + final String[] args = new String[] {"--read", "--config", CONFIG_FILENAME, "--catalog", CATALOG_FILENAME}; final IntegrationConfig actual = new IntegrationCliParser().parse(args); - assertEquals(IntegrationConfig.read(Path.of(CONFIG_FILENAME), Path.of(SCHEMA_FILENAME), null), actual); + assertEquals(IntegrationConfig.read(Path.of(CONFIG_FILENAME), Path.of(CATALOG_FILENAME), null), actual); } @Test void testReadWithState() { - final String[] args = new String[] {"--read", "--config", CONFIG_FILENAME, "--schema", SCHEMA_FILENAME, "--state", STATE_FILENAME}; + final String[] args = new String[] {"--read", "--config", CONFIG_FILENAME, "--catalog", CATALOG_FILENAME, "--state", STATE_FILENAME}; final IntegrationConfig actual = new IntegrationCliParser().parse(args); - assertEquals(IntegrationConfig.read(Path.of(CONFIG_FILENAME), Path.of(SCHEMA_FILENAME), Path.of(STATE_FILENAME)), actual); + assertEquals(IntegrationConfig.read(Path.of(CONFIG_FILENAME), Path.of(CATALOG_FILENAME), Path.of(STATE_FILENAME)), actual); } @Test diff --git a/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/IntegrationConfigTest.java b/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/IntegrationConfigTest.java index 82851c6aa4953..d8216919b431a 100644 --- a/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/IntegrationConfigTest.java +++ b/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/IntegrationConfigTest.java @@ -34,7 +34,7 @@ class IntegrationConfigTest { private static final Path CONFIG_PATH = Path.of("config.json"); - private static final Path SCHEMA_PATH = Path.of("schema.json"); + private static final Path CATALOG_PATH = Path.of("catalog.json"); private static final Path STATE_PATH = Path.of("state.json"); @Test @@ -42,7 +42,7 @@ void testSpec() { final IntegrationConfig config = IntegrationConfig.spec(); assertEquals(Command.SPEC, config.getCommand()); assertThrows(IllegalStateException.class, config::getConfigPath); - assertThrows(IllegalStateException.class, config::getSchemaPath); + assertThrows(IllegalStateException.class, config::getCatalogPath); assertThrows(IllegalStateException.class, config::getStatePath); } @@ -53,7 +53,7 @@ void testCheck() { final IntegrationConfig config = IntegrationConfig.check(CONFIG_PATH); assertEquals(Command.CHECK, config.getCommand()); assertEquals(CONFIG_PATH, config.getConfigPath()); - assertThrows(IllegalStateException.class, config::getSchemaPath); + assertThrows(IllegalStateException.class, config::getCatalogPath); assertThrows(IllegalStateException.class, config::getStatePath); } @@ -64,43 +64,43 @@ void testDiscover() { final IntegrationConfig config = IntegrationConfig.discover(CONFIG_PATH); assertEquals(Command.DISCOVER, config.getCommand()); assertEquals(CONFIG_PATH, config.getConfigPath()); - assertThrows(IllegalStateException.class, config::getSchemaPath); + assertThrows(IllegalStateException.class, config::getCatalogPath); assertThrows(IllegalStateException.class, config::getStatePath); } @Test void testWrite() { - assertThrows(NullPointerException.class, () -> IntegrationConfig.write(null, SCHEMA_PATH)); + assertThrows(NullPointerException.class, () -> IntegrationConfig.write(null, CATALOG_PATH)); assertThrows(NullPointerException.class, () -> IntegrationConfig.write(CONFIG_PATH, null)); - final IntegrationConfig config = IntegrationConfig.write(CONFIG_PATH, SCHEMA_PATH); + final IntegrationConfig config = IntegrationConfig.write(CONFIG_PATH, CATALOG_PATH); assertEquals(Command.WRITE, config.getCommand()); assertEquals(CONFIG_PATH, config.getConfigPath()); - assertEquals(SCHEMA_PATH, config.getSchemaPath()); + assertEquals(CATALOG_PATH, config.getCatalogPath()); assertThrows(IllegalStateException.class, config::getStatePath); } @Test void testReadWithState() { - assertThrows(NullPointerException.class, () -> IntegrationConfig.read(null, SCHEMA_PATH, STATE_PATH)); + assertThrows(NullPointerException.class, () -> IntegrationConfig.read(null, CATALOG_PATH, STATE_PATH)); assertThrows(NullPointerException.class, () -> IntegrationConfig.read(CONFIG_PATH, null, STATE_PATH)); - final IntegrationConfig config = IntegrationConfig.read(CONFIG_PATH, SCHEMA_PATH, STATE_PATH); + final IntegrationConfig config = IntegrationConfig.read(CONFIG_PATH, CATALOG_PATH, STATE_PATH); assertEquals(Command.READ, config.getCommand()); assertEquals(CONFIG_PATH, config.getConfigPath()); - assertEquals(SCHEMA_PATH, config.getSchemaPath()); + assertEquals(CATALOG_PATH, config.getCatalogPath()); assertEquals(Optional.of(STATE_PATH), config.getStatePath()); } @Test void testReadWithoutState() { - assertThrows(NullPointerException.class, () -> IntegrationConfig.read(null, SCHEMA_PATH, null)); + assertThrows(NullPointerException.class, () -> IntegrationConfig.read(null, CATALOG_PATH, null)); assertThrows(NullPointerException.class, () -> IntegrationConfig.read(CONFIG_PATH, null, null)); - final IntegrationConfig config = IntegrationConfig.read(CONFIG_PATH, SCHEMA_PATH, null); + final IntegrationConfig config = IntegrationConfig.read(CONFIG_PATH, CATALOG_PATH, null); assertEquals(Command.READ, config.getCommand()); assertEquals(CONFIG_PATH, config.getConfigPath()); - assertEquals(SCHEMA_PATH, config.getSchemaPath()); + assertEquals(CATALOG_PATH, config.getCatalogPath()); assertEquals(Optional.empty(), config.getStatePath()); } diff --git a/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java b/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java index 718bdb736387a..6a16bfc179127 100644 --- a/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java +++ b/airbyte-integrations/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java @@ -57,7 +57,7 @@ class IntegrationRunnerTest { private static final String CONFIG_FILE_NAME = "config.json"; - private static final String SCHEMA_FILE_NAME = "schema.json"; + private static final String CATALOG_FILE_NAME = "catalog.json"; private static final String[] ARGS = new String[] {"args"}; @@ -70,7 +70,7 @@ class IntegrationRunnerTest { private Consumer stdoutConsumer; private Destination destination; private Path configPath; - private Path schemaPath; + private Path catalogPath; @SuppressWarnings("unchecked") @BeforeEach @@ -81,7 +81,7 @@ void setup() throws IOException { Path configDir = Files.createTempDirectory("test"); configPath = IOs.writeFile(configDir, CONFIG_FILE_NAME, CONFIG_STRING); - schemaPath = IOs.writeFile(configDir, SCHEMA_FILE_NAME, Jsons.serialize(SCHEMA)); + catalogPath = IOs.writeFile(configDir, CATALOG_FILE_NAME, Jsons.serialize(SCHEMA)); } @Test @@ -129,7 +129,7 @@ void testDiscover() throws Exception { @SuppressWarnings("unchecked") @Test void testWrite() throws Exception { - final IntegrationConfig intConfig = IntegrationConfig.write(Path.of(configPath.toString()), Path.of(schemaPath.toString())); + final IntegrationConfig intConfig = IntegrationConfig.write(Path.of(configPath.toString()), Path.of(catalogPath.toString())); final DestinationConsumer destinationConsumerMock = mock(DestinationConsumer.class); when(cliParser.parse(ARGS)).thenReturn(intConfig); when(destination.write(CONFIG, SCHEMA)).thenReturn(destinationConsumerMock); From 393dc18cac979e5059f3177ab51e79387d33d7b8 Mon Sep 17 00:00:00 2001 From: cgardens Date: Wed, 7 Oct 2020 15:39:04 -0700 Subject: [PATCH 2/2] remove consumer from rename pr --- .../integrations/base/StatefulConsumer.java | 54 ------------------- 1 file changed, 54 deletions(-) delete mode 100644 airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/StatefulConsumer.java diff --git a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/StatefulConsumer.java b/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/StatefulConsumer.java deleted file mode 100644 index 9894280ada080..0000000000000 --- a/airbyte-integrations/base-java/src/main/java/io/airbyte/integrations/base/StatefulConsumer.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -package io.airbyte.integrations.base; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class StatefulConsumer implements DestinationConsumer { - - private static final Logger LOGGER = LoggerFactory.getLogger(StatefulConsumer.class); - - private boolean hasFailed = false; - - protected abstract void acceptInternal(T t) throws Exception; - - public void accept(T t) throws Exception { - try { - acceptInternal(t); - } catch (Exception e) { - hasFailed = true; - throw e; - } - } - - protected abstract void close(boolean hasFailed) throws Exception; - - public void close() throws Exception { - LOGGER.info("hasFailed: {}.", hasFailed); - close(hasFailed); - } - -}