From abb103d17ca55fa99e54ebf2859d3cdbc57aadfc Mon Sep 17 00:00:00 2001 From: "Ryan Br..." Date: Wed, 29 Jan 2025 12:54:19 -0800 Subject: [PATCH] Rbroughan/augustin/01 23 destination s3 use the non root base image (#52579) Co-authored-by: alafanechere Co-authored-by: Octavia Squidington III --- .../airbyte/cdk/extensions/PathExtensions.kt | 26 +++++++++++++++++ .../cdk/load/file/SpillFileProvider.kt | 4 +-- .../DockerizedDestination.kt | 29 ++++++++++--------- .../airbyte/cdk/extensions/PathExtensions.kt | 26 +++++++++++++++++ .../BaseDestinationAcceptanceTest.kt | 15 ++++++---- .../connectors/destination-s3/metadata.yaml | 4 +-- .../s3/S3V2FileTransferDestinationTest.kt | 16 +++++----- docs/integrations/destinations/s3.md | 1 + 8 files changed, 90 insertions(+), 31 deletions(-) create mode 100644 airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/extensions/PathExtensions.kt create mode 100644 airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/PathExtensions.kt diff --git a/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/extensions/PathExtensions.kt b/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/extensions/PathExtensions.kt new file mode 100644 index 000000000000..b4330f59ff29 --- /dev/null +++ b/airbyte-cdk/bulk/core/base/src/testFixtures/kotlin/io/airbyte/cdk/extensions/PathExtensions.kt @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.extensions + +import java.nio.file.Path +import java.nio.file.attribute.PosixFilePermission +import kotlin.io.path.setPosixFilePermissions + +fun Path.grantAllPermissions(): Path { + this.setPosixFilePermissions( + setOf( + PosixFilePermission.OWNER_READ, + PosixFilePermission.OWNER_WRITE, + PosixFilePermission.OWNER_EXECUTE, + PosixFilePermission.GROUP_READ, + PosixFilePermission.GROUP_WRITE, + PosixFilePermission.GROUP_EXECUTE, + PosixFilePermission.OTHERS_READ, + PosixFilePermission.OTHERS_WRITE, + PosixFilePermission.OTHERS_EXECUTE, + ), + ) + return this +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/file/SpillFileProvider.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/file/SpillFileProvider.kt index 0d5607099f01..dc1ea0529bbb 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/file/SpillFileProvider.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/file/SpillFileProvider.kt @@ -18,8 +18,6 @@ interface SpillFileProvider { @Secondary class DefaultSpillFileProvider(val config: DestinationConfiguration) : SpillFileProvider { override fun createTempFile(): Path { - val directory = config.tmpFileDirectory - Files.createDirectories(directory) - return Files.createTempFile(directory, "staged-raw-records", ".jsonl") + return Files.createTempFile("staged-raw-records", ".jsonl") } } diff --git a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt index 056f8e5f553d..ca13266c99f8 100644 --- a/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt +++ b/airbyte-cdk/bulk/core/load/src/testFixtures/kotlin/io/airbyte/cdk/load/test/util/destination_process/DockerizedDestination.kt @@ -5,6 +5,7 @@ package io.airbyte.cdk.load.test.util.destination_process import io.airbyte.cdk.command.FeatureFlag +import io.airbyte.cdk.extensions.grantAllPermissions import io.airbyte.cdk.load.command.Property import io.airbyte.cdk.load.util.deserializeToClass import io.airbyte.cdk.load.util.serializeToJsonBytes @@ -59,7 +60,8 @@ class DockerizedDestination( private val stdoutDrained = CompletableDeferred() private val stderrDrained = CompletableDeferred() - private val fileTransferMountSource = Files.createTempDirectory("tmp") + // Mainly, used for file transfer but there are other consumers, name the AWS CRT HTTP client. + private val tmpDir = Files.createTempDirectory("tmp").grantAllPermissions() init { // This is largely copied from the old cdk's DockerProcessFactory / @@ -69,25 +71,23 @@ class DockerizedDestination( // the actual platform, and we don't need it here. val testDir = Path.of("/tmp/airbyte_tests/") Files.createDirectories(testDir) + // Allow ourselves and our connector access to our test dir + testDir.grantAllPermissions() val workspaceRoot = Files.createTempDirectory(testDir, "test") - // This directory gets mounted to the docker container, - // presumably so that we can extract some files out of it? - // It's unclear to me that we actually need to do this... - // Certainly nothing in the bulk CDK's test suites is reading back - // anything in this directory. - val localRoot = Files.createTempDirectory(testDir, "output") + workspaceRoot.grantAllPermissions() // This directory will contain the actual inputs to the connector (config+catalog), // and is also mounted as a volume. val jobDir = "job" val jobRoot = Files.createDirectories(workspaceRoot.resolve(jobDir)) + jobRoot.grantAllPermissions() val containerDataRoot = "/data" val containerJobRoot = "$containerDataRoot/$jobDir" // This directory is being used for the file transfer feature. if (useFileTransfer) { - val file = Files.createFile(fileTransferMountSource.resolve("test_file")) + val file = Files.createFile(tmpDir.resolve("test_file")) file.writeText("123") } // Extract the string "destination-foo" from "gcr.io/airbyte/destination-foo:1.2.3". @@ -117,7 +117,7 @@ class DockerizedDestination( "-i", "-w", // In real syncs, platform changes the workdir to /dest for destinations. - "/dest", + testDir.toString(), "--log-driver", "none", "--name", @@ -127,9 +127,7 @@ class DockerizedDestination( "-v", String.format("%s:%s", workspaceRoot, containerDataRoot), "-v", - String.format("%s:%s", localRoot, "/local"), - "-v", - "$fileTransferMountSource:/tmp", + "$tmpDir:/tmp", ) + additionalEnvEntries + featureFlags.flatMap { listOf("-e", it.envVarBindingDeclaration) } + @@ -146,10 +144,13 @@ class DockerizedDestination( .toMutableList() fun addInput(paramName: String, fileContents: ByteArray) { + val path = jobRoot.resolve("destination_$paramName.json") Files.write( - jobRoot.resolve("destination_$paramName.json"), + path, fileContents, ) + path.grantAllPermissions() + cmd.add("--$paramName") cmd.add("$containerJobRoot/destination_$paramName.json") } @@ -272,7 +273,7 @@ class DockerizedDestination( } override fun verifyFileDeleted() { - val file = File(fileTransferMountSource.resolve("test_file").toUri()) + val file = File(tmpDir.resolve("test_file").toUri()) assertFalse(file.exists()) } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/PathExtensions.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/PathExtensions.kt new file mode 100644 index 000000000000..b4330f59ff29 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/PathExtensions.kt @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.extensions + +import java.nio.file.Path +import java.nio.file.attribute.PosixFilePermission +import kotlin.io.path.setPosixFilePermissions + +fun Path.grantAllPermissions(): Path { + this.setPosixFilePermissions( + setOf( + PosixFilePermission.OWNER_READ, + PosixFilePermission.OWNER_WRITE, + PosixFilePermission.OWNER_EXECUTE, + PosixFilePermission.GROUP_READ, + PosixFilePermission.GROUP_WRITE, + PosixFilePermission.GROUP_EXECUTE, + PosixFilePermission.OTHERS_READ, + PosixFilePermission.OTHERS_WRITE, + PosixFilePermission.OTHERS_EXECUTE, + ), + ) + return this +} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt index 791cce057db3..5ac9d3d76cde 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/BaseDestinationAcceptanceTest.kt @@ -6,6 +6,7 @@ package io.airbyte.cdk.integrations.standardtest.destination import com.fasterxml.jackson.databind.JsonNode import edu.umd.cs.findbugs.annotations.SuppressFBWarnings +import io.airbyte.cdk.extensions.grantAllPermissions import io.airbyte.commons.features.EnvVariableFeatureFlags import io.airbyte.commons.features.FeatureFlags import io.airbyte.commons.features.FeatureFlagsWrapper @@ -247,10 +248,12 @@ abstract class BaseDestinationAcceptanceTest( @Throws(Exception::class) open fun setUpInternal() { val testDir = Path.of("/tmp/airbyte_tests/") - Files.createDirectories(testDir) - val workspaceRoot = Files.createTempDirectory(testDir, "test") - jobRoot = Files.createDirectories(Path.of(workspaceRoot.toString(), "job")) - localRoot = Files.createTempDirectory(testDir, "output") + // Allow ourselves and our connector access to our test dir + Files.createDirectories(testDir).grantAllPermissions() + val workspaceRoot = Files.createTempDirectory(testDir, "test").grantAllPermissions() + jobRoot = + Files.createDirectories(Path.of(workspaceRoot.toString(), "job")).grantAllPermissions() + localRoot = Files.createTempDirectory(testDir, "output").grantAllPermissions() LOGGER.info { "${"jobRoot: {}"} $jobRoot" } LOGGER.info { "${"localRoot: {}"} $localRoot" } testEnv = DestinationAcceptanceTest.TestDestinationEnv(localRoot) @@ -258,7 +261,9 @@ abstract class BaseDestinationAcceptanceTest( testSchemas = HashSet() setup(testEnv, testSchemas) fileTransferMountSource = - if (supportsFileTransfer) Files.createTempDirectory(testDir, "file_transfer") else null + if (supportsFileTransfer) + Files.createTempDirectory(testDir, "file_transfer").grantAllPermissions() + else null processFactory = DockerProcessFactory( diff --git a/airbyte-integrations/connectors/destination-s3/metadata.yaml b/airbyte-integrations/connectors/destination-s3/metadata.yaml index fcf4034d094f..ab469471c635 100644 --- a/airbyte-integrations/connectors/destination-s3/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3/metadata.yaml @@ -2,14 +2,14 @@ data: connectorSubtype: file connectorType: destination definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 - dockerImageTag: 1.5.0-rc.14 + dockerImageTag: 1.5.0-rc.15 dockerRepository: airbyte/destination-s3 githubIssueLabel: destination-s3 icon: s3.svg license: ELv2 name: S3 connectorBuildOptions: - baseImage: docker.io/airbyte/java-connector-base:1.0.0@sha256:be86e5684e1e6d9280512d3d8071b47153698fe08ad990949c8eeff02803201a + baseImage: docker.io/airbyte/java-connector-base:2.0.0@sha256:5a1a21c75c5e1282606de9fa539ba136520abe2fbd013058e988bb0297a9f454 registryOverrides: cloud: enabled: true diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2FileTransferDestinationTest.kt b/airbyte-integrations/connectors/destination-s3/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2FileTransferDestinationTest.kt index 0795cca4ad31..c96478817a33 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2FileTransferDestinationTest.kt +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration-legacy/kotlin/io/airbyte/integrations/destination/s3/S3V2FileTransferDestinationTest.kt @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.node.JsonNodeFactory import com.fasterxml.jackson.databind.node.ObjectNode import com.google.common.collect.ImmutableMap +import io.airbyte.cdk.extensions.grantAllPermissions import io.airbyte.cdk.integrations.destination.async.model.AirbyteRecordMessageFile import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat import io.airbyte.cdk.integrations.destination.s3.S3BaseDestinationAcceptanceTest @@ -79,12 +80,13 @@ class S3V2FileTransferDestinationTest : S3BaseDestinationAcceptanceTest() { val filePath = "$dirPath/$fileName" val fileSize = 1_024 * 1_024 - fileTransferMountSource!!.resolve(dirPath).createDirectories() - val absoluteFilePath = - fileTransferMountSource!! - .resolve(filePath) - .createFile() - .writeText(RandomStringUtils.insecure().nextAlphanumeric(fileSize)) + val absoluteDirPath = fileTransferMountSource!!.resolve(dirPath).createDirectories() + val absoluteFilePath = fileTransferMountSource!!.resolve(filePath).createFile() + + absoluteDirPath.grantAllPermissions() + absoluteFilePath.grantAllPermissions() + absoluteFilePath.writeText(RandomStringUtils.insecure().nextAlphanumeric(fileSize)) + return Path.of(filePath) } @@ -197,7 +199,6 @@ class S3V2FileTransferDestinationTest : S3BaseDestinationAcceptanceTest() { .getUserMetaDataOf(S3StorageOperations.GENERATION_ID_USER_META_KEY) .toLong() assertEquals(generationId, 32L) - assertFalse(file.exists(), "file should have been deleted by the connector") assertEquals(fileLength, objectInStore.size) assertEquals("$testBucketPath/$streamName/${filePath.toString()}", objectInStore.key) assertContentEquals( @@ -207,5 +208,6 @@ class S3V2FileTransferDestinationTest : S3BaseDestinationAcceptanceTest() { .objectContent .readBytes() ) + assertFalse(file.exists(), "file should have been deleted by the connector") } } diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index fe78c43ee91c..bbc10bcf7260 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -544,6 +544,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou | Version | Date | Pull Request | Subject | |:------------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.5.0-rc.15 | 2025-01-23 | [52103](https://github.com/airbytehq/airbyte/pull/52103) | Make the connector use our non root base image. | | 1.5.0-rc.14 | 2025-01-24 | [51600](https://github.com/airbytehq/airbyte/pull/51600) | Internal refactor | | 1.5.0-rc.13 | 2025-01-22 | [52076](https://github.com/airbytehq/airbyte/pull/52076) | Test improvements. | | 1.5.0-rc.12 | 2025-01-22 | [52072](https://github.com/airbytehq/airbyte/pull/52072) | Bug fix: Configure OpenStreamTask concurrency to handle connection to reduce http connection errors. |