Skip to content

Commit

Permalink
fix: stop reusing the same RecordMetadata
Browse files Browse the repository at this point in the history
Every command will now create its own RecordMetadata. This ensures the thread-safety of this class.
  • Loading branch information
remcowesterhoud committed May 10, 2022
1 parent be93bc5 commit dc60102
Showing 1 changed file with 89 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
*/
package io.camunda.zeebe.process.test.engine;

import com.google.protobuf.GeneratedMessageV3;
import com.google.rpc.Status;
import io.camunda.zeebe.gateway.protocol.GatewayGrpc;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
Expand Down Expand Up @@ -43,7 +41,6 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobRetriesResponse;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.msgpack.value.ValueArray;
import io.camunda.zeebe.process.test.engine.GrpcResponseWriter.GrpcResponseMapper;
import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
Expand All @@ -69,16 +66,11 @@
import io.camunda.zeebe.util.VersionUtil;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

class GrpcToLogStreamGateway extends GatewayGrpc.GatewayImplBase {

private final LogStreamRecordWriter writer;
private final RecordMetadata recordMetadata = new RecordMetadata();
private final int partitionId;
private final int partitionCount;
private final int port;
Expand All @@ -98,17 +90,19 @@ public GrpcToLogStreamGateway(
}

private void writeCommandWithKey(
final Long key, final RecordMetadata metadata, final BufferWriter bufferWriter) {
writer.reset();

writer.key(key).metadataWriter(metadata).valueWriter(bufferWriter).tryWrite();
final Long key, final BufferWriter bufferWriter, final RecordMetadata recordMetadata) {
synchronized (writer) {
writer.reset();
writer.key(key).metadataWriter(recordMetadata).valueWriter(bufferWriter).tryWrite();
}
}

private void writeCommandWithoutKey(
final RecordMetadata metadata, final BufferWriter bufferWriter) {
writer.reset();

writer.keyNull().metadataWriter(metadata).valueWriter(bufferWriter).tryWrite();
final BufferWriter bufferWriter, final RecordMetadata recordMetadata) {
synchronized (writer) {
writer.reset();
writer.keyNull().metadataWriter(recordMetadata).valueWriter(bufferWriter).tryWrite();
}
}

@Override
Expand All @@ -118,10 +112,11 @@ public void activateJobs(
final Long requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.JOB_BATCH)
.intent(JobBatchIntent.ACTIVATE);
final RecordMetadata recordMetadata =
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.JOB_BATCH)
.intent(JobBatchIntent.ACTIVATE);

final JobBatchRecord jobBatchRecord = new JobBatchRecord();

Expand All @@ -130,7 +125,7 @@ public void activateJobs(
jobBatchRecord.setTimeout(request.getTimeout());
jobBatchRecord.setMaxJobsToActivate(request.getMaxJobsToActivate());

writeCommandWithoutKey(recordMetadata, jobBatchRecord);
writeCommandWithoutKey(jobBatchRecord, recordMetadata);
}

@Override
Expand All @@ -140,15 +135,16 @@ public void cancelProcessInstance(
final Long requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.PROCESS_INSTANCE)
.intent(ProcessInstanceIntent.CANCEL);
final RecordMetadata recordMetadata =
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.PROCESS_INSTANCE)
.intent(ProcessInstanceIntent.CANCEL);

final ProcessInstanceRecord processInstanceRecord = new ProcessInstanceRecord();
processInstanceRecord.setProcessInstanceKey(request.getProcessInstanceKey());

writeCommandWithKey(request.getProcessInstanceKey(), recordMetadata, processInstanceRecord);
writeCommandWithKey(request.getProcessInstanceKey(), processInstanceRecord, recordMetadata);
}

@Override
Expand All @@ -158,10 +154,11 @@ public void completeJob(
final Long requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.JOB)
.intent(JobIntent.COMPLETE);
final RecordMetadata recordMetadata =
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.JOB)
.intent(JobIntent.COMPLETE);

final JobRecord jobRecord = new JobRecord();

Expand All @@ -170,7 +167,7 @@ public void completeJob(
jobRecord.setVariables(BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables)));
}

writeCommandWithKey(request.getJobKey(), recordMetadata, jobRecord);
writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata);
}

@Override
Expand All @@ -180,14 +177,15 @@ public void createProcessInstance(
final Long requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.PROCESS_INSTANCE_CREATION)
.intent(ProcessInstanceCreationIntent.CREATE);
final RecordMetadata recordMetadata =
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.PROCESS_INSTANCE_CREATION)
.intent(ProcessInstanceCreationIntent.CREATE);

final ProcessInstanceCreationRecord processInstanceCreationRecord =
createProcessInstanceCreationRecord(request);
writeCommandWithoutKey(recordMetadata, processInstanceCreationRecord);
writeCommandWithoutKey(processInstanceCreationRecord, recordMetadata);
}

@Override
Expand All @@ -197,16 +195,17 @@ public void createProcessInstanceWithResult(
final Long requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.PROCESS_INSTANCE_CREATION)
.intent(ProcessInstanceCreationIntent.CREATE_WITH_AWAITING_RESULT);
final RecordMetadata recordMetadata =
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.PROCESS_INSTANCE_CREATION)
.intent(ProcessInstanceCreationIntent.CREATE_WITH_AWAITING_RESULT);

final ProcessInstanceCreationRecord processInstanceCreationRecord =
createProcessInstanceCreationRecord(request.getRequest());
processInstanceCreationRecord.setFetchVariables(request.getFetchVariablesList());

