Skip to content

Commit

Permalink
Fix some minor warnings in the matching package (#4608)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
I went through all the GoLand IDE inspection errors in the matching
package, and I fixed the ones that seemed relevant. Most of them are
little typos.

<!-- Tell your future self why have you made these changes -->
**Why?**
To clean things up a bit, make it easier to find actual errors from
inspection, make sure these don't have to get included in other actual
behavioral changes.

<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
I made sure not to include any behavioral changes, even small things
like replacing `err ==` with `errors.Is`.

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**


<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
  • Loading branch information
MichaelSnowden authored Jul 14, 2023
1 parent cc38a41 commit 7e2de69
Show file tree
Hide file tree
Showing 18 changed files with 137 additions and 124 deletions.
10 changes: 5 additions & 5 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ linters-settings:
# Disabled rules
- name: add-constant
disabled: true
- name: argument-limit
disabled: true
- name: bare-return
disabled: true
- name: banned-characters
Expand Down Expand Up @@ -78,9 +80,6 @@ linters-settings:
disabled: true

# Rule tuning
- name: argument-limit
arguments:
- 10
- name: cognitive-complexity
arguments:
- 25
Expand All @@ -92,8 +91,9 @@ linters-settings:
- 3
- name: unhandled-error
arguments:
- "fmt.Printf"
- "fmt.Println"
- "fmt.*"
- "bytes.Buffer.*"
- "strings.Builder.*"
issues:
# Exclude cyclomatic and cognitive complexity rules for functional tests in the `tests` root directory.
exclude-rules:
Expand Down
2 changes: 1 addition & 1 deletion service/matching/ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (m *ackManager) completeTask(taskID int64) (ackLevel int64) {
m.backlogCounter.Dec()
}

// TODO the ack level management shuld be done by a dedicated coroutine
// TODO the ack level management should be done by a dedicated coroutine
// this is only a temporarily solution

taskIDs := maps.Keys(m.outstandingTasks)
Expand Down
2 changes: 1 addition & 1 deletion service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ type (
AdminNamespaceTaskQueueToPartitionDispatchRate func() float64

// If set to false, matching does not load user data from DB for root partitions or fetch it via RPC from the
// root. When disbled, features that rely on user data (e.g. worker versioning) will essentially be disabled.
// root. When disabled, features that rely on user data (e.g. worker versioning) will essentially be disabled.
// See the documentation for constants.MatchingLoadUserData for the implications on versioning.
LoadUserData func() bool

Expand Down
6 changes: 3 additions & 3 deletions service/matching/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,16 +310,16 @@ func (db *taskQueueDB) CompleteTasksLessThan(
return n, err
}

// Returns true if we are storing user data in the db. We need to be the root partition,
// workflow type, unversioned, and also a normal queue.
// DbStoresUserData returns true if we are storing user data in the db. We need to be the root partition, workflow type,
// unversioned, and also a normal queue.
func (db *taskQueueDB) DbStoresUserData() bool {
return db.taskQueue.OwnsUserData() && db.taskQueueKind == enumspb.TASK_QUEUE_KIND_NORMAL
}

// GetUserData returns the versioning data for this task queue. Do not mutate the returned pointer, as doing so
// will cause cache inconsistency.
func (db *taskQueueDB) GetUserData(
ctx context.Context,
context.Context,
) (*persistencespb.VersionedTaskQueueUserData, chan struct{}, error) {
db.Lock()
defer db.Unlock()
Expand Down
9 changes: 3 additions & 6 deletions service/matching/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func newForwarder(
return fwdr
}

// ForwardTask forwards an activity or workflow task to the parent task queue partition if it exist
// ForwardTask forwards an activity or workflow task to the parent task queue partition if it exists
func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) error {
if fwdr.taskQueueKind == enumspb.TASK_QUEUE_KIND_STICKY {
return errTaskQueueKind
Expand All @@ -131,15 +131,12 @@ func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) erro

var expirationDuration time.Duration
expirationTime := timestamp.TimeValue(task.event.Data.ExpiryTime)
if expirationTime.IsZero() {
// noop
} else {
if !expirationTime.IsZero() {
expirationDuration = time.Until(expirationTime)
if expirationDuration <= 0 {
return nil
}
}

switch fwdr.taskQueueID.taskType {
case enumspb.TASK_QUEUE_TYPE_WORKFLOW:
_, err = fwdr.client.AddWorkflowTask(ctx, &matchingservice.AddWorkflowTaskRequest{
Expand Down Expand Up @@ -178,7 +175,7 @@ func (fwdr *Forwarder) ForwardTask(ctx context.Context, task *internalTask) erro
return fwdr.handleErr(err)
}

// ForwardQueryTask forwards a query task to parent task queue partition, if it exist
// ForwardQueryTask forwards a query task to parent task queue partition, if it exists
func (fwdr *Forwarder) ForwardQueryTask(
ctx context.Context,
task *internalTask,
Expand Down
10 changes: 5 additions & 5 deletions service/matching/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (t *ForwarderTestSuite) TestForwardWorkflowTask() {
t.NoError(t.fwdr.ForwardTask(context.Background(), task))
t.NotNil(request)
t.Equal(mustParent(t.taskQueue.Name, 20).FullName(), request.TaskQueue.GetName())
t.Equal(enumspb.TaskQueueKind(t.fwdr.taskQueueKind), request.TaskQueue.GetKind())
t.Equal(t.fwdr.taskQueueKind, request.TaskQueue.GetKind())
t.Equal(taskInfo.Data.GetNamespaceId(), request.GetNamespaceId())
t.Equal(taskInfo.Data.GetWorkflowId(), request.GetExecution().GetWorkflowId())
t.Equal(taskInfo.Data.GetRunId(), request.GetExecution().GetRunId())
Expand Down Expand Up @@ -175,7 +175,7 @@ func (t *ForwarderTestSuite) TestForwardQueryTask() {
gotResp, err := t.fwdr.ForwardQueryTask(context.Background(), task)
t.NoError(err)
t.Equal(mustParent(t.taskQueue.Name, 20).FullName(), request.TaskQueue.GetName())
t.Equal(enumspb.TaskQueueKind(t.fwdr.taskQueueKind), request.TaskQueue.GetKind())
t.Equal(t.fwdr.taskQueueKind, request.TaskQueue.GetKind())
t.Equal(task.query.request.QueryRequest, request.QueryRequest)
t.Equal(resp, gotResp)
}
Expand All @@ -191,7 +191,7 @@ func (t *ForwarderTestSuite) TestForwardQueryTaskRateNotEnforced() {
t.NoError(err)
}
_, err := t.fwdr.ForwardQueryTask(context.Background(), task)
t.NoError(err) // no rateliming should be enforced for query task
t.NoError(err) // no rate limiting should be enforced for query task
}

func (t *ForwarderTestSuite) TestForwardPollError() {
Expand Down Expand Up @@ -228,7 +228,7 @@ func (t *ForwarderTestSuite) TestForwardPollWorkflowTaskQueue() {
t.Equal(t.taskQueue.namespaceID, namespace.ID(request.GetNamespaceId()))
t.Equal("id1", request.GetPollRequest().GetIdentity())
t.Equal(mustParent(t.taskQueue.Name, 20).FullName(), request.GetPollRequest().GetTaskQueue().GetName())
t.Equal(enumspb.TaskQueueKind(t.fwdr.taskQueueKind), request.GetPollRequest().GetTaskQueue().GetKind())
t.Equal(t.fwdr.taskQueueKind, request.GetPollRequest().GetTaskQueue().GetKind())
t.Equal(resp, task.pollWorkflowTaskQueueResponse())
t.Nil(task.pollActivityTaskQueueResponse())
}
Expand Down Expand Up @@ -256,7 +256,7 @@ func (t *ForwarderTestSuite) TestForwardPollForActivity() {
t.Equal(t.taskQueue.namespaceID, namespace.ID(request.GetNamespaceId()))
t.Equal("id1", request.GetPollRequest().GetIdentity())
t.Equal(mustParent(t.taskQueue.Name, 20).FullName(), request.GetPollRequest().GetTaskQueue().GetName())
t.Equal(enumspb.TaskQueueKind(t.fwdr.taskQueueKind), request.GetPollRequest().GetTaskQueue().GetKind())
t.Equal(t.fwdr.taskQueueKind, request.GetPollRequest().GetTaskQueue().GetKind())
t.Equal(resp, task.pollActivityTaskQueueResponse())
t.Nil(task.pollWorkflowTaskQueueResponse())
}
Expand Down
6 changes: 3 additions & 3 deletions service/matching/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func RateLimitInterceptorProvider(
)
}

// This function is the same between services but uses different config sources.
// PersistenceRateLimitingParamsProvider is the same between services but uses different config sources.
// if-case comes from resourceImpl.New.
func PersistenceRateLimitingParamsProvider(
serviceConfig *Config,
Expand All @@ -129,8 +129,8 @@ func ServiceResolverProvider(membershipMonitor membership.Monitor) (membership.S
return membershipMonitor.GetResolver(primitives.MatchingService)
}

// This type is used to ensure the replicator only gets set if global namespaces are enabled on this cluster.
// See NamespaceReplicationQueueProvider below.
// TaskQueueReplicatorNamespaceReplicationQueue is used to ensure the replicator only gets set if global namespaces are
// enabled on this cluster. See NamespaceReplicationQueueProvider below.
type TaskQueueReplicatorNamespaceReplicationQueue persistence.NamespaceReplicationQueue

func NamespaceReplicationQueueProvider(
Expand Down
15 changes: 7 additions & 8 deletions service/matching/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ type TaskMatcher struct {

// synchronous task channel to match producer/consumer
taskC chan *internalTask
// synchronous task channel to match query task - the reason to have
// separate channel for this is because there are cases when consumers
// are interested in queryTasks but not others. Example is when namespace is
// not active in a cluster
// synchronous task channel to match query task - the reason to have a
// separate channel for this is that there are cases where consumers
// are interested in queryTasks but not others. One example is when a
// namespace is not active in a cluster.
queryTaskC chan *internalTask

// dynamicRate is the dynamic rate & burst for rate limiter
Expand All @@ -75,9 +75,8 @@ var (
errInterrupted = errors.New("interrupted offer")
)

// newTaskMatcher returns an task matcher instance. The returned instance can be
// used by task producers and consumers to find a match. Both sync matches and non-sync
// matches should use this implementation
// newTaskMatcher returns a task matcher instance. The returned instance can be used by task producers and consumers to
// find a match. Both sync matches and non-sync matches should use this implementation
func newTaskMatcher(config *taskQueueConfig, fwdr *Forwarder, metricsHandler metrics.Handler) *TaskMatcher {
dynamicRateBurst := quotas.NewMutableRateBurst(
defaultTaskDispatchRPS,
Expand Down Expand Up @@ -383,7 +382,7 @@ func (tm *TaskMatcher) poll(ctx context.Context, pollMetadata *pollMetadata, que
default:
}

// 3. forwarding (and all other clauses repeated again)
// 3. forwarding (and all other clauses repeated)
select {
case <-ctx.Done():
tm.metricsHandler.Counter(metrics.PollTimeoutPerTaskQueueCounter.GetMetricName()).Record(1)
Expand Down
40 changes: 18 additions & 22 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (e *matchingEngineImpl) String() string {
//
// Note that stickyInfo is not used as part of the task queue identity. That means that if
// getTaskQueueManager is called twice with the same taskQueue but different stickyInfo, the
// properties of the taskQueueManager will depend on which call came first. In general we can
// properties of the taskQueueManager will depend on which call came first. In general, we can
// rely on kind being the same for all calls now, but normalName was a later addition to the
// protocol and is not always set consistently. normalName is only required when using
// versioning, and SDKs that support versioning will always set it. The current server version
Expand Down Expand Up @@ -347,9 +347,7 @@ func (e *matchingEngineImpl) AddWorkflowTask(
var expirationTime *time.Time
now := timestamp.TimePtr(time.Now().UTC())
expirationDuration := timestamp.DurationValue(addRequest.GetScheduleToStartTimeout())
if expirationDuration == 0 {
// noop
} else {
if expirationDuration != 0 {
expirationTime = timestamp.TimePtr(now.Add(expirationDuration))
}
taskInfo := &persistencespb.TaskInfo{
Expand Down Expand Up @@ -412,9 +410,7 @@ func (e *matchingEngineImpl) AddActivityTask(
var expirationTime *time.Time
now := timestamp.TimePtr(time.Now().UTC())
expirationDuration := timestamp.DurationValue(addRequest.GetScheduleToStartTimeout())
if expirationDuration == 0 {
// noop
} else {
if expirationDuration != 0 {
expirationTime = timestamp.TimePtr(now.Add(expirationDuration))
}
taskInfo := &persistencespb.TaskInfo{
Expand Down Expand Up @@ -714,13 +710,13 @@ func (e *matchingEngineImpl) QueryWorkflow(
taskID := uuid.New()
resp, err := tqm.DispatchQueryTask(ctx, taskID, queryRequest)

// if get response or error it means that query task was handled by forwarding to another matching host
// if we get a response or error it means that query task was handled by forwarding to another matching host
// this remote host's result can be returned directly
if resp != nil || err != nil {
return resp, err
}

// if get here it means that dispatch of query task has occurred locally
// if we get here it means that dispatch of query task has occurred locally
// must wait on result channel to get query result
queryResultCh := make(chan *queryResult, 1)
e.lockableQueryTaskMap.put(taskID, queryResultCh)
Expand All @@ -747,7 +743,7 @@ func (e *matchingEngineImpl) QueryWorkflow(
}

func (e *matchingEngineImpl) RespondQueryTaskCompleted(
ctx context.Context,
_ context.Context,
request *matchingservice.RespondQueryTaskCompletedRequest,
opMetrics metrics.Handler,
) error {
Expand All @@ -768,7 +764,7 @@ func (e *matchingEngineImpl) deliverQueryResult(taskID string, queryResult *quer
}

func (e *matchingEngineImpl) CancelOutstandingPoll(
ctx context.Context,
_ context.Context,
request *matchingservice.CancelOutstandingPollRequest,
) error {
e.pollMap.cancel(request.PollerId)
Expand Down Expand Up @@ -796,7 +792,7 @@ func (e *matchingEngineImpl) DescribeTaskQueue(
}

func (e *matchingEngineImpl) ListTaskQueuePartitions(
ctx context.Context,
_ context.Context,
request *matchingservice.ListTaskQueuePartitionsRequest,
) (*matchingservice.ListTaskQueuePartitionsResponse, error) {
activityTaskQueueInfo, err := e.listTaskQueuePartitions(request, enumspb.TASK_QUEUE_TYPE_ACTIVITY)
Expand Down Expand Up @@ -868,12 +864,12 @@ func (e *matchingEngineImpl) UpdateWorkerBuildIdCompatibility(
}

err = tqMgr.UpdateUserData(ctx, updateOptions, func(data *persistencespb.TaskQueueUserData) (*persistencespb.TaskQueueUserData, bool, error) {
clock := data.GetClock()
if clock == nil {
clk := data.GetClock()
if clk == nil {
tmp := hlc.Zero(e.clusterMeta.GetClusterID())
clock = &tmp
clk = &tmp
}
updatedClock := hlc.Next(*clock, e.timeSource)
updatedClock := hlc.Next(*clk, e.timeSource)
var versioningData *persistencespb.VersioningData
switch req.GetOperation().(type) {
case *matchingservice.UpdateWorkerBuildIdCompatibilityRequest_ApplyPublicRequest_:
Expand Down Expand Up @@ -1172,12 +1168,12 @@ func (e *matchingEngineImpl) getHostInfo(partitionKey string) (string, error) {
}

func (e *matchingEngineImpl) getAllPartitions(
namespace namespace.Name,
ns namespace.Name,
taskQueue taskqueuepb.TaskQueue,
taskQueueType enumspb.TaskQueueType,
) ([]string, error) {
var partitionKeys []string
namespaceID, err := e.namespaceRegistry.GetNamespaceID(namespace)
namespaceID, err := e.namespaceRegistry.GetNamespaceID(ns)
if err != nil {
return partitionKeys, err
}
Expand All @@ -1186,7 +1182,7 @@ func (e *matchingEngineImpl) getAllPartitions(
return partitionKeys, err
}

n := e.config.NumTaskqueueWritePartitions(namespace.String(), taskQueueID.BaseNameString(), taskQueueType)
n := e.config.NumTaskqueueWritePartitions(ns.String(), taskQueueID.BaseNameString(), taskQueueType)
for i := 0; i < n; i++ {
partitionKeys = append(partitionKeys, taskQueueID.WithPartition(i).FullName())
}
Expand Down Expand Up @@ -1265,14 +1261,14 @@ func (e *matchingEngineImpl) unloadTaskQueue(unloadTQM taskQueueManager) {

func (e *matchingEngineImpl) updateTaskQueueGauge(countKey taskQueueCounterKey, taskQueueCount int) {
nsEntry, err := e.namespaceRegistry.GetNamespaceByID(countKey.namespaceID)
namespace := namespace.Name("unknown")
ns := namespace.Name("unknown")
if err == nil {
namespace = nsEntry.Name()
ns = nsEntry.Name()
}

e.metricsHandler.Gauge(metrics.LoadedTaskQueueGauge.GetMetricName()).Record(
float64(taskQueueCount),
metrics.NamespaceTag(namespace.String()),
metrics.NamespaceTag(ns.String()),
metrics.TaskTypeTag(countKey.taskType.String()),
metrics.QueueTypeTag(countKey.kind.String()),
)
Expand Down
Loading

0 comments on commit 7e2de69

Please sign in to comment.