Skip to content

Commit

Permalink
Refactor imports in SqsBulkExportProcessor and SqsBulkExportProcessor…
Browse files Browse the repository at this point in the history
…Lambda for consistency and readability; add DynamoDB and S3 permissions to BulkExportStack
  • Loading branch information
ab295382 committed Feb 18, 2025
1 parent eef069d commit c2940fd
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
*/
package sleeper.bulkexport.lambda;

import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.sqs.AmazonSQS;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sleeper.bulkexport.core.model.BulkExportLeafPartitionQuery;
import sleeper.bulkexport.core.model.BulkExportQuery;
Expand All @@ -37,6 +34,8 @@
import sleeper.parquet.utils.HadoopConfigurationProvider;
import sleeper.statestore.StateStoreFactory;

import java.util.List;

/**
* Lambda to start the bulk export job.
*/
Expand Down Expand Up @@ -88,9 +87,9 @@ public static Builder builder() {
}

/**
* Builder class for constructing instances of {@link SqsBulkExportProcessor}.
* Builder class for constructing instances of SqsBulkExportProcessor.
* This builder allows for the configuration of various clients and properties
* required by the {@link SqsBulkExportProcessor}.
* required by the SqsBulkExportProcessor.
*
* <p>Example usage:</p>
* <pre>{@code
Expand Down Expand Up @@ -123,7 +122,7 @@ public Builder s3Client(AmazonS3 s3Client) {
this.s3Client = s3Client;
return this;
}

/**
* Sets the AmazonSQS client to be used by the SqsBulkExportProcessor.
*
Expand Down Expand Up @@ -169,7 +168,7 @@ public Builder tablePropertiesProvider(TablePropertiesProvider tablePropertiesPr
}

/**
* Builds and returns a new instance of {@link SqsBulkExportProcessor}.
* Builds and returns a new instance of SqsBulkExportProcessor.
*
* @return A new instance of {@link SqsBulkExportProcessor}.
* @throws ObjectFactoryException If there is an error during the creation of the {@link SqsBulkExportProcessor} instance.
Expand All @@ -178,4 +177,4 @@ public SqsBulkExportProcessor build() throws ObjectFactoryException {
return new SqsBulkExportProcessor(this);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
*/
package sleeper.bulkexport.lambda;

import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.lambda.runtime.Context;
Expand All @@ -29,18 +24,22 @@
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sleeper.bulkexport.core.model.BulkExportQuery;
import sleeper.bulkexport.core.model.BulkExportQuerySerDe;
import sleeper.bulkexport.core.model.BulkExportQueryValidationException;
import sleeper.configuration.properties.S3InstanceProperties;
import sleeper.configuration.properties.S3TableProperties;

import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.table.TablePropertiesProvider;
import sleeper.core.util.ObjectFactoryException;

import java.util.Optional;

import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;

/**
* A lambda that is triggered when a serialised export query arrives on an SQS
* queue. A processor executes the request and publishes the results to S3 based
Expand Down Expand Up @@ -82,10 +81,10 @@ public SqsBulkExportProcessorLambda(AmazonSQS sqsClient, AmazonS3 s3Client, Amaz
this.sqsClient = sqsClient;
this.s3Client = s3Client;
this.dynamoClient = dynamoClient;

String bucket = System.getenv(CONFIG_BUCKET.toEnvironmentVariable());
instanceProperties = S3InstanceProperties.loadFromBucket(s3Client, bucket);
TablePropertiesProvider tablePropertiesProvider = S3TableProperties.createProvider(instanceProperties, s3Client, dynamoClient);

bulkExportQuerySerDe = new BulkExportQuerySerDe();
processor = SqsBulkExportProcessor.builder()
.sqsClient(sqsClient)
Expand Down Expand Up @@ -143,4 +142,4 @@ public Optional<BulkExportQuery> deserialiseAndValidate(String message) {
return Optional.empty();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package sleeper.cdk.stack.bulkexport;

import com.amazonaws.auth.policy.actions.DynamoDBv2Actions;
import com.amazonaws.auth.policy.actions.S3Actions;
import com.amazonaws.auth.policy.actions.SQSActions;
import com.amazonaws.auth.policy.actions.DynamoDBv2Actions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import software.amazon.awscdk.CfnOutput;
import software.amazon.awscdk.CfnOutputProps;
Expand Down

0 comments on commit c2940fd

Please sign in to comment.