Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding the QueryExecutor implementation #19515

Merged
merged 5 commits into from
Mar 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions sdk/cosmos/azure-cosmos-benchmark/ctl/linkedin/run_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
## - ctl_concurrency (optional: default 50)
## - ctl_consistency_level (optional: default Session)
## - ctl_number_of_precreated_documents (optional: default 100,000)
## - ctl_bulk_load_batch_size (optional: default 200,000)
## - ctl_number_of_operations (optional: default 1,000,000)
## - ctl_max_running_time_duration (optional: default 10 minutes)
## - ctl_printing_interval (optional: default 30 seconds)
Expand Down Expand Up @@ -63,6 +64,13 @@ else
number_of_precreated_documents=$ctl_number_of_precreated_documents
fi

if [ -z "$ctl_bulk_load_batch_size" ]
then
bulk_load_batch_size=200000
else
bulk_load_batch_size=$ctl_bulk_load_batch_size
fi

if [ -z "$ctl_number_of_operations" ]
then
number_of_operations=-1
Expand Down Expand Up @@ -98,9 +106,9 @@ jvm_opt=""

if [ -z "$ctl_graphite_endpoint" ]
then
java -Xmx8g -Xms8g $jvm_opt -Dcosmos.directModeProtocol=$protocol -Dazure.cosmos.directModeProtocol=$protocol -jar "$jar_file" -serviceEndpoint "$service_endpoint" -masterKey "$master_key" -databaseId "$database_name" -collectionId "$collection_name" -numberOfCollectionForCtl "$number_Of_collection" -throughput $throughput -consistencyLevel $consistency_level -concurrency $concurrency -numberOfOperations $number_of_operations -operation $operation -connectionMode $connection_mode -maxRunningTimeDuration $max_running_time_duration -numberOfPreCreatedDocuments $number_of_precreated_documents -printingInterval $printing_interval -manageResources 2>&1 | tee -a "$log_filename"
java -Xmx8g -Xms8g $jvm_opt -Dcosmos.directModeProtocol=$protocol -Dazure.cosmos.directModeProtocol=$protocol -jar "$jar_file" -serviceEndpoint "$service_endpoint" -masterKey "$master_key" -databaseId "$database_name" -collectionId "$collection_name" -numberOfCollectionForCtl "$number_Of_collection" -throughput $throughput -consistencyLevel $consistency_level -concurrency $concurrency -numberOfOperations $number_of_operations -operation $operation -connectionMode $connection_mode -maxRunningTimeDuration $max_running_time_duration -numberOfPreCreatedDocuments $number_of_precreated_documents -bulkloadBatchSize $bulk_load_batch_size -printingInterval $printing_interval 2>&1 | tee -a "$log_filename"
else
java -Xmx8g -Xms8g $jvm_opt -Dcosmos.directModeProtocol=$protocol -Dazure.cosmos.directModeProtocol=$protocol -jar "$jar_file" -serviceEndpoint "$service_endpoint" -masterKey "$master_key" -databaseId "$database_name" -collectionId "$collection_name" -numberOfCollectionForCtl "$number_Of_collection" -throughput $throughput -consistencyLevel $consistency_level -concurrency $concurrency -numberOfOperations $number_of_operations -operation $operation -connectionMode $connection_mode -maxRunningTimeDuration $max_running_time_duration -graphiteEndpoint $ctl_graphite_endpoint -numberOfPreCreatedDocuments $number_of_precreated_documents -printingInterval $printing_interval -manageResources 2>&1 | tee -a "$log_filename"
java -Xmx8g -Xms8g $jvm_opt -Dcosmos.directModeProtocol=$protocol -Dazure.cosmos.directModeProtocol=$protocol -jar "$jar_file" -serviceEndpoint "$service_endpoint" -masterKey "$master_key" -databaseId "$database_name" -collectionId "$collection_name" -numberOfCollectionForCtl "$number_Of_collection" -throughput $throughput -consistencyLevel $consistency_level -concurrency $concurrency -numberOfOperations $number_of_operations -operation $operation -connectionMode $connection_mode -maxRunningTimeDuration $max_running_time_duration -graphiteEndpoint $ctl_graphite_endpoint -numberOfPreCreatedDocuments $number_of_precreated_documents -bulkloadBatchSize $bulk_load_batch_size -printingInterval $printing_interval 2>&1 | tee -a "$log_filename"
fi

