Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support of durable delete #1651

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Language;
import com.aerospike.client.Value;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.query.Statement;
import com.aerospike.client.task.ExecuteTask;
import com.aerospike.client.task.RegisterTask;
Expand All @@ -22,16 +23,22 @@ public class AerospikeExpiredDocumentsCleaner implements ExpiredDocumentsCleaner

private final AerospikeClient client;
private final String namespace;
private final boolean durableDelete;

public AerospikeExpiredDocumentsCleaner(AerospikeClient client, String namespace) {
public AerospikeExpiredDocumentsCleaner(AerospikeClient client, String namespace, boolean durableDelete) {
Assert.notNull(client, "Aerospike client can not be null");
Assert.notNull(namespace, "Namespace can not be null");
this.client = client;
this.namespace = namespace;
this.durableDelete = durableDelete;
wolfchkov marked this conversation as resolved.
Show resolved Hide resolved

registerUdf();
}

public AerospikeExpiredDocumentsCleaner(AerospikeClient client, String namespace) {
this(client, namespace, false);
}

private void registerUdf() {
ClassLoader classLoader = AerospikeExpiredDocumentsCleaner.class.getClassLoader();
RegisterTask registerTask = client.register(null, classLoader, RESOURCE_PATH, SERVER_PATH, Language.LUA);
Expand All @@ -51,8 +58,9 @@ public void cleanExpiredDocumentsBefore(long expireTimeMillis) {

Statement statement = new Statement();
statement.setNamespace(namespace);

ExecuteTask executeTask = client.execute(null, statement, PACKAGE_NAME, FUNC_NAME, value);
WritePolicy writePolicy = new WritePolicy(client.getWritePolicyDefault());
writePolicy.durableDelete = durableDelete;
ExecuteTask executeTask = client.execute(writePolicy, statement, PACKAGE_NAME, FUNC_NAME, value);
executeTask.waitTillComplete(SLEEP_INTERVAL, TIMEOUT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
@ConfigurationProperties("embedded.aerospike")
public class AerospikeProperties extends CommonContainerProperties {

static final String BEAN_NAME_AEROSPIKE = "aerospike";
public static final String BEAN_NAME_AEROSPIKE = "aerospike";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need public? I dont see usages

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usage on other library

static final String BEAN_NAME_AEROSPIKE_BEAN_NAME = "aerospikePackageProperties";

boolean enabled = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public PackageInstaller aerospikePackageInstaller(@Qualifier(BEAN_NAME_AEROSPIKE
}

@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "embedded.aerospike.time-travel.enabled", havingValue = "true", matchIfMissing = true)
public ExpiredDocumentsCleaner expiredDocumentsCleaner(AerospikeClient client,
AerospikeProperties properties) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.playtika.testcontainer.aerospike;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.Bin;
import com.aerospike.client.Key;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.WritePolicy;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest(
classes = AerospikeExpiredDocumentsCleanerWithDurableDeleteTest.TestConfiguration.class
)
class AerospikeExpiredDocumentsCleanerWithDurableDeleteTest {

static final String SET_NAME = "some-set";

@Value("${embedded.aerospike.namespace}")
String namespace;
@Autowired
ExpiredDocumentsCleaner durableDeleteExpiredDocumentsCleaner;
@Autowired
AerospikeClient aerospikeClient;

@Test
public void shouldNotRemoveExpiredWithDurableDeleteFlagBecauseOfAerospikeCommunityEdition() {
Key key = new Key(namespace, "set", "shouldRemoveExpired");
putBin(key, (int) TimeUnit.DAYS.toSeconds(1));

Instant plus23 = Instant.now().plus(23, ChronoUnit.HOURS);
durableDeleteExpiredDocumentsCleaner.cleanExpiredDocumentsBefore(plus23);
assertThat(aerospikeClient.get(null, key)).isNotNull();

Instant plus25 = Instant.now().plus(25, ChronoUnit.HOURS);
durableDeleteExpiredDocumentsCleaner.cleanExpiredDocumentsBefore(plus25);
assertThat(aerospikeClient.get(null, key)).isNotNull();
}
private void putBin(Key key, int expiration) {
Bin bin = new Bin("mybin", "myvalue");

WritePolicy writePolicy = new WritePolicy(aerospikeClient.getWritePolicyDefault());
writePolicy.expiration = expiration;
aerospikeClient.put(writePolicy, key, bin);
}

@EnableAutoConfiguration
@Configuration
static class TestConfiguration {

@Primary
@Bean(destroyMethod = "close")
public AerospikeClient aerospikeClient(@Value("${embedded.aerospike.host}") String host,
@Value("${embedded.aerospike.port}") int port) {
ClientPolicy clientPolicy = new ClientPolicy();
clientPolicy.timeout = 10_000;//in millis
return new AerospikeClient(clientPolicy, host, port);
}

@Bean
public ExpiredDocumentsCleaner durableDeleteExpiredDocumentsCleaner(AerospikeClient client,
AerospikeProperties properties) {
return new AerospikeExpiredDocumentsCleaner(client, properties.getNamespace(), true);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.playtika.testcontainer.aerospike;

import com.aerospike.client.AerospikeClient;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import static org.assertj.core.api.Assertions.assertThat;

public class ReplaceExpiredDocumentsCleanerBeanTest {

private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withUserConfiguration(TestConfiguration.class)
.withConfiguration(AutoConfigurations.of(
EmbeddedAerospikeBootstrapConfiguration.class,
EmbeddedAerospikeTestOperationsAutoConfiguration.class));

@Test
public void contextLoadsWithOtherExpiredDocumentsCleaner() {
contextRunner.run(context -> assertThat(context)
.hasNotFailed()
.doesNotHaveBean("expiredDocumentsCleaner")
.hasBean("otherExpiredDocumentsCleaner")
);
}

@Configuration
public static class TestConfiguration {

@Bean
public ExpiredDocumentsCleaner otherExpiredDocumentsCleaner() {
return Mockito.mock(ExpiredDocumentsCleaner.class);
}

@Bean
public AerospikeClient mockAerospikeClient() {
return Mockito.mock(AerospikeClient.class);
}

}

}
Loading