Skip to content
This repository has been archived by the owner on Aug 12, 2022. It is now read-only.

Commit

Permalink
cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
roneli committed Apr 28, 2022
1 parent 92129a6 commit e904e0d
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 11 deletions.
5 changes: 3 additions & 2 deletions cqproto/internal/plugin.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cqproto/internal/plugin_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 9 additions & 8 deletions provider/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ func (e TableExecutor) Resolve(ctx context.Context, meta schema.ClientMeta) (uin
}

// withTable allows to create a new TableExecutor for received *schema.Table
func (e TableExecutor) withTable(t *schema.Table) *TableExecutor {
func (e TableExecutor) withTable(t *schema.Table, kv ...interface{}) *TableExecutor {
var c [2]schema.ColumnList
c[0], c[1] = e.Db.Dialect().Columns(t).Sift()
return &TableExecutor{
ResourceName: e.ResourceName,
Table: t,
Db: e.Db,
Logger: e.Logger.With("table", t.Name),
Logger: e.Logger.With(kv...),
classifiers: e.classifiers,
extraFields: e.extraFields,
executionStart: e.executionStart,
Expand Down Expand Up @@ -132,8 +132,8 @@ func (e TableExecutor) doMultiplexResolve(ctx context.Context, clients []schema.
doneClients = 0
numberOfClients = 0
)
logger := clients[0].Logger()
logger.Debug("multiplexing client", "count", len(clients))
// initially use client logger here
e.Logger.Debug("multiplexing client", "count", len(clients))

done := make(chan struct{})
go func() {
Expand All @@ -142,7 +142,7 @@ func (e TableExecutor) doMultiplexResolve(ctx context.Context, clients []schema.
allDiags = allDiags.Add(dd)
doneClients++
}
logger.Debug("multiplexed client finished", "done", doneClients, "total", numberOfClients)
e.Logger.Debug("multiplexed client finished", "done", doneClients, "total", numberOfClients)
}()

wg := &sync.WaitGroup{}
Expand All @@ -154,7 +154,7 @@ func (e TableExecutor) doMultiplexResolve(ctx context.Context, clients []schema.
clientID = e.Table.Name + ":" + clientID

// we can only limit on a granularity of a top table otherwise we can get deadlock
logger.Debug("trying acquire for new client", "next_id", clientID)
e.Logger.Debug("trying acquire for new client", "next_id", clientID)
if err := e.goroutinesSem.Acquire(ctx, 1); err != nil {
diagsChan <- ClassifyError(err, diag.WithResourceName(e.ResourceName))
break
Expand All @@ -171,7 +171,8 @@ func (e TableExecutor) doMultiplexResolve(ctx context.Context, clients []schema.
defer cancel()
}
defer e.Logger.Debug("releasing multiplex client", "ctx_err", ctx.Err())

// create client execution add all Client's implied Args to execution logger + add its unique client id, so all its execution can be
// identified.
count, resolveDiags := e.withLogger(append(c.Logger().ImpliedArgs(), "client_id", id)).callTableResolve(ctx, c, nil)
atomic.AddUint64(&totalResources, count)
diags <- resolveDiags
Expand All @@ -181,7 +182,7 @@ func (e TableExecutor) doMultiplexResolve(ctx context.Context, clients []schema.
close(diagsChan)
<-done

logger.Debug("table multiplex resolve completed")
e.Logger.Debug("table multiplex resolve completed")
return totalResources, allDiags
}

Expand Down
2 changes: 1 addition & 1 deletion provider/execution/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ func TestTableExecutor_resolveResourceValues(t *testing.T) {
r := schema.NewResourceData(storage.Dialect(), tc.Table, nil, tc.ResourceData, tc.MetaData, exec.executionStart)
// columns should be resolved from ColumnResolver functions or default functions
cl := executionClient{testlog.New(t)}
diags := exec.resolveResourceValues(context.Background(), cl.Logger(), cl, r)
diags := exec.resolveResourceValues(context.Background(), cl, r)
if tc.ExpectedDiags != nil {
require.True(t, diags.HasDiags())
if tc.ExpectedDiags != nil {
Expand Down

0 comments on commit e904e0d

Please sign in to comment.