From e994c05d2c5724dab5cb60d939cadb23869399c7 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 17 Dec 2018 16:12:54 -0800 Subject: [PATCH 1/3] query: fix goroutine leak Closing a query early could leak the query goroutine if the buffer isn't large enough to fit the rest of the query. --- flatfs.go | 43 ++++++++++++++++++++++++++++++------------- package.json | 6 ++++++ 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/flatfs.go b/flatfs.go index a608e21..4ee0dcf 100644 --- a/flatfs.go +++ b/flatfs.go @@ -19,6 +19,7 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" + "github.com/jbenet/goprocess" logging "github.com/ipfs/go-log" ) @@ -631,18 +632,25 @@ func (fs *Datastore) Query(q query.Query) (query.Results, error) { return nil, errors.New("flatfs only supports listing all keys in random order") } - reschan := make(chan query.Result, query.KeysOnlyBufSize) - go func() { - defer close(reschan) - err := fs.walkTopLevel(fs.path, reschan) - if err != nil { - reschan <- query.Result{Error: errors.New("walk failed: " + err.Error())} + // Replicates the logic in ResultsWithChan but actually respects calls + // to `Close`. + b := query.NewResultBuilder(q) + b.Process.Go(func(p goprocess.Process) { + err := fs.walkTopLevel(fs.path, b) + if err == nil { + return } - }() - return query.ResultsWithChan(q, reschan), nil + select { + case b.Output <- query.Result{Error: errors.New("walk failed: " + err.Error())}: + case <-p.Closing(): + } + }) + go b.Process.CloseAfterChildren() + + return b.Results(), nil } -func (fs *Datastore) walkTopLevel(path string, reschan chan query.Result) error { +func (fs *Datastore) walkTopLevel(path string, result *query.ResultBuilder) error { dir, err := os.Open(path) if err != nil { return err @@ -653,16 +661,21 @@ func (fs *Datastore) walkTopLevel(path string, reschan chan query.Result) error return err } for _, dir := range names { - if len(dir) == 0 || dir[0] == '.' { continue } - err = fs.walk(filepath.Join(path, dir), reschan) + err = fs.walk(filepath.Join(path, dir), result) if err != nil { return err } + // Are we closing? + select { + case <-result.Process.Closing(): + return nil + default: + } } return nil } @@ -957,7 +970,7 @@ func (fs *Datastore) Accuracy() string { return string(fs.storedValue.Accuracy) } -func (fs *Datastore) walk(path string, reschan chan query.Result) error { +func (fs *Datastore) walk(path string, result *query.ResultBuilder) error { dir, err := os.Open(path) if err != nil { if os.IsNotExist(err) { @@ -993,10 +1006,14 @@ func (fs *Datastore) walk(path string, reschan chan query.Result) error { continue } - reschan <- query.Result{ + select { + case result.Output <- query.Result{ Entry: query.Entry{ Key: key.String(), }, + }: + case <-result.Process.Closing(): + return nil } } return nil diff --git a/package.json b/package.json index c7d9cd9..9eedd0c 100644 --- a/package.json +++ b/package.json @@ -17,6 +17,12 @@ "hash": "QmaRb5yNXKonhbkpNxNawoydk4N6es6b4fPj19sjEKsh5D", "name": "go-datastore", "version": "3.4.0" + }, + { + "author": "whyrusleeping", + "hash": "QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP", + "name": "goprocess", + "version": "1.0.0" } ], "gxVersion": "0.8.0", From bba93ebde2bd675f5c07c28894b26be4d9115a26 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 17 Dec 2018 16:15:46 -0800 Subject: [PATCH 2/3] query: test for goroutine leaks --- flatfs_test.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/flatfs_test.go b/flatfs_test.go index f5d1168..7ec035b 100644 --- a/flatfs_test.go +++ b/flatfs_test.go @@ -988,3 +988,34 @@ func BenchmarkBatchedPut(b *testing.B) { } b.StopTimer() // avoid counting cleanup } + +func TestQueryLeak(t *testing.T) { + temp, cleanup := tempdir(t) + defer cleanup() + + fs, err := flatfs.CreateOrOpen(temp, flatfs.Prefix(2), false) + if err != nil { + t.Fatalf("New fail: %v\n", err) + } + defer fs.Close() + + for i := 0; i < 1000; i++ { + err = fs.Put(datastore.NewKey(fmt.Sprint(i)), []byte("foobar")) + if err != nil { + t.Fatalf("Put fail: %v\n", err) + } + } + + before := runtime.NumGoroutine() + for i := 0; i < 200; i++ { + res, err := fs.Query(query.Query{KeysOnly: true}) + if err != nil { + t.Errorf("Query fail: %v\n", err) + } + res.Close() + } + after := runtime.NumGoroutine() + if after-before > 100 { + t.Errorf("leaked %d goroutines", after-before) + } +} From 3c828de209d5b46d8fe21ef4bc8d333b688fc3ae Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 17 Dec 2018 16:20:53 -0800 Subject: [PATCH 3/3] ci: update go --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index bc156b0..6ef8a02 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,7 +7,7 @@ sudo: false language: go go: - - 1.9.x + - 1.11.x install: - make deps