-
Notifications
You must be signed in to change notification settings - Fork 94
/
Copy pathworker.go
104 lines (88 loc) · 3.99 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// Package worker implements a Cadence worker with basic configurations.
package worker
import (
"github.com/uber-common/cadence-samples/new_samples/workflows"
"github.com/uber-go/tally"
apiv1 "github.com/uber/cadence-idl/go/proto/api/v1"
"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/activity"
"go.uber.org/cadence/compatibility"
"go.uber.org/cadence/worker"
"go.uber.org/cadence/workflow"
"go.uber.org/yarpc"
"go.uber.org/yarpc/transport/grpc"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
const (
HostPort = "127.0.0.1:7833"
Domain = "cadence-samples"
// TaskListName identifies set of client workflows, activities, and workers.
// It could be your group or client or application name.
TaskListName = "cadence-samples-worker"
ClientName = "cadence-samples-worker"
CadenceService = "cadence-frontend"
)
// StartWorker creates and starts a basic Cadence worker.
func StartWorker() {
logger, cadenceClient := BuildLogger(), BuildCadenceClient()
workerOptions := worker.Options{
Logger: logger,
MetricsScope: tally.NewTestScope(TaskListName, nil),
}
w := worker.New(
cadenceClient,
Domain,
TaskListName,
workerOptions)
// HelloWorld workflow registration
w.RegisterWorkflowWithOptions(workflows.HelloWorldWorkflow, workflow.RegisterOptions{Name: "cadence_samples.HelloWorldWorkflow"})
w.RegisterActivityWithOptions(workflows.HelloWorldActivity, activity.RegisterOptions{Name: "cadence_samples.HelloWorldActivity"})
// Signal workflow registration
w.RegisterWorkflowWithOptions(workflows.SignalGreeterMultiLanguageWorkflow, workflow.RegisterOptions{Name: "cadence_samples.SignalGreeterMultiLanguageWorkflow"})
w.RegisterActivityWithOptions(workflows.GenerateGreetingMessage, activity.RegisterOptions{Name: "cadence_samples.GenerateGreetingMessage"})
// Dynamic workflow registration
w.RegisterWorkflowWithOptions(workflows.DynamicWorkflow, workflow.RegisterOptions{Name: "cadence_samples.DynamicWorkflow"})
w.RegisterActivityWithOptions(workflows.DynamicGreetingActivity, activity.RegisterOptions{Name: "cadence_samples.DynamicGreetingActivity"})
// ParallelPickFirst workflow registration
w.RegisterWorkflowWithOptions(workflows.ParallelBranchPickFirstWorkflow, workflow.RegisterOptions{Name: "cadence_samples.ParallelBranchPickFirstWorkflow"})
w.RegisterActivityWithOptions(workflows.ParallelActivity, activity.RegisterOptions{Name: "cadence_samples.ParallelActivity"})
// Cancel workflow registration
w.RegisterWorkflowWithOptions(workflows.CancelWorkflow, workflow.RegisterOptions{Name: "cadence_samples.CancelWorkflow"})
w.RegisterActivityWithOptions(workflows.ActivityToBeSkipped, activity.RegisterOptions{Name: "cadence_samples.ActivityToBeSkipped"})
w.RegisterActivityWithOptions(workflows.ActivityToBeCanceled, activity.RegisterOptions{Name: "cadence_samples.ActivityToBeCanceled"})
w.RegisterActivityWithOptions(workflows.CleanupActivity, activity.RegisterOptions{Name: "cadence_samples.CleanupActivity"})
err := w.Start()
if err != nil {
panic("Failed to start worker: " + err.Error())
}
logger.Info("Started Worker.", zap.String("worker", TaskListName))
}
func BuildCadenceClient() workflowserviceclient.Interface {
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: ClientName,
Outbounds: yarpc.Outbounds{
CadenceService: {Unary: grpc.NewTransport().NewSingleOutbound(HostPort)},
},
})
if err := dispatcher.Start(); err != nil {
panic("Failed to start dispatcher: " + err.Error())
}
clientConfig := dispatcher.ClientConfig(CadenceService)
return compatibility.NewThrift2ProtoAdapter(
apiv1.NewDomainAPIYARPCClient(clientConfig),
apiv1.NewWorkflowAPIYARPCClient(clientConfig),
apiv1.NewWorkerAPIYARPCClient(clientConfig),
apiv1.NewVisibilityAPIYARPCClient(clientConfig),
)
}
func BuildLogger() *zap.Logger {
config := zap.NewDevelopmentConfig()
config.Level.SetLevel(zapcore.InfoLevel)
var err error
logger, err := config.Build()
if err != nil {
panic("Failed to setup logger: " + err.Error())
}
return logger
}