From dbc05b808b55d5e8463248cca73f3cdfeaf12c4d Mon Sep 17 00:00:00 2001 From: Yandry Perez Clemente <99700024+ypc-faros@users.noreply.github.com> Date: Wed, 12 Feb 2025 13:16:35 -0500 Subject: [PATCH] FAI-14708 Mock data feed should reset models BEFORE writing new records --- .../airbyte-faros-destination/src/destination.ts | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/destinations/airbyte-faros-destination/src/destination.ts b/destinations/airbyte-faros-destination/src/destination.ts index 8d775af15..dc99e4a23 100644 --- a/destinations/airbyte-faros-destination/src/destination.ts +++ b/destinations/airbyte-faros-destination/src/destination.ts @@ -816,6 +816,13 @@ export class FarosDestination extends AirbyteDestination { msg.sourceType === 'faros-feeds' ? msg.sourceMode : msg.sourceType; + + if (sourceModeOrType === 'mock-data-feed') { + this.logger.info('Running a mock data feed sync. Resetting all models before writing records.'); + ctx.markAllStreamsForReset(); + await resetData?.(isResetSync); + } + await updateLocalAccount?.(msg); if (sourceVersion) sourceVersion.version = msg.sourceVersion; ctx.setSourceConfig(msg.redactedConfig); @@ -968,6 +975,7 @@ export class FarosDestination extends AirbyteDestination { isBackfillSync, isFarosSource: sourceConfigReceived, sourceSucceeded, + sourceModeOrType, }) ) { if (!streamStatusReceived) { @@ -1010,6 +1018,7 @@ export class FarosDestination extends AirbyteDestination { isBackfillSync: boolean; isFarosSource: boolean; sourceSucceeded: boolean; + sourceModeOrType: string; }): boolean { const { syncErrors, @@ -1019,6 +1028,7 @@ export class FarosDestination extends AirbyteDestination { isBackfillSync, isFarosSource, sourceSucceeded, + sourceModeOrType, } = syncInfo; if (isBackfillSync) { this.logger.info( @@ -1026,6 +1036,10 @@ export class FarosDestination extends AirbyteDestination { ); return false; } + if (sourceModeOrType === 'mock-data-feed') { + // Reset is performed BEFORE writing records, so we don't need to reset again + return false; + } if (streamStatusReceived) { if (syncErrors.dst.length) { this.logger.warn(