Skip to content

Commit

Permalink
MPL-68: simplify run schema event
Browse files Browse the repository at this point in the history
  • Loading branch information
lampajr committed Apr 9, 2024
1 parent b472e90 commit d9296e5
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,4 @@ public String toString() {
'}';
}
}

public static class CreateOrUpdateEvent {
public int id;
public String uri;

public CreateOrUpdateEvent() {}

public CreateOrUpdateEvent(int id, String uri) {
this.id = id;
this.uri = uri;
}
@Override
public String toString() {
return "CreateOrUpdateEvent{" +
"id=" + id +
", uri=" + uri +
'}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,8 @@ public Integer add(Schema schemaDTO){
}

private void newOrUpdatedSchema(SchemaDAO schema) {
Schema.CreateOrUpdateEvent event = new Schema.CreateOrUpdateEvent(schema.id, schema.uri);
log.debugf("Push schema event for async schema synchronization: %s", event);
Util.registerTxSynchronization(tm, txStatus -> mediator.queueSchemaSync(event));
log.debugf("Push schema event for async run schemas update: %d (%s)", schema.id, schema.uri);
Util.registerTxSynchronization(tm, txStatus -> mediator.queueSchemaSync(schema.id));
}

private void verifyNewSchema(Schema schemaDTO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class ServiceMediator {

@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 10000)
@Channel("schema-sync-out")
Emitter<Schema.CreateOrUpdateEvent> schemaEmitter;
Emitter<Integer> schemaEmitter;

private Map<AsyncEventChannels, Map<Integer, BlockingQueue<Object>>> events = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -168,15 +168,15 @@ void queueRunRecalculation(int runId) {
}

@Incoming("schema-sync-in")
@Blocking(ordered = false, value = "horreum.run.pool")
@Blocking(ordered = false, value = "horreum.schema.pool")
@ActivateRequestContext
public void processSchemaSync(Schema.CreateOrUpdateEvent event) {
runService.onNewOrUpdatedSchema(event.id);
public void processSchemaSync(int schemaId) {
runService.onNewOrUpdatedSchema(schemaId);
}

@Transactional(Transactional.TxType.NOT_SUPPORTED)
void queueSchemaSync(Schema.CreateOrUpdateEvent event) {
schemaEmitter.send(event);
void queueSchemaSync(int schemaId) {
schemaEmitter.send(schemaId);
}

void dataPointsProcessed(DataPoint.DatasetProcessedEvent event) {
Expand Down
1 change: 1 addition & 0 deletions horreum-backend/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ horreum.test-mode=false
# thread pool sizes
smallrye.messaging.worker.horreum.dataset.pool.max-concurrency=10
smallrye.messaging.worker.horreum.run.pool.max-concurrency=10
smallrye.messaging.worker.horreum.schema.pool.max-concurrency=10


hibernate.jdbc.time_zone=UTC
Expand Down

0 comments on commit d9296e5

Please sign in to comment.