Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPL-68: make run_schemas updates asynchronous #1581

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ public enum AsyncEventChannels {
RUN_VALIDATED,
CHANGE_NEW,
EXPERIMENT_RESULT_NEW,
SCHEMA_SYNC,
FOOBAR
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,19 +178,24 @@ public Integer add(Schema schemaDTO){
.setParameter(1, schema.id).executeUpdate();
em.createNativeQuery("DELETE FROM dataset_schemas WHERE schema_id = ?1")
.setParameter(1, schema.id).executeUpdate();
mediator.newOrUpdatedSchema(schema);
newOrUpdatedSchema(schema);
}
}
else {
schema.id = null;
schema.persist();
em.flush();
mediator.newOrUpdatedSchema(schema);
newOrUpdatedSchema(schema);
}
log.debugf("Added schema %s (%d), URI %s", schema.name, schema.id, schema.uri);
return schema.id;
}

private void newOrUpdatedSchema(SchemaDAO schema) {
log.debugf("Push schema event for async run schemas update: %d (%s)", schema.id, schema.uri);
Util.registerTxSynchronization(tm, txStatus -> mediator.queueSchemaSync(schema.id));
}

private void verifyNewSchema(Schema schemaDTO) {
if (schemaDTO.uri == null || Arrays.stream(ALL_URNS).noneMatch(scheme -> schemaDTO.uri.startsWith(scheme + ":"))) {
throw ServiceException.badRequest("Please use URI starting with one of these schemes: " + Arrays.toString(ALL_URNS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.hyperfoil.tools.horreum.api.services.ExperimentService;
import io.hyperfoil.tools.horreum.bus.AsyncEventChannels;
import io.hyperfoil.tools.horreum.entity.data.ActionDAO;
import io.hyperfoil.tools.horreum.entity.data.SchemaDAO;
import io.hyperfoil.tools.horreum.events.DatasetChanges;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -85,6 +84,10 @@ public class ServiceMediator {
@Channel("run-recalc-out")
Emitter<Integer> runEmitter;

@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10000)
@Channel("schema-sync-out")
Emitter<Integer> schemaEmitter;

private Map<AsyncEventChannels, Map<Integer, BlockingQueue<Object>>> events = new ConcurrentHashMap<>();

public ServiceMediator() {
Expand Down Expand Up @@ -168,6 +171,18 @@ void queueRunRecalculation(int runId) {
runEmitter.send(runId);
}

@Incoming("schema-sync-in")
@Blocking(ordered = false, value = "horreum.schema.pool")
@ActivateRequestContext
public void processSchemaSync(int schemaId) {
runService.onNewOrUpdatedSchema(schemaId);
}

@Transactional(Transactional.TxType.NOT_SUPPORTED)
void queueSchemaSync(int schemaId) {
schemaEmitter.send(schemaId);
}

void dataPointsProcessed(DataPoint.DatasetProcessedEvent event) {
experimentService.onDatapointsCreated(event);
}
Expand Down Expand Up @@ -216,9 +231,6 @@ void importTestToAll(TestExport test) {
subscriptionService.importSubscriptions(test);
}

public void newOrUpdatedSchema(SchemaDAO schema) {
runService.processNewOrUpdatedSchema(schema);
}
public void updateFingerprints(int testId) {
datasetService.updateFingerprints(testId);
}
Expand Down Expand Up @@ -246,7 +258,7 @@ public <T> void publishEvent(AsyncEventChannels channel, int testId, T payload)
}

public <T> BlockingQueue<T> getEventQueue(AsyncEventChannels channel, Integer id) {
if (testMode ) {
if (testMode) {
events.putIfAbsent(channel, new HashMap<>());
BlockingQueue<?> queue = events.get(channel).computeIfAbsent(id, k -> new LinkedBlockingQueue<>());
return (BlockingQueue<T>) queue;
Expand Down
17 changes: 15 additions & 2 deletions horreum-backend/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ mp.messaging.outgoing.run-recalc-out.address=run-recalc
mp.messaging.outgoing.run-recalc-out.durable=true
mp.messaging.outgoing.run-recalc-out.container-id=horreum-broker
mp.messaging.outgoing.run-recalc-out.link-name=run-recalc
# schema-sync incoming
mp.messaging.incoming.schema-sync-in.connector=smallrye-amqp
mp.messaging.incoming.schema-sync-in.address=schema-sync
mp.messaging.incoming.schema-sync-in.durable=true
mp.messaging.incoming.schema-sync-in.container-id=horreum-broker
mp.messaging.incoming.schema-sync-in.link-name=schema-sync
# schema-sync outgoing
mp.messaging.outgoing.schema-sync-out.connector=smallrye-amqp
mp.messaging.outgoing.schema-sync-out.address=schema-sync
mp.messaging.outgoing.schema-sync-out.durable=true
mp.messaging.outgoing.schema-sync-out.container-id=horreum-broker
mp.messaging.outgoing.schema-sync-out.link-name=schema-sync

## Datasource updated by Liquibase - the same as app but always with superuser credentials

Expand Down Expand Up @@ -74,8 +86,9 @@ horreum.test-mode=false
#quarkus.native.additional-build-args=

# thread pool sizes
smallrye.messaging.worker.horreum.dataset.pool.max-concurrency=10
smallrye.messaging.worker.horreum.run.pool.max-concurrency=10
smallrye.messaging.worker.horreum.dataset.pool.max-concurrency=7
smallrye.messaging.worker.horreum.run.pool.max-concurrency=7
smallrye.messaging.worker.horreum.schema.pool.max-concurrency=7


hibernate.jdbc.time_zone=UTC
Expand Down
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a couple of tests, not sure if this is enough but at least it is a start.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the tests at least cover the use cases that caused the issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends what you mean by "the issue", because actually the issue (i.e., the timeout) happens only with a large number of runs that we don't have in test environment.
Moreover here we are not actually solving the issue (the timeout), but we are just moving the run_schemas update asynchronously and the tests cover both the creation and update of a Schema, checking if the run_schemas table is eventually updated.

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.quarkus.test.oidc.server.OidcWiremockTestResource;
import io.restassured.common.mapper.TypeRef;
import jakarta.inject.Inject;
import jakarta.persistence.Tuple;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestInfo;

Expand Down Expand Up @@ -261,7 +262,7 @@ private void testExportImport(boolean wipe) {
}

@org.junit.jupiter.api.Test
public void testFindUsages(TestInfo info) throws InterruptedException {
public void testFindUsages() throws InterruptedException {
Test test = createTest(createExampleTest("nofilter"));
createComparisonSchema();
uploadExampleRuns(test);
Expand All @@ -273,12 +274,145 @@ public void testFindUsages(TestInfo info) throws InterruptedException {

assertNotEquals(0, report.data.size());

List<SchemaService.LabelLocation> usages = jsonRequest().get("/api/schema/findUsages?label=".concat("category"))
.then().statusCode(200).extract().body().as(List.class);
List<SchemaService.LabelLocation> usages =
jsonRequest().get("/api/schema/findUsages?label=".concat("category"))
.then().statusCode(200).extract().body().as(List.class);

assertNotNull(usages);
}

@org.junit.jupiter.api.Test
public void testCreateSchemaAfterRunWithArrayData() throws InterruptedException {
String schemaUri = "urn:unknown:schema";
Test test = createTest(createExampleTest("dummy-test"));

ArrayNode data = JsonNodeFactory.instance.arrayNode();
data.addObject().put("$schema", schemaUri).put("foo", "bar");
data.addObject().put("$schema", schemaUri).put("foo", "bar");
int runId = uploadRun(data.toString(), test.name);
assertTrue(runId > 0);

// no validation errors
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult());
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult());

List<?> runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
assertEquals(0, runSchemasBefore.size());

// create the schema afterward
Schema schema = createSchema("Unknown schema", schemaUri);
assertNotNull(schema);
assertTrue(schema.id > 0);

TestUtil.eventually(() -> {
Util.withTx(tm, () -> {
em.clear();
List<?> runSchemas = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
// two records as the run is an array of two objects, both referencing the same schema
assertEquals(2, runSchemas.size());
return null;
});
});
}

@org.junit.jupiter.api.Test
public void testCreateSchemaAfterRunWithMultipleSchemas() throws InterruptedException {
String firstSchemaUri = "urn:unknown1:schema";
String secondSchemaUri = "urn:unknown2:schema";
Test test = createTest(createExampleTest("dummy-test"));

ArrayNode data = JsonNodeFactory.instance.arrayNode();
data.addObject().put("$schema", firstSchemaUri).put("foo", "bar");
data.addObject().put("$schema", secondSchemaUri).put("foo", "zip");
int runId = uploadRun(data.toString(), test.name);
assertTrue(runId > 0);

// no validation errors
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult());
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult());

List<?> runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
assertEquals(0, runSchemasBefore.size());

// create the schema 1 afterward
Schema schema1 = createSchema("Unknown schema 1", firstSchemaUri);
assertNotNull(schema1);
assertTrue(schema1.id > 0);

TestUtil.eventually(() -> {
Util.withTx(tm, () -> {
em.clear();
List<Tuple> runSchemas = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1", Tuple.class).setParameter(1, runId).getResultList();
// 1 record as the run is an array of two objects referencing different schemas and only the first one is created
assertEquals(1, runSchemas.size());
assertEquals(schema1.id, (int)runSchemas.get(0).get(3));
return null;
});
});
}

@org.junit.jupiter.api.Test
public void testCreateSchemaAfterRunWithObjectData() throws InterruptedException {
String schemaUri = "urn:unknown:schema";
Test test = createTest(createExampleTest("dummy-test"));

ObjectNode data = JsonNodeFactory.instance.objectNode();
data.put("$schema", schemaUri).put("foo", "bar");
int runId = uploadRun(data.toString(), test.name);
assertTrue(runId > 0);

// no validation errors
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult());
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult());

List<?> runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
assertEquals(0, runSchemasBefore.size());

// create the schema afterward
Schema schema = createSchema("Unknown schema", schemaUri);
assertNotNull(schema);
assertTrue(schema.id > 0);

TestUtil.eventually(() -> {
Util.withTx(tm, () -> {
em.clear();
List<?> runSchemas = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
// run has single object data, thus referencing one schema
assertEquals(1, runSchemas.size());
return null;
});
});
}

@org.junit.jupiter.api.Test
public void testChangeUriForReferencedSchema() throws InterruptedException {
String schemaUri = "urn:dummy:schema";
Schema schema = createSchema("Dummy schema", schemaUri);
assertNotNull(schema);
assertTrue(schema.id > 0);

Test test = createTest(createExampleTest("dummy-test"));

ArrayNode data = JsonNodeFactory.instance.arrayNode();
data.addObject().put("$schema", schemaUri).put("foo", "bar");
data.addObject().put("$schema", schemaUri).put("foo", "bar");
int runId = uploadRun(data.toString(), test.name);
assertTrue(runId > 0);

// no validation errors
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM run_validationerrors").getSingleResult());
assertEquals(0, em.createNativeQuery("SELECT COUNT(*)::::int FROM dataset_validationerrors").getSingleResult());

List<?> runSchemasBefore = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
assertEquals(2, runSchemasBefore.size());

// update the schema uri afterward
schema.uri = "urn:new-dummy:schema";
Schema updatedSchema = addOrUpdateSchema(schema);
assertNotNull(updatedSchema);
assertEquals(schema.id, updatedSchema.id);

List<?> runSchemasAfter = em.createNativeQuery("SELECT * FROM run_schemas WHERE runid = ?1").setParameter(1, runId).getResultList();
assertEquals(0, runSchemasAfter.size());
}
}
51 changes: 45 additions & 6 deletions horreum-web/src/Banner.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,58 @@ import { useEffect, useState } from "react"
import { Alert } from "@patternfly/react-core"
import {Banner as BannerData, bannerApi} from "./api"

function getAlertBanner(banner: BannerData) {
return (
<Alert variant={banner.severity as any} title={banner.title} isInline>
<div dangerouslySetInnerHTML={{ __html: banner.message || "" }}></div>
</Alert>
)
}

// 30 seconds
const DEFAULT_TIMEOUT = 30000
export type TimeoutBannerProps = {
bannerData: BannerData
timeout?: number
onTimeout?: () => void
}

export function TimeoutBanner({bannerData, timeout, onTimeout}: TimeoutBannerProps) {
timeout = timeout ?? DEFAULT_TIMEOUT
const [banner, setBanner] = useState<BannerData | undefined>(bannerData)

useEffect(() => {
if (banner) {
const timeoutId = setTimeout(() => {
setBanner(undefined)
if (onTimeout) {
onTimeout()
}
}, timeout)

return () => clearTimeout(timeoutId);
}
}, [])

if (!banner) {
return null
}

return getAlertBanner(banner)
}

export default function Banner() {
const [banner, setBanner] = useState<BannerData>()
const [updateCounter, setUpdateCounter] = useState(0)
useEffect(() => {
setTimeout(() => setUpdateCounter(updateCounter + 1), 60000)
const timeoutId = setTimeout(() => setUpdateCounter(updateCounter + 1), 60000)
bannerApi.get().then(setBanner)

return () => clearTimeout(timeoutId)
}, [updateCounter])
if (!banner) {
return null
}
return (
<Alert variant={banner.severity as any} title={banner.title} isInline>
<div dangerouslySetInnerHTML={{ __html: banner.message || "" }}></div>
</Alert>
)

return getAlertBanner(banner)
}
Loading