From f2260341abadf5da790e9fe9789a73a9d587a466 Mon Sep 17 00:00:00 2001 From: Paul Nicolas Date: Fri, 10 Jan 2025 16:01:33 +0100 Subject: [PATCH] feat(tools): add temporal tool to delete schedules and workflows (#256) * add tool to delete schedules * add tool to delete workflows * minor fixes * remove defaults * fix flag * fix useless comments --- cmd/worker.go | 2 - .../main.go | 76 ++++++++++++ .../main.go | 108 ++++++++++++++++++ 3 files changed, 184 insertions(+), 2 deletions(-) create mode 100644 tools/list-and-delete-temporal-schedules/main.go create mode 100644 tools/list-and-delete-temporal-workflows/main.go diff --git a/cmd/worker.go b/cmd/worker.go index 4cb48b78..b7614486 100644 --- a/cmd/worker.go +++ b/cmd/worker.go @@ -60,8 +60,6 @@ func workerOptions(cmd *cobra.Command) (fx.Option, error) { temporalRateLimitingRetryDelay, _ := cmd.Flags().GetDuration(temporalRateLimitingRetryDelay) temporalMaxConcurrentWorkflowTaskPollers, _ := cmd.Flags().GetInt(temporalMaxConcurrentWorkflowTaskPollersFlag) temporalMaxConcurrentActivityTaskPollers, _ := cmd.Flags().GetInt(temporalMaxConcurrentActivityTaskPollersFlag) - fmt.Println("temporalMaxConcurrentWorkflowTaskPollers", temporalMaxConcurrentWorkflowTaskPollers) - fmt.Println("temporalMaxConcurrentActivityTaskPollers", temporalMaxConcurrentActivityTaskPollers) return fx.Options( worker.NewHealthCheckModule(listen, service.IsDebug(cmd)), worker.NewModule( diff --git a/tools/list-and-delete-temporal-schedules/main.go b/tools/list-and-delete-temporal-schedules/main.go new file mode 100644 index 00000000..853e9b50 --- /dev/null +++ b/tools/list-and-delete-temporal-schedules/main.go @@ -0,0 +1,76 @@ +package main + +import ( + "context" + "crypto/tls" + "flag" + "fmt" + "log" + + "go.temporal.io/sdk/client" +) + +var ( + temporalAddress = flag.String("temporal-address", "", "Temporal server address") + temporalNamespace = flag.String("namespace", "", "Temporal namespace") + temporalKey = flag.String("key", "", "TLS key") + temporalCertStr = flag.String("cert", "", "TLS cert") + temporalStack = flag.String("stack", "", "Stack") +) + +func main() { + flag.Parse() + + ctx := context.Background() + + var cert *tls.Certificate + if temporalKey != nil && *temporalKey != "" && temporalCertStr != nil && *temporalCertStr != "" { + clientCert, err := tls.X509KeyPair([]byte(*temporalCertStr), []byte(*temporalKey)) + if err != nil { + panic(err) + } + cert = &clientCert + } + + if temporalStack == nil || *temporalStack == "" { + log.Fatalln("Stack is required") + } + + options := client.Options{ + HostPort: *temporalAddress, + Namespace: *temporalNamespace, + } + if cert != nil { + options.ConnectionOptions = client.ConnectionOptions{ + TLS: &tls.Config{Certificates: []tls.Certificate{*cert}}, + } + } + temporalClient, err := client.Dial(options) + if err != nil { + log.Fatalln("Unable to create Temporal Client", err) + } + defer temporalClient.Close() + + // list schedules + listView, _ := temporalClient.ScheduleClient().List(ctx, client.ScheduleListOptions{ + PageSize: 1, + Query: fmt.Sprintf("Stack=\"%s\"", *temporalStack), + }) + + for listView.HasNext() { + s, err := listView.Next() + if err != nil { + log.Fatalln("Unable to list schedules", err) + } + + // get handle + handle := temporalClient.ScheduleClient().GetHandle(ctx, s.ID) + + // delete schedule + if err := handle.Delete(ctx); err != nil { + log.Fatalln("Unable to delete schedule", err) + } + + log.Println("Deleted schedule", s.ID) + } +} diff --git a/tools/list-and-delete-temporal-workflows/main.go b/tools/list-and-delete-temporal-workflows/main.go new file mode 100644 index 00000000..f2db38ed --- /dev/null +++ b/tools/list-and-delete-temporal-workflows/main.go @@ -0,0 +1,108 @@ +package main + +import ( + "context" + "crypto/tls" + "flag" + "fmt" + "log" + "sync" + + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/client" +) + +var ( + temporalAddress = flag.String("temporal-address", "", "Temporal server address") + temporalNamespace = flag.String("namespace", "", "Temporal namespace") + temporalKey = flag.String("key", "", "TLS key") + temporalCertStr = flag.String("cert", "", "TLS cert") + temporalStack = flag.String("stack", "", "Stack") +) + +func main() { + flag.Parse() + + ctx := context.Background() + + var cert *tls.Certificate + if temporalKey != nil && *temporalKey != "" && temporalCertStr != nil && *temporalCertStr != "" { + clientCert, err := tls.X509KeyPair([]byte(*temporalCertStr), []byte(*temporalKey)) + if err != nil { + panic(err) + } + cert = &clientCert + } + + if temporalStack == nil || *temporalStack == "" { + log.Fatalln("Stack is required") + } + + options := client.Options{ + HostPort: *temporalAddress, + Namespace: *temporalNamespace, + } + if cert != nil { + options.ConnectionOptions = client.ConnectionOptions{ + TLS: &tls.Config{Certificates: []tls.Certificate{*cert}}, + } + } + temporalClient, err := client.Dial(options) + if err != nil { + log.Fatalln("Unable to create Temporal Client", err) + } + defer temporalClient.Close() + + var nextPageToken []byte + wg := sync.WaitGroup{} + for { + resp, err := temporalClient.WorkflowService().ListWorkflowExecutions( + ctx, + &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: "local-operator.sihc8", + PageSize: 100, + NextPageToken: nextPageToken, + Query: fmt.Sprintf("Stack=\"%s\"", *temporalStack), + }, + ) + if err != nil { + log.Fatalln("Unable to list workflows", err) + } + + for _, e := range resp.Executions { + if e.Status != enums.WORKFLOW_EXECUTION_STATUS_RUNNING { + continue + } + + wg.Add(1) + + go func() { + defer wg.Done() + + // close workflow + _, err := temporalClient.WorkflowService().TerminateWorkflowExecution( + ctx, + &workflowservice.TerminateWorkflowExecutionRequest{ + Namespace: *temporalNamespace, + WorkflowExecution: e.Execution, + Reason: "done", + }, + ) + if err != nil { + return + } + + fmt.Println("workflow terminated: ", e.Execution.GetWorkflowId(), e.Execution.GetRunId()) + }() + } + + wg.Wait() + + if resp.NextPageToken == nil { + break + } + + nextPageToken = resp.NextPageToken + } +}