diff --git a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java index 6df349eee..32d60b575 100644 --- a/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java +++ b/engine/src/main/java/io/camunda/zeebe/process/test/engine/GrpcToLogStreamGateway.java @@ -7,8 +7,6 @@ */ package io.camunda.zeebe.process.test.engine; -import com.google.protobuf.GeneratedMessageV3; -import com.google.rpc.Status; import io.camunda.zeebe.gateway.protocol.GatewayGrpc; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass; import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest; @@ -43,7 +41,6 @@ import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobRetriesResponse; import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter; import io.camunda.zeebe.msgpack.value.ValueArray; -import io.camunda.zeebe.process.test.engine.GrpcResponseWriter.GrpcResponseMapper; import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter; import io.camunda.zeebe.protocol.impl.record.RecordMetadata; import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord; @@ -69,16 +66,11 @@ import io.camunda.zeebe.util.VersionUtil; import io.camunda.zeebe.util.buffer.BufferUtil; import io.camunda.zeebe.util.buffer.BufferWriter; -import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; class GrpcToLogStreamGateway extends GatewayGrpc.GatewayImplBase { private final LogStreamRecordWriter writer; - private final RecordMetadata recordMetadata = new RecordMetadata(); private final int partitionId; private final int partitionCount; private final int port; @@ -98,17 +90,19 @@ public GrpcToLogStreamGateway( } private void writeCommandWithKey( - final Long key, final RecordMetadata metadata, final BufferWriter bufferWriter) { - writer.reset(); - - writer.key(key).metadataWriter(metadata).valueWriter(bufferWriter).tryWrite(); + final Long key, final BufferWriter bufferWriter, final RecordMetadata recordMetadata) { + synchronized (writer) { + writer.reset(); + writer.key(key).metadataWriter(recordMetadata).valueWriter(bufferWriter).tryWrite(); + } } private void writeCommandWithoutKey( - final RecordMetadata metadata, final BufferWriter bufferWriter) { - writer.reset(); - - writer.keyNull().metadataWriter(metadata).valueWriter(bufferWriter).tryWrite(); + final BufferWriter bufferWriter, final RecordMetadata recordMetadata) { + synchronized (writer) { + writer.reset(); + writer.keyNull().metadataWriter(recordMetadata).valueWriter(bufferWriter).tryWrite(); + } } @Override @@ -118,10 +112,11 @@ public void activateJobs( final Long requestId = gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver); - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.JOB_BATCH) - .intent(JobBatchIntent.ACTIVATE); + final RecordMetadata recordMetadata = + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.JOB_BATCH) + .intent(JobBatchIntent.ACTIVATE); final JobBatchRecord jobBatchRecord = new JobBatchRecord(); @@ -130,7 +125,7 @@ public void activateJobs( jobBatchRecord.setTimeout(request.getTimeout()); jobBatchRecord.setMaxJobsToActivate(request.getMaxJobsToActivate()); - writeCommandWithoutKey(recordMetadata, jobBatchRecord); + writeCommandWithoutKey(jobBatchRecord, recordMetadata); } @Override @@ -140,15 +135,16 @@ public void cancelProcessInstance( final Long requestId = gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver); - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.PROCESS_INSTANCE) - .intent(ProcessInstanceIntent.CANCEL); + final RecordMetadata recordMetadata = + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.PROCESS_INSTANCE) + .intent(ProcessInstanceIntent.CANCEL); final ProcessInstanceRecord processInstanceRecord = new ProcessInstanceRecord(); processInstanceRecord.setProcessInstanceKey(request.getProcessInstanceKey()); - writeCommandWithKey(request.getProcessInstanceKey(), recordMetadata, processInstanceRecord); + writeCommandWithKey(request.getProcessInstanceKey(), processInstanceRecord, recordMetadata); } @Override @@ -158,10 +154,11 @@ public void completeJob( final Long requestId = gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver); - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.JOB) - .intent(JobIntent.COMPLETE); + final RecordMetadata recordMetadata = + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.JOB) + .intent(JobIntent.COMPLETE); final JobRecord jobRecord = new JobRecord(); @@ -170,7 +167,7 @@ public void completeJob( jobRecord.setVariables(BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables))); } - writeCommandWithKey(request.getJobKey(), recordMetadata, jobRecord); + writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata); } @Override @@ -180,14 +177,15 @@ public void createProcessInstance( final Long requestId = gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver); - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.PROCESS_INSTANCE_CREATION) - .intent(ProcessInstanceCreationIntent.CREATE); + final RecordMetadata recordMetadata = + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.PROCESS_INSTANCE_CREATION) + .intent(ProcessInstanceCreationIntent.CREATE); final ProcessInstanceCreationRecord processInstanceCreationRecord = createProcessInstanceCreationRecord(request); - writeCommandWithoutKey(recordMetadata, processInstanceCreationRecord); + writeCommandWithoutKey(processInstanceCreationRecord, recordMetadata); } @Override @@ -197,16 +195,17 @@ public void createProcessInstanceWithResult( final Long requestId = gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver); - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.PROCESS_INSTANCE_CREATION) - .intent(ProcessInstanceCreationIntent.CREATE_WITH_AWAITING_RESULT); + final RecordMetadata recordMetadata = + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.PROCESS_INSTANCE_CREATION) + .intent(ProcessInstanceCreationIntent.CREATE_WITH_AWAITING_RESULT); final ProcessInstanceCreationRecord processInstanceCreationRecord = createProcessInstanceCreationRecord(request.getRequest()); processInstanceCreationRecord.setFetchVariables(request.getFetchVariablesList()); - writeCommandWithoutKey(recordMetadata, processInstanceCreationRecord); + writeCommandWithoutKey(processInstanceCreationRecord, recordMetadata); } @Override @@ -216,10 +215,11 @@ public void deployProcess( final Long requestId = gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver); - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.DEPLOYMENT) - .intent(DeploymentIntent.CREATE); + final RecordMetadata recordMetadata = + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.DEPLOYMENT) + .intent(DeploymentIntent.CREATE); final DeploymentRecord deploymentRecord = new DeploymentRecord(); final ValueArray resources = deploymentRecord.resources(); @@ -234,7 +234,7 @@ public void deployProcess( .setResource(processRequestObject.getDefinition().toByteArray()); })); - writeCommandWithoutKey(recordMetadata, deploymentRecord); + writeCommandWithoutKey(deploymentRecord, recordMetadata); } @Override @@ -244,10 +244,11 @@ public void deployResource( final Long requestId = gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver); - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.DEPLOYMENT) - .intent(DeploymentIntent.CREATE); + final RecordMetadata recordMetadata = + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.DEPLOYMENT) + .intent(DeploymentIntent.CREATE); final DeploymentRecord deploymentRecord = new DeploymentRecord(); final ValueArray resources = deploymentRecord.resources(); @@ -261,7 +262,7 @@ public void deployResource( .setResourceName(resource.getName()) .setResource(resource.getContent().toByteArray()))); - writeCommandWithoutKey(recordMetadata, deploymentRecord); + writeCommandWithoutKey(deploymentRecord, recordMetadata); } @Override @@ -270,14 +271,18 @@ public void failJob( final Long requestId = gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver); - prepareRecordMetadata().requestId(requestId).valueType(ValueType.JOB).intent(JobIntent.FAIL); + final RecordMetadata recordMetadata = + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.JOB) + .intent(JobIntent.FAIL); final JobRecord jobRecord = new JobRecord(); jobRecord.setRetries(request.getRetries()); jobRecord.setErrorMessage(request.getErrorMessage()); - writeCommandWithKey(request.getJobKey(), recordMetadata, jobRecord); + writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata); } @Override @@ -286,17 +291,18 @@ public void throwError( final Long requestId = gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver); - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.JOB) - .intent(JobIntent.THROW_ERROR); + final RecordMetadata recordMetadata = + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.JOB) + .intent(JobIntent.THROW_ERROR); final JobRecord jobRecord = new JobRecord(); jobRecord.setErrorCode(BufferUtil.wrapString(request.getErrorCode())); jobRecord.setErrorMessage(request.getErrorMessage()); - writeCommandWithKey(request.getJobKey(), recordMetadata, jobRecord); + writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata); } @Override @@ -306,10 +312,11 @@ public void publishMessage( final Long requestId = gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver); - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.MESSAGE) - .intent(MessageIntent.PUBLISH); + final RecordMetadata recordMetadata = + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.MESSAGE) + .intent(MessageIntent.PUBLISH); final MessageRecord messageRecord = new MessageRecord(); @@ -323,7 +330,7 @@ public void publishMessage( BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables))); } - writeCommandWithoutKey(recordMetadata, messageRecord); + writeCommandWithoutKey(messageRecord, recordMetadata); } @Override @@ -333,14 +340,15 @@ public void resolveIncident( final Long requestId = gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver); - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.INCIDENT) - .intent(IncidentIntent.RESOLVE); + final RecordMetadata recordMetadata = + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.INCIDENT) + .intent(IncidentIntent.RESOLVE); final IncidentRecord incidentRecord = new IncidentRecord(); - writeCommandWithKey(request.getIncidentKey(), recordMetadata, incidentRecord); + writeCommandWithKey(request.getIncidentKey(), incidentRecord, recordMetadata); } @Override @@ -350,10 +358,11 @@ public void setVariables( final Long requestId = gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver); - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.VARIABLE_DOCUMENT) - .intent(VariableDocumentIntent.UPDATE); + final RecordMetadata recordMetadata = + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.VARIABLE_DOCUMENT) + .intent(VariableDocumentIntent.UPDATE); final VariableDocumentRecord variableDocumentRecord = new VariableDocumentRecord(); @@ -369,7 +378,7 @@ public void setVariables( ? VariableDocumentUpdateSemantic.LOCAL : VariableDocumentUpdateSemantic.PROPAGATE); - writeCommandWithoutKey(recordMetadata, variableDocumentRecord); + writeCommandWithoutKey(variableDocumentRecord, recordMetadata); } @Override @@ -410,19 +419,20 @@ public void updateJobRetries( final Long requestId = gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver); - prepareRecordMetadata() - .requestId(requestId) - .valueType(ValueType.JOB) - .intent(JobIntent.UPDATE_RETRIES); + final RecordMetadata recordMetadata = + prepareRecordMetadata() + .requestId(requestId) + .valueType(ValueType.JOB) + .intent(JobIntent.UPDATE_RETRIES); final JobRecord jobRecord = new JobRecord(); jobRecord.setRetries(request.getRetries()); - writeCommandWithKey(request.getJobKey(), recordMetadata, jobRecord); + writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata); } private RecordMetadata prepareRecordMetadata() { - return recordMetadata.reset().recordType(RecordType.COMMAND).requestStreamId(partitionId); + return new RecordMetadata().recordType(RecordType.COMMAND).requestStreamId(partitionId); } private ProcessInstanceCreationRecord createProcessInstanceCreationRecord(