Skip to content

Commit

Permalink
Update Zeebe to version 1.0 (#876)
Browse files Browse the repository at this point in the history
* Update Zeebe to version 1.0

* Fix lint error regarding import

* Don't panic on error

* Log error as string

* Pass reason msg to process engine on fail job command

* Pass job specific variables as headers to a worker

* Fix formatting issue

Co-authored-by: Phil Kedy <[email protected]>
Co-authored-by: Artur Souza <[email protected]>
  • Loading branch information
3 people authored Jun 11, 2021
1 parent 9cd7de9 commit 3689020
Show file tree
Hide file tree
Showing 26 changed files with 217 additions and 270 deletions.
3 changes: 1 addition & 2 deletions bindings/zeebe/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import (
"encoding/json"
"errors"

"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"

"github.com/camunda-cloud/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
Expand Down
3 changes: 1 addition & 2 deletions bindings/zeebe/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
"github.com/stretchr/testify/assert"
)

func TestParseMetadata(t *testing.T) {
Expand Down
9 changes: 4 additions & 5 deletions bindings/zeebe/command/activate_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/zeebe-io/zeebe/clients/go/pkg/commands"
"github.com/zeebe-io/zeebe/clients/go/pkg/entities"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"

"github.com/camunda-cloud/zeebe/clients/go/pkg/commands"
"github.com/camunda-cloud/zeebe/clients/go/pkg/entities"
"github.com/camunda-cloud/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
contrib_metadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
"github.com/stretchr/testify/assert"
)

type mockActivateJobsClient struct {
Expand Down
12 changes: 6 additions & 6 deletions bindings/zeebe/command/cancel_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
"github.com/dapr/components-contrib/bindings"
)

var ErrMissingWorkflowInstanceKey = errors.New("workflowInstanceKey is a required attribute")
var ErrMissingProcessInstanceKey = errors.New("processInstanceKey is a required attribute")

type cancelInstancePayload struct {
WorkflowInstanceKey *int64 `json:"workflowInstanceKey"`
ProcessInstanceKey *int64 `json:"processInstanceKey"`
}

func (z *ZeebeCommand) cancelInstance(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
Expand All @@ -27,15 +27,15 @@ func (z *ZeebeCommand) cancelInstance(req *bindings.InvokeRequest) (*bindings.In
return nil, err
}

if payload.WorkflowInstanceKey == nil {
return nil, ErrMissingWorkflowInstanceKey
if payload.ProcessInstanceKey == nil {
return nil, ErrMissingProcessInstanceKey
}

_, err = z.client.NewCancelInstanceCommand().
WorkflowInstanceKey(*payload.WorkflowInstanceKey).
ProcessInstanceKey(*payload.ProcessInstanceKey).
Send(context.Background())
if err != nil {
return nil, fmt.Errorf("cannot cancel instance for workflow instance key %d: %w", payload.WorkflowInstanceKey, err)
return nil, fmt.Errorf("cannot cancel instance for process instance key %d: %w", payload.ProcessInstanceKey, err)
}

return &bindings.InvokeResponse{}, nil
Expand Down
35 changes: 17 additions & 18 deletions bindings/zeebe/command/cancel_instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
"github.com/zeebe-io/zeebe/clients/go/pkg/commands"
"github.com/zeebe-io/zeebe/clients/go/pkg/pb"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"

"github.com/camunda-cloud/zeebe/clients/go/pkg/commands"
"github.com/camunda-cloud/zeebe/clients/go/pkg/pb"
"github.com/camunda-cloud/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
"github.com/stretchr/testify/assert"
)

type mockCancelInstanceClient struct {
Expand All @@ -26,45 +25,45 @@ type mockCancelInstanceClient struct {

type mockCancelInstanceStep1 struct {
commands.CancelInstanceStep1
cmd2 *mockDispatchCancelWorkflowInstanceCommand
cmd2 *mockDispatchCancelProcessInstanceCommand
}

type mockDispatchCancelWorkflowInstanceCommand struct {
commands.DispatchCancelWorkflowInstanceCommand
workflowInstanceKey int64
type mockDispatchCancelProcessInstanceCommand struct {
commands.DispatchCancelProcessInstanceCommand
processInstanceKey int64
}

func (mc *mockCancelInstanceClient) NewCancelInstanceCommand() commands.CancelInstanceStep1 {
mc.cmd1 = &mockCancelInstanceStep1{
cmd2: &mockDispatchCancelWorkflowInstanceCommand{},
cmd2: &mockDispatchCancelProcessInstanceCommand{},
}

return mc.cmd1
}

func (cmd1 *mockCancelInstanceStep1) WorkflowInstanceKey(workflowInstanceKey int64) commands.DispatchCancelWorkflowInstanceCommand {
cmd1.cmd2.workflowInstanceKey = workflowInstanceKey
func (cmd1 *mockCancelInstanceStep1) ProcessInstanceKey(processInstanceKey int64) commands.DispatchCancelProcessInstanceCommand {
cmd1.cmd2.processInstanceKey = processInstanceKey

return cmd1.cmd2
}

func (cmd2 *mockDispatchCancelWorkflowInstanceCommand) Send(context.Context) (*pb.CancelWorkflowInstanceResponse, error) {
return &pb.CancelWorkflowInstanceResponse{}, nil
func (cmd2 *mockDispatchCancelProcessInstanceCommand) Send(context.Context) (*pb.CancelProcessInstanceResponse, error) {
return &pb.CancelProcessInstanceResponse{}, nil
}

func TestCancelInstance(t *testing.T) {
testLogger := logger.NewLogger("test")

t.Run("workflowInstanceKey is mandatory", func(t *testing.T) {
t.Run("processInstanceKey is mandatory", func(t *testing.T) {
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Operation: cancelInstanceOperation}
_, err := message.Invoke(req)
assert.Error(t, err, ErrMissingWorkflowInstanceKey)
assert.Error(t, err, ErrMissingProcessInstanceKey)
})

t.Run("cancel a command", func(t *testing.T) {
payload := cancelInstancePayload{
WorkflowInstanceKey: new(int64),
ProcessInstanceKey: new(int64),
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
Expand All @@ -77,6 +76,6 @@ func TestCancelInstance(t *testing.T) {
_, err = message.Invoke(req)
assert.NoError(t, err)

assert.Equal(t, *payload.WorkflowInstanceKey, mc.cmd1.cmd2.workflowInstanceKey)
assert.Equal(t, *payload.ProcessInstanceKey, mc.cmd1.cmd2.processInstanceKey)
})
}
11 changes: 5 additions & 6 deletions bindings/zeebe/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ import (
"errors"
"fmt"

"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"

"github.com/camunda-cloud/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/bindings/zeebe"
"github.com/dapr/kit/logger"
Expand All @@ -19,7 +18,7 @@ import (
const (
// operations
topologyOperation bindings.OperationKind = "topology"
deployWorkflowOperation bindings.OperationKind = "deploy-workflow"
deployProcessOperation bindings.OperationKind = "deploy-process"
createInstanceOperation bindings.OperationKind = "create-instance"
cancelInstanceOperation bindings.OperationKind = "cancel-instance"
setVariablesOperation bindings.OperationKind = "set-variables"
Expand Down Expand Up @@ -66,7 +65,7 @@ func (z *ZeebeCommand) Init(metadata bindings.Metadata) error {
func (z *ZeebeCommand) Operations() []bindings.OperationKind {
return []bindings.OperationKind{
topologyOperation,
deployWorkflowOperation,
deployProcessOperation,
createInstanceOperation,
cancelInstanceOperation,
setVariablesOperation,
Expand All @@ -84,8 +83,8 @@ func (z *ZeebeCommand) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResp
switch req.Operation {
case topologyOperation:
return z.topology()
case deployWorkflowOperation:
return z.deployWorkflow(req)
case deployProcessOperation:
return z.deployProcess(req)
case createInstanceOperation:
return z.createInstance(req)
case cancelInstanceOperation:
Expand Down
9 changes: 4 additions & 5 deletions bindings/zeebe/command/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"

"github.com/camunda-cloud/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/bindings/zeebe"
"github.com/dapr/kit/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type mockClientFactory struct {
Expand Down Expand Up @@ -86,7 +85,7 @@ func TestOperations(t *testing.T) {
operations := testBinding.Operations()
require.Equal(t, 12, len(operations))
assert.Equal(t, topologyOperation, operations[0])
assert.Equal(t, deployWorkflowOperation, operations[1])
assert.Equal(t, deployProcessOperation, operations[1])
assert.Equal(t, createInstanceOperation, operations[2])
assert.Equal(t, cancelInstanceOperation, operations[3])
assert.Equal(t, setVariablesOperation, operations[4])
Expand Down
3 changes: 1 addition & 2 deletions bindings/zeebe/command/complete_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import (
"encoding/json"
"fmt"

"github.com/zeebe-io/zeebe/clients/go/pkg/commands"

"github.com/camunda-cloud/zeebe/clients/go/pkg/commands"
"github.com/dapr/components-contrib/bindings"
)

Expand Down
9 changes: 4 additions & 5 deletions bindings/zeebe/command/complete_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
"github.com/zeebe-io/zeebe/clients/go/pkg/commands"
"github.com/zeebe-io/zeebe/clients/go/pkg/pb"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"

"github.com/camunda-cloud/zeebe/clients/go/pkg/commands"
"github.com/camunda-cloud/zeebe/clients/go/pkg/pb"
"github.com/camunda-cloud/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
"github.com/stretchr/testify/assert"
)

type mockCompleteJobClient struct {
Expand Down
23 changes: 11 additions & 12 deletions bindings/zeebe/command/create_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,20 @@ import (
"errors"
"fmt"

"github.com/zeebe-io/zeebe/clients/go/pkg/commands"

"github.com/camunda-cloud/zeebe/clients/go/pkg/commands"
"github.com/dapr/components-contrib/bindings"
)

var (
ErrAmbiguousCreationVars = errors.New("either 'bpmnProcessId' or 'workflowKey' must be passed, not both at the same time")
ErrMissingCreationVars = errors.New("either 'bpmnProcessId' or 'workflowKey' must be passed")
ErrAmbiguousCreationVars = errors.New("either 'bpmnProcessId' or 'processDefinitionKey' must be passed, not both at the same time")
ErrMissingCreationVars = errors.New("either 'bpmnProcessId' or 'processDefinitionKey' must be passed")
)

type createInstancePayload struct {
BpmnProcessID string `json:"bpmnProcessId"`
WorkflowKey *int64 `json:"workflowKey"`
Version *int32 `json:"version"`
Variables interface{} `json:"variables"`
BpmnProcessID string `json:"bpmnProcessId"`
ProcessDefinitionKey *int64 `json:"processDefinitionKey"`
Version *int32 `json:"version"`
Variables interface{} `json:"variables"`
}

func (z *ZeebeCommand) createInstance(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
Expand All @@ -41,7 +40,7 @@ func (z *ZeebeCommand) createInstance(req *bindings.InvokeRequest) (*bindings.In
var errorDetail string

if payload.BpmnProcessID != "" { //nolint:nestif
if payload.WorkflowKey != nil {
if payload.ProcessDefinitionKey != nil {
return nil, ErrAmbiguousCreationVars
}

Expand All @@ -53,9 +52,9 @@ func (z *ZeebeCommand) createInstance(req *bindings.InvokeRequest) (*bindings.In
cmd3 = cmd2.LatestVersion()
errorDetail = fmt.Sprintf("bpmnProcessId %s and lates version", payload.BpmnProcessID)
}
} else if payload.WorkflowKey != nil {
cmd3 = cmd1.WorkflowKey(*payload.WorkflowKey)
errorDetail = fmt.Sprintf("workflowKey %d", payload.WorkflowKey)
} else if payload.ProcessDefinitionKey != nil {
cmd3 = cmd1.ProcessDefinitionKey(*payload.ProcessDefinitionKey)
errorDetail = fmt.Sprintf("processDefinitionKey %d", payload.ProcessDefinitionKey)
} else {
return nil, ErrMissingCreationVars
}
Expand Down
Loading

0 comments on commit 3689020

Please sign in to comment.