writeCommandWithoutKey(recordMetadata, processInstanceCreationRecord);
writeCommandWithoutKey(processInstanceCreationRecord, recordMetadata);
}

@Override
Expand All @@ -216,10 +215,11 @@ public void deployProcess(
final Long requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.DEPLOYMENT)
.intent(DeploymentIntent.CREATE);
final RecordMetadata recordMetadata =
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.DEPLOYMENT)
.intent(DeploymentIntent.CREATE);

final DeploymentRecord deploymentRecord = new DeploymentRecord();
final ValueArray<DeploymentResource> resources = deploymentRecord.resources();
Expand All @@ -234,7 +234,7 @@ public void deployProcess(
.setResource(processRequestObject.getDefinition().toByteArray());
}));

writeCommandWithoutKey(recordMetadata, deploymentRecord);
writeCommandWithoutKey(deploymentRecord, recordMetadata);
}

@Override
Expand All @@ -244,10 +244,11 @@ public void deployResource(
final Long requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.DEPLOYMENT)
.intent(DeploymentIntent.CREATE);
final RecordMetadata recordMetadata =
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.DEPLOYMENT)
.intent(DeploymentIntent.CREATE);

final DeploymentRecord deploymentRecord = new DeploymentRecord();
final ValueArray<DeploymentResource> resources = deploymentRecord.resources();
Expand All @@ -261,7 +262,7 @@ public void deployResource(
.setResourceName(resource.getName())
.setResource(resource.getContent().toByteArray())));

writeCommandWithoutKey(recordMetadata, deploymentRecord);
writeCommandWithoutKey(deploymentRecord, recordMetadata);
}

@Override
Expand All @@ -270,14 +271,18 @@ public void failJob(
final Long requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

prepareRecordMetadata().requestId(requestId).valueType(ValueType.JOB).intent(JobIntent.FAIL);
final RecordMetadata recordMetadata =
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.JOB)
.intent(JobIntent.FAIL);

final JobRecord jobRecord = new JobRecord();

jobRecord.setRetries(request.getRetries());
jobRecord.setErrorMessage(request.getErrorMessage());

writeCommandWithKey(request.getJobKey(), recordMetadata, jobRecord);
writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata);
}

@Override
Expand All @@ -286,17 +291,18 @@ public void throwError(
final Long requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.JOB)
.intent(JobIntent.THROW_ERROR);
final RecordMetadata recordMetadata =
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.JOB)
.intent(JobIntent.THROW_ERROR);

final JobRecord jobRecord = new JobRecord();

jobRecord.setErrorCode(BufferUtil.wrapString(request.getErrorCode()));
jobRecord.setErrorMessage(request.getErrorMessage());

writeCommandWithKey(request.getJobKey(), recordMetadata, jobRecord);
writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata);
}

@Override
Expand All @@ -306,10 +312,11 @@ public void publishMessage(
final Long requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.MESSAGE)
.intent(MessageIntent.PUBLISH);
final RecordMetadata recordMetadata =
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.MESSAGE)
.intent(MessageIntent.PUBLISH);

final MessageRecord messageRecord = new MessageRecord();

Expand All @@ -323,7 +330,7 @@ public void publishMessage(
BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables)));
}

writeCommandWithoutKey(recordMetadata, messageRecord);
writeCommandWithoutKey(messageRecord, recordMetadata);
}

@Override
Expand All @@ -333,14 +340,15 @@ public void resolveIncident(
final Long requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.INCIDENT)
.intent(IncidentIntent.RESOLVE);
final RecordMetadata recordMetadata =
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.INCIDENT)
.intent(IncidentIntent.RESOLVE);

final IncidentRecord incidentRecord = new IncidentRecord();

writeCommandWithKey(request.getIncidentKey(), recordMetadata, incidentRecord);
writeCommandWithKey(request.getIncidentKey(), incidentRecord, recordMetadata);
}

@Override
Expand All @@ -350,10 +358,11 @@ public void setVariables(
final Long requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.VARIABLE_DOCUMENT)
.intent(VariableDocumentIntent.UPDATE);
final RecordMetadata recordMetadata =
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.VARIABLE_DOCUMENT)
.intent(VariableDocumentIntent.UPDATE);

final VariableDocumentRecord variableDocumentRecord = new VariableDocumentRecord();

Expand All @@ -369,7 +378,7 @@ public void setVariables(
? VariableDocumentUpdateSemantic.LOCAL
: VariableDocumentUpdateSemantic.PROPAGATE);

writeCommandWithoutKey(recordMetadata, variableDocumentRecord);
writeCommandWithoutKey(variableDocumentRecord, recordMetadata);
}

@Override
Expand Down Expand Up @@ -410,19 +419,20 @@ public void updateJobRetries(
final Long requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.JOB)
.intent(JobIntent.UPDATE_RETRIES);
final RecordMetadata recordMetadata =
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.JOB)
.intent(JobIntent.UPDATE_RETRIES);

final JobRecord jobRecord = new JobRecord();
jobRecord.setRetries(request.getRetries());

writeCommandWithKey(request.getJobKey(), recordMetadata, jobRecord);
writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata);
}

private RecordMetadata prepareRecordMetadata() {
return recordMetadata.reset().recordType(RecordType.COMMAND).requestStreamId(partitionId);
return new RecordMetadata().recordType(RecordType.COMMAND).requestStreamId(partitionId);
}

private ProcessInstanceCreationRecord createProcessInstanceCreationRecord(
Expand Down

0 comments on commit dc60102

Please sign in to comment.