Skip to content

Commit

Permalink
feat(tools): add temporal tool to delete schedules and workflows (#256)
Browse files Browse the repository at this point in the history
* add tool to delete schedules

* add tool to delete workflows

* minor fixes

* remove defaults

* fix flag

* fix useless comments
  • Loading branch information
paul-nicolas authored Jan 10, 2025
1 parent 5b82807 commit f226034
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 2 deletions.
2 changes: 0 additions & 2 deletions cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ func workerOptions(cmd *cobra.Command) (fx.Option, error) {
temporalRateLimitingRetryDelay, _ := cmd.Flags().GetDuration(temporalRateLimitingRetryDelay)
temporalMaxConcurrentWorkflowTaskPollers, _ := cmd.Flags().GetInt(temporalMaxConcurrentWorkflowTaskPollersFlag)
temporalMaxConcurrentActivityTaskPollers, _ := cmd.Flags().GetInt(temporalMaxConcurrentActivityTaskPollersFlag)
fmt.Println("temporalMaxConcurrentWorkflowTaskPollers", temporalMaxConcurrentWorkflowTaskPollers)
fmt.Println("temporalMaxConcurrentActivityTaskPollers", temporalMaxConcurrentActivityTaskPollers)
return fx.Options(
worker.NewHealthCheckModule(listen, service.IsDebug(cmd)),
worker.NewModule(
Expand Down
76 changes: 76 additions & 0 deletions tools/list-and-delete-temporal-schedules/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"context"
"crypto/tls"
"flag"
"fmt"
"log"

"go.temporal.io/sdk/client"
)

var (
temporalAddress = flag.String("temporal-address", "", "Temporal server address")
temporalNamespace = flag.String("namespace", "", "Temporal namespace")
temporalKey = flag.String("key", "", "TLS key")
temporalCertStr = flag.String("cert", "", "TLS cert")
temporalStack = flag.String("stack", "", "Stack")
)

func main() {
flag.Parse()

ctx := context.Background()

var cert *tls.Certificate
if temporalKey != nil && *temporalKey != "" && temporalCertStr != nil && *temporalCertStr != "" {
clientCert, err := tls.X509KeyPair([]byte(*temporalCertStr), []byte(*temporalKey))
if err != nil {
panic(err)
}
cert = &clientCert
}

if temporalStack == nil || *temporalStack == "" {
log.Fatalln("Stack is required")
}

options := client.Options{
HostPort: *temporalAddress,
Namespace: *temporalNamespace,
}
if cert != nil {
options.ConnectionOptions = client.ConnectionOptions{
TLS: &tls.Config{Certificates: []tls.Certificate{*cert}},
}
}
temporalClient, err := client.Dial(options)
if err != nil {
log.Fatalln("Unable to create Temporal Client", err)
}
defer temporalClient.Close()

// list schedules
listView, _ := temporalClient.ScheduleClient().List(ctx, client.ScheduleListOptions{
PageSize: 1,
Query: fmt.Sprintf("Stack=\"%s\"", *temporalStack),
})

for listView.HasNext() {
s, err := listView.Next()
if err != nil {
log.Fatalln("Unable to list schedules", err)
}

// get handle
handle := temporalClient.ScheduleClient().GetHandle(ctx, s.ID)

// delete schedule
if err := handle.Delete(ctx); err != nil {
log.Fatalln("Unable to delete schedule", err)
}

log.Println("Deleted schedule", s.ID)
}
}
108 changes: 108 additions & 0 deletions tools/list-and-delete-temporal-workflows/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package main

import (
"context"
"crypto/tls"
"flag"
"fmt"
"log"
"sync"

"go.temporal.io/api/enums/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
)

var (
temporalAddress = flag.String("temporal-address", "", "Temporal server address")
temporalNamespace = flag.String("namespace", "", "Temporal namespace")
temporalKey = flag.String("key", "", "TLS key")
temporalCertStr = flag.String("cert", "", "TLS cert")
temporalStack = flag.String("stack", "", "Stack")
)

func main() {
flag.Parse()

ctx := context.Background()

var cert *tls.Certificate
if temporalKey != nil && *temporalKey != "" && temporalCertStr != nil && *temporalCertStr != "" {
clientCert, err := tls.X509KeyPair([]byte(*temporalCertStr), []byte(*temporalKey))
if err != nil {
panic(err)
}
cert = &clientCert
}

if temporalStack == nil || *temporalStack == "" {
log.Fatalln("Stack is required")
}

options := client.Options{
HostPort: *temporalAddress,
Namespace: *temporalNamespace,
}
if cert != nil {
options.ConnectionOptions = client.ConnectionOptions{
TLS: &tls.Config{Certificates: []tls.Certificate{*cert}},
}
}
temporalClient, err := client.Dial(options)
if err != nil {
log.Fatalln("Unable to create Temporal Client", err)
}
defer temporalClient.Close()

var nextPageToken []byte
wg := sync.WaitGroup{}
for {
resp, err := temporalClient.WorkflowService().ListWorkflowExecutions(
ctx,
&workflowservice.ListWorkflowExecutionsRequest{
Namespace: "local-operator.sihc8",
PageSize: 100,
NextPageToken: nextPageToken,
Query: fmt.Sprintf("Stack=\"%s\"", *temporalStack),
},
)
if err != nil {
log.Fatalln("Unable to list workflows", err)
}

for _, e := range resp.Executions {
if e.Status != enums.WORKFLOW_EXECUTION_STATUS_RUNNING {
continue
}

wg.Add(1)

go func() {
defer wg.Done()

// close workflow
_, err := temporalClient.WorkflowService().TerminateWorkflowExecution(
ctx,
&workflowservice.TerminateWorkflowExecutionRequest{
Namespace: *temporalNamespace,
WorkflowExecution: e.Execution,
Reason: "done",
},
)
if err != nil {
return
}

fmt.Println("workflow terminated: ", e.Execution.GetWorkflowId(), e.Execution.GetRunId())
}()
}

wg.Wait()

if resp.NextPageToken == nil {
break
}

nextPageToken = resp.NextPageToken
}
}

0 comments on commit f226034

Please sign in to comment.