end=`date +%s`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public class Configuration {
@Parameter(names = "-readWriteQueryPct", description = "Comma separated read write query workload percent")
private String readWriteQueryPct = "90,9,1";

@Parameter(names = "-manageResources", description = "Control switch for creating/deleting underlying test resources")
private boolean manageResources = false;
@Parameter(names = "-manageDatabase", description = "Control switch for creating/deleting underlying database resource")
private boolean manageDatabase = false;

@Parameter(names = "-operation", description = "Type of Workload:\n"
+ "\tReadThroughput- run a READ workload that prints only throughput *\n"
Expand Down Expand Up @@ -161,6 +161,9 @@ public Duration convert(String value) {
@Parameter(names = "-contentResponseOnWriteEnabled", description = "if set to false, does not returns content response on document write operations")
private String contentResponseOnWriteEnabled = String.valueOf(true);

@Parameter(names = "-bulkloadBatchSize", description = "Control the number of documents uploaded in each BulkExecutor load iteration (Only supported for the LinkedInCtlWorkload)")
private int bulkloadBatchSize = 200000;

@Parameter(names = {"-h", "-help", "--help"}, description = "Help", help = true)
private boolean help = false;

Expand Down Expand Up @@ -393,8 +396,12 @@ public String getReadWriteQueryPct() {
return this.readWriteQueryPct;
}

public boolean shouldManageResources() {
return this.manageResources;
public boolean shouldManageDatabase() {
return this.manageDatabase;
}

public int getBulkloadBatchSize() {
return this.bulkloadBatchSize;
}

public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,83 +10,69 @@
import com.azure.cosmos.benchmark.Configuration;
import com.azure.cosmos.benchmark.linkedin.data.CollectionAttributes;
import com.azure.cosmos.benchmark.linkedin.data.EntityConfiguration;
import com.azure.cosmos.benchmark.linkedin.impl.Constants;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.ThroughputProperties;
import com.google.common.base.Preconditions;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.azure.cosmos.benchmark.linkedin.impl.Constants.PARTITION_KEY_PATH;
import static com.azure.cosmos.models.ThroughputProperties.createManualThroughput;


public class ResourceManagerImpl implements ResourceManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ResourceManagerImpl.class);
/**
* Implementation for managing only the Collections for this test. This class facilitates
* container creation after the CTL environment has provisioned the database with the
* required throughput
*/
public class CollectionResourceManager implements ResourceManager {
private static final Logger LOGGER = LoggerFactory.getLogger(CollectionResourceManager.class);
private static final Duration RESOURCE_CRUD_WAIT_TIME = Duration.ofSeconds(30);

private final Configuration _configuration;
private final EntityConfiguration _entityConfiguration;
private final CosmosAsyncClient _client;

public ResourceManagerImpl(final Configuration configuration,
public CollectionResourceManager(final Configuration configuration,
final EntityConfiguration entityConfiguration,
final CosmosAsyncClient client) {
Preconditions.checkNotNull(configuration,
"The Workload configuration defining the parameters can not be null");
Preconditions.checkNotNull(entityConfiguration,
"The Test Entity specific configuration can not be null");
Preconditions.checkNotNull(client, "Need a non-null client for "
+ "setting up the Database and containers for the test");
+ "setting up the Database and collections for the test");
_configuration = configuration;
_entityConfiguration = entityConfiguration;
_client = client;
}

@Override
public void createDatabase() throws CosmosException {
try {
LOGGER.info("Creating database {} for the ctl workload if one doesn't exist", _configuration.getDatabaseId());
final ThroughputProperties throughputProperties = createManualThroughput(_configuration.getThroughput());
_client.createDatabaseIfNotExists(_configuration.getDatabaseId(), throughputProperties)
.block(RESOURCE_CRUD_WAIT_TIME);
} catch (CosmosException e) {
LOGGER.error("Exception while creating database {}", _configuration.getDatabaseId(), e);
throw e;
}

deleteExistingContainers();
}

@Override
public void createContainer() throws CosmosException {
public void createResources() throws CosmosException {
final String containerName = _configuration.getCollectionId();
final CosmosAsyncDatabase database = _client.getDatabase(_configuration.getDatabaseId());
final CollectionAttributes collectionAttributes = _entityConfiguration.collectionAttributes();
try {
LOGGER.info("Creating container {} in the database {}", containerName, database.getId());
final CosmosContainerProperties containerProperties =
new CosmosContainerProperties(containerName, PARTITION_KEY_PATH)
new CosmosContainerProperties(containerName, Constants.PARTITION_KEY_PATH)
.setIndexingPolicy(collectionAttributes.indexingPolicy());
database.createContainerIfNotExists(containerProperties)
.block(RESOURCE_CRUD_WAIT_TIME);
} catch (CosmosException e) {
LOGGER.error("Exception while creating container {}", containerName, e);
LOGGER.error("Exception while creating collection {}", containerName, e);
throw e;
}
}

@Override
public void deleteResources() {
// Delete all the containers in the database
deleteExistingContainers();
deleteExistingCollections();

LOGGER.info("Resource cleanup completed");
LOGGER.info("Collection resource cleanup completed");
}

private void deleteExistingContainers() {
private void deleteExistingCollections() {
final CosmosAsyncDatabase database = _client.getDatabase(_configuration.getDatabaseId());
final List<CosmosAsyncContainer> cosmosAsyncContainers = database.readAllContainers()
.byPage()
Expand All @@ -98,12 +84,12 @@ private void deleteExistingContainers() {

// Run a best effort attempt to delete all existing containers and data there-in
for (CosmosAsyncContainer cosmosAsyncContainer : cosmosAsyncContainers) {
LOGGER.info("Deleting container {} in the Database {}", cosmosAsyncContainer.getId(), _configuration.getDatabaseId());
LOGGER.info("Deleting collection {} in the Database {}", cosmosAsyncContainer.getId(), _configuration.getDatabaseId());
try {
cosmosAsyncContainer.delete()
.block(RESOURCE_CRUD_WAIT_TIME);
} catch (CosmosException e) {
LOGGER.error("Error deleting container {} in the Database {}",
LOGGER.error("Error deleting collection {} in the Database {}",
cosmosAsyncContainer.getId(), _configuration.getDatabaseId(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,32 @@
import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;


/**
* This class facilitates generating data in batches.
*/
public class DataGenerationIterator implements Iterator<Map<Key, ObjectNode>> {

public static final int BATCH_SIZE = 200000;

private final DataGenerator _dataGenerator;
private final int _totalRecordCount;
private int _totalDataGenerated;
private final int _batchLoadBatchSize;

/**
* @param dataGenerator The underlying DataGenerator capable of generating a batch of records
* @param recordCount Number of records we want to generate generate for this test.
* Actual data generation happens in pre-determined batch size
* @param batchLoadBatchSize The number of documents to generate, and load, in each BulkExecutor iteration
*/
public DataGenerationIterator(final DataGenerator dataGenerator, int recordCount) {
public DataGenerationIterator(final DataGenerator dataGenerator, int recordCount, int batchLoadBatchSize) {
Preconditions.checkArgument(recordCount > 0,
"The number of documents to generate must be greater than 0");
Preconditions.checkArgument(batchLoadBatchSize > 0,
"The number of documents to generate and load on each BulkExecutor load iteration must be greater than 0");
_dataGenerator = Preconditions.checkNotNull(dataGenerator,
"The underlying DataGenerator for this iterator can not be null");
_totalRecordCount = recordCount;
_batchLoadBatchSize = batchLoadBatchSize;
_totalDataGenerated = 0;
}

Expand All @@ -42,7 +45,7 @@ public boolean hasNext() {

@Override
public Map<Key, ObjectNode> next() {
final int recordsToGenerate = Math.min(BATCH_SIZE, _totalRecordCount - _totalDataGenerated);
final int recordsToGenerate = Math.min(_batchLoadBatchSize, _totalRecordCount - _totalDataGenerated);

// Filter Keys in case there are duplicates
final Map<Key, ObjectNode> newDocuments = _dataGenerator.generate(recordsToGenerate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
public class DataLoader {
private static final Logger LOGGER = LoggerFactory.getLogger(DataLoader.class);

private static final int MAX_BATCH_SIZE = 20000;
private static final int BULK_OPERATION_CONCURRENCY = 5;
private static final Duration BULK_LOAD_WAIT_DURATION = Duration.ofSeconds(120);
private static final int MAX_BATCH_SIZE = 10000;
private static final int BULK_OPERATION_CONCURRENCY = 10;
private static final Duration VALIDATE_DATA_WAIT_DURATION = Duration.ofSeconds(120);
private static final String COUNT_ALL_QUERY = "SELECT COUNT(1) FROM c";
private static final String COUNT_ALL_QUERY_RESULT_FIELD = "$1";

Expand All @@ -48,11 +48,13 @@ public DataLoader(final Configuration configuration,
_client = Preconditions.checkNotNull(client,
"The CosmosAsyncClient needed for data loading can not be null");
_dataGenerator = new DataGenerationIterator(entityConfiguration.dataGenerator(),
_configuration.getNumberOfPreCreatedDocuments());
_configuration.getNumberOfPreCreatedDocuments(),
_configuration.getBulkloadBatchSize());
}

public void loadData() {
LOGGER.info("Starting batched data loading, loading {} documents in each iteration", DataGenerationIterator.BATCH_SIZE);
LOGGER.info("Starting batched data loading, loading {} documents in each iteration",
_configuration.getBulkloadBatchSize());
while (_dataGenerator.hasNext()) {
final Map<Key, ObjectNode> newDocuments = _dataGenerator.next();
bulkCreateItems(newDocuments);
Expand All @@ -71,11 +73,14 @@ private void bulkCreateItems(final Map<Key, ObjectNode> records) {
database.getId(),
containerName);

// We want to wait longer depending on the number of documents in each iteration
final Duration blockingWaitTime = Duration.ofSeconds(120 *
(((_configuration.getBulkloadBatchSize() - 1) / 200000) + 1));
final BulkProcessingOptions<Object> bulkProcessingOptions = new BulkProcessingOptions<>(Object.class);
bulkProcessingOptions.setMaxMicroBatchSize(MAX_BATCH_SIZE)
.setMaxMicroBatchConcurrency(BULK_OPERATION_CONCURRENCY);
container.processBulkOperations(Flux.fromIterable(cosmosItemOperations), bulkProcessingOptions)
.blockLast(BULK_LOAD_WAIT_DURATION);
.blockLast(blockingWaitTime);

LOGGER.info("Completed loading {} documents into [{}:{}]", cosmosItemOperations.size(),
database.getId(),
Expand All @@ -93,7 +98,7 @@ private void validateDataCreation(int expectedSize) {
.queryItems(COUNT_ALL_QUERY, ObjectNode.class)
.byPage()
.collectList()
.block(BULK_LOAD_WAIT_DURATION);
.block(VALIDATE_DATA_WAIT_DURATION);
final int resultCount = Optional.ofNullable(queryItemsResponseList)
.map(responseList -> responseList.get(0))
.map(FeedResponse::getResults)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.benchmark.linkedin;

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.benchmark.Configuration;
import com.azure.cosmos.benchmark.linkedin.data.EntityConfiguration;
import com.azure.cosmos.models.ThroughputProperties;
import com.google.common.base.Preconditions;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* For local testing, the database creation needs to happen as part of the Test setup. This class
* manages the database AND collection setup, and useful for ensuring database and other resources
* and not left unused after local testing
*/
public class DatabaseResourceManager implements ResourceManager {

private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseResourceManager.class);
private static final Duration RESOURCE_CRUD_WAIT_TIME = Duration.ofSeconds(30);

private final Configuration _configuration;
private final CosmosAsyncClient _client;
private final CollectionResourceManager _collectionResourceManager;

public DatabaseResourceManager(final Configuration configuration,
final EntityConfiguration entityConfiguration,
final CosmosAsyncClient client) {
Preconditions.checkNotNull(configuration,
"The Workload configuration defining the parameters can not be null");
Preconditions.checkNotNull(entityConfiguration,
"The Test Entity specific configuration can not be null");
Preconditions.checkNotNull(client, "Need a non-null client for "
+ "setting up the Database and collections for the test");
_configuration = configuration;
_client = client;
_collectionResourceManager = new CollectionResourceManager(_configuration, entityConfiguration, _client);
}

@Override
public void createResources() throws CosmosException {
try {
LOGGER.info("Creating database {} for the ctl workload if one doesn't exist", _configuration.getDatabaseId());
final ThroughputProperties throughputProperties =
ThroughputProperties.createManualThroughput(_configuration.getThroughput());
_client.createDatabaseIfNotExists(_configuration.getDatabaseId(), throughputProperties)
.block(RESOURCE_CRUD_WAIT_TIME);
} catch (CosmosException e) {
LOGGER.error("Exception while creating database {}", _configuration.getDatabaseId(), e);
throw e;
}

// Delete any existing collections/containers in this database
_collectionResourceManager.deleteResources();

// And recreate the collections for this test
_collectionResourceManager.createResources();
}

@Override
public void deleteResources() {
// Followed by the main database used for testing
final CosmosAsyncDatabase database = _client.getDatabase(_configuration.getDatabaseId());
try {
LOGGER.info("Deleting the main database {} used in this test. Collection", _configuration.getDatabaseId());
database.delete()
.block(RESOURCE_CRUD_WAIT_TIME);
} catch (CosmosException e) {
LOGGER.error("Exception deleting the database {}", _configuration.getDatabaseId(), e);
throw e;
}

LOGGER.info("Database resource cleanup completed");
}
}
Loading