Skip to content

Commit

Permalink
Merge AwsS3Service and InternalAwsS3Service in a S3Service class (#31580
Browse files Browse the repository at this point in the history
)

The interface and its implementation can be merged into a single class,
which is renamed to S3Service like the other S3BlobStore, S3Repository
classes.
  • Loading branch information
tlrx committed Jun 28, 2018
1 parent 84f4949 commit 509421f
Show file tree
Hide file tree
Showing 12 changed files with 64 additions and 201 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.StorageClass;

import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
Expand All @@ -40,7 +39,7 @@

class S3BlobStore extends AbstractComponent implements BlobStore {

private final AwsS3Service service;
private final S3Service service;

private final String clientName;

Expand All @@ -54,7 +53,7 @@ class S3BlobStore extends AbstractComponent implements BlobStore {

private final StorageClass storageClass;

S3BlobStore(Settings settings, AwsS3Service service, String clientName, String bucket, boolean serverSideEncryption,
S3BlobStore(Settings settings, S3Service service, String clientName, String bucket, boolean serverSideEncryption,
ByteSizeValue bufferSize, String cannedACL, String storageClass) {
super(settings);
this.service = service;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.repositories.s3;

import com.amazonaws.auth.BasicAWSCredentials;

import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -156,8 +155,10 @@ class S3Repository extends BlobStoreRepository {
/**
* Constructs an s3 backed repository
*/
S3Repository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry namedXContentRegistry,
AwsS3Service awsService) throws IOException {
S3Repository(final RepositoryMetaData metadata,
final Settings settings,
final NamedXContentRegistry namedXContentRegistry,
final S3Service service) throws IOException {
super(metadata, settings, namedXContentRegistry);

final String bucket = BUCKET_SETTING.get(metadata.settings());
Expand Down Expand Up @@ -188,9 +189,9 @@ class S3Repository extends BlobStoreRepository {
// deprecated behavior: override client credentials from the cluster state
// (repository settings)
if (S3ClientSettings.checkDeprecatedCredentials(metadata.settings())) {
overrideCredentialsFromClusterState(awsService);
overrideCredentialsFromClusterState(service);
}
blobStore = new S3BlobStore(settings, awsService, clientName, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass);
blobStore = new S3BlobStore(settings, service, clientName, bucket, serverSideEncryption, bufferSize, cannedACL, storageClass);

final String basePath = BASE_PATH_SETTING.get(metadata.settings());
if (Strings.hasLength(basePath)) {
Expand Down Expand Up @@ -220,13 +221,13 @@ protected ByteSizeValue chunkSize() {
return chunkSize;
}

void overrideCredentialsFromClusterState(AwsS3Service awsService) {
void overrideCredentialsFromClusterState(final S3Service s3Service) {
deprecationLogger.deprecated("Using s3 access/secret key from repository settings. Instead "
+ "store these in named clients and the elasticsearch keystore for secure settings.");
final BasicAWSCredentials insecureCredentials = S3ClientSettings.loadDeprecatedCredentials(metadata.settings());
// hack, but that's ok because the whole if branch should be axed
final Map<String, S3ClientSettings> prevSettings = awsService.refreshAndClearCache(S3ClientSettings.load(Settings.EMPTY));
final Map<String, S3ClientSettings> prevSettings = s3Service.refreshAndClearCache(S3ClientSettings.load(Settings.EMPTY));
final Map<String, S3ClientSettings> newSettings = S3ClientSettings.overrideCredentials(prevSettings, insecureCredentials);
awsService.refreshAndClearCache(newSettings);
s3Service.refreshAndClearCache(newSettings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@

package org.elasticsearch.repositories.s3;

import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import com.amazonaws.util.json.Jackson;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
Expand All @@ -39,6 +31,15 @@
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;

import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* A plugin to add a repository type that writes to and from the AWS S3.
*/
Expand All @@ -60,33 +61,29 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
});
}

private final AwsS3Service awsS3Service;
private final S3Service service;

public S3RepositoryPlugin(Settings settings) {
this.awsS3Service = getAwsS3Service(settings);
// eagerly load client settings so that secure settings are read
final Map<String, S3ClientSettings> clientsSettings = S3ClientSettings.load(settings);
this.awsS3Service.refreshAndClearCache(clientsSettings);
public S3RepositoryPlugin(final Settings settings) {
this(settings, new S3Service(settings));
}

protected S3RepositoryPlugin(AwsS3Service awsS3Service) {
this.awsS3Service = awsS3Service;
}

// proxy method for testing
protected S3Repository getS3Repository(RepositoryMetaData metadata, Settings settings, NamedXContentRegistry namedXContentRegistry)
throws IOException {
return new S3Repository(metadata, settings, namedXContentRegistry, awsS3Service);
S3RepositoryPlugin(final Settings settings, final S3Service service) {
this.service = Objects.requireNonNull(service, "S3 service must not be null");
// eagerly load client settings so that secure settings are read
final Map<String, S3ClientSettings> clientsSettings = S3ClientSettings.load(settings);
this.service.refreshAndClearCache(clientsSettings);
}

// proxy method for testing
protected AwsS3Service getAwsS3Service(Settings settings) {
return new InternalAwsS3Service(settings);
protected S3Repository createRepository(final RepositoryMetaData metadata,
final Settings settings,
final NamedXContentRegistry registry) throws IOException {
return new S3Repository(metadata, settings, registry, service);
}

@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
return Collections.singletonMap(S3Repository.TYPE, (metadata) -> getS3Repository(metadata, env.settings(), namedXContentRegistry));
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry) {
return Collections.singletonMap(S3Repository.TYPE, (metadata) -> createRepository(metadata, env.settings(), registry));
}

@Override
Expand All @@ -112,11 +109,11 @@ public List<Setting<?>> getSettings() {
public void reload(Settings settings) {
// secure settings should be readable
final Map<String, S3ClientSettings> clientsSettings = S3ClientSettings.load(settings);
awsS3Service.refreshAndClearCache(clientsSettings);
service.refreshAndClearCache(clientsSettings);
}

@Override
public void close() throws IOException {
awsS3Service.close();
service.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.elasticsearch.repositories.s3;

import java.util.Map;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
Expand All @@ -30,23 +28,24 @@
import com.amazonaws.internal.StaticCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import static java.util.Collections.emptyMap;

class InternalAwsS3Service extends AbstractComponent implements AwsS3Service {
class S3Service extends AbstractComponent implements Closeable {

private volatile Map<String, AmazonS3Reference> clientsCache = emptyMap();
private volatile Map<String, S3ClientSettings> clientsSettings = emptyMap();

InternalAwsS3Service(Settings settings) {
S3Service(Settings settings) {
super(settings);
}

Expand All @@ -56,7 +55,6 @@ class InternalAwsS3Service extends AbstractComponent implements AwsS3Service {
* clients are usable until released. On release they will be destroyed instead
* to being returned to the cache.
*/
@Override
public synchronized Map<String, S3ClientSettings> refreshAndClearCache(Map<String, S3ClientSettings> clientsSettings) {
// shutdown all unused clients
// others will shutdown on their respective release
Expand All @@ -72,7 +70,6 @@ public synchronized Map<String, S3ClientSettings> refreshAndClearCache(Map<Strin
* Attempts to retrieve a client by name from the cache. If the client does not
* exist it will be created.
*/
@Override
public AmazonS3Reference client(String clientName) {
AmazonS3Reference clientReference = clientsCache.get(clientName);
if ((clientReference != null) && clientReference.tryIncRef()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,82 +65,6 @@ public final void wipeAfter() {
cleanRepositoryFiles(basePath);
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-aws/issues/211")
public void testSimpleWorkflow() {
Client client = client();
Settings.Builder settings = Settings.builder()
.put(S3Repository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(1000, 10000));

// We sometime test getting the base_path from node settings using repositories.s3.base_path
settings.put(S3Repository.BASE_PATH_SETTING.getKey(), basePath);

logger.info("--> creating s3 repository with bucket[{}] and path [{}]", internalCluster().getInstance(Settings.class).get("repositories.s3.bucket"), basePath);
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("s3").setSettings(settings
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));

createIndex("test-idx-1", "test-idx-2", "test-idx-3");
ensureGreen();

logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index("test-idx-1", "doc", Integer.toString(i), "foo", "bar" + i);
index("test-idx-2", "doc", Integer.toString(i), "foo", "baz" + i);
index("test-idx-3", "doc", Integer.toString(i), "foo", "baz" + i);
}
refresh();
assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
assertThat(client.prepareSearch("test-idx-2").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits(), equalTo(100L));

logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));

logger.info("--> delete some data");
for (int i = 0; i < 50; i++) {
client.prepareDelete("test-idx-1", "doc", Integer.toString(i)).get();
}
for (int i = 50; i < 100; i++) {
client.prepareDelete("test-idx-2", "doc", Integer.toString(i)).get();
}
for (int i = 0; i < 100; i += 2) {
client.prepareDelete("test-idx-3", "doc", Integer.toString(i)).get();
}
refresh();
assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().getTotalHits(), equalTo(50L));
assertThat(client.prepareSearch("test-idx-2").setSize(0).get().getHits().getTotalHits(), equalTo(50L));
assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits(), equalTo(50L));

logger.info("--> close indices");
client.admin().indices().prepareClose("test-idx-1", "test-idx-2").get();

logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));

ensureGreen();
assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
assertThat(client.prepareSearch("test-idx-2").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits(), equalTo(50L));

// Test restore after index deletion
logger.info("--> delete indices");
cluster().wipeIndices("test-idx-1", "test-idx-2");
logger.info("--> restore one index after deletion");
restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-2").execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareSearch("test-idx-1").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-cloud-aws/issues/211")
public void testEncryption() {
Client client = client();
Expand Down Expand Up @@ -179,7 +103,7 @@ public void testEncryption() {

Settings settings = internalCluster().getInstance(Settings.class);
Settings bucket = settings.getByPrefix("repositories.s3.");
try (AmazonS3Reference s3Client = internalCluster().getInstance(AwsS3Service.class).client("default")) {
try (AmazonS3Reference s3Client = internalCluster().getInstance(S3Service.class).client("default")) {
String bucketName = bucket.get("bucket");
logger.info("--> verify encryption for bucket [{}], prefix [{}]", bucketName, basePath);
List<S3ObjectSummary> summaries = s3Client.client().listObjects(bucketName, basePath).getObjectSummaries();
Expand Down Expand Up @@ -442,7 +366,7 @@ public void cleanRepositoryFiles(String basePath) {
// We check that settings has been set in elasticsearch.yml integration test file
// as described in README
assertThat("Your settings in elasticsearch.yml are incorrect. Check README file.", bucketName, notNullValue());
try (AmazonS3Reference s3Client = internalCluster().getInstance(AwsS3Service.class).client("default")) {
try (AmazonS3Reference s3Client = internalCluster().getInstance(S3Service.class).client("default")) {
ObjectListing prevListing = null;
//From http://docs.amazonwebservices.com/AmazonS3/latest/dev/DeletingMultipleObjectsUsingJava.html
//we can do at most 1K objects per delete
Expand Down
Loading

0 comments on commit 509421f

Please sign in to comment.