diff --git a/.env b/.env index fdf824b827315..01a50bb867bec 100644 --- a/.env +++ b/.env @@ -81,7 +81,6 @@ JOB_ERROR_REPORTING_STRATEGY=logging # Although not present as an env var, expected by Log4J configuration. LOG_LEVEL=INFO - ### APPLICATIONS ### # Worker # WORKERS_MICRONAUT_ENVIRONMENTS=control-plane diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 69adc4af4cb6a..67d9f25af6c5f 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -2264,6 +2264,26 @@ paths: application/json: schema: $ref: "#/components/schemas/InternalOperationResult" + /v1/attempt/save_stats: + post: + tags: + - attempt + - internal + summary: For worker to set running attempt stats. + operationId: saveStats + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/SaveStatsRequestBody" + required: true + responses: + "200": + description: Successful Operation + content: + application/json: + schema: + $ref: "#/components/schemas/InternalOperationResult" components: securitySchemes: @@ -4038,6 +4058,12 @@ components: recordsCommitted: type: integer format: int64 + estimatedRecords: + type: integer + format: int64 + estimatedBytes: + type: integer + format: int64 AttemptStreamStats: type: object required: @@ -4881,6 +4907,23 @@ components: processingTaskQueue: type: string default: "" + SaveStatsRequestBody: + type: object + required: + - jobId + - attemptNumber + - stats + properties: + jobId: + $ref: "#/components/schemas/JobId" + attemptNumber: + $ref: "#/components/schemas/AttemptNumber" + stats: + $ref: "#/components/schemas/AttemptStats" + streamStats: + type: array + items: + $ref: "#/components/schemas/AttemptStreamStats" InternalOperationResult: type: object required: diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java index cf2630467ca45..371488b1098a1 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java @@ -136,7 +136,7 @@ void testBootloaderAppBlankDb() throws Exception { bootloader.load(); val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway); - assertEquals("0.40.14.001", jobsMigrator.getLatestMigration().getVersion().getVersion()); + assertEquals("0.40.17.001", jobsMigrator.getLatestMigration().getVersion().getVersion()); val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway); // this line should change with every new migration diff --git a/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py b/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py index a4b654310d00b..7560d968d073c 100644 --- a/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py +++ b/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py @@ -81,6 +81,7 @@ class Config: class TraceType(Enum): ERROR = "ERROR" + ESTIMATE = "ESTIMATE" class FailureType(Enum): @@ -98,6 +99,28 @@ class Config: failure_type: Optional[FailureType] = Field(None, description="The type of error") +class Type1(Enum): + STREAM = "STREAM" + SYNC = "SYNC" + + +class AirbyteEstimateTraceMessage(BaseModel): + class Config: + extra = Extra.allow + + name: str = Field(..., description="The name of the stream") + type: Type1 = Field(..., description="The type of estimate") + namespace: Optional[str] = Field(None, description="The namespace of the stream") + row_estimate: Optional[float] = Field( + None, + description="The estimated number of rows to be emitted by this sync for this stream", + ) + byte_estimate: Optional[float] = Field( + None, + description="The estimated number of bytes to be emitted by this sync for this stream", + ) + + class OrchestratorType(Enum): CONNECTOR_CONFIG = "CONNECTOR_CONFIG" @@ -213,6 +236,10 @@ class Config: type: TraceType = Field(..., description="the type of trace message", title="trace type") emitted_at: float = Field(..., description="the time in ms that the message was emitted") error: Optional[AirbyteErrorTraceMessage] = Field(None, description="error trace message: the error object") + estimate: Optional[AirbyteEstimateTraceMessage] = Field( + None, + description="Estimate trace message: a guess at how much data will be produced in this sync", + ) class AirbyteControlMessage(BaseModel): diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java index 7a7c7806d7d05..6fb5efdb332b7 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/sync/OrchestratorConstants.java @@ -46,6 +46,7 @@ public class OrchestratorConstants { EnvConfigs.DD_AGENT_HOST, EnvConfigs.DD_DOGSTATSD_PORT, EnvConfigs.METRIC_CLIENT, + EnvConfigs.INTERNAL_API_HOST, LOG_LEVEL, LogClientSingleton.GCS_LOG_BUCKET, LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS, diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java index 6ab4ef51bab77..61b457635aa65 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java @@ -11,7 +11,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import datadog.trace.api.Trace; +import io.airbyte.api.client.AirbyteApiClient; +import io.airbyte.api.client.invoker.generated.ApiClient; +import io.airbyte.api.client.invoker.generated.ApiException; +import io.airbyte.api.client.model.generated.AttemptStats; +import io.airbyte.api.client.model.generated.AttemptStreamStats; +import io.airbyte.api.client.model.generated.SaveStatsRequestBody; import io.airbyte.commons.io.LineGobbler; +import io.airbyte.config.Configs; +import io.airbyte.config.Configs.WorkerEnvironment; +import io.airbyte.config.EnvConfigs; import io.airbyte.config.FailureReason; import io.airbyte.config.ReplicationAttemptSummary; import io.airbyte.config.ReplicationOutput; @@ -80,6 +89,27 @@ public class DefaultReplicationWorker implements ReplicationWorker { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultReplicationWorker.class); + private static final Configs CONFIGS = new EnvConfigs(); + private static final AirbyteApiClient CLIENT = getAirbyteApiClient(); + + // Passing env vars to the container orchestrator isn't working properly. Hack around this for now. + // TODO(Davin): This doesn't work for Kube. Need to figure it out. + private static AirbyteApiClient getAirbyteApiClient() { + if (CONFIGS.getWorkerEnvironment() == WorkerEnvironment.DOCKER) { + return new AirbyteApiClient( + new ApiClient().setScheme("http") + .setHost(CONFIGS.getAirbyteApiHost()) + .setPort(CONFIGS.getAirbyteApiPort()) + .setBasePath("/api")); + } + + return new AirbyteApiClient( + new ApiClient().setScheme("http") + .setHost("airbyte-server-svc") + .setPort(8001) + .setBasePath("/api")); + } + private final String jobId; private final int attempt; private final AirbyteSource source; @@ -180,7 +210,8 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path }); final CompletableFuture replicationThreadFuture = CompletableFuture.runAsync( - getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter, timeTracker), + getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter, timeTracker, + Long.parseLong(jobId), attempt), executors) .whenComplete((msg, ex) -> { if (ex != null) { @@ -347,7 +378,9 @@ private static Runnable getReplicationRunnable(final AirbyteSource source, final Map mdc, final RecordSchemaValidator recordSchemaValidator, final WorkerMetricReporter metricReporter, - final ThreadedTimeTracker timeHolder) { + final ThreadedTimeTracker timeHolder, + final Long jobId, + final Integer attemptNumber) { return () -> { MDC.setContextMap(mdc); LOGGER.info("Replication thread started."); @@ -367,8 +400,15 @@ private static Runnable getReplicationRunnable(final AirbyteSource source, validateSchema(recordSchemaValidator, validationErrors, airbyteMessage); final AirbyteMessage message = mapper.mapMessage(airbyteMessage); + // metrics block messageTracker.acceptFromSource(message); + // config/mutating platform state block + if (message.getType() == Type.STATE || message.getType() == Type.TRACE) { + saveStats(messageTracker, jobId, attemptNumber); + } + + // continue processing try { if (message.getType() == Type.RECORD || message.getType() == Type.STATE) { destination.accept(message); @@ -427,6 +467,37 @@ private static Runnable getReplicationRunnable(final AirbyteSource source, }; } + private static void saveStats(MessageTracker messageTracker, Long jobId, Integer attemptNumber) { + final AttemptStats totalStats = new AttemptStats() + .bytesEmitted(messageTracker.getTotalBytesEmitted()) + .recordsEmitted(messageTracker.getTotalRecordsEmitted()) + .estimatedBytes(messageTracker.getTotalBytesEstimated()) + .estimatedRecords(messageTracker.getTotalRecordsEstimated()); + + // calculate per stream stats + List streamStats = messageTracker.getStreamToEstimatedBytes().keySet().stream().map(stream -> { + final var syncStats = new AttemptStats() + .recordsEmitted(messageTracker.getStreamToEmittedRecords().get(stream)) + .bytesEmitted(messageTracker.getStreamToEmittedBytes().get(stream)) + .estimatedBytes(messageTracker.getStreamToEstimatedBytes().get(stream)) + .estimatedRecords(messageTracker.getStreamToEstimatedRecords().get(stream)); + + return new AttemptStreamStats().streamName(stream).stats(syncStats); + }).collect(Collectors.toList());; + + final SaveStatsRequestBody saveStatsRequestBody = new SaveStatsRequestBody() + .jobId(jobId) + .attemptNumber(attemptNumber) + .stats(totalStats) + .streamStats(streamStats); + LOGGER.info("saving stats"); + try { + CLIENT.getAttemptApi().saveStats(saveStatsRequestBody); + } catch (ApiException e) { + LOGGER.warn("error trying to save stats: ", e); + } + } + private static void validateSchema(final RecordSchemaValidator recordSchemaValidator, final Map, Integer>> validationErrors, final AirbyteMessage message) { diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java index aa4b348887aeb..6afb755a49b07 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java @@ -25,7 +25,9 @@ import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; import io.airbyte.protocol.models.AirbyteTraceMessage; import io.airbyte.workers.helper.FailureHelper; +import io.airbyte.workers.internal.StateDeltaTracker.StateDeltaTrackerException; import io.airbyte.workers.internal.StateMetricsTracker.StateMetricsTrackerNoStateMatchException; +import io.airbyte.workers.internal.StateMetricsTracker.StateMetricsTrackerOomException; import io.airbyte.workers.internal.state_aggregator.DefaultStateAggregator; import io.airbyte.workers.internal.state_aggregator.StateAggregator; import java.time.LocalDateTime; @@ -51,6 +53,10 @@ public class AirbyteMessageTracker implements MessageTracker { private final BiMap streamNameToIndex; private final Map streamToTotalBytesEmitted; private final Map streamToTotalRecordsEmitted; + + private final Map streamToTotalBytesEstimated; + + private final Map streamToTotalRecordsEstimated; private final StateDeltaTracker stateDeltaTracker; private final StateMetricsTracker stateMetricsTracker; private final List destinationErrorTraceMessages; @@ -93,6 +99,8 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker, this.hashFunction = Hashing.murmur3_32_fixed(); this.streamToTotalBytesEmitted = new HashMap<>(); this.streamToTotalRecordsEmitted = new HashMap<>(); + this.streamToTotalBytesEstimated = new HashMap<>(); + this.streamToTotalRecordsEstimated = new HashMap<>(); this.stateDeltaTracker = stateDeltaTracker; this.stateMetricsTracker = stateMetricsTracker; this.nextStreamIndex = 0; @@ -173,12 +181,12 @@ private void handleSourceEmittedState(final AirbyteStateMessage stateMessage) { if (!unreliableStateTimingMetrics) { stateMetricsTracker.addState(stateMessage, stateHash, timeEmittedStateMessage); } - } catch (final StateDeltaTracker.StateDeltaTrackerException e) { + } catch (final StateDeltaTrackerException e) { log.warn("The message tracker encountered an issue that prevents committed record counts from being reliably computed."); log.warn("This only impacts metadata and does not indicate a problem with actual sync data."); log.warn(e.getMessage(), e); unreliableCommittedCounts = true; - } catch (final StateMetricsTracker.StateMetricsTrackerOomException e) { + } catch (final StateMetricsTrackerOomException e) { log.warn("The StateMetricsTracker encountered an out of memory error that prevents new state metrics from being recorded"); log.warn("This only affects metrics and does not indicate a problem with actual sync data."); unreliableStateTimingMetrics = true; @@ -251,6 +259,7 @@ private void handleEmittedOrchestratorConnectorConfig(final AirbyteControlConnec private void handleEmittedTrace(final AirbyteTraceMessage traceMessage, final ConnectorType connectorType) { switch (traceMessage.getType()) { case ERROR -> handleEmittedErrorTrace(traceMessage, connectorType); + case ESTIMATE -> handleEmittedEstimateTrace(traceMessage); default -> log.warn("Invalid message type for trace message: {}", traceMessage); } } @@ -263,6 +272,19 @@ private void handleEmittedErrorTrace(final AirbyteTraceMessage errorTraceMessage } } + @SuppressWarnings("PMD") // until method is implemented + private void handleEmittedEstimateTrace(final AirbyteTraceMessage estimateTraceMessage) { + // Assume the estimate is a whole number and not a sum i.e. each estimate replaces the previous + // estimate. + + log.info("====== saving trace estimates"); + final var estimate = estimateTraceMessage.getEstimate(); + final var index = getStreamIndex(estimate.getName()); + + streamToTotalRecordsEstimated.put(index, estimate.getRowEstimate()); + streamToTotalBytesEstimated.put(index, estimate.getByteEstimate()); + } + private short getStreamIndex(final String streamName) { if (!streamNameToIndex.containsKey(streamName)) { streamNameToIndex.put(streamName, nextStreamIndex); @@ -363,6 +385,13 @@ public Map getStreamToEmittedRecords() { Map.Entry::getValue)); } + @Override + public Map getStreamToEstimatedRecords() { + return streamToTotalRecordsEstimated.entrySet().stream().collect(Collectors.toMap( + entry -> streamNameToIndex.inverse().get(entry.getKey()), + Map.Entry::getValue)); + } + /** * Swap out stream indices for stream names and return total bytes emitted by stream. */ @@ -373,6 +402,13 @@ public Map getStreamToEmittedBytes() { Map.Entry::getValue)); } + @Override + public Map getStreamToEstimatedBytes() { + return streamToTotalBytesEstimated.entrySet().stream().collect(Collectors.toMap( + entry -> streamNameToIndex.inverse().get(entry.getKey()), + Map.Entry::getValue)); + } + /** * Compute sum of emitted record counts across all streams. */ @@ -381,6 +417,11 @@ public long getTotalRecordsEmitted() { return streamToTotalRecordsEmitted.values().stream().reduce(0L, Long::sum); } + @Override + public long getTotalRecordsEstimated() { + return streamToTotalRecordsEstimated.values().stream().reduce(0L, Long::sum); + } + /** * Compute sum of emitted bytes across all streams. */ @@ -389,6 +430,11 @@ public long getTotalBytesEmitted() { return streamToTotalBytesEmitted.values().stream().reduce(0L, Long::sum); } + @Override + public long getTotalBytesEstimated() { + return streamToTotalBytesEstimated.values().stream().reduce(0L, Long::sum); + } + /** * Compute sum of committed record counts across all streams. If the delta tracker has exceeded its * capacity, return empty because committed record counts cannot be reliably computed. diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/MessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/MessageTracker.java index 86994fd785c85..a76de0c1a6cf6 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/MessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/MessageTracker.java @@ -65,6 +65,8 @@ public interface MessageTracker { */ Map getStreamToEmittedRecords(); + Map getStreamToEstimatedRecords(); + /** * Get the per-stream emitted byte count. This includes messages that were emitted by the source, * but never committed by the destination. @@ -73,6 +75,8 @@ public interface MessageTracker { */ Map getStreamToEmittedBytes(); + Map getStreamToEstimatedBytes(); + /** * Get the overall emitted record count. This includes messages that were emitted by the source, but * never committed by the destination. @@ -81,6 +85,8 @@ public interface MessageTracker { */ long getTotalRecordsEmitted(); + long getTotalRecordsEstimated(); + /** * Get the overall emitted bytes. This includes messages that were emitted by the source, but never * committed by the destination. @@ -89,6 +95,8 @@ public interface MessageTracker { */ long getTotalBytesEmitted(); + long getTotalBytesEstimated(); + /** * Get the overall committed record count. * diff --git a/airbyte-config/config-models/src/main/resources/types/SyncStats.yaml b/airbyte-config/config-models/src/main/resources/types/SyncStats.yaml index 6410a3695292b..0c60c14f09aad 100644 --- a/airbyte-config/config-models/src/main/resources/types/SyncStats.yaml +++ b/airbyte-config/config-models/src/main/resources/types/SyncStats.yaml @@ -47,3 +47,13 @@ properties: destinationWriteEndTime: type: integer description: The exit time of the destination container/pod + estimatedBytes: + type: integer + description: The total estimated number of bytes for the sync + estimatedRecords: + type: integer + description: The total estimated number of records for the sync + streamStats: + type: array + items: + "$ref": StreamSyncStats.yaml diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 8c82c3ccdcc63..eaea5a9253365 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -2288,48 +2288,48 @@ supported_destination_sync_modes: [] - dockerImage: "airbyte/source-coinmarketcap:0.1.0" spec: - documentationUrl: https://docs.airbyte.com/integrations/sources/coinmarketcap + documentationUrl: "https://docs.airbyte.com/integrations/sources/coinmarketcap" connectionSpecification: - $schema: http://json-schema.org/draft-07/schema# - title: Coinmarketcap Spec - type: object + $schema: "http://json-schema.org/draft-07/schema#" + title: "Coinmarketcap Spec" + type: "object" required: - - api_key - - data_type + - "api_key" + - "data_type" additionalProperties: true properties: api_key: - title: API Key - type: string - description: >- - Your API Key. See here. The token is - case sensitive. + title: "API Key" + type: "string" + description: "Your API Key. See here. The token is case sensitive." airbyte_secret: true data_type: - title: Data type - type: string + title: "Data type" + type: "string" enum: - - latest - - historical - description: >- - /latest: Latest market ticker quotes and averages for cryptocurrencies and exchanges. - /historical: Intervals of historic market data like OHLCV data or data for use in charting libraries. See here. + - "latest" + - "historical" + description: "/latest: Latest market ticker quotes and averages for cryptocurrencies\ + \ and exchanges. /historical: Intervals of historic market data like OHLCV\ + \ data or data for use in charting libraries. See here." symbols: - title: Symbol - type: array - items: { - "type": "string" - } - description: Cryptocurrency symbols. (only used for quotes stream) + title: "Symbol" + type: "array" + items: + type: "string" + description: "Cryptocurrency symbols. (only used for quotes stream)" minItems: 1 examples: - - AVAX - - BTC + - "AVAX" + - "BTC" supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] + supportsNormalization: false + supportsDBT: false + supported_destination_sync_modes: [] - dockerImage: "airbyte/source-commercetools:0.1.0" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/commercetools" diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_40_17_001__AddEstimatedRecordsAndBytesColumns.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_40_17_001__AddEstimatedRecordsAndBytesColumns.java new file mode 100644 index 0000000000000..316a6d514f5d2 --- /dev/null +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_40_17_001__AddEstimatedRecordsAndBytesColumns.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.jobs.migrations; + +import org.flywaydb.core.api.migration.BaseJavaMigration; +import org.flywaydb.core.api.migration.Context; +import org.jooq.DSLContext; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class V0_40_17_001__AddEstimatedRecordsAndBytesColumns extends BaseJavaMigration { + + private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_17_001__AddEstimatedRecordsAndBytesColumns.class); + + @Override + public void migrate(final Context context) throws Exception { + LOGGER.info("Running migration: {}", this.getClass().getSimpleName()); + + // Warning: please do not use any jOOQ generated code to write a migration. + // As database schema changes, the generated jOOQ code can be deprecated. So + // old migration may not compile if there is any generated code. + final DSLContext ctx = DSL.using(context.getConnection()); + ctx.alterTable("sync_stats") + .add( + DSL.field("estimated_records", SQLDataType.BIGINT.nullable(true)), + DSL.field("estimated_bytes", SQLDataType.BIGINT.nullable(true))) + .execute(); + } + +} diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_40_17_002__AddStreamStatsColumnToSyncStats.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_40_17_002__AddStreamStatsColumnToSyncStats.java new file mode 100644 index 0000000000000..aa2db496f9a84 --- /dev/null +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/instance/jobs/migrations/V0_40_17_002__AddStreamStatsColumnToSyncStats.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.jobs.migrations; + +import org.flywaydb.core.api.migration.BaseJavaMigration; +import org.flywaydb.core.api.migration.Context; +import org.jooq.DSLContext; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO: update migration description in the class name +public class V0_40_17_002__AddStreamStatsColumnToSyncStats extends BaseJavaMigration { + + private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_17_002__AddStreamStatsColumnToSyncStats.class); + + @Override + public void migrate(final Context context) throws Exception { + LOGGER.info("Running migration: {}", this.getClass().getSimpleName()); + + // Warning: please do not use any jOOQ generated code to write a migration. + // As database schema changes, the generated jOOQ code can be deprecated. So + // old migration may not compile if there is any generated code. + final DSLContext ctx = DSL.using(context.getConnection()); + ctx.alterTable("sync_stats") + .add( + DSL.field("stream_stats", SQLDataType.JSONB.nullable(true))) + .execute(); + } + +} diff --git a/airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt b/airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt index 15cd985a91184..f9b451a6c6700 100644 --- a/airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt +++ b/airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt @@ -2,6 +2,16 @@ // It is also not used by any piece of code to generate anything. // It doesn't contain the enums created in the database and the default values might also be buggy. +create table "public"."airbyte_configs"( + "id" int8 generated by default as identity not null, + "config_id" varchar(36) not null, + "config_type" varchar(60) not null, + "config_blob" jsonb not null, + "created_at" timestamptz(35) not null default null, + "updated_at" timestamptz(35) not null default null, + constraint "airbyte_configs_pkey" + primary key ("id") +); create table "public"."airbyte_jobs_migrations"( "installed_rank" int4 not null, "version" varchar(50) null, @@ -75,6 +85,9 @@ create table "public"."sync_stats"( "max_seconds_between_state_message_emitted_and_committed" int8 null, "created_at" timestamptz(35) not null default null, "updated_at" timestamptz(35) not null default null, + "estimated_records" int8 null, + "estimated_bytes" int8 null, + "stream_stats" jsonb null, constraint "sync_stats_pkey" primary key ("id") ); @@ -86,6 +99,12 @@ alter table "public"."sync_stats" add constraint "sync_stats_attempt_id_fkey" foreign key ("attempt_id") references "public"."attempts" ("id"); +create index "airbyte_configs_id_idx" on "public"."airbyte_configs"("config_id" asc); +create unique index "airbyte_configs_pkey" on "public"."airbyte_configs"("id" asc); +create unique index "airbyte_configs_type_id_idx" on "public"."airbyte_configs"( + "config_type" asc, + "config_id" asc +); create unique index "airbyte_jobs_migrations_pk" on "public"."airbyte_jobs_migrations"("installed_rank" asc); create index "airbyte_jobs_migrations_s_idx" on "public"."airbyte_jobs_migrations"("success" asc); create unique index "airbyte_metadata_pkey" on "public"."airbyte_metadata"("key" asc); diff --git a/airbyte-integrations/connectors/source-faker/setup.py b/airbyte-integrations/connectors/source-faker/setup.py index ab62499037f54..ca2136a928a53 100644 --- a/airbyte-integrations/connectors/source-faker/setup.py +++ b/airbyte-integrations/connectors/source-faker/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "mimesis==6.1.1"] +MAIN_REQUIREMENTS = ["airbyte-cdk~=0.2", "mimesis==6.1.1"] TEST_REQUIREMENTS = [ "pytest~=6.1", diff --git a/airbyte-integrations/connectors/source-faker/source_faker/source.py b/airbyte-integrations/connectors/source-faker/source_faker/source.py index 6e664751df249..f4d6f3125c7a3 100644 --- a/airbyte-integrations/connectors/source-faker/source_faker/source.py +++ b/airbyte-integrations/connectors/source-faker/source_faker/source.py @@ -41,8 +41,10 @@ def check(self, logger: AirbyteLogger, config: Dict[str, any]) -> AirbyteConnect :return: AirbyteConnectionStatus indicating a Success or Failure """ - # As this is an in-memory source, it always succeeds - return AirbyteConnectionStatus(status=Status.SUCCEEDED) + if type(config["count"]) == int or type(config["count"]) == float: + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + else: + return AirbyteConnectionStatus(status=Status.FAILED) def discover(self, logger: AirbyteLogger, config: Dict[str, any]) -> AirbyteCatalog: """ @@ -136,6 +138,10 @@ def read( records_in_sync = 0 records_in_page = 0 + users_estimate = count - cursor + yield generate_estimate(stream.stream.name, users_estimate, 450) + yield generate_estimate("Purchases", users_estimate * 1.5, 230) # a fuzzy guess, some users have purchases, some don't + for i in range(cursor, count): user = generate_user(person, dt, i) yield generate_record(stream, user) @@ -162,6 +168,7 @@ def read( elif stream.stream.name == "Products": products = generate_products() + yield generate_estimate(stream.stream.name, len(products), 180) for p in products: yield generate_record(stream, p) yield generate_state(state, stream, {"product_count": len(products)}) @@ -193,6 +200,21 @@ def generate_record(stream: any, data: any): ) +def generate_estimate(stream_name: str, total: int, bytes_per_row: int): + # TODO: Use the updated CDK classes when published, e.g. `return AirbyteMessage`` + + data = { + "type": "TRACE", + "trace": { + "emitted_at": int(datetime.datetime.now().timestamp() * 1000), + "type": "ESTIMATE", + "estimate": {"type": "STREAM", "name": stream_name, "namespace": "", "row_estimate": round(total), "byte_estimate": round(total * bytes_per_row)}, + }, + } + + return HackedAirbyteTraceMessage(data) + + def log_stream(stream_name: str): return AirbyteMessage( type=Type.LOG, @@ -300,3 +322,14 @@ def format_airbyte_time(d: datetime): s = s.replace(" ", "T") s += "+00:00" return s + + +class HackedAirbyteTraceMessage: + data = {} + type = "TRACE" + + def __init__(self, data: dict): + self.data = data + + def json(self, exclude_unset): + return json.dumps(self.data) diff --git a/airbyte-integrations/connectors/source-faker/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-faker/unit_tests/unit_test.py index 0db54325bffaa..68a4351ba2b5c 100644 --- a/airbyte-integrations/connectors/source-faker/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-faker/unit_tests/unit_test.py @@ -44,7 +44,13 @@ def test_read_small_random_data(): logger = None config = {"count": 10} catalog = ConfiguredAirbyteCatalog( - streams=[{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}] + streams=[ + { + "stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + } + ] ) state = {} iterator = source.read(logger, config, catalog, state) @@ -70,8 +76,16 @@ def test_read_big_random_data(): config = {"count": 1000, "records_per_slice": 100, "records_per_sync": 1000} catalog = ConfiguredAirbyteCatalog( streams=[ - {"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}, - {"stream": {"name": "Products", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}, + { + "stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + }, + { + "stream": {"name": "Products", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + }, ] ) state = {} @@ -98,9 +112,21 @@ def test_with_purchases(): config = {"count": 1000, "records_per_sync": 1000} catalog = ConfiguredAirbyteCatalog( streams=[ - {"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}, - {"stream": {"name": "Products", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}, - {"stream": {"name": "Purchases", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}, + { + "stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + }, + { + "stream": {"name": "Products", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + }, + { + "stream": {"name": "Purchases", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + }, ] ) state = {} @@ -128,7 +154,13 @@ def test_sync_ends_with_limit(): logger = None config = {"count": 100, "records_per_sync": 5} catalog = ConfiguredAirbyteCatalog( - streams=[{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}] + streams=[ + { + "stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + } + ] ) state = {} iterator = source.read(logger, config, catalog, state) @@ -157,7 +189,13 @@ def test_read_with_seed(): logger = None config = {"count": 1, "seed": 100} catalog = ConfiguredAirbyteCatalog( - streams=[{"stream": {"name": "Users", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}] + streams=[ + { + "stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite", + } + ] ) state = {} iterator = source.read(logger, config, catalog, state) diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index 1f5f5b2a23814..8a1b415a8055e 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -32,6 +32,7 @@ import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobOutput; import io.airbyte.config.NormalizationSummary; +import io.airbyte.config.StreamSyncStats; import io.airbyte.config.SyncStats; import io.airbyte.db.Database; import io.airbyte.db.ExceptionWrappingDatabase; @@ -349,21 +350,42 @@ public void writeOutput(final long jobId, final int attemptNumber, final JobOutp attemptNumber).stream().findFirst(); final Long attemptId = record.get().get("id", Long.class); - ctx.insertInto(SYNC_STATS) - .set(SYNC_STATS.ID, UUID.randomUUID()) - .set(SYNC_STATS.UPDATED_AT, now) - .set(SYNC_STATS.CREATED_AT, now) - .set(SYNC_STATS.ATTEMPT_ID, attemptId) - .set(SYNC_STATS.BYTES_EMITTED, syncStats.getBytesEmitted()) - .set(SYNC_STATS.RECORDS_EMITTED, syncStats.getRecordsEmitted()) - .set(SYNC_STATS.RECORDS_COMMITTED, syncStats.getRecordsCommitted()) - .set(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED, syncStats.getSourceStateMessagesEmitted()) - .set(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED, syncStats.getDestinationStateMessagesEmitted()) - .set(SYNC_STATS.MAX_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMaxSecondsBeforeSourceStateMessageEmitted()) - .set(SYNC_STATS.MEAN_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMeanSecondsBeforeSourceStateMessageEmitted()) - .set(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted()) - .set(SYNC_STATS.MEAN_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted()) - .execute(); + final var needToCreate = !ctx.fetchExists(SYNC_STATS, SYNC_STATS.ATTEMPT_ID.eq(attemptId)); + // A record might already created by the writeSyncStats method. + // TODO(Davin): This is ugly and can be removed. + if (needToCreate) { + ctx.insertInto(SYNC_STATS) + .set(SYNC_STATS.ID, UUID.randomUUID()) + .set(SYNC_STATS.ATTEMPT_ID, attemptId) + .set(SYNC_STATS.CREATED_AT, now) + .set(SYNC_STATS.UPDATED_AT, now) + .set(SYNC_STATS.BYTES_EMITTED, syncStats.getBytesEmitted()) + .set(SYNC_STATS.RECORDS_EMITTED, syncStats.getRecordsEmitted()) + .set(SYNC_STATS.RECORDS_COMMITTED, syncStats.getRecordsCommitted()) + .set(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED, syncStats.getSourceStateMessagesEmitted()) + .set(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED, syncStats.getDestinationStateMessagesEmitted()) + .set(SYNC_STATS.MAX_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMaxSecondsBeforeSourceStateMessageEmitted()) + .set(SYNC_STATS.MEAN_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMeanSecondsBeforeSourceStateMessageEmitted()) + .set(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted()) + .set(SYNC_STATS.MEAN_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, + syncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted()) + .execute(); + } else { + ctx.update(SYNC_STATS) + .set(SYNC_STATS.UPDATED_AT, now) + .set(SYNC_STATS.BYTES_EMITTED, syncStats.getBytesEmitted()) + .set(SYNC_STATS.RECORDS_EMITTED, syncStats.getRecordsEmitted()) + .set(SYNC_STATS.RECORDS_COMMITTED, syncStats.getRecordsCommitted()) + .set(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED, syncStats.getSourceStateMessagesEmitted()) + .set(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED, syncStats.getDestinationStateMessagesEmitted()) + .set(SYNC_STATS.MAX_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMaxSecondsBeforeSourceStateMessageEmitted()) + .set(SYNC_STATS.MEAN_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMeanSecondsBeforeSourceStateMessageEmitted()) + .set(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted()) + .set(SYNC_STATS.MEAN_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, + syncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted()) + .where(SYNC_STATS.ATTEMPT_ID.eq(attemptId)) + .execute(); + } if (normalizationSummary != null) { ctx.insertInto(NORMALIZATION_SUMMARIES) @@ -382,6 +404,56 @@ public void writeOutput(final long jobId, final int attemptNumber, final JobOutp } + @Override + public void writeSyncStats(long jobId, + int attemptNumber, + long estimatedRecords, + long estimatedBytes, + long recordsEmitted, + long bytesEmitted, + List streamStats) + throws IOException { + // Although the attempt table's output has a copy of the sync summary, we do not update it for + // running sync stat updates. + final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); + jobDatabase.transaction(ctx -> { + final Optional record = + ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId, + attemptNumber).stream().findFirst(); + final Long attemptId = record.get().get("id", Long.class); + + final var isExisting = ctx.fetchExists(SYNC_STATS, SYNC_STATS.ATTEMPT_ID.eq(attemptId)); + + if (isExisting) { + ctx.update(SYNC_STATS) + .set(SYNC_STATS.BYTES_EMITTED, bytesEmitted) + .set(SYNC_STATS.RECORDS_EMITTED, recordsEmitted) + .set(SYNC_STATS.ESTIMATED_BYTES, estimatedBytes) + .set(SYNC_STATS.ESTIMATED_RECORDS, estimatedRecords) + .set(SYNC_STATS.STREAM_STATS, JSONB.valueOf(Jsons.serialize(streamStats))) + .set(SYNC_STATS.UPDATED_AT, now) + .where(SYNC_STATS.ATTEMPT_ID.eq(attemptId)) + .execute(); + return null; + } + + ctx.insertInto(SYNC_STATS) + .set(SYNC_STATS.ID, UUID.randomUUID()) + .set(SYNC_STATS.UPDATED_AT, now) + .set(SYNC_STATS.CREATED_AT, now) + .set(SYNC_STATS.ATTEMPT_ID, attemptId) + .set(SYNC_STATS.BYTES_EMITTED, bytesEmitted) + .set(SYNC_STATS.RECORDS_EMITTED, recordsEmitted) + .set(SYNC_STATS.ESTIMATED_BYTES, estimatedBytes) + .set(SYNC_STATS.ESTIMATED_RECORDS, estimatedRecords) + .set(SYNC_STATS.STREAM_STATS, JSONB.valueOf(Jsons.serialize(streamStats))) + .execute(); + + // write per stream stat info + return null; + }); + } + @Override public void writeAttemptFailureSummary(final long jobId, final int attemptNumber, final AttemptFailureSummary failureSummary) throws IOException { final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); @@ -403,6 +475,15 @@ public List getSyncStats(final Long attemptId) throws IOException { .toList()); } + @Override + public Long getAttemptId(Long jobId, Long attemptNumber) throws IOException { + final Optional record = + jobDatabase.query(ctx -> ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId, + attemptNumber).stream().findFirst()); + + return record.get().get("id", Long.class); + } + @Override public List getNormalizationSummary(final Long attemptId) throws IOException, JsonProcessingException { return jobDatabase @@ -413,14 +494,23 @@ public List getNormalizationSummary(final Long attemptId) } private static RecordMapper getSyncStatsRecordMapper() { - return record -> new SyncStats().withBytesEmitted(record.get(SYNC_STATS.BYTES_EMITTED)).withRecordsEmitted(record.get(SYNC_STATS.RECORDS_EMITTED)) - .withSourceStateMessagesEmitted(record.get(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED)) - .withDestinationStateMessagesEmitted(record.get(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED)) - .withRecordsCommitted(record.get(SYNC_STATS.RECORDS_COMMITTED)) - .withMeanSecondsBeforeSourceStateMessageEmitted(record.get(SYNC_STATS.MEAN_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED)) - .withMaxSecondsBeforeSourceStateMessageEmitted(record.get(SYNC_STATS.MAX_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED)) - .withMeanSecondsBetweenStateMessageEmittedandCommitted(record.get(SYNC_STATS.MEAN_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED)) - .withMaxSecondsBetweenStateMessageEmittedandCommitted(record.get(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED)); + return record -> { + try { + return new SyncStats().withBytesEmitted(record.get(SYNC_STATS.BYTES_EMITTED)).withRecordsEmitted(record.get(SYNC_STATS.RECORDS_EMITTED)) + .withSourceStateMessagesEmitted(record.get(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED)) + .withDestinationStateMessagesEmitted(record.get(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED)) + .withRecordsCommitted(record.get(SYNC_STATS.RECORDS_COMMITTED)) + .withEstimatedBytes(record.get(SYNC_STATS.ESTIMATED_BYTES)) + .withEstimatedRecords(record.get(SYNC_STATS.ESTIMATED_RECORDS)) + .withMeanSecondsBeforeSourceStateMessageEmitted(record.get(SYNC_STATS.MEAN_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED)) + .withMaxSecondsBeforeSourceStateMessageEmitted(record.get(SYNC_STATS.MAX_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED)) + .withMeanSecondsBetweenStateMessageEmittedandCommitted(record.get(SYNC_STATS.MEAN_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED)) + .withMaxSecondsBetweenStateMessageEmittedandCommitted(record.get(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED)) + .withStreamStats(record.get(SYNC_STATS.STREAM_STATS, String.class) == null ? null : deserializeStreamStats(record)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }; } private static RecordMapper getNormalizationSummaryRecordMapper() { @@ -441,6 +531,11 @@ private static List deserializeFailureReasons(final Record record return List.of(mapper.readValue(String.valueOf(record.get(NORMALIZATION_SUMMARIES.FAILURES)), FailureReason[].class)); } + private static List deserializeStreamStats(final Record record) throws JsonProcessingException { + final ObjectMapper mapper = new ObjectMapper(); + return List.of(mapper.readValue(String.valueOf(record.get(SYNC_STATS.STREAM_STATS)), StreamSyncStats[].class)); + } + @Override public Job getJob(final long jobId) throws IOException { return jobDatabase.query(ctx -> getJob(ctx, jobId)); diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java index e5b389cae2f45..77fd83eec8084 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java @@ -11,6 +11,7 @@ import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobOutput; import io.airbyte.config.NormalizationSummary; +import io.airbyte.config.StreamSyncStats; import io.airbyte.config.SyncStats; import io.airbyte.db.instance.jobs.JobsDatabaseSchema; import io.airbyte.persistence.job.models.AttemptNormalizationStatus; @@ -36,6 +37,17 @@ public interface JobPersistence { List getSyncStats(Long attemptId) throws IOException; + /** + * Return the id of the record in the attempt table corresponding to that job and attempt + * combination. This is useful to index into other attempt-scoped metadata. + * + * @param jobId + * @param attemptNumber + * @return + * @throws IOException + */ + Long getAttemptId(Long jobId, Long attemptNumber) throws IOException; + List getNormalizationSummary(Long attemptId) throws IOException; Job getJob(long jobId) throws IOException; @@ -136,6 +148,15 @@ public interface JobPersistence { */ void writeOutput(long jobId, int attemptNumber, JobOutput output) throws IOException; + void writeSyncStats(long jobId, + int attemptNumber, + long estimatedRecords, + long estimatedBytes, + long recordsEmitted, + long bytesEmitted, + List streamStats) + throws IOException; + /** * Writes a summary of all failures that occurred during the attempt. * diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java index 06cdf5c4a2c56..d01de4a579e14 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java @@ -308,6 +308,64 @@ void testWriteOutput() throws IOException, SQLException { assertEquals(List.of(failureReason1, failureReason2), storedNormalizationSummary.getFailures()); } + @Nested + class SyncStatsTest { + + @Test + @DisplayName("Writing sync stats the first time should only write record and bytes information correctly") + void testWriteSyncStatsFirst() throws IOException, SQLException { + final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); + final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH); + jobPersistence.writeSyncStats(jobId, attemptNumber, 1000, 1000, 1000, 1000, null); + + final Optional record = + jobDatabase.query(ctx -> ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId, + attemptNumber).stream().findFirst()); + final Long attemptId = record.get().get("id", Long.class); + + final var stats = jobPersistence.getSyncStats(attemptId).stream().findFirst().get(); + assertEquals(1000, stats.getBytesEmitted()); + assertEquals(1000, stats.getRecordsEmitted()); + assertEquals(1000, stats.getEstimatedBytes()); + assertEquals(1000, stats.getEstimatedRecords()); + + assertEquals(null, stats.getRecordsCommitted()); + assertEquals(null, stats.getDestinationStateMessagesEmitted()); + } + + @Test + @DisplayName("Writing sync stats multiple times should write record and bytes information correctly without exceptions") + void testWriteSyncStatsRepeated() throws IOException, SQLException { + final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); + final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH); + + jobPersistence.writeSyncStats(jobId, attemptNumber, 1000, 1000, 1000, 1000, null); + + final Optional record = + jobDatabase.query(ctx -> ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId, + attemptNumber).stream().findFirst()); + final Long attemptId = record.get().get("id", Long.class); + + var stat = jobPersistence.getSyncStats(attemptId).stream().findFirst().get(); + assertEquals(1000, stat.getBytesEmitted()); + assertEquals(1000, stat.getRecordsEmitted()); + assertEquals(1000, stat.getEstimatedBytes()); + assertEquals(1000, stat.getEstimatedRecords()); + + jobPersistence.writeSyncStats(jobId, attemptNumber, 2000, 2000, 2000, 2000, null); + var stats = jobPersistence.getSyncStats(attemptId); + assertEquals(1, stats.size()); + + stat = stats.stream().findFirst().get(); + assertEquals(2000, stat.getBytesEmitted()); + assertEquals(2000, stat.getRecordsEmitted()); + assertEquals(2000, stat.getEstimatedBytes()); + assertEquals(2000, stat.getEstimatedRecords()); + + } + + } + @Test @DisplayName("Should be able to read attemptFailureSummary that was written") void testWriteAttemptFailureSummary() throws IOException { diff --git a/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml b/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml index 9965bde95825d..8c60e9b6e0a60 100644 --- a/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml +++ b/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml @@ -4,7 +4,7 @@ title: AirbyteProtocol type: object description: AirbyteProtocol structs -version: 0.3.1 +version: 0.3.2 properties: airbyte_message: "$ref": "#/definitions/AirbyteMessage" @@ -174,12 +174,16 @@ definitions: type: string enum: - ERROR + - ESTIMATE emitted_at: description: "the time in ms that the message was emitted" type: number error: description: "error trace message: the error object" "$ref": "#/definitions/AirbyteErrorTraceMessage" + estimate: + description: "Estimate trace message: a guess at how much data will be produced in this sync" + "$ref": "#/definitions/AirbyteEstimateTraceMessage" AirbyteErrorTraceMessage: type: object additionalProperties: true @@ -201,6 +205,31 @@ definitions: enum: - system_error - config_error + AirbyteEstimateTraceMessage: + type: object + additionalProperties: true + required: + - name + - type + properties: + name: + description: The name of the stream + type: string + type: + description: The type of estimate + type: string + enum: + - STREAM + - SYNC + namespace: + description: The namespace of the stream + type: string + row_estimate: + description: The estimated number of rows to be emitted by this sync for this stream + type: integer + byte_estimate: + description: The estimated number of bytes to be emitted by this sync for this stream + type: integer AirbyteControlMessage: type: object additionalProperties: true diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/AttemptApiController.java b/airbyte-server/src/main/java/io/airbyte/server/apis/AttemptApiController.java index 71274154cca41..f2b067023e667 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/AttemptApiController.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/AttemptApiController.java @@ -6,11 +6,12 @@ import io.airbyte.api.generated.AttemptApi; import io.airbyte.api.model.generated.InternalOperationResult; +import io.airbyte.api.model.generated.SaveStatsRequestBody; import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody; import io.airbyte.server.handlers.AttemptHandler; import javax.ws.rs.Path; -@Path("/v1/attempt/set_workflow_in_attempt") +@Path("/v1/attempt/") public class AttemptApiController implements AttemptApi { private final AttemptHandler attemptHandler; @@ -20,6 +21,13 @@ public AttemptApiController(final AttemptHandler attemptHandler) { } @Override + // @Path("/v1/attempt/save_stats") + public InternalOperationResult saveStats(SaveStatsRequestBody requestBody) { + return ConfigurationApi.execute(() -> attemptHandler.saveStats(requestBody)); + } + + @Override + // @Path("/v1/attempt/set_workflow_in_attempt") public InternalOperationResult setWorkflowInAttempt(final SetWorkflowInAttemptRequestBody requestBody) { return ConfigurationApi.execute(() -> attemptHandler.setWorkflowInAttempt(requestBody)); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java index 6bc240c92a41e..e11cb11822676 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java @@ -64,6 +64,7 @@ import io.airbyte.api.model.generated.PrivateDestinationDefinitionReadList; import io.airbyte.api.model.generated.PrivateSourceDefinitionRead; import io.airbyte.api.model.generated.PrivateSourceDefinitionReadList; +import io.airbyte.api.model.generated.SaveStatsRequestBody; import io.airbyte.api.model.generated.SetInstancewideDestinationOauthParamsRequestBody; import io.airbyte.api.model.generated.SetInstancewideSourceOauthParamsRequestBody; import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody; @@ -375,6 +376,11 @@ public void revokeSourceDefinitionFromWorkspace(final SourceDefinitionIdWithWork }); } + @Override + public InternalOperationResult saveStats(SaveStatsRequestBody saveStatsRequestBody) { + return null; + } + // SOURCE SPECIFICATION @Override diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java index 52c28f3640f11..1cf0b25e9be7e 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java @@ -172,7 +172,9 @@ private static AttemptStats getTotalAttemptStats(final Attempt attempt) { .bytesEmitted(totalStats.getBytesEmitted()) .recordsEmitted(totalStats.getRecordsEmitted()) .stateMessagesEmitted(totalStats.getSourceStateMessagesEmitted()) - .recordsCommitted(totalStats.getRecordsCommitted()); + .recordsCommitted(totalStats.getRecordsCommitted()) + .estimatedRecords(totalStats.getEstimatedRecords()) + .estimatedBytes(totalStats.getEstimatedBytes()); } private static List getAttemptStreamStats(final Attempt attempt) { diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/AttemptHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/AttemptHandler.java index 83f86861d6b3f..b4cf998da6da9 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/AttemptHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/AttemptHandler.java @@ -5,9 +5,13 @@ package io.airbyte.server.handlers; import io.airbyte.api.model.generated.InternalOperationResult; +import io.airbyte.api.model.generated.SaveStatsRequestBody; import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody; +import io.airbyte.config.StreamSyncStats; +import io.airbyte.config.SyncStats; import io.airbyte.persistence.job.JobPersistence; import java.io.IOException; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,4 +37,29 @@ public InternalOperationResult setWorkflowInAttempt( return new InternalOperationResult().succeeded(true); } + public InternalOperationResult saveStats(SaveStatsRequestBody requestBody) { + try { + // This is for the entire sync for now. + final var stats = requestBody.getStats(); + final var streamStats = requestBody.getStreamStats().stream() + .map(s -> new StreamSyncStats() + .withStreamName(s.getStreamName()) + .withStats(new SyncStats() + .withBytesEmitted(s.getStats().getBytesEmitted()) + .withRecordsEmitted(s.getStats().getRecordsEmitted()) + .withEstimatedBytes(s.getStats().getEstimatedBytes()) + .withEstimatedRecords(s.getStats().getEstimatedRecords()))) + .collect(Collectors.toList()); + + jobPersistence.writeSyncStats(requestBody.getJobId(), requestBody.getAttemptNumber(), + stats.getEstimatedRecords(), stats.getEstimatedBytes(), stats.getRecordsEmitted(), stats.getBytesEmitted(), streamStats); + + } catch (IOException ioe) { + LOGGER.error("IOException when setting temporal workflow in attempt;", ioe); + return new InternalOperationResult().succeeded(false); + } + + return new InternalOperationResult().succeeded(true); + } + } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java index e25bee37f04be..4ad271c6aaa47 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java @@ -5,7 +5,11 @@ package io.airbyte.server.handlers; import com.google.common.base.Preconditions; +import io.airbyte.api.model.generated.AttemptInfoRead; import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList; +import io.airbyte.api.model.generated.AttemptRead; +import io.airbyte.api.model.generated.AttemptStats; +import io.airbyte.api.model.generated.AttemptStreamStats; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.DestinationDefinitionIdRequestBody; import io.airbyte.api.model.generated.DestinationDefinitionRead; @@ -40,12 +44,16 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class JobHistoryHandler { + private static final Random RANDOM = new Random(); private final ConnectionsHandler connectionsHandler; private final SourceHandler sourceHandler; private final DestinationHandler destinationHandler; @@ -104,6 +112,44 @@ public JobReadList listJobsFor(final JobListRequestBody request) throws IOExcept .map(JobConverter::getJobWithAttemptsRead) .collect(Collectors.toList()); + for (JobWithAttemptsRead jwar : jobReads) { + for (final AttemptRead a : jwar.getAttempts()) { + // I need the job + final var attemptId = jobPersistence.getAttemptId(jwar.getJob().getId(), a.getId()); + + final var syncStatList = jobPersistence.getSyncStats(attemptId); + if (syncStatList.size() == 0) { + // there should only be one returned. + continue; + } + + final var syncStat = jobPersistence.getSyncStats(attemptId).get(0); + if (a.getTotalStats() == null) { + a.setTotalStats(new AttemptStats()); + } + + // total stats + a.getTotalStats() + .estimatedBytes(syncStat.getEstimatedBytes()) + .estimatedRecords(syncStat.getEstimatedRecords()) + .bytesEmitted(syncStat.getBytesEmitted()) + .recordsEmitted(syncStat.getRecordsEmitted()); + + // stream stats + if (syncStat.getStreamStats() != null) { + final var streamStats = syncStat.getStreamStats().stream().map(s -> new AttemptStreamStats() + .streamName(s.getStreamName()) + .stats(new AttemptStats() + .bytesEmitted(s.getStats().getBytesEmitted()) + .recordsEmitted(s.getStats().getRecordsEmitted()) + .estimatedBytes(s.getStats().getEstimatedBytes()) + .estimatedRecords(s.getStats().getEstimatedRecords()))) + .collect(Collectors.toList()); + a.setStreamStats(streamStats); + } + } + } + return new JobReadList().jobs(jobReads).totalJobCount(totalJobCount); } @@ -122,6 +168,47 @@ public JobDebugInfoRead getJobDebugInfo(final JobIdRequestBody jobIdRequestBody) final Job job = jobPersistence.getJob(jobIdRequestBody.getId()); final JobInfoRead jobinfoRead = jobConverter.getJobInfoRead(job); + // jobConverter is pulling from the sync summary, so either we write to the sync summary, or we pull + // the information directly from + // the table while the job is running. + // if it's not running, we no longer need to do this. + if (job.getStatus() == JobStatus.RUNNING) { + for (final AttemptInfoRead a : jobinfoRead.getAttempts()) { + + final var attemptId = jobPersistence.getAttemptId(job.getId(), a.getAttempt().getId()); + final var syncStatList = jobPersistence.getSyncStats(attemptId); + if (syncStatList.size() == 0) { + // there should only be one returned. + continue; + } + + final var syncStat = jobPersistence.getSyncStats(attemptId).get(0); + if (a.getAttempt().getTotalStats() == null) { + a.getAttempt().setTotalStats(new AttemptStats()); + } + + // total stats + a.getAttempt().getTotalStats() + .estimatedBytes(syncStat.getEstimatedBytes()) + .estimatedRecords(syncStat.getEstimatedRecords()) + .bytesEmitted(syncStat.getBytesEmitted()) + .recordsEmitted(syncStat.getRecordsEmitted()); + + // stream stats + if (syncStat.getStreamStats() != null) { + final var streamStats = syncStat.getStreamStats().stream().map(s -> new AttemptStreamStats() + .streamName(s.getStreamName()) + .stats(new AttemptStats() + .bytesEmitted(s.getStats().getBytesEmitted()) + .recordsEmitted(s.getStats().getRecordsEmitted()) + .estimatedBytes(s.getStats().getEstimatedBytes()) + .estimatedRecords(s.getStats().getEstimatedRecords()))) + .collect(Collectors.toList()); + a.getAttempt().setStreamStats(streamStats); + } + } + } + return buildJobDebugInfoRead(jobinfoRead); } diff --git a/airbyte-webapp/package-lock.json b/airbyte-webapp/package-lock.json index d6a677da4c03c..59f32837ba954 100644 --- a/airbyte-webapp/package-lock.json +++ b/airbyte-webapp/package-lock.json @@ -31,6 +31,7 @@ "lodash": "^4.17.21", "mdast": "^3.0.0", "query-string": "^6.13.1", + "rc-progress": "^3.4.0", "react": "^17.0.2", "react-dom": "^17.0.2", "react-helmet-async": "^1.3.0", @@ -36065,6 +36066,34 @@ "rc": "cli.js" } }, + "node_modules/rc-progress": { + "version": "3.4.0", + "resolved": "https://registry.npmjs.org/rc-progress/-/rc-progress-3.4.0.tgz", + "integrity": "sha512-ZuMyOzzTkZnn+EKqGQ7YHzrvGzBtcCCVjx1McC/E/pMTvr6GWVfVRSawDlWsscxsJs7MkqSTwCO6Lu4IeoY2zQ==", + "dependencies": { + "@babel/runtime": "^7.10.1", + "classnames": "^2.2.6", + "rc-util": "^5.16.1" + }, + "peerDependencies": { + "react": ">=16.9.0", + "react-dom": ">=16.9.0" + } + }, + "node_modules/rc-util": { + "version": "5.24.4", + "resolved": "https://registry.npmjs.org/rc-util/-/rc-util-5.24.4.tgz", + "integrity": "sha512-2a4RQnycV9eV7lVZPEJ7QwJRPlZNc06J7CwcwZo4vIHr3PfUqtYgl1EkUV9ETAc6VRRi8XZOMFhYG63whlIC9Q==", + "dependencies": { + "@babel/runtime": "^7.18.3", + "react-is": "^16.12.0", + "shallowequal": "^1.1.0" + }, + "peerDependencies": { + "react": ">=16.9.0", + "react-dom": ">=16.9.0" + } + }, "node_modules/rc/node_modules/strip-json-comments": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/strip-json-comments/-/strip-json-comments-2.0.1.tgz", @@ -75387,6 +75416,26 @@ } } }, + "rc-progress": { + "version": "3.4.0", + "resolved": "https://registry.npmjs.org/rc-progress/-/rc-progress-3.4.0.tgz", + "integrity": "sha512-ZuMyOzzTkZnn+EKqGQ7YHzrvGzBtcCCVjx1McC/E/pMTvr6GWVfVRSawDlWsscxsJs7MkqSTwCO6Lu4IeoY2zQ==", + "requires": { + "@babel/runtime": "^7.10.1", + "classnames": "^2.2.6", + "rc-util": "^5.16.1" + } + }, + "rc-util": { + "version": "5.24.4", + "resolved": "https://registry.npmjs.org/rc-util/-/rc-util-5.24.4.tgz", + "integrity": "sha512-2a4RQnycV9eV7lVZPEJ7QwJRPlZNc06J7CwcwZo4vIHr3PfUqtYgl1EkUV9ETAc6VRRi8XZOMFhYG63whlIC9Q==", + "requires": { + "@babel/runtime": "^7.18.3", + "react-is": "^16.12.0", + "shallowequal": "^1.1.0" + } + }, "react": { "version": "17.0.2", "resolved": "https://registry.npmjs.org/react/-/react-17.0.2.tgz", diff --git a/airbyte-webapp/package.json b/airbyte-webapp/package.json index 53e0150b8a8e7..3ae3273048e2a 100644 --- a/airbyte-webapp/package.json +++ b/airbyte-webapp/package.json @@ -47,6 +47,7 @@ "lodash": "^4.17.21", "mdast": "^3.0.0", "query-string": "^6.13.1", + "rc-progress": "^3.4.0", "react": "^17.0.2", "react-dom": "^17.0.2", "react-helmet-async": "^1.3.0", diff --git a/airbyte-webapp/src/components/JobItem/components/MainInfo.tsx b/airbyte-webapp/src/components/JobItem/components/MainInfo.tsx index c6d363b3b4db7..171c7c549415d 100644 --- a/airbyte-webapp/src/components/JobItem/components/MainInfo.tsx +++ b/airbyte-webapp/src/components/JobItem/components/MainInfo.tsx @@ -4,6 +4,7 @@ import classNames from "classnames"; import React, { useMemo } from "react"; import { FormattedDateParts, FormattedMessage, FormattedTimeParts } from "react-intl"; +import { ProgressBar } from "components/ProgressBar"; import { Cell, Row } from "components/SimpleTableComponents"; import { StatusIcon } from "components/ui/StatusIcon"; @@ -87,6 +88,9 @@ const MainInfo: React.FC = ({ job, attempts = [], isOpen, onExpan
{statusIcon}
{label} +
+ +
{attempts.length > 0 && ( <> {jobConfigType === "reset_connection" ? ( diff --git a/airbyte-webapp/src/components/ProgressBar/ProgressBar.module.scss b/airbyte-webapp/src/components/ProgressBar/ProgressBar.module.scss new file mode 100644 index 0000000000000..b2db942f3fedb --- /dev/null +++ b/airbyte-webapp/src/components/ProgressBar/ProgressBar.module.scss @@ -0,0 +1,8 @@ +@use "../../scss/colors"; +@use "../../scss/variables"; + +.container { + font-size: 12px; + line-height: 15px; + color: colors.$grey; +} diff --git a/airbyte-webapp/src/components/ProgressBar/ProgressBar.tsx b/airbyte-webapp/src/components/ProgressBar/ProgressBar.tsx new file mode 100644 index 0000000000000..78f7f40f39e0b --- /dev/null +++ b/airbyte-webapp/src/components/ProgressBar/ProgressBar.tsx @@ -0,0 +1,250 @@ +import classNames from "classnames"; +import { Line } from "rc-progress"; +import { useState } from "react"; +import { useIntl, FormattedMessage } from "react-intl"; + +import { getJobStatus } from "components/JobItem/JobItem"; + +import { AttemptRead, JobConfigType, SynchronousJobRead } from "core/request/AirbyteClient"; +import Status from "core/statuses"; +import { JobsWithJobs } from "pages/ConnectionPage/pages/ConnectionItemPage/JobsList"; + +import styles from "./ProgressBar.module.scss"; + +function isJobsWithJobs(job: JobsWithJobs | SynchronousJobRead): job is JobsWithJobs { + return (job as JobsWithJobs).attempts !== undefined; +} + +const formatBytes = (bytes?: number) => { + if (!bytes) { + return ; + } + + const k = 1024; + const dm = 2; + const sizes = ["Bytes", "KB", "MB", "GB", "TB"]; + const i = Math.floor(Math.log(bytes) / Math.log(k)); + const result = parseFloat((bytes / Math.pow(k, i)).toFixed(dm)); + + return ; +}; + +const buttonUnStyle = { + background: "none", + color: "inherit", + border: "none", + padding: 0, + font: "inherit", + cursor: "pointer", + outline: "inherit", + textDecoration: "underline", +}; + +export const ProgressBar = ({ + job, + jobConfigType, +}: { + job: JobsWithJobs | SynchronousJobRead; + jobConfigType: JobConfigType; +}) => { + const { formatMessage, formatNumber } = useIntl(); + const [showStreams, setShowStreams] = useState(false); + + if (jobConfigType !== "sync") { + return null; + } + + let numeratorRecords = -1; + let denominatorRecords = -1; + let totalPercentRecords = -1; + let numeratorBytes = -1; + let denominatorBytes = -1; + // let totalPercentBytes = -1; + let elapsedTimeMS = -1; + let timeRemaining = -1; + let timeRemainingString = ""; + const unEstimatedStreams: string[] = []; + let latestAttempt: AttemptRead | undefined; + + const jobStatus = getJobStatus(job); + + // colors from `_colors.scss` TODO: Use the SCSS variables maybe? + let color = "white"; + switch (jobStatus) { + case "pending": + color = "#cbc8ff"; + break; + case "running": + color = "#cbc8ff"; + break; + case "incomplete": + color = "#fdf8e1"; + break; + case "failed": + color = "#e64228"; + return null; + case "succeeded": + color = "#67dae1"; + return null; + case "cancelled": + return null; + } + + if (isJobsWithJobs(job)) { + if (job.attempts) { + latestAttempt = job.attempts[job.attempts?.length - 1]; + let countTotalsFromStreams = true; + if ( + latestAttempt.totalStats?.recordsEmitted && + latestAttempt.totalStats?.estimatedRecords && + latestAttempt.totalStats?.bytesEmitted && + latestAttempt.totalStats?.estimatedBytes + ) { + countTotalsFromStreams = false; + numeratorRecords = latestAttempt.totalStats.recordsEmitted; + denominatorRecords = latestAttempt.totalStats.estimatedRecords; + numeratorBytes = latestAttempt.totalStats.bytesEmitted; + denominatorBytes = latestAttempt.totalStats.estimatedBytes; + } + + if (latestAttempt && !latestAttempt.totalStats && latestAttempt.streamStats) { + for (const stream of latestAttempt.streamStats) { + if (!stream.stats.recordsEmitted) { + unEstimatedStreams.push(`${stream.streamName}`); + } + if (countTotalsFromStreams) { + numeratorRecords += stream.stats.recordsEmitted ?? 0; + denominatorRecords += stream.stats.estimatedRecords ?? 0; + numeratorBytes += stream.stats.bytesEmitted ?? 0; + denominatorBytes += stream.stats.estimatedBytes ?? 0; + } + } + } + } + } else { + // TODO... maybe + } + + totalPercentRecords = denominatorRecords > 0 ? Math.floor((numeratorRecords * 100) / denominatorRecords) : 0; + + // chose to estimate time remaining based on records rather than bytes + if (latestAttempt && latestAttempt.status === Status.RUNNING) { + elapsedTimeMS = new Date().getTime() - latestAttempt.createdAt * 1000; + timeRemaining = Math.floor(elapsedTimeMS / totalPercentRecords) * (100 - totalPercentRecords); // in ms + const minutesRemaining = Math.ceil(timeRemaining / 1000 / 60); + const hoursRemaining = Math.ceil(minutesRemaining / 60); + if (minutesRemaining <= 60) { + timeRemainingString = `${minutesRemaining} ${formatMessage({ id: "estimate.minutesRemaining" })}`; + } else { + timeRemainingString = `${hoursRemaining} ${formatMessage({ id: "estimate.hoursRemaining" })}`; + } + } + + return ( +
+ {unEstimatedStreams.length === 0 && } + {latestAttempt?.status === Status.RUNNING && ( + <> + {unEstimatedStreams.length === 0 && ( +
+ {totalPercentRecords}% {timeRemaining < Infinity && timeRemaining > 0 ? `| ~${timeRemainingString}` : ""} +
+ )} + {unEstimatedStreams.length > 0 && ( +
+ {unEstimatedStreams.length} {formatMessage({ id: "estimate.unEstimatedStreams" })} +
+ )} + {denominatorRecords > 0 && ( + <> +
+ {formatNumber(numeratorRecords)}{" "} + {unEstimatedStreams.length > 0 ? "" : `/ ${formatNumber(denominatorRecords)}`}{" "} + {formatMessage({ id: "estimate.recordsSynced" })} @{" "} + {Math.round((numeratorRecords / elapsedTimeMS) * 1000)}{" "} + {formatMessage({ id: "estimate.recordsPerSecond" })} +
+
+ {formatBytes(numeratorBytes)}{" "} + {unEstimatedStreams.length > 0 ? ( + "" + ) : ( + <> + / + {formatBytes(denominatorBytes)} + + )}{" "} + {formatMessage({ id: "estimate.bytesSynced" })} @ {formatBytes((numeratorBytes * 1000) / elapsedTimeMS)} + {formatMessage({ id: "estimate.bytesPerSecond" })} +
+ + )} + + {latestAttempt.streamStats && !showStreams && ( +
+
+ +
+
+ )} + + {latestAttempt.streamStats && showStreams && ( +
+
+
+ {formatMessage({ + id: "estimate.streamStats", + })}{" "} + ( + + ): +
+ {latestAttempt.streamStats?.map((stream, idx) => { + const localNumerator = stream.stats.recordsEmitted; + const localDenominator = stream.stats.estimatedRecords; + + return ( +
+ {" - "} + {stream.streamName} -{" "} + {localNumerator && localDenominator + ? `${Math.round((localNumerator * 100) / localDenominator)}${formatMessage({ + id: "estimate.percentComplete", + })} (${formatNumber(localNumerator)} / ${formatNumber(localDenominator)} ${formatMessage({ + id: "estimate.recordsSynced", + })})` + : `${localNumerator} ${formatMessage({ id: "estimate.recordsSyncedThusFar" })} (no estimate)`} +
+ ); + })} +
+ )} + + )} +
+ ); +}; diff --git a/airbyte-webapp/src/components/ProgressBar/index.tsx b/airbyte-webapp/src/components/ProgressBar/index.tsx new file mode 100644 index 0000000000000..a550d58d84d62 --- /dev/null +++ b/airbyte-webapp/src/components/ProgressBar/index.tsx @@ -0,0 +1 @@ +export * from "./ProgressBar"; diff --git a/airbyte-webapp/src/locales/en.json b/airbyte-webapp/src/locales/en.json index e6f728bcd7c18..3b67de40516d5 100644 --- a/airbyte-webapp/src/locales/en.json +++ b/airbyte-webapp/src/locales/en.json @@ -550,6 +550,19 @@ "errorView.unknown": "Unknown", "errorView.unknownError": "Unknown error occurred", + "estimate.minutesRemaining": "minutes remaining", + "estimate.hoursRemaining": "hours remaining", + "estimate.recordsSynced": "records synced", + "estimate.recordsPerSecond": "records/sec", + "estimate.bytesSynced": "synced", + "estimate.bytesPerSecond": "/sec", + "estimate.unEstimatedStreams": "un-estimated streams", + "estimate.recordsSyncedThusFar": "records synced", + "estimate.viewStreamStats": "view stream stats", + "estimate.percentComplete": "% complete", + "estimate.streamStats": "Stream Stats", + "estimate.hide": "hide", + "frequency.manual": "Manual", "frequency.cron": "Cron", "frequency.minutes": "{value} min", diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index 92bd602330f3d..18ef348544474 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -221,6 +221,7 @@

Table of Contents

Attempt

Connection

@@ -281,6 +282,7 @@

Internal

Jobs

@@ -390,6 +392,58 @@

Workspace

Attempt

+
+
+ Up +
post /v1/attempt/save_stats
+
For worker to set running attempt stats. (saveStats)
+
+ + +

Consumes

+ This API call consumes the following media types via the Content-Type request header: +
    +
  • application/json
  • +
+ +

Request body

+
+
SaveStatsRequestBody SaveStatsRequestBody (required)
+ +
Body Parameter
+ +
+ + + + +

Return type

+ + + + +

Example data

+
Content-Type: application/json
+
{
+  "succeeded" : true
+}
+ +

Produces

+ This API call produces the following media types according to the Accept request header; + the media type will be conveyed by the Content-Type response header. +
    +
  • application/json
  • +
+ +

Responses

+

200

+ Successful Operation + InternalOperationResult +
+
Up @@ -1157,6 +1211,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "failureSummary" : { @@ -1165,13 +1221,13 @@

Example data

"stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 } ], "partialSuccess" : true }, @@ -1183,6 +1239,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -1191,6 +1249,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -1208,6 +1268,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "failureSummary" : { @@ -1216,13 +1278,13 @@

Example data

"stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 } ], "partialSuccess" : true }, @@ -1234,6 +1296,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -1242,6 +1306,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -1515,6 +1581,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "failureSummary" : { @@ -1523,13 +1591,13 @@

Example data

"stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 } ], "partialSuccess" : true }, @@ -1541,6 +1609,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -1549,6 +1619,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -1566,6 +1638,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "failureSummary" : { @@ -1574,13 +1648,13 @@

Example data

"stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 } ], "partialSuccess" : true }, @@ -1592,6 +1666,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -1600,6 +1676,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -3980,6 +4058,58 @@

200

AttemptNormalizationStatusReadList

+
+
+ Up +
post /v1/attempt/save_stats
+
For worker to set running attempt stats. (saveStats)
+
+ + +

Consumes

+ This API call consumes the following media types via the Content-Type request header: +
    +
  • application/json
  • +
+ +

Request body

+
+
SaveStatsRequestBody SaveStatsRequestBody (required)
+ +
Body Parameter
+ +
+ + + + +

Return type

+ + + + +

Example data

+
Content-Type: application/json
+
{
+  "succeeded" : true
+}
+ +

Produces

+ This API call produces the following media types according to the Accept request header; + the media type will be conveyed by the Content-Type response header. +
    +
  • application/json
  • +
+ +

Responses

+

200

+ Successful Operation + InternalOperationResult +
+
Up @@ -4090,6 +4220,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "failureSummary" : { @@ -4098,13 +4230,13 @@

Example data

"stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 } ], "partialSuccess" : true }, @@ -4116,6 +4248,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4124,6 +4258,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4141,6 +4277,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "failureSummary" : { @@ -4149,13 +4287,13 @@

Example data

"stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 } ], "partialSuccess" : true }, @@ -4167,6 +4305,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4175,6 +4315,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4384,6 +4526,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "failureSummary" : { @@ -4392,13 +4536,13 @@

Example data

"stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 } ], "partialSuccess" : true }, @@ -4410,6 +4554,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4418,6 +4564,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4435,6 +4583,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "failureSummary" : { @@ -4443,13 +4593,13 @@

Example data

"stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 } ], "partialSuccess" : true }, @@ -4461,6 +4611,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4469,6 +4621,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4559,6 +4713,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "failureSummary" : { @@ -4567,13 +4723,13 @@

Example data

"stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 } ], "partialSuccess" : true }, @@ -4585,6 +4741,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4593,6 +4751,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4610,6 +4770,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "failureSummary" : { @@ -4618,13 +4780,13 @@

Example data

"stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 } ], "partialSuccess" : true }, @@ -4636,6 +4798,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4644,6 +4808,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4807,6 +4973,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "failureSummary" : { @@ -4815,13 +4983,13 @@

Example data

"stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 } ], "partialSuccess" : true }, @@ -4833,6 +5001,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4841,6 +5011,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4853,6 +5025,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "failureSummary" : { @@ -4861,13 +5035,13 @@

Example data

"stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 } ], "partialSuccess" : true }, @@ -4879,6 +5053,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4887,6 +5063,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4916,6 +5094,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "failureSummary" : { @@ -4924,13 +5104,13 @@

Example data

"stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 } ], "partialSuccess" : true }, @@ -4942,6 +5122,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4950,6 +5132,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4962,6 +5146,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "failureSummary" : { @@ -4970,13 +5156,13 @@

Example data

"stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 }, { "retryable" : true, "stacktrace" : "stacktrace", "internalMessage" : "internalMessage", "externalMessage" : "externalMessage", - "timestamp" : 1 + "timestamp" : 6 } ], "partialSuccess" : true }, @@ -4988,6 +5174,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -4996,6 +5184,8 @@

Example data

"stateMessagesEmitted" : 7, "recordsCommitted" : 1, "bytesEmitted" : 4, + "estimatedBytes" : 1, + "estimatedRecords" : 1, "recordsEmitted" : 2 }, "streamName" : "streamName" @@ -10094,6 +10284,7 @@

Table of Contents

  • ReleaseStage -
  • ResetConfig -
  • ResourceRequirements -
  • +
  • SaveStatsRequestBody -
  • SchemaChange -
  • SetInstancewideDestinationOauthParamsRequestBody -
  • SetInstancewideSourceOauthParamsRequestBody -
  • @@ -10288,6 +10479,8 @@

    AttemptStats - bytesEmitted (optional)

    Long format: int64
    stateMessagesEmitted (optional)
    Long format: int64
    recordsCommitted (optional)
    Long format: int64
    +
    estimatedRecords (optional)
    Long format: int64
    +
    estimatedBytes (optional)
    Long format: int64
    +
    +

    SaveStatsRequestBody - Up

    +
    +
    +
    jobId
    Long format: int64
    +
    attemptNumber
    Integer format: int32
    +
    stats
    +
    streamStats (optional)
    +
    +

    SchemaChange - Up

    diff --git a/docs/understanding-airbyte/airbyte-protocol.md b/docs/understanding-airbyte/airbyte-protocol.md index 381e03f05aa14..c34dbf726442d 100644 --- a/docs/understanding-airbyte/airbyte-protocol.md +++ b/docs/understanding-airbyte/airbyte-protocol.md @@ -28,6 +28,7 @@ The Airbyte Protocol is versioned independently of the Airbyte Platform, and the | Version | Date of Change | Pull Request(s) | Subject | | :------- | :------------- | :------------------------------------------------------------------------------------------------------------------ | :------------------------------------------------------------------------------- | +| `v0.3.2` | 2022-10-128 | [xxx](https://github.com/airbytehq/airbyte/pull/xxx) | `AirbyteEstimateTraceMessage` added | | `v0.3.1` | 2022-10-12 | [17907](https://github.com/airbytehq/airbyte/pull/17907) | `AirbyteControlMessage.ConnectorConfig` added | | `v0.3.0` | 2022-09-09 | [16479](https://github.com/airbytehq/airbyte/pull/16479) | `AirbyteLogMessage.stack_trace` added | | `v0.2.0` | 2022-06-10 | [13573](https://github.com/airbytehq/airbyte/pull/13573) & [12586](https://github.com/airbytehq/airbyte/pull/12586) | `STREAM` and `GLOBAL` STATE messages |