Skip to content

feat: add grpcds support #8734

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions docs/examples/kubo-as-a-library/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ require (
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20221203041831-ce31453925ec // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/guseggert/go-ds-grpc v0.0.0-20230402190854-00fd80d37780 // indirect
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
Expand Down Expand Up @@ -188,8 +189,8 @@ require (
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/grpc v1.53.0 // indirect
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 // indirect
google.golang.org/grpc v1.54.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
Expand Down
1,012 changes: 1,008 additions & 4 deletions docs/examples/kubo-as-a-library/go.sum

Large diffs are not rendered by default.

12 changes: 7 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ require (
golang.org/x/mod v0.10.0
golang.org/x/sync v0.1.0
golang.org/x/sys v0.7.0
google.golang.org/grpc v1.54.0
)

require github.com/guseggert/go-ds-grpc v0.0.0-20230402190854-00fd80d37780

require (
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect
github.com/Kubuxu/go-os-helper v0.0.1 // indirect
Expand Down Expand Up @@ -121,12 +124,12 @@ require (
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20221203041831-ce31453925ec // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
Expand Down Expand Up @@ -212,14 +215,13 @@ require (
go4.org v0.0.0-20230225012048-214862532bf5 // indirect
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/oauth2 v0.4.0 // indirect
golang.org/x/oauth2 v0.6.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/grpc v1.53.0 // indirect
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
1,005 changes: 1,000 additions & 5 deletions go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions plugin/loader/preload.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
pluginflatfs "github.com/ipfs/kubo/plugin/plugins/flatfs"
pluginfxtest "github.com/ipfs/kubo/plugin/plugins/fxtest"
pluginipldgit "github.com/ipfs/kubo/plugin/plugins/git"
plugingrpcds "github.com/ipfs/kubo/plugin/plugins/grpcds"
pluginlevelds "github.com/ipfs/kubo/plugin/plugins/levelds"
pluginpeerlog "github.com/ipfs/kubo/plugin/plugins/peerlog"
)
Expand All @@ -22,4 +23,5 @@ func init() {
Preload(pluginlevelds.Plugins...)
Preload(pluginpeerlog.Plugins...)
Preload(pluginfxtest.Plugins...)
Preload(plugingrpcds.Plugins...)
}
1 change: 1 addition & 0 deletions plugin/loader/preload_list
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ flatfs github.com/ipfs/kubo/plugin/plugins/flatfs *
levelds github.com/ipfs/kubo/plugin/plugins/levelds *
peerlog github.com/ipfs/kubo/plugin/plugins/peerlog *
fxtest github.com/ipfs/kubo/plugin/plugins/fxtest *
grpcds github.com/ipfs/kubo/plugin/plugins/grpcds *
160 changes: 160 additions & 0 deletions plugin/plugins/grpcds/grpcds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package grpcds

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

grpcds "github.com/guseggert/go-ds-grpc"
pb "github.com/guseggert/go-ds-grpc/proto"
"github.com/ipfs/kubo/plugin"
"github.com/ipfs/kubo/repo"
"github.com/ipfs/kubo/repo/fsrepo"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials/insecure"
)

// Plugins is exported list of plugins that will be loaded
var Plugins = []plugin.Plugin{
&grpcdsPlugin{},
}

type grpcdsPlugin struct{}

var _ plugin.PluginDatastore = (*grpcdsPlugin)(nil)

func (*grpcdsPlugin) Name() string {
return "ds-grpc"
}

func (*grpcdsPlugin) Version() string {
return "0.1.0"
}

func (*grpcdsPlugin) Init(_ *plugin.Environment) error {
return nil
}

func (*grpcdsPlugin) DatastoreTypeName() string {
return "grpcds"
}

type datastoreConfig struct {
Name string `json:"name"`
Target string `json:"target"`
AllowInsecure bool `json:"allowInsecure"`
ConnectParams *connectParams `json:"connectParams"`
Compressor string `json:"compressor"`
DefaultServiceConfig json.RawMessage `json:"defaultServiceConfig"`
UserAgent string `json:"userAgent"`
}

type connectParams struct {
Backoff *backoffConfig `json:"backoff"`
MinConnectTimeoutMillis int64 `json:"minConnectTimeoutMillis"`
}

type backoffConfig struct {
BaseDelayMillis int64 `json:"baseDelayMillis"`
Multiplier float64 `json:"multiplier"`
Jitter float64 `json:"jitter"`
MaxDelayMillis int64 `json:"maxDelayMillis"`
}

func (b *backoffConfig) ToGRPCConfig() backoff.Config {
return backoff.Config{
BaseDelay: time.Duration(b.BaseDelayMillis) * time.Millisecond,
Multiplier: b.Multiplier,
Jitter: b.Jitter,
MaxDelay: time.Duration(b.MaxDelayMillis) * time.Millisecond,
}
}

func (*grpcdsPlugin) DatastoreConfigParser() fsrepo.ConfigFromMap {
return func(params map[string]interface{}) (fsrepo.DatastoreConfig, error) {

var c datastoreConfig
b, err := json.Marshal(params)
if err != nil {
return nil, fmt.Errorf("marshaling grpcds config: %w", err)
}

err = json.Unmarshal(b, &c)
if err != nil {
return nil, fmt.Errorf("unmarshaling grpcds config: %w", err)
}

if c.Name == "" {
return nil, errors.New("'name' must be specified")
}

if c.Target == "" {
return nil, errors.New("'target' must be specified")
}

return &c, nil
}
}

func (c *datastoreConfig) DiskSpec() fsrepo.DiskSpec {
return map[string]interface{}{
"type": "grpcds",
"name": c.Name,
}
}

func (c *datastoreConfig) Create(path string) (repo.Datastore, error) {
dialOpts := []grpc.DialOption{}
callOpts := []grpc.CallOption{}

if c.AllowInsecure {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

if c.ConnectParams != nil {
backoffConfig := backoff.DefaultConfig
if c.ConnectParams.Backoff != nil {
backoffConfig = c.ConnectParams.Backoff.ToGRPCConfig()
}
dialOpts = append(dialOpts, grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoffConfig,
MinConnectTimeout: time.Duration(c.ConnectParams.MinConnectTimeoutMillis),
}))
}

if c.DefaultServiceConfig != nil {
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(string(c.DefaultServiceConfig)))
}

if c.UserAgent != "" {
dialOpts = append(dialOpts, grpc.WithUserAgent(c.UserAgent))
}

if c.Compressor != "" {
callOpts = append(callOpts, grpc.UseCompressor(c.Compressor))
}

dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(callOpts...))

conn, err := grpc.Dial(c.Target, dialOpts...)
if err != nil {
return nil, fmt.Errorf("initial dialing of grpcds target '%s': %w", c.Target, err)
}
client := pb.NewDatastoreClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ds, err := grpcds.New(ctx, client)
if err != nil {
return nil, fmt.Errorf("building grpcds: %w", err)
}
repoDS, ok := ds.(repo.Datastore)
if !ok {
return nil, fmt.Errorf("remote gRPC datastore must be a repo datastore")
}

return repoDS, nil
}
86 changes: 86 additions & 0 deletions test/cli/grpc_ds_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package cli

