Skip to content

Commit

Permalink
Merge pull request #1353 from SpineEventEngine/fix-empty-dispatching-…
Browse files Browse the repository at this point in the history
…delivery

Fix empty dispatching delivery
  • Loading branch information
yuri-sergiichuk authored Jan 14, 2021
2 parents 967fd9a + 0383a6b commit 69dd272
Show file tree
Hide file tree
Showing 12 changed files with 374 additions and 40 deletions.
12 changes: 12 additions & 0 deletions .idea/codeStyles/Project.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public final void dispatchTo(I aggregateId) {

private void storeAndPost(A aggregate, DispatchOutcome outcome) {
Success success = outcome.getSuccess();
if (success.hasProducedEvents()) {
if (success.hasEvents()) {
store(aggregate);
List<Event> events = success.getProducedEvents()
.getEventList();
Expand Down Expand Up @@ -126,7 +126,7 @@ private A loadOrCreate(I aggregateId) {
final DispatchOutcome handleAndApplyEvents(A aggregate) {
DispatchOutcome outcome = invokeDispatcher(aggregate);
Success successfulOutcome = outcome.getSuccess();
return successfulOutcome.hasProducedEvents()
return successfulOutcome.hasEvents()
? applyProducedEvents(aggregate, outcome)
: outcome;
}
Expand Down
20 changes: 20 additions & 0 deletions server/src/main/java/io/spine/server/dispatch/SuccessMixin.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,24 @@ default Object readValue(FieldDescriptor field) {
return getField(field);
}
}

/**
* Determines if the outcome has any produced events.
*
* @implNote Prefer using this method over the generated {@code hasProducedEvents}
* while the latter only checks if the message is set.
*/
default boolean hasEvents() {
return hasProducedEvents() && getProducedEvents().getEventCount() > 0;
}

/**
* Determines if the outcome has any produced commands.
*
* @implNote Prefer using this method over the generated {@code hasProducedCommands}
* while the latter only checks if the message is set.
*/
default boolean hasCommands() {
return hasProducedCommands() && getProducedCommands().getCommandCount() > 0;
}
}
103 changes: 73 additions & 30 deletions server/src/test/java/io/spine/server/aggregate/AggregateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.spine.core.Ack;
import io.spine.core.Command;
import io.spine.core.Event;
import io.spine.core.EventContext;
import io.spine.core.MessageId;
import io.spine.core.TenantId;
import io.spine.server.BoundedContext;
Expand All @@ -50,6 +51,11 @@
import io.spine.server.aggregate.given.aggregate.TaskAggregateRepository;
import io.spine.server.aggregate.given.aggregate.TestAggregate;
import io.spine.server.aggregate.given.aggregate.TestAggregateRepository;
import io.spine.server.aggregate.given.thermometer.SafeThermometer;
import io.spine.server.aggregate.given.thermometer.SafeThermometerRepo;
import io.spine.server.aggregate.given.thermometer.Thermometer;
import io.spine.server.aggregate.given.thermometer.ThermometerId;
import io.spine.server.aggregate.given.thermometer.event.TemperatureChanged;
import io.spine.server.commandbus.CommandBus;
import io.spine.server.delivery.MessageEndpoint;
import io.spine.server.dispatch.BatchDispatchOutcome;
Expand Down Expand Up @@ -81,7 +87,7 @@
import io.spine.test.aggregate.rejection.Rejections.AggCannotReassignUnassignedTask;
import io.spine.testing.logging.MuteLogging;
import io.spine.testing.server.EventSubject;
import io.spine.testing.server.blackbox.BlackBoxContext;
import io.spine.testing.server.blackbox.ContextAwareTest;
import io.spine.testing.server.model.ModelTests;
import io.spine.time.testing.TimeTests;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -317,12 +323,13 @@ void writeVersionIntoEventContext() {
dispatchCommand(aggregate, command(createProject));

// Get the first event since the command handler produces only one event message.
Aggregate<?, ?, ?> agg = this.aggregate;
List<Event> uncommittedEvents = agg.getUncommittedEvents().list();
Aggregate<?, ?, ?> agg = aggregate;
List<Event> uncommittedEvents = agg.getUncommittedEvents()
.list();
Event event = uncommittedEvents.get(0);

assertEquals(this.aggregate.version(), event.context()
.getVersion());
EventContext context = event.context();
assertThat(aggregate.version())
.isEqualTo(context.getVersion());
}

@Test
Expand Down Expand Up @@ -719,7 +726,8 @@ void throughNewestEventsFirst() {

private ProtoSubject assertNextCommandId() {
Event event = history.next();
return assertThat(event.rootMessage().asCommandId());
return assertThat(event.rootMessage()
.asCommandId());
}

@Test
Expand Down Expand Up @@ -818,27 +826,20 @@ void checkEventsUponHistory() {
private static void dispatch(TenantId tenant,
Supplier<MessageEndpoint<ProjectId, ?>> endpoint) {
with(tenant).run(
() -> endpoint.get().dispatchTo(ID)
() -> endpoint.get()
.dispatchTo(ID)
);
}

@Nested
@DisplayName("create a single event when emitting a pair without second value")
class CreateSingleEventForPair {

private BlackBoxContext context;

@BeforeEach
void prepareContext() {
context = BlackBoxContext.from(
BoundedContextBuilder.assumingTests()
.add(new TaskAggregateRepository())
);
}
class CreateSingleEventForPair extends ContextAwareTest {

@AfterEach
void closeContext() {
context.close();
@Override
protected BoundedContextBuilder contextBuilder() {
return BoundedContextBuilder
.assumingTests()
.add(new TaskAggregateRepository());
}

/**
Expand All @@ -852,10 +853,10 @@ void closeContext() {
@Test
@DisplayName("when dispatching a command")
void fromCommandDispatch() {
context.receivesCommand(createTask())
.assertEvents()
.withType(AggTaskCreated.class)
.isNotEmpty();
context().receivesCommand(createTask())
.assertEvents()
.withType(AggTaskCreated.class)
.isNotEmpty();
}

/**
Expand All @@ -870,8 +871,8 @@ void fromCommandDispatch() {
@Test
@DisplayName("when reacting on an event")
void fromEventReact() {
EventSubject assertEvents = context.receivesCommand(assignTask())
.assertEvents();
EventSubject assertEvents = context().receivesCommand(assignTask())
.assertEvents();
assertEvents.hasSize(2);
assertEvents.withType(AggTaskAssigned.class)
.hasSize(1);
Expand All @@ -891,13 +892,55 @@ void fromEventReact() {
@Test
@DisplayName("when reacting on a rejection")
void fromRejectionReact() {
EventSubject assertEvents = context.receivesCommand(reassignTask())
.assertEvents();
EventSubject assertEvents = context().receivesCommand(reassignTask())
.assertEvents();
assertEvents.hasSize(2);
assertEvents.withType(AggCannotReassignUnassignedTask.class)
.hasSize(1);
assertEvents.withType(AggUserNotified.class)
.hasSize(1);
}
}

@Nested
@DisplayName("allow having validation on the aggregate state and")
class AllowValidatedAggregates extends ContextAwareTest {

private final ThermometerId thermometer = ThermometerId.generate();

@Override
protected BoundedContextBuilder contextBuilder() {
return BoundedContextBuilder
.assumingTests()
.add(new SafeThermometerRepo(thermometer));
}

@Test
@DisplayName("not change the Aggregate state when there is no reaction on the event")
void notChangeStateIfNoReaction() {
TemperatureChanged booksOnFire =
TemperatureChanged.newBuilder()
.setFahrenheit(451)
.vBuild();
context().receivesExternalEvent(booksOnFire)
.assertEntity(thermometer, SafeThermometer.class)
.doesNotExist();
}

@Test
@DisplayName("save valid aggregate state on change")
void safelySaveValidState() {
TemperatureChanged gettingWarmer =
TemperatureChanged.newBuilder()
.setFahrenheit(72)
.vBuild();
context().receivesExternalEvent(gettingWarmer);
Thermometer expected = Thermometer
.newBuilder()
.setId(thermometer)
.setFahrenheit(72)
.vBuild();
context().assertState(thermometer, expected);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2021, TeamDev. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Redistribution and use in source and/or binary forms, with or without
* modification, must retain the above copyright notice and the following
* disclaimer.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.spine.server.aggregate.given.thermometer;

import io.spine.core.External;
import io.spine.server.aggregate.Aggregate;
import io.spine.server.aggregate.Apply;
import io.spine.server.aggregate.given.thermometer.event.TemperatureChanged;
import io.spine.server.aggregate.given.thermometer.event.TermTemperatureChanged;
import io.spine.server.event.React;

import java.util.Optional;

/**
* Ignores temperature changes outside its own range.
*/
public final class SafeThermometer extends Aggregate<ThermometerId, Thermometer, Thermometer.Builder> {

private static final double MIN = 0;
private static final double MAX = 120;

@React
Optional<TermTemperatureChanged> on(@External TemperatureChanged e) {
double temperature = e.getFahrenheit();
if (!withinBounds(temperature)) {
return Optional.empty();
}
return Optional.of(
TermTemperatureChanged
.newBuilder()
.setThermometer(id())
.setChange(
TemperatureChange
.newBuilder()
.setNewValue(temperature)
.setPreviousValue(state().getFahrenheit())
)
.vBuild()
);
}

private static boolean withinBounds(double temperature) {
return temperature > MIN && temperature < MAX;
}

@Apply
private void on(TermTemperatureChanged e) {
builder().setFahrenheit(e.getChange()
.getNewValue());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2021, TeamDev. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Redistribution and use in source and/or binary forms, with or without
* modification, must retain the above copyright notice and the following
* disclaimer.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.spine.server.aggregate.given.thermometer;

import io.spine.server.aggregate.AggregateRepository;
import io.spine.server.aggregate.given.thermometer.event.TemperatureChanged;
import io.spine.server.route.EventRouting;

import static io.spine.util.Preconditions2.checkNotDefaultArg;

/**
* A {@link SafeThermometer thermometer} repository.
*/
public final class SafeThermometerRepo extends AggregateRepository<ThermometerId, SafeThermometer> {

private final ThermometerId thermometer;

/**
* Creates a new repository for the {@code thermometer}.
*/
public SafeThermometerRepo(ThermometerId thermometer) {
this.thermometer = checkNotDefaultArg(thermometer);
}

@Override
protected void setupEventRouting(EventRouting<ThermometerId> routing) {
routing.unicast(TemperatureChanged.class, (e) -> thermometer);
}
}
Loading

0 comments on commit 69dd272

Please sign in to comment.