rootDirectory;
@@ -117,18 +117,18 @@ public final class BoundedContextBuilder implements Logging {
}
/**
- * Creates a new builder with the given spec and system features.
+ * Creates a new builder with the given spec and system settings.
*
* @param spec
* the context spec for the built context
- * @param systemFeatures
- * system feature flags; can be changed later via {@link #systemFeatures()}
+ * @param systemSettings
+ * settings of the System context; may be changed later via {@link #systemSettings()}
* @see BoundedContext#singleTenant
* @see BoundedContext#multitenant
*/
- private BoundedContextBuilder(ContextSpec spec, SystemSettings systemFeatures) {
+ private BoundedContextBuilder(ContextSpec spec, SystemSettings systemSettings) {
this.spec = checkNotNull(spec);
- this.systemFeatures = checkNotNull(systemFeatures);
+ this.systemSettings = checkNotNull(systemSettings);
}
/**
@@ -515,14 +515,12 @@ AggregateRootDirectory aggregateRootDirectory() {
}
/**
- * Obtains the system context feature configuration.
+ * Obtains the configuration of the System context.
*
- * Users may enable or disable some features of the system context.
- *
- * @see SystemSettings
+ *
With it, users are able to change the behavior of the system context.
*/
- public SystemSettings systemFeatures() {
- return systemFeatures;
+ public SystemSettings systemSettings() {
+ return systemSettings;
}
/**
@@ -590,7 +588,7 @@ private BoundedContext buildDomain(SystemContext system) {
}
private SystemContext buildSystem() {
- BoundedContextBuilder system = new BoundedContextBuilder(systemSpec(), systemFeatures);
+ BoundedContextBuilder system = new BoundedContextBuilder(systemSpec(), systemSettings);
Optional extends TenantIndex> tenantIndex = tenantIndex();
tenantIndex.ifPresent(system::setTenantIndex);
SystemContext result =
@@ -600,7 +598,7 @@ private SystemContext buildSystem() {
private ContextSpec systemSpec() {
ContextSpec systemSpec = this.spec.toSystem();
- if (!systemFeatures.includePersistentEvents()) {
+ if (!systemSettings.includePersistentEvents()) {
systemSpec = systemSpec.notStoringEvents();
}
return systemSpec;
diff --git a/server/src/main/java/io/spine/server/event/model/EventReactorClass.java b/server/src/main/java/io/spine/server/event/model/EventReactorClass.java
index 77817a550bb..e965b921cb4 100644
--- a/server/src/main/java/io/spine/server/event/model/EventReactorClass.java
+++ b/server/src/main/java/io/spine/server/event/model/EventReactorClass.java
@@ -67,7 +67,7 @@ private EventReactorClass(Class extends S> cls) {
@SuppressWarnings("unchecked")
EventReactorClass result = (EventReactorClass)
get(cls, EventReactorClass.class, () -> new EventReactorClass<>(cls));
- return (result);
+ return result;
}
@Override
diff --git a/server/src/main/java/io/spine/server/integration/ThirdPartyContext.java b/server/src/main/java/io/spine/server/integration/ThirdPartyContext.java
index ef7d0e31d96..dea1801b0e4 100644
--- a/server/src/main/java/io/spine/server/integration/ThirdPartyContext.java
+++ b/server/src/main/java/io/spine/server/integration/ThirdPartyContext.java
@@ -90,7 +90,7 @@ private static ThirdPartyContext newContext(String name, boolean multitenant) {
BoundedContextBuilder contextBuilder = multitenant
? BoundedContext.multitenant(name)
: BoundedContext.singleTenant(name);
- contextBuilder.systemFeatures()
+ contextBuilder.systemSettings()
.disableCommandLog()
.disableAggregateQuerying()
.forgetEvents();
diff --git a/server/src/main/java/io/spine/system/server/DefaultSystemWriteSide.java b/server/src/main/java/io/spine/system/server/DefaultSystemWriteSide.java
index 62f7f98e24e..7fa3709fce7 100644
--- a/server/src/main/java/io/spine/system/server/DefaultSystemWriteSide.java
+++ b/server/src/main/java/io/spine/system/server/DefaultSystemWriteSide.java
@@ -35,7 +35,6 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static io.spine.grpc.StreamObservers.noOpObserver;
import static io.spine.system.server.SystemEventFactory.forMessage;
-import static java.util.concurrent.ForkJoinPool.commonPool;
/**
* The default implementation of {@link SystemWriteSide}.
@@ -61,9 +60,12 @@ final class DefaultSystemWriteSide implements SystemWriteSide {
public Event postEvent(EventMessage systemEvent, Origin origin) {
checkNotNull(systemEvent);
checkNotNull(origin);
+
Event event = event(systemEvent, origin);
- if (system.config().postEventsInParallel()) {
- commonPool().execute(() -> postEvent(event));
+ SystemConfig config = system.config();
+
+ if (config.postEventsInParallel()) {
+ config.postingExecutor().execute(() -> postEvent(event));
} else {
postEvent(event);
}
diff --git a/server/src/main/java/io/spine/system/server/SystemConfig.java b/server/src/main/java/io/spine/system/server/SystemConfig.java
index a8635976947..bbbb250494b 100644
--- a/server/src/main/java/io/spine/system/server/SystemConfig.java
+++ b/server/src/main/java/io/spine/system/server/SystemConfig.java
@@ -26,8 +26,14 @@
package io.spine.system.server;
-import com.google.common.base.Objects;
-import com.google.errorprone.annotations.Immutable;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.hash;
+import static java.util.Objects.nonNull;
/**
* An immutable set of features of a {@link SystemContext}.
@@ -38,16 +44,16 @@ final class SystemConfig implements SystemFeatures {
private final boolean commandLog;
private final boolean aggregateMirrors;
private final boolean storeEvents;
- private final boolean parallelPosting;
+ private final @Nullable Executor postingExecutor;
SystemConfig(boolean commandLog,
boolean aggregateMirrors,
boolean storeEvents,
- boolean parallelPosting) {
+ @Nullable Executor postingExecutor) {
this.commandLog = commandLog;
this.aggregateMirrors = aggregateMirrors;
this.storeEvents = storeEvents;
- this.parallelPosting = parallelPosting;
+ this.postingExecutor = postingExecutor;
}
@Override
@@ -67,7 +73,20 @@ public boolean includePersistentEvents() {
@Override
public boolean postEventsInParallel() {
- return parallelPosting;
+ return nonNull(postingExecutor);
+ }
+
+ /**
+ * Returns an {@code Executor} to be used to post system events in parallel.
+ *
+ *
Before calling this method, make sure parallel posting
+ * {@linkplain #postEventsInParallel() is enabled}.
+ *
+ * @throws IllegalStateException if parallel posting of system events is disabled
+ */
+ Executor postingExecutor() {
+ checkState(postEventsInParallel());
+ return postingExecutor;
}
@SuppressWarnings("OverlyComplexBooleanExpression")
@@ -83,11 +102,11 @@ public boolean equals(Object o) {
return commandLog == config.commandLog &&
aggregateMirrors == config.aggregateMirrors &&
storeEvents == config.storeEvents &&
- parallelPosting == config.parallelPosting;
+ Objects.equals(postingExecutor, config.postingExecutor);
}
@Override
public int hashCode() {
- return Objects.hashCode(commandLog, aggregateMirrors, storeEvents, parallelPosting);
+ return hash(commandLog, aggregateMirrors, storeEvents);
}
}
diff --git a/server/src/main/java/io/spine/system/server/SystemContext.java b/server/src/main/java/io/spine/system/server/SystemContext.java
index dd63115cd95..4b6caceb71c 100644
--- a/server/src/main/java/io/spine/system/server/SystemContext.java
+++ b/server/src/main/java/io/spine/system/server/SystemContext.java
@@ -60,7 +60,7 @@ public final class SystemContext extends BoundedContext {
private SystemContext(BoundedContextBuilder builder) {
super(builder);
- this.config = builder.systemFeatures()
+ this.config = builder.systemSettings()
.freeze();
}
diff --git a/server/src/main/java/io/spine/system/server/SystemFeatures.java b/server/src/main/java/io/spine/system/server/SystemFeatures.java
index 91d2ec3338d..d098420e462 100644
--- a/server/src/main/java/io/spine/system/server/SystemFeatures.java
+++ b/server/src/main/java/io/spine/system/server/SystemFeatures.java
@@ -51,10 +51,11 @@ interface SystemFeatures {
* @return {@code true} if system events should be stored, {@code false} otherwise
*/
boolean includePersistentEvents();
+
/**
* Checks if the system events are allowed to be posted in parallel.
*
- * @return {@code true} if it's OK to post system event is parallel, {@code false} otherwise
+ * @return {@code true} if it's OK to post system events in parallel, {@code false} otherwise
*/
boolean postEventsInParallel();
}
diff --git a/server/src/main/java/io/spine/system/server/SystemSettings.java b/server/src/main/java/io/spine/system/server/SystemSettings.java
index 093df3cacc5..e39206d49b2 100644
--- a/server/src/main/java/io/spine/system/server/SystemSettings.java
+++ b/server/src/main/java/io/spine/system/server/SystemSettings.java
@@ -32,6 +32,13 @@
import io.spine.base.Environment;
import io.spine.base.Tests;
+import javax.annotation.Nullable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
/**
* A configuration of features of a system context.
*
@@ -40,6 +47,26 @@
*/
public final class SystemSettings implements SystemFeatures {
+ /**
+ * A default executor for parallel posting of system events.
+ *
+ *
This one is used when a custom executor is not provided.
+ */
+ private static final Executor defaultExecutor = ForkJoinPool.commonPool();
+
+ /**
+ * An executor for parallel posting of system events.
+ *
+ *
This property can have the following values:
+ *
+ *
+ * - {@code null}, when parallel posting of system events is disabled.
+ *
- The {@linkplain #defaultExecutor default} executor.
+ *
- A {@linkplain #useCustomPostingExecutor(Executor) user-provided} executor.
+ *
+ */
+ private @Nullable Executor postingExecutor;
+
private boolean commandLog;
private boolean aggregateMirrors;
private boolean storeEvents;
@@ -89,7 +116,7 @@ public SystemSettings enableCommandLog() {
}
/**
- * Disables {@linkplain io.spine.system.server.CommandLog CommandLog}.
+ * Disables {@link io.spine.system.server.CommandLog CommandLog}.
*
* This is the default setting.
*
@@ -106,7 +133,7 @@ public SystemSettings disableCommandLog() {
* Enables querying of the latest domain {@code Aggregate} states.
*
*
The system context stores domain {@code Aggregate} states in the form of
- * {@link io.spine.system.server.Mirror} projections.
+ * {@link io.spine.system.server.Mirror Mirror} projections.
*
*
This is the default setting.
*
@@ -132,54 +159,100 @@ public SystemSettings disableAggregateQuerying() {
}
/**
- * Configures the the system context to store system events.
+ * Configures the system context to store system events.
*
* @return self for method chaining
+ * @see #forgetEvents()
*/
+ @CanIgnoreReturnValue
public SystemSettings persistEvents() {
this.storeEvents = true;
return this;
}
/**
- * Configures the the system context NOT to store system events for better performance.
+ * Configures the system context NOT to store system events for better performance.
*
*
This is the default setting.
*
* @return self for method chaining
+ * @see #persistEvents()
*/
+ @CanIgnoreReturnValue
public SystemSettings forgetEvents() {
this.storeEvents = false;
return this;
}
/**
- * Configures the system context clients to post system events in parallel.
+ * Configures the system context to post system events in parallel.
*
- *
The events are posted using {@link java.util.concurrent.ForkJoinPool#commonPool()}.
+ *
The events are posted using the {@link ForkJoinPool#commonPool() common pool}.
*
*
This is the default setting in production environment.
*
* @return self for method chaining
+ * @see #disableParallelPosting()
*/
@CanIgnoreReturnValue
public SystemSettings enableParallelPosting() {
this.parallelPosting = true;
+ this.postingExecutor = defaultExecutor;
return this;
}
/**
- * Configures the system context clients NOT to post system events in parallel.
+ * Configures the system context NOT to post system events in parallel.
*
- *
Choosing this configuration option may effect performance.
+ *
Choosing this configuration option may affect performance.
*
*
This is the default setting in test environment.
*
* @return self for method chaining
+ * @see #enableParallelPosting()
*/
@CanIgnoreReturnValue
public SystemSettings disableParallelPosting() {
this.parallelPosting = false;
+ this.postingExecutor = null;
+ return this;
+ }
+
+ /**
+ * Configures the system context to post system events using the given {@code Executor}.
+ *
+ *
Please note, this setting can be configured only if parallel posting of events
+ * {@linkplain #postEventsInParallel() is enabled}.
+ *
+ * @return self for method chaining
+ *
+ * @see #enableParallelPosting()
+ * @see #useDefaultPostingExecutor()
+ */
+ @CanIgnoreReturnValue
+ public SystemSettings useCustomPostingExecutor(Executor executor) {
+ checkNotNull(executor);
+ checkState(parallelPosting);
+ this.postingExecutor = executor;
+ return this;
+ }
+
+ /**
+ * Configures the system context to post system events using
+ * the {@link ForkJoinPool#commonPool() common pool}.
+ *
+ *
Please note, this setting can be configured only if parallel posting of events
+ * {@linkplain #postEventsInParallel() is enabled}.
+ *
+ * @return self for method chaining
+ *
+ * @see #enableParallelPosting()
+ * @see #useCustomPostingExecutor(Executor)
+ */
+ @CanIgnoreReturnValue
+ public SystemSettings useDefaultPostingExecutor() {
+ checkState(parallelPosting);
+ this.postingExecutor = defaultExecutor;
return this;
}
@@ -211,7 +284,7 @@ public boolean postEventsInParallel() {
* Copies these settings into an immutable feature set.
*/
SystemConfig freeze() {
- return new SystemConfig(commandLog, aggregateMirrors, storeEvents, parallelPosting);
+ return new SystemConfig(commandLog, aggregateMirrors, storeEvents, postingExecutor);
}
@SuppressWarnings({"OverlyComplexBooleanExpression", "NonFinalFieldReferenceInEquals"})
diff --git a/server/src/test/java/io/spine/server/aggregate/AggregateRepositoryTest.java b/server/src/test/java/io/spine/server/aggregate/AggregateRepositoryTest.java
index 4571e56d82f..c535ec1a010 100644
--- a/server/src/test/java/io/spine/server/aggregate/AggregateRepositoryTest.java
+++ b/server/src/test/java/io/spine/server/aggregate/AggregateRepositoryTest.java
@@ -763,7 +763,7 @@ void ifVisibilityIsNone() {
@DisplayName("if aggregate querying is disabled")
void ifAggregateQueryingDisabled() {
BoundedContextBuilder builder = BoundedContextBuilder.assumingTests();
- builder.systemFeatures()
+ builder.systemSettings()
.disableAggregateQuerying();
BoundedContext context = builder.add(ProjectAggregate.class)
.build();
diff --git a/server/src/test/java/io/spine/system/server/ScheduledCommandTest.java b/server/src/test/java/io/spine/system/server/ScheduledCommandTest.java
index ed866bba463..96f0a9ac3c0 100644
--- a/server/src/test/java/io/spine/system/server/ScheduledCommandTest.java
+++ b/server/src/test/java/io/spine/system/server/ScheduledCommandTest.java
@@ -76,7 +76,7 @@ void setUp() {
ServerEnvironment.instance()
.scheduleCommandsUsing(() -> scheduler);
BoundedContextBuilder contextBuilder = BoundedContextBuilder.assumingTests();
- contextBuilder.systemFeatures()
+ contextBuilder.systemSettings()
.enableCommandLog();
context = contextBuilder.build();
context.internalAccess()
diff --git a/server/src/test/java/io/spine/system/server/SystemContextFeaturesTest.java b/server/src/test/java/io/spine/system/server/SystemContextFeaturesTest.java
deleted file mode 100644
index f7e927e9a0e..00000000000
--- a/server/src/test/java/io/spine/system/server/SystemContextFeaturesTest.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Copyright 2021, TeamDev. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Redistribution and use in source and/or binary forms, with or without
- * modification, must retain the above copyright notice and the following
- * disclaimer.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package io.spine.system.server;
-
-import com.google.protobuf.Empty;
-import com.google.protobuf.StringValue;
-import io.spine.base.Identifier;
-import io.spine.core.CommandId;
-import io.spine.core.Event;
-import io.spine.core.MessageId;
-import io.spine.grpc.MemoizingObserver;
-import io.spine.server.BoundedContext;
-import io.spine.server.BoundedContextBuilder;
-import io.spine.server.event.EventBus;
-import io.spine.server.event.EventFilter;
-import io.spine.server.event.EventStreamQuery;
-import io.spine.system.server.event.EntityCreated;
-import io.spine.system.server.event.EntityStateChanged;
-import io.spine.system.server.given.entity.HistoryEventWatcher;
-import io.spine.testing.server.TestEventFactory;
-import io.spine.type.TypeUrl;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-
-import static com.google.common.truth.Truth.assertThat;
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-import static io.spine.base.Identifier.newUuid;
-import static io.spine.grpc.StreamObservers.memoizingObserver;
-import static io.spine.option.EntityOption.Kind.ENTITY;
-import static io.spine.protobuf.AnyPacker.pack;
-import static io.spine.system.server.SystemBoundedContexts.systemOf;
-import static java.time.Duration.ofSeconds;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-@DisplayName("System Bounded Context should")
-class SystemContextFeaturesTest {
-
- private static final TestEventFactory events =
- TestEventFactory.newInstance(SystemContextFeaturesTest.class);
-
- @Test
- @DisplayName("not store events by default")
- void notStoreEvents() {
- BoundedContext domain = BoundedContextBuilder
- .assumingTests()
- .build();
- BoundedContext system = systemOf(domain);
- Event event = createEvent();
- MemoizingObserver observer = postSystemEvent(system.eventBus(), event);
- assertTrue(observer.isCompleted());
- assertThat(observer.responses()).isEmpty();
- }
-
- @Test
- @DisplayName("store events if required")
- void storeEvents() {
- BoundedContextBuilder contextBuilder = BoundedContextBuilder.assumingTests();
- contextBuilder.systemFeatures()
- .persistEvents();
- BoundedContext domain = contextBuilder.build();
- BoundedContext system = systemOf(domain);
- Event event = createEvent();
- MemoizingObserver observer = postSystemEvent(system.eventBus(), event);
- assertTrue(observer.isCompleted());
- assertThat(observer.responses()).containsExactly(event);
- }
-
- @Test
- @DisplayName("not store domain commands")
- void notStoreDomainCommands() {
- BoundedContext domain = BoundedContextBuilder
- .assumingTests()
- .build();
- BoundedContext system = systemOf(domain);
- assertFalse(system.hasEntitiesWithState(CommandLog.class));
- }
-
- @Test
- @DisplayName("store domain commands if required")
- void storeDomainCommands() {
- BoundedContextBuilder contextBuilder = BoundedContextBuilder.assumingTests();
- contextBuilder.systemFeatures()
- .enableCommandLog();
- BoundedContext domain = contextBuilder.build();
- BoundedContext system = systemOf(domain);
- assertTrue(system.hasEntitiesWithState(CommandLog.class));
- }
-
- @Test
- @DisplayName("mirror domain aggregates")
- void mirror() {
- BoundedContext domain = BoundedContextBuilder
- .assumingTests()
- .build();
- BoundedContext system = systemOf(domain);
- assertTrue(system.hasEntitiesWithState(Mirror.class));
- }
-
- @Test
- @DisplayName("not mirror domain aggregates if disabled")
- void notMirror() {
- BoundedContextBuilder contextBuilder = BoundedContextBuilder.assumingTests();
- contextBuilder.systemFeatures()
- .disableAggregateQuerying();
- BoundedContext domain = contextBuilder.build();
- BoundedContext system = systemOf(domain);
- assertFalse(system.hasEntitiesWithState(Mirror.class));
- }
-
- @Test
- @DisplayName("post system events in parallel")
- void asyncEvents() {
- BoundedContextBuilder contextBuilder = BoundedContextBuilder.assumingTests();
- contextBuilder.systemFeatures()
- .enableParallelPosting();
- BoundedContext domain = contextBuilder.build();
- BoundedContext system = systemOf(domain);
- HistoryEventWatcher watcher = new HistoryEventWatcher();
- system.eventBus().register(watcher);
- MessageId messageId = MessageId.newBuilder()
- .setTypeUrl(TypeUrl.of(Empty.class).value())
- .setId(Identifier.pack(newUuid()))
- .build();
- EntityCreated event = EntityCreated
- .newBuilder()
- .setEntity(messageId)
- .setKind(ENTITY)
- .build();
- domain.systemClient()
- .writeSide()
- .postEvent(event);
- sleepUninterruptibly(ofSeconds(1));
- watcher.assertReceivedEvent(EntityCreated.class);
- }
-
- private static MemoizingObserver postSystemEvent(EventBus systemBus, Event event) {
- systemBus.post(event);
- EventFilter filter = EventFilter
- .newBuilder()
- .setEventType(event.enclosedTypeUrl()
- .toTypeName().value())
- .vBuild();
- EventStreamQuery query = EventStreamQuery
- .newBuilder()
- .addFilter(filter)
- .vBuild();
- MemoizingObserver observer = memoizingObserver();
- systemBus.eventStore()
- .read(query, observer);
- return observer;
- }
-
- private static Event createEvent() {
- EntityStateChanged eventMessage = EntityStateChanged
- .newBuilder()
- .setEntity(MessageId.newBuilder()
- .setId(Identifier.pack(42))
- .setTypeUrl(TypeUrl.of(EmptyEntityState.class)
- .value()))
- .setOldState(pack(StringValue.of("0")))
- .setNewState(pack(StringValue.of("42")))
- .addSignalId(MessageId.newBuilder()
- .setId(Identifier.pack(CommandId.generate()))
- .setTypeUrl(TypeUrl.of(EntityStateChanged.class)
- .value()))
- .vBuild();
- Event event = events.createEvent(eventMessage);
- return event;
- }
-}
diff --git a/server/src/test/java/io/spine/system/server/SystemContextSettingsTest.java b/server/src/test/java/io/spine/system/server/SystemContextSettingsTest.java
new file mode 100644
index 00000000000..01ef44c4f70
--- /dev/null
+++ b/server/src/test/java/io/spine/system/server/SystemContextSettingsTest.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2021, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.system.server;
+
+import io.spine.base.EventMessage;
+import io.spine.core.Event;
+import io.spine.grpc.MemoizingObserver;
+import io.spine.server.BoundedContext;
+import io.spine.server.BoundedContextBuilder;
+import io.spine.system.server.event.EntityCreated;
+import io.spine.system.server.given.entity.HistoryEventWatcher;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static io.spine.system.server.SystemBoundedContexts.systemOf;
+import static io.spine.system.server.given.SystemContextSettingsTestEnv.entityCreated;
+import static io.spine.system.server.given.SystemContextSettingsTestEnv.entityStateChanged;
+import static io.spine.system.server.given.SystemContextSettingsTestEnv.postSystemEvent;
+import static java.time.Duration.ofSeconds;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DisplayName("System Bounded Context should")
+class SystemContextSettingsTest {
+
+ @Nested
+ @DisplayName("by default")
+ class ByDefault {
+
+ @Test
+ @DisplayName("not store events")
+ void notStoreEvents() {
+ BoundedContext domain = BoundedContextBuilder.assumingTests().build();
+ BoundedContext system = systemOf(domain);
+ Event event = entityStateChanged();
+ MemoizingObserver observer = postSystemEvent(system.eventBus(), event);
+ assertTrue(observer.isCompleted());
+ assertThat(observer.responses()).isEmpty();
+ }
+
+ @Test
+ @DisplayName("not store domain commands")
+ void notStoreDomainCommands() {
+ BoundedContext domain = BoundedContextBuilder.assumingTests().build();
+ BoundedContext system = systemOf(domain);
+ assertFalse(system.hasEntitiesWithState(CommandLog.class));
+ }
+
+ @Test
+ @DisplayName("mirror domain aggregates")
+ void mirror() {
+ BoundedContext domain = BoundedContextBuilder.assumingTests().build();
+ BoundedContext system = systemOf(domain);
+ assertTrue(system.hasEntitiesWithState(Mirror.class));
+ }
+
+ @Test
+ @DisplayName("post system events in parallel")
+ void postEventsInParallel() {
+ BoundedContext domain = BoundedContextBuilder.assumingTests().build();
+ BoundedContext system = systemOf(domain);
+ HistoryEventWatcher watcher = new HistoryEventWatcher();
+ system.eventBus().register(watcher);
+
+ EventMessage event = entityCreated();
+ domain.systemClient()
+ .writeSide()
+ .postEvent(event);
+
+ sleepUninterruptibly(ofSeconds(1));
+ watcher.assertReceivedEvent(EntityCreated.class);
+ }
+ }
+
+ @Nested
+ @DisplayName("if required")
+ class IfRequired {
+
+ @Test
+ @DisplayName("store events")
+ void storeEvents() {
+ BoundedContextBuilder contextBuilder = BoundedContextBuilder.assumingTests();
+ contextBuilder.systemSettings()
+ .persistEvents();
+ BoundedContext domain = contextBuilder.build();
+ BoundedContext system = systemOf(domain);
+ Event event = entityStateChanged();
+ MemoizingObserver observer = postSystemEvent(system.eventBus(), event);
+ assertTrue(observer.isCompleted());
+ assertThat(observer.responses()).containsExactly(event);
+ }
+
+ @Test
+ @DisplayName("store domain commands")
+ void storeDomainCommands() {
+ BoundedContextBuilder contextBuilder = BoundedContextBuilder.assumingTests();
+ contextBuilder.systemSettings()
+ .enableCommandLog();
+ BoundedContext domain = contextBuilder.build();
+ BoundedContext system = systemOf(domain);
+ assertTrue(system.hasEntitiesWithState(CommandLog.class));
+ }
+
+ @Test
+ @DisplayName("not mirror domain aggregates")
+ void notMirror() {
+ BoundedContextBuilder contextBuilder = BoundedContextBuilder.assumingTests();
+ contextBuilder.systemSettings()
+ .disableAggregateQuerying();
+ BoundedContext domain = contextBuilder.build();
+ BoundedContext system = systemOf(domain);
+ assertFalse(system.hasEntitiesWithState(Mirror.class));
+ }
+
+ @Test
+ @DisplayName("post system events directly in the current thread")
+ void postEventsInCurrentThread() {
+ BoundedContextBuilder contextBuilder = BoundedContextBuilder.assumingTests();
+ contextBuilder.systemSettings().disableParallelPosting();
+ BoundedContext domain = contextBuilder.build();
+ BoundedContext system = systemOf(domain);
+ HistoryEventWatcher watcher = new HistoryEventWatcher();
+ system.eventBus().register(watcher);
+
+ EventMessage event = entityCreated();
+ domain.systemClient()
+ .writeSide()
+ .postEvent(event);
+
+ watcher.assertReceivedEvent(EntityCreated.class);
+ }
+
+ @Test
+ @DisplayName("post system events using the passed `Executor`")
+ void postEventsWithExecutor() {
+ AtomicInteger calls = new AtomicInteger();
+ Executor executor = (command) -> {
+ calls.incrementAndGet();
+ command.run();
+ };
+
+ BoundedContextBuilder contextBuilder = BoundedContextBuilder.assumingTests();
+ contextBuilder.systemSettings()
+ .enableParallelPosting()
+ .useCustomPostingExecutor(executor);
+
+ BoundedContext domain = contextBuilder.build();
+ BoundedContext system = systemOf(domain);
+ HistoryEventWatcher watcher = new HistoryEventWatcher();
+ system.eventBus().register(watcher);
+
+ EventMessage event = entityCreated();
+ domain.systemClient()
+ .writeSide()
+ .postEvent(event);
+
+ watcher.assertReceivedEvent(EntityCreated.class);
+ assertThat(calls.get()).isEqualTo(1);
+ }
+ }
+}
diff --git a/server/src/test/java/io/spine/system/server/SystemSettingsTest.java b/server/src/test/java/io/spine/system/server/SystemSettingsTest.java
index 6ec1739af2a..ca77dbcddc6 100644
--- a/server/src/test/java/io/spine/system/server/SystemSettingsTest.java
+++ b/server/src/test/java/io/spine/system/server/SystemSettingsTest.java
@@ -35,75 +35,81 @@
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeFalse;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
-@DisplayName("SystemFeatures should")
+@DisplayName("`SystemSettings` should")
class SystemSettingsTest {
+ private final Environment env = Environment.instance();
+
+ @AfterEach
+ void resetEnv() {
+ Environment.instance()
+ .reset();
+ }
+
@Nested
@DisplayName("by default")
- class Defaults {
-
- @AfterEach
- void resetEnv() {
- Environment.instance()
- .reset();
- }
+ class ByDefault {
@Test
@DisplayName("enable aggregate mirroring")
void mirrors() {
- SystemSettings features = SystemSettings.defaults();
- assertTrue(features.includeAggregateMirroring());
+ SystemSettings settings = SystemSettings.defaults();
+ assertTrue(settings.includeAggregateMirroring());
}
@Test
@DisplayName("disable command log")
void commands() {
- SystemSettings features = SystemSettings.defaults();
- assertFalse(features.includeCommandLog());
+ SystemSettings settings = SystemSettings.defaults();
+ assertFalse(settings.includeCommandLog());
}
@Test
@DisplayName("disable event store")
void events() {
- SystemSettings features = SystemSettings.defaults();
- assertFalse(features.includePersistentEvents());
+ SystemSettings settings = SystemSettings.defaults();
+ assertFalse(settings.includePersistentEvents());
+ }
+
+ @Test
+ @DisplayName("disallow parallel posting of system events in the test environment")
+ void disallowParallelPostingForTest() {
+ assumeTrue(env.is(Tests.class));
+ SystemSettings settings = SystemSettings.defaults();
+ assertFalse(settings.postEventsInParallel());
}
@Nested
@DisplayName("allow parallel posting of system events")
class AllowParallelPosting {
- private final Environment env = Environment.instance();
-
@Test
@DisplayName("in the `Production` environment")
void forProductionEnv() {
env.setTo(Production.class);
- assertTrue(SystemSettings.defaults()
- .postEventsInParallel());
+ SystemSettings settings = SystemSettings.defaults();
+ assertTrue(settings.postEventsInParallel());
}
@Test
@DisplayName("in a custom environment")
void forCustomEnv() {
env.setTo(Local.class);
- assertTrue(SystemSettings.defaults()
- .postEventsInParallel());
+ SystemSettings settings = SystemSettings.defaults();
+ assertTrue(settings.postEventsInParallel());
}
}
-
- @Test
- @DisplayName("disallow parallel posting of system events in the test environment")
- void disallowParallelPostingForTest() {
- Environment env = Environment.instance();
- assumeTrue(env.is(Tests.class));
- assertFalse(SystemSettings.defaults()
- .postEventsInParallel());
- }
}
@Nested
@@ -113,37 +119,95 @@ class Configure {
@Test
@DisplayName("aggregate mirroring")
void mirrors() {
- SystemSettings features = SystemSettings
- .defaults()
- .disableAggregateQuerying();
- assertFalse(features.includeAggregateMirroring());
+ SystemSettings settings = SystemSettings.defaults();
+ settings.disableAggregateQuerying();
+ assertFalse(settings.includeAggregateMirroring());
}
@Test
@DisplayName("command log")
void commands() {
- SystemSettings features = SystemSettings
- .defaults()
- .enableCommandLog();
- assertTrue(features.includeCommandLog());
+ SystemSettings settings = SystemSettings.defaults();
+ settings.enableCommandLog();
+ assertTrue(settings.includeCommandLog());
}
@Test
@DisplayName("event store")
void events() {
- SystemSettings features = SystemSettings
- .defaults()
- .persistEvents();
- assertTrue(features.includePersistentEvents());
+ SystemSettings settings = SystemSettings.defaults();
+ settings.persistEvents();
+ assertTrue(settings.includePersistentEvents());
+ }
+
+ @Nested
+ @DisplayName("system events to be posted")
+ class SystemEventsPosted {
+
+ @Test
+ @DisplayName("directly in the current thread")
+ void usingCurrentThread() {
+ env.setTo(Production.class);
+ SystemSettings settings = SystemSettings.defaults();
+ assumeTrue(settings.postEventsInParallel());
+
+ settings.disableParallelPosting();
+ assertFalse(settings.postEventsInParallel());
+ }
+
+ @Test
+ @DisplayName("using the passed `Executor`")
+ void usingPassedExecutor() {
+ env.setTo(Production.class);
+ SystemSettings settings = SystemSettings.defaults();
+ assumeTrue(settings.postEventsInParallel());
+ assertDefaultExecutor(settings);
+
+ Executor executor = command -> { };
+ settings.useCustomPostingExecutor(executor);
+ Executor newExecutor = settings.freeze().postingExecutor();
+ assertSame(newExecutor, executor);
+ }
+
+ @Test
+ @DisplayName("using the default `Executor`")
+ void usingDefaultExecutor() {
+ env.setTo(Production.class);
+ SystemSettings settings = SystemSettings.defaults();
+ assumeTrue(settings.postEventsInParallel());
+
+ Executor executor = command -> { };
+ settings.useCustomPostingExecutor(executor);
+ Executor currentExecutor = settings.freeze().postingExecutor();
+ assertSame(currentExecutor, executor);
+
+ settings.useDefaultPostingExecutor();
+ assertDefaultExecutor(settings);
+ }
}
+ }
+
+ @Nested
+ @DisplayName("not configure posting executor")
+ class NotConfigurePostingExecutor {
@Test
- @DisplayName("system events to be posted in synch")
- void parallelism() {
- SystemSettings features = SystemSettings
- .defaults()
- .disableParallelPosting();
- assertFalse(features.postEventsInParallel());
+ @DisplayName("when parallel posting is disabled")
+ void whenParallelPostingDisabled() {
+ env.setTo(Tests.class);
+ SystemSettings settings = SystemSettings.defaults();
+ assumeFalse(settings.postEventsInParallel());
+
+ Executor executor = command -> { };
+ assertThrows(
+ IllegalStateException.class,
+ () -> settings.useCustomPostingExecutor(executor)
+ );
}
}
+
+ private static void assertDefaultExecutor(SystemSettings settings) {
+ Executor postingExecutor = settings.freeze().postingExecutor();
+ assertEquals(postingExecutor.getClass(), ForkJoinPool.class);
+ }
}
diff --git a/server/src/test/java/io/spine/system/server/given/SystemContextSettingsTestEnv.java b/server/src/test/java/io/spine/system/server/given/SystemContextSettingsTestEnv.java
new file mode 100644
index 00000000000..2514393ba00
--- /dev/null
+++ b/server/src/test/java/io/spine/system/server/given/SystemContextSettingsTestEnv.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2022, TeamDev. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Redistribution and use in source and/or binary forms, with or without
+ * modification, must retain the above copyright notice and the following
+ * disclaimer.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.spine.system.server.given;
+
+import com.google.protobuf.Empty;
+import com.google.protobuf.StringValue;
+import io.spine.base.EventMessage;
+import io.spine.base.Identifier;
+import io.spine.core.CommandId;
+import io.spine.core.Event;
+import io.spine.core.MessageId;
+import io.spine.grpc.MemoizingObserver;
+import io.spine.server.event.EventBus;
+import io.spine.server.event.EventFilter;
+import io.spine.server.event.EventStreamQuery;
+import io.spine.system.server.EmptyEntityState;
+import io.spine.system.server.event.EntityCreated;
+import io.spine.system.server.event.EntityStateChanged;
+import io.spine.testing.server.TestEventFactory;
+import io.spine.type.TypeUrl;
+
+import static io.spine.base.Identifier.newUuid;
+import static io.spine.grpc.StreamObservers.memoizingObserver;
+import static io.spine.option.EntityOption.Kind.ENTITY;
+import static io.spine.protobuf.AnyPacker.pack;
+
+/**
+ * Test environment for testing interop between
+ * {@link io.spine.system.server.SystemContext SystemContext} and
+ * {@link io.spine.system.server.SystemSettings SystemSettings}.
+ */
+public final class SystemContextSettingsTestEnv {
+
+ private static final TestEventFactory events =
+ TestEventFactory.newInstance(SystemContextSettingsTestEnv.class);
+
+ private SystemContextSettingsTestEnv() {
+ }
+
+ public static MemoizingObserver postSystemEvent(EventBus systemBus, Event event) {
+ systemBus.post(event);
+ EventFilter filter = EventFilter
+ .newBuilder()
+ .setEventType(event.enclosedTypeUrl()
+ .toTypeName().value())
+ .vBuild();
+ EventStreamQuery query = EventStreamQuery
+ .newBuilder()
+ .addFilter(filter)
+ .vBuild();
+ MemoizingObserver observer = memoizingObserver();
+ systemBus.eventStore()
+ .read(query, observer);
+ return observer;
+ }
+
+ public static Event entityStateChanged() {
+ EntityStateChanged eventMessage = EntityStateChanged
+ .newBuilder()
+ .setEntity(MessageId.newBuilder()
+ .setId(Identifier.pack(42))
+ .setTypeUrl(TypeUrl.of(EmptyEntityState.class)
+ .value()))
+ .setOldState(pack(StringValue.of("0")))
+ .setNewState(pack(StringValue.of("42")))
+ .addSignalId(MessageId.newBuilder()
+ .setId(Identifier.pack(CommandId.generate()))
+ .setTypeUrl(TypeUrl.of(EntityStateChanged.class)
+ .value()))
+ .vBuild();
+ Event event = events.createEvent(eventMessage);
+ return event;
+ }
+
+ public static EventMessage entityCreated() {
+ MessageId messageId = MessageId.newBuilder()
+ .setTypeUrl(TypeUrl.of(Empty.class).value())
+ .setId(Identifier.pack(newUuid()))
+ .build();
+ EntityCreated event = EntityCreated
+ .newBuilder()
+ .setEntity(messageId)
+ .setKind(ENTITY)
+ .build();
+ return event;
+ }
+}
diff --git a/version.gradle.kts b/version.gradle.kts
index aa14c77dfc5..278bdfd08de 100644
--- a/version.gradle.kts
+++ b/version.gradle.kts
@@ -34,7 +34,7 @@
/**
* Version of this library.
*/
-val coreJava = "1.8.1"
+val coreJava = "1.8.2-SNAPSHOT.1"
/**
* Versions of the Spine libraries that `core-java` depends on.