Skip to content

Commit

Permalink
MPL-68: make run_schemas updates asynchronous
Browse files Browse the repository at this point in the history
  • Loading branch information
lampajr committed Apr 9, 2024
1 parent 3849f04 commit b472e90
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,23 @@ public String toString() {
'}';
}
}

public static class CreateOrUpdateEvent {
public int id;
public String uri;

public CreateOrUpdateEvent() {}

public CreateOrUpdateEvent(int id, String uri) {
this.id = id;
this.uri = uri;
}
@Override
public String toString() {
return "CreateOrUpdateEvent{" +
"id=" + id +
", uri=" + uri +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ public enum AsyncEventChannels {
RUN_VALIDATED,
CHANGE_NEW,
EXPERIMENT_RESULT_NEW,
SCHEMA_SYNC,
FOOBAR
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,19 +178,25 @@ public Integer add(Schema schemaDTO){
.setParameter(1, schema.id).executeUpdate();
em.createNativeQuery("DELETE FROM dataset_schemas WHERE schema_id = ?1")
.setParameter(1, schema.id).executeUpdate();
mediator.newOrUpdatedSchema(schema);
newOrUpdatedSchema(schema);
}
}
else {
schema.id = null;
schema.persist();
em.flush();
mediator.newOrUpdatedSchema(schema);
newOrUpdatedSchema(schema);
}
log.debugf("Added schema %s (%d), URI %s", schema.name, schema.id, schema.uri);
return schema.id;
}

private void newOrUpdatedSchema(SchemaDAO schema) {
Schema.CreateOrUpdateEvent event = new Schema.CreateOrUpdateEvent(schema.id, schema.uri);
log.debugf("Push schema event for async schema synchronization: %s", event);
Util.registerTxSynchronization(tm, txStatus -> mediator.queueSchemaSync(event));
}

private void verifyNewSchema(Schema schemaDTO) {
if (schemaDTO.uri == null || Arrays.stream(ALL_URNS).noneMatch(scheme -> schemaDTO.uri.startsWith(scheme + ":"))) {
throw ServiceException.badRequest("Please use URI starting with one of these schemes: " + Arrays.toString(ALL_URNS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@

import io.hyperfoil.tools.horreum.api.alerting.Change;
import io.hyperfoil.tools.horreum.api.alerting.DataPoint;
import io.hyperfoil.tools.horreum.api.data.Action;
import io.hyperfoil.tools.horreum.api.data.Dataset;
import io.hyperfoil.tools.horreum.api.data.Run;
import io.hyperfoil.tools.horreum.api.data.Test;
import io.hyperfoil.tools.horreum.api.data.TestExport;
import io.hyperfoil.tools.horreum.api.data.*;
import io.hyperfoil.tools.horreum.api.services.ExperimentService;
import io.hyperfoil.tools.horreum.bus.AsyncEventChannels;
import io.hyperfoil.tools.horreum.entity.data.ActionDAO;
import io.hyperfoil.tools.horreum.entity.data.SchemaDAO;
import io.hyperfoil.tools.horreum.events.DatasetChanges;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -85,6 +80,10 @@ public class ServiceMediator {
@Channel("run-recalc-out")
Emitter<Integer> runEmitter;

@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10000)
@Channel("schema-sync-out")
Emitter<Schema.CreateOrUpdateEvent> schemaEmitter;

private Map<AsyncEventChannels, Map<Integer, BlockingQueue<Object>>> events = new ConcurrentHashMap<>();

public ServiceMediator() {
Expand Down Expand Up @@ -168,6 +167,18 @@ void queueRunRecalculation(int runId) {
runEmitter.send(runId);
}

@Incoming("schema-sync-in")
@Blocking(ordered = false, value = "horreum.run.pool")
@ActivateRequestContext
public void processSchemaSync(Schema.CreateOrUpdateEvent event) {
runService.onNewOrUpdatedSchema(event.id);
}

@Transactional(Transactional.TxType.NOT_SUPPORTED)
void queueSchemaSync(Schema.CreateOrUpdateEvent event) {
schemaEmitter.send(event);
}

void dataPointsProcessed(DataPoint.DatasetProcessedEvent event) {
experimentService.onDatapointsCreated(event);
}
Expand Down Expand Up @@ -216,9 +227,6 @@ void importTestToAll(TestExport test) {
subscriptionService.importSubscriptions(test);
}

public void newOrUpdatedSchema(SchemaDAO schema) {
runService.processNewOrUpdatedSchema(schema);
}
public void updateFingerprints(int testId) {
datasetService.updateFingerprints(testId);
}
Expand Down Expand Up @@ -246,7 +254,7 @@ public <T> void publishEvent(AsyncEventChannels channel, int testId, T payload)
}

public <T> BlockingQueue<T> getEventQueue(AsyncEventChannels channel, Integer id) {
if (testMode ) {
if (testMode) {
events.putIfAbsent(channel, new HashMap<>());
BlockingQueue<?> queue = events.get(channel).computeIfAbsent(id, k -> new LinkedBlockingQueue<>());
return (BlockingQueue<T>) queue;
Expand Down
12 changes: 12 additions & 0 deletions horreum-backend/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ mp.messaging.outgoing.run-recalc-out.address=run-recalc
mp.messaging.outgoing.run-recalc-out.durable=true
mp.messaging.outgoing.run-recalc-out.container-id=horreum-broker
mp.messaging.outgoing.run-recalc-out.link-name=run-recalc
# schema-sync incoming
mp.messaging.incoming.schema-sync-in.connector=smallrye-amqp
mp.messaging.incoming.schema-sync-in.address=schema-sync
mp.messaging.incoming.schema-sync-in.durable=true
mp.messaging.incoming.schema-sync-in.container-id=horreum-broker
mp.messaging.incoming.schema-sync-in.link-name=schema-sync
# schema-sync outgoing
mp.messaging.outgoing.schema-sync-out.connector=smallrye-amqp
mp.messaging.outgoing.schema-sync-out.address=schema-sync
mp.messaging.outgoing.schema-sync-out.durable=true
mp.messaging.outgoing.schema-sync-out.container-id=horreum-broker
mp.messaging.outgoing.schema-sync-out.link-name=schema-sync

## Datasource updated by Liquibase - the same as app but always with superuser credentials

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ private void testExportImport(boolean wipe) {
}

@org.junit.jupiter.api.Test
public void testFindUsages(TestInfo info) throws InterruptedException {
public void testFindUsages() throws InterruptedException {
Test test = createTest(createExampleTest("nofilter"));
createComparisonSchema();
uploadExampleRuns(test);
Expand All @@ -273,12 +273,84 @@ public void testFindUsages(TestInfo info) throws InterruptedException {

assertNotEquals(0, report.data.size());

List<SchemaService.LabelLocation> usages = jsonRequest().get("/api/schema/findUsages?label=".concat("category"))
.then().statusCode(200).extract().body().as(List.class);
List<SchemaService.LabelLocation> usages =
jsonRequest().get("/api/schema/findUsages?label=".concat("category"))
.then().statusCode(200).extract().body().as(List.class);

assertNotNull(usages);
}

@org.junit.jupiter.api.Test
public void testCreateSchemaAfterRun() throws InterruptedException {
String schemaUri = "urn:unknown:schema";
Test test = createTest(createExampleTest("dummy-test"));

ArrayNode data = JsonNodeFactory.instance.arrayNode();
data.addObject().put("$schema", schemaUri).put("foo", "bar");
data.addObject().put("$schema", schemaUri).put("foo", "bar");
int runId = uploadRun(data.toString(), test.name);
assertTrue(runId > 0);

// no validation errors
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult());
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult());


List<?> runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
assertEquals(0, runSchemasBefore.size());

// create the schema afterward
Schema schema = createSchema("Unknown schema", schemaUri);
assertNotNull(schema);
assertTrue(schema.id > 0);

TestUtil.eventually(() -> {
Util.withTx(tm, () -> {
em.clear();
List<?> runSchemas = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
// two records as the run is an array of two objects, both referencing the same schema
assertEquals(2, runSchemas.size());
return null;
});
});
}

@org.junit.jupiter.api.Test
public void testChangeUriForReferencedSchema() throws InterruptedException {
String schemaUri = "urn:dummy:schema";
Schema schema = createSchema("Dummy schema", schemaUri);
assertNotNull(schema);
assertTrue(schema.id > 0);

Test test = createTest(createExampleTest("dummy-test"));

ArrayNode data = JsonNodeFactory.instance.arrayNode();
data.addObject().put("$schema", schemaUri).put("foo", "bar");
data.addObject().put("$schema", schemaUri).put("foo", "bar");
int runId = uploadRun(data.toString(), test.name);
assertTrue(runId > 0);

// no validation errors
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult());
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult());

List<?> runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
assertEquals(2, runSchemasBefore.size());

// update the schema uri afterward
schema.uri = "urn:new-dummy:schema";
Schema updatedSchema = addOrUpdateSchema(schema);
assertNotNull(updatedSchema);
assertEquals(schema.id, updatedSchema.id);

TestUtil.eventually(() -> {
Util.withTx(tm, () -> {
em.clear();
List<?> runSchemas = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
// two records as the run is an array of two objects, both referencing the same schema
assertEquals(0, runSchemas.size());
return null;
});
});
}
}

0 comments on commit b472e90

Please sign in to comment.