diff --git a/README.md b/README.md index ffde1cfb8..4c9b580d7 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,7 @@ executed repeatedly. Moreover, the locks are time-based and ShedLock assumes tha - [In-Memory](#in-memory) - [Memcached](#memcached-using-spymemcached) - [Datastore](#datastore) + - [S3](#s3) + [Multi-tenancy](#multi-tenancy) + [Customization](#customization) + [Duration specification](#duration-specification) @@ -885,6 +886,28 @@ public LockProvider lockProvider(DatabaseClient databaseClient) { } ``` +#### S3 + +Import the project +```xml + + net.javacrumbs.shedlock + shedlock-provider-s3 + 6.0.2 + +``` + +and configure +```java +import net.javacrumbs.shedlock.provider.s3.S3LockProvider; + +... + +@Bean +public LockProvider lockProvider(com.amazonaws.services.s3.AmazonS3 amazonS3) { + return new S3LockProvider(amazonS3, "BUCKET_NAME"); +} +``` ## Multi-tenancy If you have multi-tenancy use-case you can use a lock provider similar to this one diff --git a/pom.xml b/pom.xml index e60664951..4c30dbf88 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,7 @@ providers/datastore/shedlock-provider-datastore providers/spanner/shedlock-provider-spanner providers/neo4j/shedlock-provider-neo4j + providers/s3/shedlock-provider-s3 diff --git a/providers/s3/shedlock-provider-s3/pom.xml b/providers/s3/shedlock-provider-s3/pom.xml new file mode 100644 index 000000000..f8364f2ec --- /dev/null +++ b/providers/s3/shedlock-provider-s3/pom.xml @@ -0,0 +1,77 @@ + + + + shedlock-parent + net.javacrumbs.shedlock + 6.0.3-SNAPSHOT + ../../../pom.xml + + 4.0.0 + + shedlock-provider-s3 + 6.0.3-SNAPSHOT + + + 1.12.747 + + + + + net.javacrumbs.shedlock + shedlock-core + ${project.version} + + + + com.amazonaws + aws-java-sdk-s3 + ${aws-java-sdk-s3.version} + + + + org.testcontainers + junit-jupiter + ${test-containers.ver} + test + + + + org.testcontainers + localstack + ${test-containers.ver} + test + + + + net.javacrumbs.shedlock + shedlock-test-support + ${project.version} + test + + + + ch.qos.logback + logback-classic + ${logback.ver} + test + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + + net.javacrumbs.shedlock.provider.s3 + + + + + + + + diff --git a/providers/s3/shedlock-provider-s3/src/main/java/net/javacrumbs/shedlock/provider/s3/Lock.java b/providers/s3/shedlock-provider-s3/src/main/java/net/javacrumbs/shedlock/provider/s3/Lock.java new file mode 100644 index 000000000..d62371f6e --- /dev/null +++ b/providers/s3/shedlock-provider-s3/src/main/java/net/javacrumbs/shedlock/provider/s3/Lock.java @@ -0,0 +1,5 @@ +package net.javacrumbs.shedlock.provider.s3; + +import java.time.Instant; + +record Lock(Instant lockUntil, Instant lockedAt, String lockedBy, String eTag) {} diff --git a/providers/s3/shedlock-provider-s3/src/main/java/net/javacrumbs/shedlock/provider/s3/S3LockProvider.java b/providers/s3/shedlock-provider-s3/src/main/java/net/javacrumbs/shedlock/provider/s3/S3LockProvider.java new file mode 100644 index 000000000..050fc8c5c --- /dev/null +++ b/providers/s3/shedlock-provider-s3/src/main/java/net/javacrumbs/shedlock/provider/s3/S3LockProvider.java @@ -0,0 +1,31 @@ +package net.javacrumbs.shedlock.provider.s3; + +import com.amazonaws.services.s3.AmazonS3; +import net.javacrumbs.shedlock.support.StorageBasedLockProvider; + +/** + * Lock provider implementation for S3. + */ +public class S3LockProvider extends StorageBasedLockProvider { + + /** + * Constructs an S3LockProvider. + * + * @param s3Client Amazon S3 client used to interact with the S3 bucket. + * @param bucketName The name of the S3 bucket where locks are stored. + * @param objectPrefix The prefix of the S3 object lock. + */ + public S3LockProvider(AmazonS3 s3Client, String bucketName, String objectPrefix) { + super(new S3StorageAccessor(s3Client, bucketName, objectPrefix)); + } + + /** + * Constructs an S3LockProvider. + * + * @param s3Client Amazon S3 client used to interact with the S3 bucket. + * @param bucketName The name of the S3 bucket where locks are stored. + */ + public S3LockProvider(AmazonS3 s3Client, String bucketName) { + this(s3Client, bucketName, "shedlock/"); + } +} diff --git a/providers/s3/shedlock-provider-s3/src/main/java/net/javacrumbs/shedlock/provider/s3/S3StorageAccessor.java b/providers/s3/shedlock-provider-s3/src/main/java/net/javacrumbs/shedlock/provider/s3/S3StorageAccessor.java new file mode 100644 index 000000000..2918e53e0 --- /dev/null +++ b/providers/s3/shedlock-provider-s3/src/main/java/net/javacrumbs/shedlock/provider/s3/S3StorageAccessor.java @@ -0,0 +1,189 @@ +package net.javacrumbs.shedlock.provider.s3; + +import static net.javacrumbs.shedlock.core.ClockProvider.now; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.Optional; +import java.util.UUID; +import net.javacrumbs.shedlock.core.LockConfiguration; +import net.javacrumbs.shedlock.support.AbstractStorageAccessor; + +/** + * Implementation of StorageAccessor for S3 as a lock storage backend. + * Manages locks using S3 objects with metadata for expiration and conditional writes. + */ +class S3StorageAccessor extends AbstractStorageAccessor { + + private static final String LOCK_UNTIL = "lockUntil"; + private static final String LOCKED_AT = "lockedAt"; + private static final String LOCKED_BY = "lockedBy"; + private static final int PRECONDITION_FAILED = 412; + + private final AmazonS3 s3Client; + private final String bucketName; + private final String objectPrefix; + + public S3StorageAccessor(AmazonS3 s3Client, String bucketName, String objectPrefix) { + this.s3Client = s3Client; + this.bucketName = bucketName; + this.objectPrefix = objectPrefix; + } + + /** + * Finds the lock in the S3 bucket. + */ + Optional find(String name, String action) { + try { + ObjectMetadata metadata = s3Client.getObjectMetadata(bucketName, objectName(name)); + Instant lockUntil = Instant.parse(metadata.getUserMetaDataOf(LOCK_UNTIL)); + Instant lockedAt = Instant.parse(metadata.getUserMetaDataOf(LOCKED_AT)); + String lockedBy = metadata.getUserMetaDataOf(LOCKED_BY); + String eTag = metadata.getETag(); + + logger.debug("Lock found. action: {}, name: {}, lockUntil: {}, e-tag: {}", action, name, lockUntil, eTag); + return Optional.of(new Lock(lockUntil, lockedAt, lockedBy, eTag)); + } catch (AmazonServiceException e) { + if (e.getStatusCode() == 404) { + logger.debug("Lock not found. action: {}, name: {}", action, name); + return Optional.empty(); + } + throw e; + } + } + + @Override + public boolean insertRecord(LockConfiguration lockConfiguration) { + String name = lockConfiguration.getName(); + if (find(name, "insertRecord").isPresent()) { + logger.debug("Lock already exists. name: {}", name); + return false; + } + + try { + var lockContent = getLockContent(); + ObjectMetadata metadata = createMetadata(lockConfiguration.getLockAtMostUntil(), now(), getHostname()); + metadata.setContentLength(lockContent.length); + + PutObjectRequest request = + new PutObjectRequest(bucketName, objectName(name), new ByteArrayInputStream(lockContent), metadata); + request.putCustomRequestHeader("If-None-Match", "*"); + + s3Client.putObject(request); + logger.debug("Lock created successfully. name: {}, metadata: {}", name, metadata.getUserMetadata()); + return true; + } catch (AmazonServiceException e) { + if (e.getStatusCode() == PRECONDITION_FAILED) { + logger.debug("Lock already in use. name: {}", name); + } else { + logger.warn("Failed to create lock. name: {}", name, e); + } + return false; + } + } + + @Override + public boolean updateRecord(LockConfiguration lockConfiguration) { + Optional lock = find(lockConfiguration.getName(), "updateRecord"); + if (lock.isEmpty()) { + logger.warn("Update skipped. Lock not found. name: {}, lock: {}", lockConfiguration.getName(), lock); + return false; + } + if (lock.get().lockUntil().isAfter(now())) { + logger.debug("Update skipped. Lock still valid. name: {}, lock: {}", lockConfiguration.getName(), lock); + return false; + } + + ObjectMetadata newMetadata = createMetadata(lockConfiguration.getLockAtMostUntil(), now(), getHostname()); + return replaceObjectMetadata( + lockConfiguration.getName(), newMetadata, lock.get().eTag(), "updateRecord"); + } + + @Override + public void unlock(LockConfiguration lockConfiguration) { + Optional lock = find(lockConfiguration.getName(), "unlock"); + if (lock.isEmpty()) { + logger.warn("Unlock skipped. Lock not found. name: {}, lock: {}", lockConfiguration.getName(), lock); + return; + } + + updateUntil(lockConfiguration.getName(), lock.get(), lockConfiguration.getUnlockTime(), "unlock"); + } + + @Override + public boolean extend(LockConfiguration lockConfiguration) { + Optional lock = find(lockConfiguration.getName(), "extend"); + if (lock.isEmpty() + || lock.get().lockUntil().isBefore(now()) + || !lock.get().lockedBy().equals(getHostname())) { + logger.debug( + "Extend skipped. Lock invalid or not owned by host. name: {}, lock: {}", + lockConfiguration.getName(), + lock); + return false; + } + + return updateUntil(lockConfiguration.getName(), lock.get(), lockConfiguration.getLockAtMostUntil(), "extend"); + } + + private boolean updateUntil(String name, Lock lock, Instant until, String action) { + ObjectMetadata existingMetadata = s3Client.getObjectMetadata(bucketName, objectName(name)); + ObjectMetadata newMetadata = + createMetadata(until, Instant.parse(existingMetadata.getUserMetaDataOf(LOCKED_AT)), getHostname()); + + return replaceObjectMetadata(name, newMetadata, lock.eTag(), action); + } + + private boolean replaceObjectMetadata(String name, ObjectMetadata newMetadata, String eTag, String action) { + var lockContent = getLockContent(); + newMetadata.setContentLength(lockContent.length); + + PutObjectRequest request = + new PutObjectRequest(bucketName, objectName(name), new ByteArrayInputStream(lockContent), newMetadata); + request.putCustomRequestHeader("If-Match", eTag); + + try { + PutObjectResult response = s3Client.putObject(request); + logger.debug( + "Lock {} successfully. name: {}, old e-tag: {}, new e-tag: {}", + action, + name, + eTag, + response.getETag()); + return true; + } catch (AmazonServiceException e) { + if (e.getStatusCode() == PRECONDITION_FAILED) { + logger.debug("Lock not exists to {}. name: {}, e-tag {}", action, name, eTag); + } else { + logger.warn("Failed to {} lock. name: {}", action, name, e); + } + return false; + } + } + + private static byte[] getLockContent() { + var uuid = UUID.randomUUID(); + ByteBuffer bb = ByteBuffer.wrap(new byte[16]); + bb.putLong(uuid.getMostSignificantBits()); + bb.putLong(uuid.getLeastSignificantBits()); + return bb.array(); + } + + private ObjectMetadata createMetadata(Instant lockUntil, Instant lockedAt, String lockedBy) { + ObjectMetadata metadata = new ObjectMetadata(); + metadata.addUserMetadata(LOCK_UNTIL, lockUntil.toString()); + metadata.addUserMetadata(LOCKED_AT, lockedAt.toString()); + metadata.addUserMetadata(LOCKED_BY, lockedBy); + return metadata; + } + + private String objectName(String name) { + return objectPrefix + name; + } +} diff --git a/providers/s3/shedlock-provider-s3/src/test/java/net/javacrumbs/shedlock/provider/s3/S3LockProviderIntegrationTest.java b/providers/s3/shedlock-provider-s3/src/test/java/net/javacrumbs/shedlock/provider/s3/S3LockProviderIntegrationTest.java new file mode 100644 index 000000000..a658eaa71 --- /dev/null +++ b/providers/s3/shedlock-provider-s3/src/test/java/net/javacrumbs/shedlock/provider/s3/S3LockProviderIntegrationTest.java @@ -0,0 +1,91 @@ +package net.javacrumbs.shedlock.provider.s3; + +import static net.javacrumbs.shedlock.core.ClockProvider.now; +import static org.assertj.core.api.Assertions.assertThat; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import net.javacrumbs.shedlock.support.StorageBasedLockProvider; +import net.javacrumbs.shedlock.test.support.AbstractStorageBasedLockProviderIntegrationTest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +/** + * Integration test uses local instance of LocalStack S3 running on localhost at + * port 9042 using bucket shedlock and folder shedlock + * + * @see net.javacrumbs.shedlock.provider.s3.S3LockProvider + */ +@Testcontainers +public class S3LockProviderIntegrationTest extends AbstractStorageBasedLockProviderIntegrationTest { + + @Container + static final MyLocalStackS3Container localStackS3 = new MyLocalStackS3Container(); + + private static AmazonS3 s3Client; + private static final String BUCKET_NAME = "my-bucket"; + private static final String OBJECT_PREFIX = "prefix"; + + @BeforeAll + public static void startLocalStackS3() { + s3Client = AmazonS3ClientBuilder.standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( + localStackS3.getEndpoint().toString(), localStackS3.getRegion())) + .withCredentials(new AWSStaticCredentialsProvider( + new BasicAWSCredentials(localStackS3.getAccessKey(), localStackS3.getSecretKey()))) + .build(); + } + + @BeforeEach + public void before() { + s3Client.createBucket(BUCKET_NAME); + } + + @AfterEach + public void after() { + s3Client.listObjects(BUCKET_NAME).getObjectSummaries().forEach(obj -> { + s3Client.deleteObject(BUCKET_NAME, obj.getKey()); + }); + } + + @Override + protected StorageBasedLockProvider getLockProvider() { + return new S3LockProvider(s3Client, BUCKET_NAME, OBJECT_PREFIX); + } + + @Override + protected void assertUnlocked(String lockName) { + Lock lock = findLock(lockName); + assertThat(lock.lockUntil()).isBefore(now()); + assertThat(lock.lockedAt()).isBefore(now()); + assertThat(lock.lockedBy()).isNotEmpty(); + } + + @Override + protected void assertLocked(String lockName) { + Lock lock = findLock(lockName); + assertThat(lock.lockUntil()).isAfter(now()); + assertThat(lock.lockedAt()).isBefore(now()); + assertThat(lock.lockedBy()).isNotEmpty(); + } + + private Lock findLock(String lockName) { + return new S3StorageAccessor(s3Client, BUCKET_NAME, OBJECT_PREFIX) + .find(lockName, "test") + .get(); + } + + private static class MyLocalStackS3Container extends LocalStackContainer { + public MyLocalStackS3Container() { + super(DockerImageName.parse("localstack/localstack:4.0.3")); + } + } +} diff --git a/providers/s3/shedlock-provider-s3/src/test/resources/logback-test.xml b/providers/s3/shedlock-provider-s3/src/test/resources/logback-test.xml new file mode 100644 index 000000000..c44bac998 --- /dev/null +++ b/providers/s3/shedlock-provider-s3/src/test/resources/logback-test.xml @@ -0,0 +1,32 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + diff --git a/shedlock-bom/pom.xml b/shedlock-bom/pom.xml index c03e792f1..1c629029c 100644 --- a/shedlock-bom/pom.xml +++ b/shedlock-bom/pom.xml @@ -144,6 +144,11 @@ shedlock-provider-spanner ${project.version} + + net.javacrumbs.shedlock + shedlock-provider-s3 + ${project.version} +