Skip to content

Commit

Permalink
Fix AsTime() for nil proto timestamp (#1847)
Browse files Browse the repository at this point in the history
  • Loading branch information
antlai-temporal authored Feb 25, 2025
1 parent 2449502 commit 61c10ce
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 12 deletions.
35 changes: 23 additions & 12 deletions internal/internal_worker_deployment_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import (
"errors"
"fmt"
"strings"
"time"

"go.temporal.io/api/common/v1"
"go.temporal.io/api/deployment/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/converter"
"google.golang.org/protobuf/types/known/timestamppb"
)

// A reserved identifier of unversioned workers.
Expand All @@ -40,6 +42,15 @@ const WorkerDeploymentUnversioned = "__unversioned__"
// A reserved separator for Worker Deployment Versions.
const WorkerDeploymentVersionSeparator = "."

// safeAsTime ensures that a nil proto timestamp makes `IsZero()` true.
func safeAsTime(timestamp *timestamppb.Timestamp) time.Time {
if timestamp == nil {
return time.Time{}
} else {
return timestamp.AsTime()
}
}

type (
// WorkerDeploymentClient is the client for managing worker deployments.
workerDeploymentClient struct {
Expand Down Expand Up @@ -100,16 +111,16 @@ func workerDeploymentRoutingConfigFromProto(routingConfig *deployment.RoutingCon
CurrentVersion: routingConfig.GetCurrentVersion(),
RampingVersion: routingConfig.GetRampingVersion(),
RampingVersionPercentage: routingConfig.GetRampingVersionPercentage(),
CurrentVersionChangedTime: routingConfig.GetCurrentVersionChangedTime().AsTime(),
RampingVersionChangedTime: routingConfig.GetRampingVersionChangedTime().AsTime(),
RampingVersionPercentageChangedTime: routingConfig.GetRampingVersionPercentageChangedTime().AsTime(),
CurrentVersionChangedTime: safeAsTime(routingConfig.GetCurrentVersionChangedTime()),
RampingVersionChangedTime: safeAsTime(routingConfig.GetRampingVersionChangedTime()),
RampingVersionPercentageChangedTime: safeAsTime(routingConfig.GetRampingVersionPercentageChangedTime()),
}
}

func workerDeploymentListEntryFromProto(summary *workflowservice.ListWorkerDeploymentsResponse_WorkerDeploymentSummary) *WorkerDeploymentListEntry {
return &WorkerDeploymentListEntry{
Name: summary.GetName(),
CreateTime: summary.GetCreateTime().AsTime(),
CreateTime: safeAsTime(summary.GetCreateTime()),
RoutingConfig: workerDeploymentRoutingConfigFromProto(summary.GetRoutingConfig()),
}
}
Expand All @@ -119,7 +130,7 @@ func workerDeploymentVersionSummariesFromProto(summaries []*deployment.WorkerDep
for _, summary := range summaries {
result = append(result, WorkerDeploymentVersionSummary{
Version: summary.GetVersion(),
CreateTime: summary.CreateTime.AsTime(),
CreateTime: safeAsTime(summary.CreateTime),
DrainageStatus: WorkerDeploymentVersionDrainageStatus(summary.GetDrainageStatus()),
})
}
Expand All @@ -133,7 +144,7 @@ func workerDeploymentInfoFromProto(info *deployment.WorkerDeploymentInfo) Worker

return WorkerDeploymentInfo{
Name: info.Name,
CreateTime: info.CreateTime.AsTime(),
CreateTime: safeAsTime(info.CreateTime),
VersionSummaries: workerDeploymentVersionSummariesFromProto(info.VersionSummaries),
RoutingConfig: workerDeploymentRoutingConfigFromProto(info.RoutingConfig),
LastModifierIdentity: info.LastModifierIdentity,
Expand Down Expand Up @@ -296,8 +307,8 @@ func workerDeploymentDrainageInfoFromProto(drainageInfo *deployment.VersionDrain
}
return &WorkerDeploymentVersionDrainageInfo{
DrainageStatus: WorkerDeploymentVersionDrainageStatus(drainageInfo.Status),
LastChangedTime: drainageInfo.LastChangedTime.AsTime(),
LastCheckedTime: drainageInfo.LastCheckedTime.AsTime(),
LastChangedTime: safeAsTime(drainageInfo.LastChangedTime),
LastCheckedTime: safeAsTime(drainageInfo.LastCheckedTime),
}
}

Expand All @@ -307,10 +318,10 @@ func workerDeploymentVersionInfoFromProto(info *deployment.WorkerDeploymentVersi
}
return WorkerDeploymentVersionInfo{
Version: info.Version,
CreateTime: info.CreateTime.AsTime(),
RoutingChangedTime: info.RoutingChangedTime.AsTime(),
CurrentSinceTime: info.CurrentSinceTime.AsTime(),
RampingSinceTime: info.RampingSinceTime.AsTime(),
CreateTime: safeAsTime(info.CreateTime),
RoutingChangedTime: safeAsTime(info.RoutingChangedTime),
CurrentSinceTime: safeAsTime(info.CurrentSinceTime),
RampingSinceTime: safeAsTime(info.RampingSinceTime),
RampPercentage: info.RampPercentage,
TaskQueuesInfos: workerDeploymentTaskQueuesInfosFromProto(info.TaskQueueInfos),
DrainageInfo: workerDeploymentDrainageInfoFromProto(info.DrainageInfo),
Expand Down
23 changes: 23 additions & 0 deletions internal/internal_worker_deployment_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/suite"
"go.temporal.io/api/deployment/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/api/workflowservicemock/v1"
Expand Down Expand Up @@ -147,3 +148,25 @@ func (d *workerDeploymentClientTestSuite) TestWorkerDeploymentIteratorError() {
d.Nil(event)
d.NotNil(err)
}

// nil timestamps pass IsZero()
func (d *workerDeploymentClientTestSuite) TestWorkerDeploymenNilTimestamp() {
request := &workflowservice.DescribeWorkerDeploymentRequest{
Namespace: DefaultNamespace,
DeploymentName: "foo",
}

response := &workflowservice.DescribeWorkerDeploymentResponse{
ConflictToken: []byte{1, 2, 1, 2, 1, 1, 8},
WorkerDeploymentInfo: &deployment.WorkerDeploymentInfo{
Name: "foo",
CreateTime: nil,
},
}

d.service.EXPECT().DescribeWorkerDeployment(gomock.Any(), request, gomock.Any()).Return(response, nil).Times(1)

dHandle := d.client.WorkerDeploymentClient().GetHandle("foo")
deployment, _ := dHandle.Describe(context.Background(), WorkerDeploymentDescribeOptions{})
d.True(deployment.Info.CreateTime.IsZero())
}

0 comments on commit 61c10ce

Please sign in to comment.