Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for S3 as Lock Storage Backend #2322

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ executed repeatedly. Moreover, the locks are time-based and ShedLock assumes tha
- [In-Memory](#in-memory)
- [Memcached](#memcached-using-spymemcached)
- [Datastore](#datastore)
- [S3](#s3)
+ [Multi-tenancy](#multi-tenancy)
+ [Customization](#customization)
+ [Duration specification](#duration-specification)
Expand Down Expand Up @@ -885,6 +886,28 @@ public LockProvider lockProvider(DatabaseClient databaseClient) {
}
```

#### S3

Import the project
```xml
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-provider-s3</artifactId>
<version>6.0.2</version>
</dependency>
```

and configure
```java
import net.javacrumbs.shedlock.provider.s3.S3LockProvider;

...

@Bean
public LockProvider lockProvider(com.amazonaws.services.s3.AmazonS3 amazonS3) {
return new S3LockProvider(amazonS3, "BUCKET_NAME");
}
```

## Multi-tenancy
If you have multi-tenancy use-case you can use a lock provider similar to this one
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
<module>providers/datastore/shedlock-provider-datastore</module>
<module>providers/spanner/shedlock-provider-spanner</module>
<module>providers/neo4j/shedlock-provider-neo4j</module>
<module>providers/s3/shedlock-provider-s3</module>
</modules>

<properties>
Expand Down
77 changes: 77 additions & 0 deletions providers/s3/shedlock-provider-s3/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>shedlock-parent</artifactId>
<groupId>net.javacrumbs.shedlock</groupId>
<version>6.0.3-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>shedlock-provider-s3</artifactId>
<version>6.0.3-SNAPSHOT</version>

<properties>
<aws-java-sdk-s3.version>1.12.747</aws-java-sdk-s3.version>
</properties>

<dependencies>
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>${aws-java-sdk-s3.version}</version>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${test-containers.ver}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<version>${test-containers.ver}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-test-support</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.ver}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifestEntries>
<Automatic-Module-Name>
net.javacrumbs.shedlock.provider.s3
</Automatic-Module-Name>
</manifestEntries>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package net.javacrumbs.shedlock.provider.s3;

import java.time.Instant;

record Lock(Instant lockUntil, Instant lockedAt, String lockedBy, String eTag) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package net.javacrumbs.shedlock.provider.s3;

import com.amazonaws.services.s3.AmazonS3;
import net.javacrumbs.shedlock.support.StorageBasedLockProvider;

/**
* Lock provider implementation for S3.
*/
public class S3LockProvider extends StorageBasedLockProvider {

/**
* Constructs an S3LockProvider.
*
* @param s3Client Amazon S3 client used to interact with the S3 bucket.
* @param bucketName The name of the S3 bucket where locks are stored.
* @param objectPrefix The prefix of the S3 object lock.
*/
public S3LockProvider(AmazonS3 s3Client, String bucketName, String objectPrefix) {
super(new S3StorageAccessor(s3Client, bucketName, objectPrefix));
}

/**
* Constructs an S3LockProvider.
*
* @param s3Client Amazon S3 client used to interact with the S3 bucket.
* @param bucketName The name of the S3 bucket where locks are stored.
*/
public S3LockProvider(AmazonS3 s3Client, String bucketName) {
this(s3Client, bucketName, "shedlock/");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package net.javacrumbs.shedlock.provider.s3;

import static net.javacrumbs.shedlock.core.ClockProvider.now;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.support.AbstractStorageAccessor;

/**
* Implementation of StorageAccessor for S3 as a lock storage backend.
* Manages locks using S3 objects with metadata for expiration and conditional writes.
*/
class S3StorageAccessor extends AbstractStorageAccessor {

private static final String LOCK_UNTIL = "lockUntil";
private static final String LOCKED_AT = "lockedAt";
private static final String LOCKED_BY = "lockedBy";
private static final int PRECONDITION_FAILED = 412;

private final AmazonS3 s3Client;
private final String bucketName;
private final String objectPrefix;

public S3StorageAccessor(AmazonS3 s3Client, String bucketName, String objectPrefix) {
this.s3Client = s3Client;
this.bucketName = bucketName;
this.objectPrefix = objectPrefix;
}

/**
* Finds the lock in the S3 bucket.
*/
Optional<Lock> find(String name, String action) {
try {
ObjectMetadata metadata = s3Client.getObjectMetadata(bucketName, objectName(name));
Instant lockUntil = Instant.parse(metadata.getUserMetaDataOf(LOCK_UNTIL));
Instant lockedAt = Instant.parse(metadata.getUserMetaDataOf(LOCKED_AT));
String lockedBy = metadata.getUserMetaDataOf(LOCKED_BY);
String eTag = metadata.getETag();

logger.debug("Lock found. action: {}, name: {}, lockUntil: {}, e-tag: {}", action, name, lockUntil, eTag);
return Optional.of(new Lock(lockUntil, lockedAt, lockedBy, eTag));
} catch (AmazonServiceException e) {
if (e.getStatusCode() == 404) {
logger.debug("Lock not found. action: {}, name: {}", action, name);
return Optional.empty();
}
throw e;
}
}

@Override
public boolean insertRecord(LockConfiguration lockConfiguration) {
String name = lockConfiguration.getName();
if (find(name, "insertRecord").isPresent()) {
logger.debug("Lock already exists. name: {}", name);
return false;
}

try {
var lockContent = getLockContent();
ObjectMetadata metadata = createMetadata(lockConfiguration.getLockAtMostUntil(), now(), getHostname());
metadata.setContentLength(lockContent.length);

PutObjectRequest request =
new PutObjectRequest(bucketName, objectName(name), new ByteArrayInputStream(lockContent), metadata);
request.putCustomRequestHeader("If-None-Match", "*");

s3Client.putObject(request);
logger.debug("Lock created successfully. name: {}, metadata: {}", name, metadata.getUserMetadata());
return true;
} catch (AmazonServiceException e) {
if (e.getStatusCode() == PRECONDITION_FAILED) {
logger.debug("Lock already in use. name: {}", name);
} else {
logger.warn("Failed to create lock. name: {}", name, e);
}
return false;
}
}

@Override
public boolean updateRecord(LockConfiguration lockConfiguration) {
Optional<Lock> lock = find(lockConfiguration.getName(), "updateRecord");
if (lock.isEmpty()) {
logger.warn("Update skipped. Lock not found. name: {}, lock: {}", lockConfiguration.getName(), lock);
return false;
}
if (lock.get().lockUntil().isAfter(now())) {
logger.debug("Update skipped. Lock still valid. name: {}, lock: {}", lockConfiguration.getName(), lock);
return false;
}

ObjectMetadata newMetadata = createMetadata(lockConfiguration.getLockAtMostUntil(), now(), getHostname());
return replaceObjectMetadata(
lockConfiguration.getName(), newMetadata, lock.get().eTag(), "updateRecord");
}

@Override
public void unlock(LockConfiguration lockConfiguration) {
Optional<Lock> lock = find(lockConfiguration.getName(), "unlock");
if (lock.isEmpty()) {
logger.warn("Unlock skipped. Lock not found. name: {}, lock: {}", lockConfiguration.getName(), lock);
return;
}

updateUntil(lockConfiguration.getName(), lock.get(), lockConfiguration.getUnlockTime(), "unlock");
}

@Override
public boolean extend(LockConfiguration lockConfiguration) {
Optional<Lock> lock = find(lockConfiguration.getName(), "extend");
if (lock.isEmpty()
|| lock.get().lockUntil().isBefore(now())
|| !lock.get().lockedBy().equals(getHostname())) {
logger.debug(
"Extend skipped. Lock invalid or not owned by host. name: {}, lock: {}",
lockConfiguration.getName(),
lock);
return false;
}

return updateUntil(lockConfiguration.getName(), lock.get(), lockConfiguration.getLockAtMostUntil(), "extend");
}

private boolean updateUntil(String name, Lock lock, Instant until, String action) {
ObjectMetadata existingMetadata = s3Client.getObjectMetadata(bucketName, objectName(name));
ObjectMetadata newMetadata =
createMetadata(until, Instant.parse(existingMetadata.getUserMetaDataOf(LOCKED_AT)), getHostname());

return replaceObjectMetadata(name, newMetadata, lock.eTag(), action);
}

private boolean replaceObjectMetadata(String name, ObjectMetadata newMetadata, String eTag, String action) {
var lockContent = getLockContent();
newMetadata.setContentLength(lockContent.length);

PutObjectRequest request =
new PutObjectRequest(bucketName, objectName(name), new ByteArrayInputStream(lockContent), newMetadata);
request.putCustomRequestHeader("If-Match", eTag);

try {
PutObjectResult response = s3Client.putObject(request);
logger.debug(
"Lock {} successfully. name: {}, old e-tag: {}, new e-tag: {}",
action,
name,
eTag,
response.getETag());
return true;
} catch (AmazonServiceException e) {
if (e.getStatusCode() == PRECONDITION_FAILED) {
logger.debug("Lock not exists to {}. name: {}, e-tag {}", action, name, eTag);
} else {
logger.warn("Failed to {} lock. name: {}", action, name, e);
}
return false;
}
}

private static byte[] getLockContent() {
var uuid = UUID.randomUUID();
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
bb.putLong(uuid.getMostSignificantBits());
bb.putLong(uuid.getLeastSignificantBits());
return bb.array();
}

private ObjectMetadata createMetadata(Instant lockUntil, Instant lockedAt, String lockedBy) {
ObjectMetadata metadata = new ObjectMetadata();
metadata.addUserMetadata(LOCK_UNTIL, lockUntil.toString());
metadata.addUserMetadata(LOCKED_AT, lockedAt.toString());
metadata.addUserMetadata(LOCKED_BY, lockedBy);
return metadata;
}

private String objectName(String name) {
return objectPrefix + name;
}
}
Loading
Loading