Skip to content

Commit

Permalink
Rbroughan/augustin/01 23 destination s3 use the non root base image (#…
Browse files Browse the repository at this point in the history
…52579)

Co-authored-by: alafanechere <[email protected]>
Co-authored-by: Octavia Squidington III <[email protected]>
  • Loading branch information
3 people authored Jan 29, 2025
1 parent 530fe82 commit abb103d
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.extensions

import java.nio.file.Path
import java.nio.file.attribute.PosixFilePermission
import kotlin.io.path.setPosixFilePermissions

fun Path.grantAllPermissions(): Path {
this.setPosixFilePermissions(
setOf(
PosixFilePermission.OWNER_READ,
PosixFilePermission.OWNER_WRITE,
PosixFilePermission.OWNER_EXECUTE,
PosixFilePermission.GROUP_READ,
PosixFilePermission.GROUP_WRITE,
PosixFilePermission.GROUP_EXECUTE,
PosixFilePermission.OTHERS_READ,
PosixFilePermission.OTHERS_WRITE,
PosixFilePermission.OTHERS_EXECUTE,
),
)
return this
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ interface SpillFileProvider {
@Secondary
class DefaultSpillFileProvider(val config: DestinationConfiguration) : SpillFileProvider {
override fun createTempFile(): Path {
val directory = config.tmpFileDirectory
Files.createDirectories(directory)
return Files.createTempFile(directory, "staged-raw-records", ".jsonl")
return Files.createTempFile("staged-raw-records", ".jsonl")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.cdk.load.test.util.destination_process

import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.extensions.grantAllPermissions
import io.airbyte.cdk.load.command.Property
import io.airbyte.cdk.load.util.deserializeToClass
import io.airbyte.cdk.load.util.serializeToJsonBytes
Expand Down Expand Up @@ -59,7 +60,8 @@ class DockerizedDestination(

private val stdoutDrained = CompletableDeferred<Unit>()
private val stderrDrained = CompletableDeferred<Unit>()
private val fileTransferMountSource = Files.createTempDirectory("tmp")
// Mainly, used for file transfer but there are other consumers, name the AWS CRT HTTP client.
private val tmpDir = Files.createTempDirectory("tmp").grantAllPermissions()

init {
// This is largely copied from the old cdk's DockerProcessFactory /
Expand All @@ -69,25 +71,23 @@ class DockerizedDestination(
// the actual platform, and we don't need it here.
val testDir = Path.of("/tmp/airbyte_tests/")
Files.createDirectories(testDir)
// Allow ourselves and our connector access to our test dir
testDir.grantAllPermissions()
val workspaceRoot = Files.createTempDirectory(testDir, "test")
// This directory gets mounted to the docker container,
// presumably so that we can extract some files out of it?
// It's unclear to me that we actually need to do this...
// Certainly nothing in the bulk CDK's test suites is reading back
// anything in this directory.
val localRoot = Files.createTempDirectory(testDir, "output")
workspaceRoot.grantAllPermissions()

// This directory will contain the actual inputs to the connector (config+catalog),
// and is also mounted as a volume.
val jobDir = "job"
val jobRoot = Files.createDirectories(workspaceRoot.resolve(jobDir))
jobRoot.grantAllPermissions()

val containerDataRoot = "/data"
val containerJobRoot = "$containerDataRoot/$jobDir"

// This directory is being used for the file transfer feature.
if (useFileTransfer) {
val file = Files.createFile(fileTransferMountSource.resolve("test_file"))
val file = Files.createFile(tmpDir.resolve("test_file"))
file.writeText("123")
}
// Extract the string "destination-foo" from "gcr.io/airbyte/destination-foo:1.2.3".
Expand Down Expand Up @@ -117,7 +117,7 @@ class DockerizedDestination(
"-i",
"-w",
// In real syncs, platform changes the workdir to /dest for destinations.
"/dest",
testDir.toString(),
"--log-driver",
"none",
"--name",
Expand All @@ -127,9 +127,7 @@ class DockerizedDestination(
"-v",
String.format("%s:%s", workspaceRoot, containerDataRoot),
"-v",
String.format("%s:%s", localRoot, "/local"),
"-v",
"$fileTransferMountSource:/tmp",
"$tmpDir:/tmp",
) +
additionalEnvEntries +
featureFlags.flatMap { listOf("-e", it.envVarBindingDeclaration) } +
Expand All @@ -146,10 +144,13 @@ class DockerizedDestination(
.toMutableList()

fun addInput(paramName: String, fileContents: ByteArray) {
val path = jobRoot.resolve("destination_$paramName.json")
Files.write(
jobRoot.resolve("destination_$paramName.json"),
path,
fileContents,
)
path.grantAllPermissions()

cmd.add("--$paramName")
cmd.add("$containerJobRoot/destination_$paramName.json")
}
Expand Down Expand Up @@ -272,7 +273,7 @@ class DockerizedDestination(
}

override fun verifyFileDeleted() {
val file = File(fileTransferMountSource.resolve("test_file").toUri())
val file = File(tmpDir.resolve("test_file").toUri())
assertFalse(file.exists())
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.extensions

import java.nio.file.Path
import java.nio.file.attribute.PosixFilePermission
import kotlin.io.path.setPosixFilePermissions

fun Path.grantAllPermissions(): Path {
this.setPosixFilePermissions(
setOf(
PosixFilePermission.OWNER_READ,
PosixFilePermission.OWNER_WRITE,
PosixFilePermission.OWNER_EXECUTE,
PosixFilePermission.GROUP_READ,
PosixFilePermission.GROUP_WRITE,
PosixFilePermission.GROUP_EXECUTE,
PosixFilePermission.OTHERS_READ,
PosixFilePermission.OTHERS_WRITE,
PosixFilePermission.OTHERS_EXECUTE,
),
)
return this
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.cdk.integrations.standardtest.destination

import com.fasterxml.jackson.databind.JsonNode
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.extensions.grantAllPermissions
import io.airbyte.commons.features.EnvVariableFeatureFlags
import io.airbyte.commons.features.FeatureFlags
import io.airbyte.commons.features.FeatureFlagsWrapper
Expand Down Expand Up @@ -247,18 +248,22 @@ abstract class BaseDestinationAcceptanceTest(
@Throws(Exception::class)
open fun setUpInternal() {
val testDir = Path.of("/tmp/airbyte_tests/")
Files.createDirectories(testDir)
val workspaceRoot = Files.createTempDirectory(testDir, "test")
jobRoot = Files.createDirectories(Path.of(workspaceRoot.toString(), "job"))
localRoot = Files.createTempDirectory(testDir, "output")
// Allow ourselves and our connector access to our test dir
Files.createDirectories(testDir).grantAllPermissions()
val workspaceRoot = Files.createTempDirectory(testDir, "test").grantAllPermissions()
jobRoot =
Files.createDirectories(Path.of(workspaceRoot.toString(), "job")).grantAllPermissions()
localRoot = Files.createTempDirectory(testDir, "output").grantAllPermissions()
LOGGER.info { "${"jobRoot: {}"} $jobRoot" }
LOGGER.info { "${"localRoot: {}"} $localRoot" }
testEnv = DestinationAcceptanceTest.TestDestinationEnv(localRoot)
mConnectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater::class.java)
testSchemas = HashSet()
setup(testEnv, testSchemas)
fileTransferMountSource =
if (supportsFileTransfer) Files.createTempDirectory(testDir, "file_transfer") else null
if (supportsFileTransfer)
Files.createTempDirectory(testDir, "file_transfer").grantAllPermissions()
else null

processFactory =
DockerProcessFactory(
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/destination-s3/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ data:
connectorSubtype: file
connectorType: destination
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerImageTag: 1.5.0-rc.14
dockerImageTag: 1.5.0-rc.15
dockerRepository: airbyte/destination-s3
githubIssueLabel: destination-s3
icon: s3.svg
license: ELv2
name: S3
connectorBuildOptions:
baseImage: docker.io/airbyte/java-connector-base:1.0.0@sha256:be86e5684e1e6d9280512d3d8071b47153698fe08ad990949c8eeff02803201a
baseImage: docker.io/airbyte/java-connector-base:2.0.0@sha256:5a1a21c75c5e1282606de9fa539ba136520abe2fbd013058e988bb0297a9f454
registryOverrides:
cloud:
enabled: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.JsonNodeFactory
import com.fasterxml.jackson.databind.node.ObjectNode
import com.google.common.collect.ImmutableMap
import io.airbyte.cdk.extensions.grantAllPermissions
import io.airbyte.cdk.integrations.destination.async.model.AirbyteRecordMessageFile
import io.airbyte.cdk.integrations.destination.s3.FileUploadFormat
import io.airbyte.cdk.integrations.destination.s3.S3BaseDestinationAcceptanceTest
Expand Down Expand Up @@ -79,12 +80,13 @@ class S3V2FileTransferDestinationTest : S3BaseDestinationAcceptanceTest() {
val filePath = "$dirPath/$fileName"
val fileSize = 1_024 * 1_024

fileTransferMountSource!!.resolve(dirPath).createDirectories()
val absoluteFilePath =
fileTransferMountSource!!
.resolve(filePath)
.createFile()
.writeText(RandomStringUtils.insecure().nextAlphanumeric(fileSize))
val absoluteDirPath = fileTransferMountSource!!.resolve(dirPath).createDirectories()
val absoluteFilePath = fileTransferMountSource!!.resolve(filePath).createFile()

absoluteDirPath.grantAllPermissions()
absoluteFilePath.grantAllPermissions()
absoluteFilePath.writeText(RandomStringUtils.insecure().nextAlphanumeric(fileSize))

return Path.of(filePath)
}

Expand Down Expand Up @@ -197,7 +199,6 @@ class S3V2FileTransferDestinationTest : S3BaseDestinationAcceptanceTest() {
.getUserMetaDataOf(S3StorageOperations.GENERATION_ID_USER_META_KEY)
.toLong()
assertEquals(generationId, 32L)
assertFalse(file.exists(), "file should have been deleted by the connector")
assertEquals(fileLength, objectInStore.size)
assertEquals("$testBucketPath/$streamName/${filePath.toString()}", objectInStore.key)
assertContentEquals(
Expand All @@ -207,5 +208,6 @@ class S3V2FileTransferDestinationTest : S3BaseDestinationAcceptanceTest() {
.objectContent
.readBytes()
)
assertFalse(file.exists(), "file should have been deleted by the connector")
}
}
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou

| Version | Date | Pull Request | Subject |
|:------------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.5.0-rc.15 | 2025-01-23 | [52103](https://github.com/airbytehq/airbyte/pull/52103) | Make the connector use our non root base image. |
| 1.5.0-rc.14 | 2025-01-24 | [51600](https://github.com/airbytehq/airbyte/pull/51600) | Internal refactor |
| 1.5.0-rc.13 | 2025-01-22 | [52076](https://github.com/airbytehq/airbyte/pull/52076) | Test improvements. |
| 1.5.0-rc.12 | 2025-01-22 | [52072](https://github.com/airbytehq/airbyte/pull/52072) | Bug fix: Configure OpenStreamTask concurrency to handle connection to reduce http connection errors. |
Expand Down

0 comments on commit abb103d

Please sign in to comment.