import (
"context"
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"testing"

pb "github.com/guseggert/go-ds-grpc/proto"
"github.com/guseggert/go-ds-grpc/server"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/ipfs/kubo/test/cli/testutils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

const grpcDatastoreSpec = `
{
"type": "grpcds",
"name": "grpc-datastore",
"target": "%s",
"allowInsecure": true
}
`

func TestGRPCDatastore(t *testing.T) {
// we init the node to get the default config, then modify it, then re-init the node
node := harness.NewT(t).NewNode().Init()

// run grpc datastore server
ds := dssync.MutexWrap(datastore.NewMapDatastore())
dsServer := server.New(ds)
grpcServer := grpc.NewServer()
pb.RegisterDatastoreServer(grpcServer, dsServer)

l, err := net.Listen("tcp", "127.0.0.1:")
require.NoError(t, err)
go func() {
if err := grpcServer.Serve(l); err != nil {
t.Logf("grpc server error: %s", err)
}
}()
defer grpcServer.Stop()

// update the config
spec := fmt.Sprintf(grpcDatastoreSpec, l.Addr().String())
fmt.Printf("using spec: \n%s\n", spec)
specMap := map[string]interface{}{}
err = json.Unmarshal([]byte(spec), &specMap)
require.NoError(t, err)
node.UpdateConfig(func(cfg *config.Config) {
cfg.Datastore.Spec = specMap
})

// copy config to a new file and re-init the repo to initialize the datastore
config := node.ReadFile(node.ConfigFile())
require.NoError(t, os.RemoveAll(node.Dir))
require.NoError(t, os.Mkdir(node.Dir, 0777))
node.WriteBytes("config-backup", []byte(config))
node.IPFS("init", filepath.Join(node.Dir, "config-backup"))

node.StartDaemon()

randStr := string(testutils.RandomBytes(100))
node.IPFSAddStr(randStr)

// verify the datastore has stuff in it
keys := map[string]bool{}
results, err := ds.Query(context.Background(), query.Query{})
require.NoError(t, err)
for res := range results.Next() {
keys[res.Entry.Key] = true
}
assert.True(t, keys["/pins/state/dirty"])

// TODO ensure daemon won't launch when grpc server is not running
}