Skip to content

Commit

Permalink
[prism] Add dynamic channel & sub element splits. (apache#26484)
Browse files Browse the repository at this point in the history
* Add channel split separations.

* Use channel leases for parallel processing.

* move data wait to chan + atomic

* Progress and split on execute goroutine.

* Convert metrics tests to short ids.

* Add Provision handler w/ capabilities.

* Add artifact validation to run_rc_validation (apache#26407)

* Add artifact validation to run_rc_validation

* config file for artifacts

* use java bom contents for validation

* add pip_pre = True

* Update tox.ini

* Remove out of date experimental warning on SDFs (apache#26450)

* [ToBF] Refinement 26.04.23 (apache#26428)

* content tree rebuilds on sdk change

* expandable parent node widget

* comment fix

* comment fix

---------

Co-authored-by: darkhan.nausharipov <[email protected]>

* Allow implicit flattening for yaml inputs. (apache#26423)

* Add field annotations for high-priority Syndeo schema transforms (apache#26384)

add changes

* [Go SDK] Timers with new datalayer (apache#26101)

* added timer package

* add timer changes and merged with rebo's pr

* timer fired in stateful

* error setting new timer in ontimer

* looping timers work

* send fv instead of bytes

* changes to coder/pardo

* works for all cases, only cleanup left

* remove comments and validate onTimer

* generic coder for user key

* fixes coder end to end

* remove logs

* add unit test and refactor

* add docs

* new example

* fix static lint

* support emitters

* allow input col of CoGBK as well

* unit tests, periodic impulse, minor refactor

* update PipelineTimer interface, minor refactor, doc comment for example

* add warn message

* single edge timer coder, rm kv coder check, cache encoder,decoder

* Update chromedriver-binary requirement in /sdks/python

Updates the requirements on [chromedriver-binary](https://github.com/danielkaiser/python-chromedriver-binary) to permit the latest version.
- [Release notes](https://github.com/danielkaiser/python-chromedriver-binary/releases)
- [Commits](danielkaiser/python-chromedriver-binary@v100.0.4896.20.0...v113.0.5672.24.0)

---
updated-dependencies:
- dependency-name: chromedriver-binary
  dependency-type: direct:development
...

Signed-off-by: dependabot[bot] <[email protected]>

* Automation: Tour of Beam infrastructure deployment (apache#25793)

* Added Terraform scripts for TOB infra

* ToB Frontend related updates

* Update settings.gradle.kts

* Deleted redundant file and minor README change

* Addressing comments in the PR

* Added newline at the end of variables.tf file

* Update README.md

* Updates related to Tour of Beam infrastructure

* Update locals.tf

* Output.tf updates

* Update output.tf

* Updates

* Update main.tf

* Updates to cloudfunctions_bucket variable

* service_account_id changes

* Update main.tf

* Update README.md

* Update README.md

* Update README.md

* Update README.md

* Update README.md

* Bulk update of terraform scripts

* Update README.md

* Update README.md

* Datastore_namespace updates

* Update README.md

* Update README.md

* Update README.md

* Update README.md

* Update main.tf

* Update README.md

* Update README.md

* Update README.md

* Some minor TF updates

* Update README.md

* Modify batch IT to use count instead of hash (apache#26327)

* Modify batch IT to use count instead of hash

* remove unused varaiable

* run spotless, update pipeline state checking

* Update timeout to 45 minutes

* revert timeout, add additional counter to try and pinpoint missing records

* add a log to notify ranges used when workers restart

* change counts from metrics to combiners

* add a window to streaming test

* move the passert to the correct place

* Remove extra counter, apply spotless

* add additional metric to KafkaWriter

* Remove debugging metrics

* verify pipeline is not failed

* remove extra newline

* Revert "Modify batch IT to use count instead of hash (apache#26327)" (apache#26466)

This reverts commit 9903b2f.

* Bump Java Dataflow container images (apache#26459)

* keep retrying mass_comment until it has started all jobs (apache#26457)

* keep retrying mass_comment until it has started all jobs

* fix lookups

* Add driverJars parameter to JdbcIO. (apache#25824)

This change allows users to use driver jars saved in GCS. With this change, Dataflow templates will be able to migrate to JdbcIO instead of DynamicJdbcIO.

* [Roll Fwd PR] Rename _namespace to _get_display_data_namespace"" (apache#26470)

* Update parquetio and textio to work with -beam_strict (apache#26469)

* use wheel sdk location for PostCommit_Py_Examples (apache#26473)

* Move back the timeout of Python PostCommit to 4h

* Minor fix on Python PostCommit description strings

* Add recent postcommits to jenkins README

* More user-friendly providers.

* Add yaml preprocessing phases.

* Add flexible windowing syntax to yaml.

* Implement flatten in terms of preprocessor phase.

This composes better with windowing.

* Reword SQL note.

* Make linter happy.

* Survive errors in size estimation in MongoDbIO

* Fix jdbc xlang schema type mismatch (apache#26480)

* Fix jdbc xlang schema type mismatch

* Also fix fetch_size type mismatch

* Add new fields in the end

* [Python] Add saved_weights example to tf notebook (apache#26472)

* add saved_weights example to tf notebook

* add description

* updated text blocks

* Update examples/notebooks/beam-ml/run_inference_tensorflow.ipynb

Co-authored-by: Rebecca Szper <[email protected]>

* Update examples/notebooks/beam-ml/run_inference_tensorflow.ipynb

Co-authored-by: Rebecca Szper <[email protected]>

---------

Co-authored-by: Rebecca Szper <[email protected]>

* Revert "Revert "Modify batch IT to use count instead of hash (apache#26327)" (apache#26466)" (apache#26467)

This reverts commit 5c67668.

* Bump github.com/tetratelabs/wazero from 1.0.3 to 1.1.0 in /sdks (apache#26486)

Bumps [github.com/tetratelabs/wazero](https://github.com/tetratelabs/wazero) from 1.0.3 to 1.1.0.
- [Release notes](https://github.com/tetratelabs/wazero/releases)
- [Commits](tetratelabs/wazero@v1.0.3...v1.1.0)

---
updated-dependencies:
- dependency-name: github.com/tetratelabs/wazero
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* tour of beam integration tests (apache#25925)

* Integration test to load the default example of the default SDK and change the example (apache#24730) (apache#24729)

* Fix formatting and README (apache#24730)

* Support collection v1.17.0 (apache#24730)

* LoadingIndicator on chaning examples, remove duplicating licenses (apache#24730)

* Add a missing license header (apache#24730)

* Integration test for changing SDK and running code (apache#24779) (apache#382)

* Integration test for changing SDK and running code (apache#24779)

* Rename an integration test (apache#24779)

* Use enum to switch SDK in integration test (apache#24779)

* Find SDK in a dropdown by key (apache#24779)

* Add a TODO (apache#24779)

* Fix exports (apache#24779)

* Issue24779 integration changing sdk from 24370 (apache#387)

* Integration test for changing SDK and running code (apache#24779)

* Rename an integration test (apache#24779)

* Use enum to switch SDK in integration test (apache#24779)

* Find SDK in a dropdown by key (apache#24779)

* Add a TODO (apache#24779)

* Fix exports (apache#24779)

* Integration tests miscellaneous UI (apache#383)

* miscellaneous ui integration tests

* reverted pubspec.lock

* gradle tasks ordered alhpabetically

* integration tests refactoring

* clean code

* integration tests miscellaneous ui fix pr

* rename method

* added layout adaptivity

* A minor cleanup (apache#24779)

Co-authored-by: Dmitry Repin <[email protected]>

* integration tests run and editing

* example selector test

* minor fixes

* rat

* fix pr

* minor

* minor

* rat

* integration test finder written

* integration test minor fixes

* minor fixes

* removed comment

* minor fixes

* playground integration tests minor fixes

* integration test pumpAnSettleNoException

* integration test shortcut refactor

* integration test another changing shortcuts running

* upgrade to flutter 3.7.1

* workaround comment

* playground frontend updated major versions

* issues 25329 25331 25336

* 25329 extract connectivity extension to separate file

* Upgrade Flutter to 3.7.3 in integration tests (apache#24730)

* Fix integration test (apache#24730)

* fix cors issue and added mouse scroll to tags

* Upgrade Flutter in Dockerfile (apache#24720)

* sorting moved to model

* sorting moved to model

* sorting moved to model

* bugs fix

* issue 25278

* fix pr

* quites fix in en.yaml

* Fix not loading default example (apache#25528)

* fix compile error

* Refactor output tabs, test embedded playground (apache#25136) (apache#439)

* Refactor output tabs, test embedded playground (apache#25136)

* Clean up (apache#25136)

* Change example paths to IDs in integration tests

* issue25640 tob ci

* fix tob ci

* rename ci process

* test add new line to main

* test add new line to main

* commented unit test run

* issue25640 changed server path

* issue25640 tests on welcome page

* deleted config.g.dart

* issue25640 pr fixes

* Update .github/workflows/tour_of_beam_frontend_test.yml

Co-authored-by: alexeyinkin <[email protected]>

* Update learning/tour-of-beam/frontend/integration_test/welcome_page_test.dart

Co-authored-by: alexeyinkin <[email protected]>

* Improve tests (apache#25640)

* issue25640 tour page tests

* pr fix

* removed import

* pr fix

* fix test

* 25640 fixed pubspec.lock

* issue25640 fix readme

* updated readme

* issue25640 fixed after master merge

* issue25483 ToB pipeline options

* removed unnecesary variable

* pr fix

* Update learning/tour-of-beam/frontend/assets/translations/en.yaml

Co-authored-by: alexeyinkin <[email protected]>

* playground hides when snippet does not exists

* pipeline options extracted to playground components

* issue25483 pipeline options

* added errors handling, fix pr

* refactoring

* Revert "refactoring"

This reverts commit 1540961.

* removed unnecessary constants

* playground controller in tour notifier becomes nullable

* playground controller returned to non nullable in tour notifier

* playground controller actions

* removed unnecessary code

* tob scaffold wrapped with animated builder

* minor fixes

* partially fixed tests

* Upgrade flutter_code_editor to v0.2.19 (apache#25640)

* Replace output SelectableText with a CodeField instance (apache#25640)

* Trigger ToB integration tests (apache#25640)

* Clean up (apache#25640)

* Enable manual workflow runs for Playground and ToB integration tests (apache#25640)

---------

Co-authored-by: Alexey Inkin <[email protected]>
Co-authored-by: alexeyinkin <[email protected]>

* Eliminate nullness errors from MongoDbIO

* Move provisioned options outside of harness.Main (apache#26476)

Co-authored-by: lostluck <[email protected]>

* Add SDF adjacent element test

* Consolidate residual processing

* fix race condition on split boundaries

* deflake fixed window combine tests

* Add comments to job functions.

* Make int64check per window.

* Update sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go

Co-authored-by: Ritesh Ghorse <[email protected]>

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: lostluck <[email protected]>
Co-authored-by: Danny McCormick <[email protected]>
Co-authored-by: Anand Inguva <[email protected]>
Co-authored-by: Anand Inguva <[email protected]>
Co-authored-by: Darkhan Nausharipov <[email protected]>
Co-authored-by: darkhan.nausharipov <[email protected]>
Co-authored-by: Robert Bradshaw <[email protected]>
Co-authored-by: Andrei Gurau <[email protected]>
Co-authored-by: Ritesh Ghorse <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: ruslan-ikhsan <[email protected]>
Co-authored-by: johnjcasey <[email protected]>
Co-authored-by: Yi Hu <[email protected]>
Co-authored-by: Pranav Bhandari <[email protected]>
Co-authored-by: Svetak Sundhar <[email protected]>
Co-authored-by: Jeremy Edwards <[email protected]>
Co-authored-by: Kenneth Knowles <[email protected]>
Co-authored-by: Rebecca Szper <[email protected]>
Co-authored-by: Dmitry Repin <[email protected]>
Co-authored-by: Alexey Inkin <[email protected]>
Co-authored-by: alexeyinkin <[email protected]>
  • Loading branch information
22 people authored and cushon committed May 24, 2024
1 parent 948eeb6 commit f9fe489
Show file tree
Hide file tree
Showing 15 changed files with 552 additions and 140 deletions.
86 changes: 64 additions & 22 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,34 @@ func (em *ElementManager) InputForBundle(rb RunBundle, info PColInfo) [][]byte {
return es.ToData(info)
}

// reElementResiduals extracts the windowed value header from residual bytes, and explodes them
// back out to their windows.
func reElementResiduals(residuals [][]byte, inputInfo PColInfo, rb RunBundle) []element {
var unprocessedElements []element
for _, residual := range residuals {
buf := bytes.NewBuffer(residual)
ws, et, pn, err := exec.DecodeWindowedValueHeader(inputInfo.WDec, buf)
if err != nil {
if err == io.EOF {
break
}
slog.Error("reElementResiduals: error decoding residual header", err, "bundle", rb)
panic("error decoding residual header")
}

for _, w := range ws {
unprocessedElements = append(unprocessedElements,
element{
window: w,
timestamp: et,
pane: pn,
elmBytes: buf.Bytes(),
})
}
}
return unprocessedElements
}

// PersistBundle uses the tentative bundle output to update the watermarks for the stage.
// Each stage has two monotonically increasing watermarks, the input watermark, and the output
// watermark.
Expand Down Expand Up @@ -330,28 +358,7 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
}

// Return unprocessed to this stage's pending
var unprocessedElements []element
for _, residual := range residuals {
buf := bytes.NewBuffer(residual)
ws, et, pn, err := exec.DecodeWindowedValueHeader(inputInfo.WDec, buf)
if err != nil {
if err == io.EOF {
break
}
slog.Error("PersistBundle: error decoding residual header", err, "bundle", rb)
panic("error decoding residual header")
}

for _, w := range ws {
unprocessedElements = append(unprocessedElements,
element{
window: w,
timestamp: et,
pane: pn,
elmBytes: buf.Bytes(),
})
}
}
unprocessedElements := reElementResiduals(residuals, inputInfo, rb)
// Add unprocessed back to the pending stack.
if len(unprocessedElements) > 0 {
em.pendingElements.Add(len(unprocessedElements))
Expand Down Expand Up @@ -379,6 +386,21 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
em.addRefreshAndClearBundle(stage.ID, rb.BundleID)
}

// ReturnResiduals is called after a successful split, so the remaining work
// can be re-assigned to a new bundle.
func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputInfo PColInfo, residuals [][]byte) {
stage := em.stages[rb.StageID]

stage.splitBundle(rb, firstRsIndex)
unprocessedElements := reElementResiduals(residuals, inputInfo, rb)
if len(unprocessedElements) > 0 {
slog.Debug("ReturnResiduals: unprocessed elements", "bundle", rb, "count", len(unprocessedElements))
em.pendingElements.Add(len(unprocessedElements))
stage.AddPending(unprocessedElements)
}
em.addRefreshes(singleSet(rb.StageID))
}

func (em *ElementManager) addRefreshes(stages set[string]) {
em.refreshCond.L.Lock()
defer em.refreshCond.L.Unlock()
Expand Down Expand Up @@ -439,6 +461,10 @@ func (s set[K]) merge(o set[K]) {
}
}

func singleSet[T comparable](v T) set[T] {
return set[T]{v: struct{}{}}
}

// stageState is the internal watermark and input tracking for a stage.
type stageState struct {
ID string
Expand Down Expand Up @@ -569,6 +595,22 @@ func (ss *stageState) startBundle(watermark mtime.Time, genBundID func() string)
return bundID, true
}

func (ss *stageState) splitBundle(rb RunBundle, firstResidual int) {
ss.mu.Lock()
defer ss.mu.Unlock()

es := ss.inprogress[rb.BundleID]
slog.Debug("split elements", "bundle", rb, "elem count", len(es.es), "res", firstResidual)

prim := es.es[:firstResidual]
res := es.es[firstResidual:]

es.es = prim
ss.pending = append(ss.pending, res...)
heap.Init(&ss.pending)
ss.inprogress[rb.BundleID] = es
}

// minimumPendingTimestamp returns the minimum pending timestamp from all pending elements,
// including in progress ones.
//
Expand Down
11 changes: 8 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *wo
endpoint := &pipepb.ApiServiceDescriptor{
Url: wk.Endpoint(),
}

pool.StartWorker(ctx, &fnpb.StartWorkerRequest{
WorkerId: wk.ID,
ControlEndpoint: endpoint,
Expand Down Expand Up @@ -274,10 +273,16 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) {
em.Impulse(id)
}

// Use a channel to limit max parallelism for the pipeline.
maxParallelism := make(chan struct{}, 8)
// Execute stages here
for rb := range em.Bundles(ctx, wk.NextInst) {
s := stages[rb.StageID]
s.Execute(j, wk, comps, em, rb)
maxParallelism <- struct{}{}
go func(rb engine.RunBundle) {
defer func() { <-maxParallelism }()
s := stages[rb.StageID]
s.Execute(j, wk, comps, em, rb)
}(rb)
}
slog.Info("pipeline done!", slog.String("job", j.String()))
}
Expand Down
3 changes: 3 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ func TestRunner_Pipelines(t *testing.T) {
qr := pr.Metrics().Query(func(sr metrics.SingleResult) bool {
return sr.Name() == "sunk"
})
if len(qr.Counters()) == 0 {
t.Fatal("no metrics, expected one.")
}
if got, want := qr.Counters()[0].Committed, int64(73); got != want {
t.Errorf("pr.Metrics.Query(Name = \"sunk\")).Committed = %v, want %v", got, want)
}
Expand Down
21 changes: 16 additions & 5 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"google.golang.org/protobuf/types/known/structpb"
)

var capabilities = map[string]struct{}{
var supportedRequirements = map[string]struct{}{
urns.RequirementSplittableDoFn: {},
}

Expand All @@ -48,13 +48,13 @@ var capabilities = map[string]struct{}{
func isSupported(requirements []string) error {
var unsupported []string
for _, req := range requirements {
if _, ok := capabilities[req]; !ok {
if _, ok := supportedRequirements[req]; !ok {
unsupported = append(unsupported, req)
}
}
if len(unsupported) > 0 {
sort.Strings(unsupported)
return fmt.Errorf("local runner doesn't support the following required features: %v", strings.Join(unsupported, ","))
return fmt.Errorf("prism runner doesn't support the following required features: %v", strings.Join(unsupported, ","))
}
return nil
}
Expand All @@ -81,8 +81,19 @@ type Job struct {
metrics metricsStore
}

func (j *Job) ContributeMetrics(payloads *fnpb.ProcessBundleResponse) {
j.metrics.ContributeMetrics(payloads)
// ContributeTentativeMetrics returns the datachannel read index, and any unknown monitoring short ids.
func (j *Job) ContributeTentativeMetrics(payloads *fnpb.ProcessBundleProgressResponse) (int64, []string) {
return j.metrics.ContributeTentativeMetrics(payloads)
}

// ContributeFinalMetrics returns any unknown monitoring short ids.
func (j *Job) ContributeFinalMetrics(payloads *fnpb.ProcessBundleResponse) []string {
return j.metrics.ContributeFinalMetrics(payloads)
}

// AddMetricShortIDs populates metric short IDs with their metadata.
func (j *Job) AddMetricShortIDs(ids *fnpb.MonitoringInfosMetadataResponse) {
j.metrics.AddShortIDs(ids)
}

func (j *Job) String() string {
Expand Down
88 changes: 67 additions & 21 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,46 +449,92 @@ func (k apiRequestLatenciesKey) Labels() map[string]string {

type metricsStore struct {
mu sync.Mutex
accums map[metricKey]metricAccumulator
accums [2]map[metricKey]metricAccumulator

shortIDsToKeys map[string]metricKey
unprocessedPayloads [2]map[string][]byte
}

func (m *metricsStore) ContributeMetrics(payloads *fnpb.ProcessBundleResponse) {
func (m *metricsStore) AddShortIDs(resp *fnpb.MonitoringInfosMetadataResponse) {
m.mu.Lock()
defer m.mu.Unlock()
if m.accums == nil {
m.accums = map[metricKey]metricAccumulator{}

if m.shortIDsToKeys == nil {
m.shortIDsToKeys = map[string]metricKey{}
}
// Old and busted.
mons := payloads.GetMonitoringInfos()
for _, mon := range mons {
urn := mon.GetUrn()

mis := resp.GetMonitoringInfo()
for short, mi := range mis {
urn := mi.GetUrn()
ops, ok := mUrn2Ops[urn]
if !ok {
slog.Debug("unknown metrics urn", slog.String("urn", urn))
continue
}
key := ops.keyFn(urn, mon.GetLabels())
a, ok := m.accums[key]
key := ops.keyFn(urn, mi.GetLabels())
m.shortIDsToKeys[short] = key
}
for d, payloads := range m.unprocessedPayloads {
m.contributeMetrics(durability(d), payloads)
m.unprocessedPayloads[d] = nil
}
}

func (m *metricsStore) contributeMetrics(d durability, mdata map[string][]byte) (int64, []string) {
readIndex := int64(-1)
if m.accums[d] == nil {
m.accums[d] = map[metricKey]metricAccumulator{}
}
if m.unprocessedPayloads[d] == nil {
m.unprocessedPayloads[d] = map[string][]byte{}
}
accums := m.accums[d]
var missingShortIDs []string
for short, payload := range mdata {
key, ok := m.shortIDsToKeys[short]
if !ok {
missingShortIDs = append(missingShortIDs, short)
m.unprocessedPayloads[d][short] = payload
continue
}
a, ok := accums[key]
if !ok || d == tentative {
ops, ok := mUrn2Ops[key.Urn()]
if !ok {
slog.Debug("unknown metrics urn", slog.String("urn", key.Urn()))
continue
}
a = ops.newAccum()
}
if err := a.accumulate(mon.GetPayload()); err != nil {
panic(fmt.Sprintf("error decoding metrics %v: %+v\n\t%+v", urn, key, a))
if err := a.accumulate(payload); err != nil {
panic(fmt.Sprintf("error decoding metrics %v: %+v\n\t%+v", key.Urn(), key, a))
}
accums[key] = a
if key.Urn() == "beam:metric:data_channel:read_index:v1" {
readIndex = a.(*sumInt64).sum
}
m.accums[key] = a
}
// New hotness.
mdata := payloads.GetMonitoringData()
_ = mdata
return readIndex, missingShortIDs
}

func (m *metricsStore) ContributeTentativeMetrics(payloads *fnpb.ProcessBundleProgressResponse) (int64, []string) {
m.mu.Lock()
defer m.mu.Unlock()
return m.contributeMetrics(tentative, payloads.GetMonitoringData())
}

func (m *metricsStore) ContributeFinalMetrics(payloads *fnpb.ProcessBundleResponse) []string {
m.mu.Lock()
defer m.mu.Unlock()
_, unknownIDs := m.contributeMetrics(committed, payloads.GetMonitoringData())
return unknownIDs
}

func (m *metricsStore) Results(d durability) []*pipepb.MonitoringInfo {
// We don't gather tentative metrics yet.
if d == tentative {
return nil
}
m.mu.Lock()
defer m.mu.Unlock()
infos := make([]*pipepb.MonitoringInfo, 0, len(m.accums))
for key, accum := range m.accums {
for key, accum := range m.accums[d] {
infos = append(infos, accum.toProto(key))
}
return infos
Expand Down
Loading

0 comments on commit f9fe489

Please sign in to comment.