Skip to content

Commit

Permalink
Setting the IndexingPolicy (#19540)
Browse files Browse the repository at this point in the history
* Setting the IndexingPolicy

* Use longs for holding partitioningKey and ids instead of Strings

* Revert "Use longs for holding partitioningKey and ids instead of Strings"

This reverts commit 3178e6d.

* Refactoring the data generation to not cache the generated keys

* Avoid the map recreation

Co-authored-by: Amar Athavale <[email protected]>
  • Loading branch information
amarathavale and Amar Athavale authored Mar 3, 2021
1 parent a317d24 commit 7b342e3
Show file tree
Hide file tree
Showing 18 changed files with 308 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,36 @@

package com.azure.cosmos.benchmark.linkedin;

import com.azure.cosmos.benchmark.linkedin.data.InvitationDataGenerator;
import com.azure.cosmos.benchmark.linkedin.data.DataGenerator;
import com.azure.cosmos.benchmark.linkedin.data.Key;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.Collections;
import java.util.HashSet;
import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;


/**
* This class facilitates generating data in batches.
*/
public class DataGenerator implements Iterator<Map<Key, ObjectNode>> {
public class DataGenerationIterator implements Iterator<Map<Key, ObjectNode>> {

public static final int BATCH_SIZE = 200000;

private final InvitationDataGenerator _dataGenerator;
private final DataGenerator _dataGenerator;
private final int _totalRecordCount;
private int _totalDataGenerated;
private final Set<Key> _generatedKeys;

/**
* @param recordCount Number of records we want to generate generate for this test
* @param dataGenerator The underlying DataGenerator capable of generating a batch of records
* @param recordCount Number of records we want to generate generate for this test.
* Actual data generation happens in pre-determined batch size
*/
public DataGenerator(int recordCount) {
_dataGenerator = new InvitationDataGenerator(recordCount);
public DataGenerationIterator(final DataGenerator dataGenerator, int recordCount) {
_dataGenerator = Preconditions.checkNotNull(dataGenerator,
"The underlying DataGenerator for this iterator can not be null");
_totalRecordCount = recordCount;
_totalDataGenerated = 0;
_generatedKeys = new HashSet<>();
}

@Override
Expand All @@ -47,21 +45,8 @@ public Map<Key, ObjectNode> next() {
final int recordsToGenerate = Math.min(BATCH_SIZE, _totalRecordCount - _totalDataGenerated);

// Filter Keys in case there are duplicates
final Map<Key, ObjectNode> newDocuments = _dataGenerator.generate(recordsToGenerate)
.entrySet()
.stream()
.filter(keyObjectNodeEntry -> !_generatedKeys.contains(keyObjectNodeEntry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

_generatedKeys.addAll(newDocuments.keySet());
final Map<Key, ObjectNode> newDocuments = _dataGenerator.generate(recordsToGenerate);
_totalDataGenerated += newDocuments.size();
return newDocuments;
}

/**
* @return Set of Keys representing each document's id and partitioningKey
*/
public Set<Key> getGeneratedKeys() {
return Collections.unmodifiableSet(_generatedKeys);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosItemOperation;
import com.azure.cosmos.benchmark.Configuration;
import com.azure.cosmos.benchmark.linkedin.data.EntityConfiguration;
import com.azure.cosmos.benchmark.linkedin.data.Key;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.PartitionKey;
Expand All @@ -19,7 +20,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,33 +29,37 @@
public class DataLoader {
private static final Logger LOGGER = LoggerFactory.getLogger(DataLoader.class);

private static final int MAX_BATCH_SIZE = 10000;
private static final int MAX_BATCH_SIZE = 20000;
private static final int BULK_OPERATION_CONCURRENCY = 5;
private static final Duration BULK_LOAD_WAIT_DURATION = Duration.ofSeconds(120);
private static final String COUNT_ALL_QUERY = "SELECT COUNT(1) FROM c";
private static final String COUNT_ALL_QUERY_RESULT_FIELD = "$1";

private final Configuration _configuration;
private final CosmosAsyncClient _client;
private final DataGenerator _dataGenerator;
private final DataGenerationIterator _dataGenerator;

public DataLoader(final Configuration configuration, final CosmosAsyncClient client) {
public DataLoader(final Configuration configuration,
final EntityConfiguration entityConfiguration,
final CosmosAsyncClient client) {
Preconditions.checkNotNull(entityConfiguration, "The test entity configuration can not be null");
_configuration = Preconditions.checkNotNull(configuration,
"The Workload configuration defining the parameters can not be null");
_client = Preconditions.checkNotNull(client,
"The CosmosAsyncClient needed for data loading can not be null");
_dataGenerator = new DataGenerator(_configuration.getNumberOfPreCreatedDocuments());
_dataGenerator = new DataGenerationIterator(entityConfiguration.dataGenerator(),
_configuration.getNumberOfPreCreatedDocuments());
}

public void loadData() {
LOGGER.info("Starting batched data loading, loading {} documents in each iteration", DataGenerator.BATCH_SIZE);
LOGGER.info("Starting batched data loading, loading {} documents in each iteration", DataGenerationIterator.BATCH_SIZE);
while (_dataGenerator.hasNext()) {
final Map<Key, ObjectNode> newDocuments = _dataGenerator.next();
bulkCreateItems(newDocuments);
newDocuments.clear();
}

validateDataCreation(_dataGenerator.getGeneratedKeys().size());
validateDataCreation(_configuration.getNumberOfPreCreatedDocuments());
}

private void bulkCreateItems(final Map<Key, ObjectNode> records) {
Expand Down Expand Up @@ -102,6 +106,9 @@ private void validateDataCreation(int expectedSize) {
String.format("Number of documents %d in the container %s is less than the expected threshold %f ",
resultCount, containerName, (expectedSize * 0.90)));
}

LOGGER.info("Validated {} out of the {} expected documents were loaded into [{}:{}]",
resultCount, expectedSize, _configuration.getDatabaseId(), containerName);
}

/**
Expand All @@ -120,11 +127,4 @@ private List<CosmosItemOperation> mapToCosmosItemOperation(final Map<Key, Object
})
.collect(Collectors.toList());
}

/**
* @return Set of Keys representing each document loaded into the test collection
*/
public Set<Key> getLoadedDataKeys() {
return _dataGenerator.getGeneratedKeys();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.benchmark.BenchmarkHelper;
import com.azure.cosmos.benchmark.Configuration;
import com.azure.cosmos.benchmark.linkedin.data.EntityConfiguration;
import com.azure.cosmos.benchmark.linkedin.data.Key;
import com.azure.cosmos.benchmark.linkedin.data.KeyGenerator;
import com.azure.cosmos.benchmark.linkedin.impl.Accessor;
import com.azure.cosmos.benchmark.linkedin.impl.CosmosDBDataAccessor;
import com.azure.cosmos.benchmark.linkedin.impl.DocumentTransformer;
Expand All @@ -28,9 +30,6 @@
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
Expand All @@ -53,6 +52,7 @@ public class GetTestRunner {
private static final Duration TERMINATION_WAIT_DURATION = Duration.ofSeconds(60);

private final Configuration _configuration;
private final EntityConfiguration _entityConfiguration;
private final Accessor<Key, JsonNode> _accessor;
private final ExecutorService _executorService;
private final AtomicLong _successCount;
Expand All @@ -61,30 +61,35 @@ public class GetTestRunner {

GetTestRunner(final Configuration configuration,
final CosmosAsyncClient client,
final MetricRegistry metricsRegistry) {
final MetricRegistry metricsRegistry,
final EntityConfiguration entityConfiguration) {
Preconditions.checkNotNull(configuration,
"The Workload configuration defining the parameters can not be null");
Preconditions.checkNotNull(client,
"Need a non-null client for setting up the Database and containers for the test");
Preconditions.checkNotNull(metricsRegistry,
"The MetricsRegistry can not be null");
Preconditions.checkNotNull(entityConfiguration,
"The Test entity configuration can not be null");

_configuration = configuration;
_entityConfiguration = entityConfiguration;
_accessor = createAccessor(configuration, client, metricsRegistry);
_executorService = Executors.newFixedThreadPool(configuration.getConcurrency());
_successCount = new AtomicLong(0);
_errorCount = new AtomicLong(0);
_semaphore = new Semaphore(configuration.getConcurrency());
}

public void run(final Set<Key> testKeys) {
final ArrayList<Key> keys = new ArrayList<>(testKeys);
Collections.shuffle(keys);
public void run() {
KeyGenerator keyGenerator = getNewKeyGenerator();
final long runStartTime = System.currentTimeMillis();
long i = 0;
for (; BenchmarkHelper.shouldContinue(runStartTime, i, _configuration); i++) {
int index = (int) ((i % keys.size()) % Integer.MAX_VALUE);
final Key key = keys.get(index);
if (i > _configuration.getNumberOfPreCreatedDocuments()) {
keyGenerator = getNewKeyGenerator();
}
final Key key = keyGenerator.key();
try {
_semaphore.acquire();
} catch (InterruptedException e) {
Expand Down Expand Up @@ -149,4 +154,8 @@ private StaticDataLocator createDataLocator(Configuration configuration, CosmosA
final CosmosAsyncContainer container = database.getContainer(configuration.getCollectionId());
return new StaticDataLocator(collectionKey, container);
}

private KeyGenerator getNewKeyGenerator() {
return _entityConfiguration.keyGenerator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.benchmark.Configuration;
import com.azure.cosmos.benchmark.ScheduledReporterFactory;
import com.azure.cosmos.benchmark.linkedin.data.EntityConfiguration;
import com.azure.cosmos.benchmark.linkedin.data.InvitationsEntityConfiguration;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;
import com.google.common.base.Preconditions;
Expand All @@ -19,6 +21,7 @@ public class LICtlWorkload {
private static final Logger LOGGER = LoggerFactory.getLogger(LICtlWorkload.class);

private final Configuration _configuration;
private final EntityConfiguration _entityConfiguration;
private final CosmosAsyncClient _client;
private final CosmosAsyncClient _bulkLoadClient;
private final MetricRegistry _metricsRegistry;
Expand All @@ -31,15 +34,16 @@ public LICtlWorkload(final Configuration configuration) {
Preconditions.checkNotNull(configuration, "The Workload configuration defining the parameters can not be null");

_configuration = configuration;
_entityConfiguration = new InvitationsEntityConfiguration(configuration);
_client = AsyncClientFactory.buildAsyncClient(configuration);
_bulkLoadClient = AsyncClientFactory.buildBulkLoadAsyncClient(configuration);
_metricsRegistry = new MetricRegistry();
_reporter = ScheduledReporterFactory.create(_configuration, _metricsRegistry);
_resourceManager = _configuration.shouldManageResources()
? new ResourceManagerImpl(_configuration, _client)
? new ResourceManagerImpl(_configuration, _entityConfiguration, _client)
: new NoopResourceManagerImpl();
_dataLoader = new DataLoader(_configuration, _bulkLoadClient);
_getTestRunner = new GetTestRunner(_configuration, _client, _metricsRegistry);
_dataLoader = new DataLoader(_configuration, _entityConfiguration, _bulkLoadClient);
_getTestRunner = new GetTestRunner(_configuration, _client, _metricsRegistry, _entityConfiguration);
}

public void setup() throws CosmosException {
Expand All @@ -58,7 +62,7 @@ public void run() {
LOGGER.info("Executing the Get test");
_reporter.start(_configuration.getPrintingInterval(), TimeUnit.SECONDS);

_getTestRunner.run(_dataLoader.getLoadedDataKeys());
_getTestRunner.run();

_reporter.report();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import com.azure.cosmos.CosmosAsyncDatabase;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.benchmark.Configuration;
import com.azure.cosmos.benchmark.linkedin.data.CollectionAttributes;
import com.azure.cosmos.benchmark.linkedin.data.EntityConfiguration;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.ThroughputProperties;
import com.google.common.base.Preconditions;
Expand All @@ -26,14 +28,20 @@ public class ResourceManagerImpl implements ResourceManager {
private static final Duration RESOURCE_CRUD_WAIT_TIME = Duration.ofSeconds(30);

private final Configuration _configuration;
private final EntityConfiguration _entityConfiguration;
private final CosmosAsyncClient _client;

public ResourceManagerImpl(final Configuration configuration, final CosmosAsyncClient client) {
public ResourceManagerImpl(final Configuration configuration,
final EntityConfiguration entityConfiguration,
final CosmosAsyncClient client) {
Preconditions.checkNotNull(configuration,
"The Workload configuration defining the parameters can not be null");
Preconditions.checkNotNull(entityConfiguration,
"The Test Entity specific configuration can not be null");
Preconditions.checkNotNull(client, "Need a non-null client for "
+ "setting up the Database and containers for the test");
_configuration = configuration;
_entityConfiguration = entityConfiguration;
_client = client;
}

Expand All @@ -56,10 +64,12 @@ public void createDatabase() throws CosmosException {
public void createContainer() throws CosmosException {
final String containerName = _configuration.getCollectionId();
final CosmosAsyncDatabase database = _client.getDatabase(_configuration.getDatabaseId());
final CollectionAttributes collectionAttributes = _entityConfiguration.collectionAttributes();
try {
LOGGER.info("Creating container {} in the database {}", containerName, database.getId());
final CosmosContainerProperties containerProperties =
new CosmosContainerProperties(containerName, PARTITION_KEY_PATH);
new CosmosContainerProperties(containerName, PARTITION_KEY_PATH)
.setIndexingPolicy(collectionAttributes.indexingPolicy());
database.createContainerIfNotExists(containerProperties)
.block(RESOURCE_CRUD_WAIT_TIME);
} catch (CosmosException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.benchmark.linkedin.data;

import com.azure.cosmos.models.IndexingPolicy;


public interface CollectionAttributes {

/**
* @return IndexingPolicy definition for a collection used to store a specific entity type
*/
IndexingPolicy indexingPolicy();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.benchmark.linkedin.data;

import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.Map;


public interface DataGenerator {
/**
* Generates the desired batch of records for a specific entity
*
* @param recordCount Number of records we want to create in this invocation
* @return Map containing desired count of record key to value entries
*/
Map<Key, ObjectNode> generate(int recordCount);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.benchmark.linkedin.data;

/**
* Interface for modeling the configurations for each entity, allowing the same implementation
* to be leveraged for different use-cases
*/
public interface EntityConfiguration {

/**
* @return KeyGenerator for this entity
*/
KeyGenerator keyGenerator();

/**
* @return Data Generator for this entity, which facilitate generating documents conforming to this
* entities schema
*/
DataGenerator dataGenerator();

/**
* @return The configuration for the underlying collection used to store this entity's data
*/
CollectionAttributes collectionAttributes();
}
Loading

0 comments on commit 7b342e3

Please sign in to comment.