From af5b3b6182447c6a2e0fb0c8e7c42b1829a046df Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Mon, 31 Oct 2022 21:59:13 -0400 Subject: [PATCH 01/16] add SSE support to http api --- api/http/broker.go | 54 +++++++++++++++++++++++++++++++++++ api/http/handler.go | 26 +++++++++++++---- api/http/handler_test.go | 2 +- api/http/handlerfuncs.go | 43 ++++++++++++++++++++++++++++ api/http/handlerfuncs_test.go | 2 +- api/http/logger.go | 16 +++++------ api/http/logger_test.go | 2 +- api/http/server.go | 24 ++++++++++++---- 8 files changed, 146 insertions(+), 23 deletions(-) create mode 100644 api/http/broker.go diff --git a/api/http/broker.go b/api/http/broker.go new file mode 100644 index 0000000000..6b3c9d7d22 --- /dev/null +++ b/api/http/broker.go @@ -0,0 +1,54 @@ +// Copyright 2022 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package http + +import ( + "context" + + "github.com/sourcenetwork/defradb/logging" +) + +type broker struct { + notifier chan []byte + subscribe chan chan []byte + unsubscribe chan chan []byte +} + +func newBroker() *broker { + return &broker{ + notifier: make(chan []byte, 1), + subscribe: make(chan chan []byte), + unsubscribe: make(chan chan []byte), + } +} + +func (b *broker) listen(ctx context.Context) { + clients := make(map[chan []byte]struct{}) + + for { + select { + case subCh := <-b.subscribe: + clients[subCh] = struct{}{} + log.Info(ctx, "GraphQL client added to broker", logging.NewKV("clients", len(clients))) + case unsubCh := <-b.unsubscribe: + delete(clients, unsubCh) + log.Info(ctx, "GraphQL client removed from broker", logging.NewKV("clients", len(clients))) + case msg := <-b.notifier: + for sub := range clients { + // To protect against unresponsive clients, we use a non-blocking send. + select { + case sub <- msg: + default: + } + } + } + } +} diff --git a/api/http/handler.go b/api/http/handler.go index a91e63a413..34aa176030 100644 --- a/api/http/handler.go +++ b/api/http/handler.go @@ -24,16 +24,20 @@ import ( ) type handler struct { - db client.DB + db client.DB + broker *broker *chi.Mux // user configurable options options serverOptions } -type ctxDB struct{} - -type ctxPeerID struct{} +// context variables +type ( + ctxBroker struct{} + ctxDB struct{} + ctxPeerID struct{} +) // DataResponse is the GQL top level object holding data for the response payload. type DataResponse struct { @@ -65,9 +69,10 @@ func simpleDataResponse(args ...any) DataResponse { } // newHandler returns a handler with the router instantiated. -func newHandler(db client.DB, opts serverOptions) *handler { +func newHandler(db client.DB, b *broker, opts serverOptions) *handler { return setRoutes(&handler{ db: db, + broker: b, options: opts, }) } @@ -77,7 +82,8 @@ func (h *handler) handle(f http.HandlerFunc) http.HandlerFunc { if h.options.tls.HasValue() { rw.Header().Add("Strict-Transport-Security", "max-age=63072000; includeSubDomains") } - ctx := context.WithValue(req.Context(), ctxDB{}, h.db) + ctx := context.WithValue(req.Context(), ctxBroker{}, h.broker) + ctx = context.WithValue(ctx, ctxDB{}, h.db) if h.options.peerID != "" { ctx = context.WithValue(ctx, ctxPeerID{}, h.options.peerID) } @@ -113,6 +119,14 @@ func sendJSON(ctx context.Context, rw http.ResponseWriter, v any, code int) { } } +func brokerFromContext(ctx context.Context) (*broker, error) { + brk, ok := ctx.Value(ctxBroker{}).(*broker) + if !ok { + return nil, errors.New("no broker available") + } + return brk, nil +} + func dbFromContext(ctx context.Context) (client.DB, error) { db, ok := ctx.Value(ctxDB{}).(client.DB) if !ok { diff --git a/api/http/handler_test.go b/api/http/handler_test.go index 5bbd315f10..d7113c0348 100644 --- a/api/http/handler_test.go +++ b/api/http/handler_test.go @@ -59,7 +59,7 @@ func TestSimpleDataResponse(t *testing.T) { } func TestNewHandlerWithLogger(t *testing.T) { - h := newHandler(nil, serverOptions{}) + h := newHandler(nil, nil, serverOptions{}) dir := t.TempDir() diff --git a/api/http/handlerfuncs.go b/api/http/handlerfuncs.go index 40666d2ff7..f4933e7042 100644 --- a/api/http/handlerfuncs.go +++ b/api/http/handlerfuncs.go @@ -11,9 +11,11 @@ package http import ( + "fmt" "io" "mime" "net/http" + "strings" "github.com/go-chi/chi/v5" "github.com/ipfs/go-cid" @@ -136,6 +138,11 @@ func execGQLHandler(rw http.ResponseWriter, req *http.Request) { return } + if strings.HasPrefix(query, "subscription") { + subscriptionHandler(rw, req) + return + } + db, err := dbFromContext(req.Context()) if err != nil { handleErr(req.Context(), rw, err, http.StatusInternalServerError) @@ -256,3 +263,39 @@ func peerIDHandler(rw http.ResponseWriter, req *http.Request) { http.StatusOK, ) } + +func subscriptionHandler(rw http.ResponseWriter, req *http.Request) { + flusher, ok := rw.(http.Flusher) + if !ok { + handleErr(req.Context(), rw, errors.New("streaming unsupported"), http.StatusInternalServerError) + return + } + + rw.Header().Set("Content-Type", "text/event-stream") + rw.Header().Set("Cache-Control", "no-cache") + rw.Header().Set("Connection", "keep-alive") + + brk, err := brokerFromContext(req.Context()) + if err != nil { + handleErr(req.Context(), rw, err, http.StatusInternalServerError) + return + } + + messageCh := make(chan []byte, 5) + brk.subscribe <- messageCh + + defer func() { + brk.unsubscribe <- messageCh + }() + + for { + select { + case <-req.Context().Done(): + return + default: + fmt.Fprintf(rw, "data: %s\n\n", <-messageCh) + flusher.Flush() + } + + } +} diff --git a/api/http/handlerfuncs_test.go b/api/http/handlerfuncs_test.go index f3bfe07fdb..67083047a1 100644 --- a/api/http/handlerfuncs_test.go +++ b/api/http/handlerfuncs_test.go @@ -832,7 +832,7 @@ func testRequest(opt testOptions) { req.Header.Set(k, v) } - h := newHandler(opt.DB, opt.ServerOptions) + h := newHandler(opt.DB, nil, opt.ServerOptions) rec := httptest.NewRecorder() h.ServeHTTP(rec, req) assert.Equal(opt.Testing, opt.ExpectedStatus, rec.Result().StatusCode) diff --git a/api/http/logger.go b/api/http/logger.go index f2c745f479..2a91a271c2 100644 --- a/api/http/logger.go +++ b/api/http/logger.go @@ -12,7 +12,6 @@ package http import ( "net/http" - "strconv" "time" "github.com/sourcenetwork/defradb/logging" @@ -33,20 +32,21 @@ func newLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter { } } +func (lrw *loggingResponseWriter) Flush() { + lrw.ResponseWriter.(http.Flusher).Flush() +} + +func (lrw *loggingResponseWriter) Header() http.Header { + return lrw.ResponseWriter.Header() +} + func (lrw *loggingResponseWriter) WriteHeader(code int) { lrw.statusCode = code lrw.ResponseWriter.WriteHeader(code) } func (lrw *loggingResponseWriter) Write(b []byte) (int, error) { - // used for chucked payloads. Content-Length should not be set - // for each chunk. - if lrw.ResponseWriter.Header().Get("Content-Length") != "" { - return lrw.ResponseWriter.Write(b) - } - lrw.contentLength = len(b) - lrw.ResponseWriter.Header().Set("Content-Length", strconv.Itoa(lrw.contentLength)) return lrw.ResponseWriter.Write(b) } diff --git a/api/http/logger_test.go b/api/http/logger_test.go index 6edc534ddf..40a07fb3cf 100644 --- a/api/http/logger_test.go +++ b/api/http/logger_test.go @@ -79,7 +79,7 @@ func TestLoggerKeyValueOutput(t *testing.T) { rec2 := httptest.NewRecorder() - h := newHandler(nil, serverOptions{}) + h := newHandler(nil, nil, serverOptions{}) log.ApplyConfig(logging.Config{ EncoderFormat: logging.NewEncoderFormatOption(logging.JSON), OutputPaths: []string{logFile}, diff --git a/api/http/server.go b/api/http/server.go index 7be0568693..485e00aa55 100644 --- a/api/http/server.go +++ b/api/http/server.go @@ -18,7 +18,6 @@ import ( "net/http" "path" "strings" - "time" "golang.org/x/crypto/acme/autocert" @@ -28,17 +27,24 @@ import ( ) const ( - // these constants are best effort durations that fit our current API + // These constants are best effort durations that fit our current API // and possibly prevent from running out of file descriptors. - readTimeout = 5 * time.Second - writeTimeout = 10 * time.Second - idleTimeout = 120 * time.Second + // readTimeout = 5 * time.Second + // writeTimeout = 10 * time.Second + // idleTimeout = 120 * time.Second + + // Temparily disabling timeouts until [this proposal](https://github.com/golang/go/issues/54136) is merged. + // https://github.com/sourcenetwork/defradb/issues/927 + readTimeout = 0 + writeTimeout = 0 + idleTimeout = 0 ) // Server struct holds the Handler for the HTTP API. type Server struct { options serverOptions listener net.Listener + broker *broker certManager *autocert.Manager http.Server @@ -70,19 +76,22 @@ type tlsOptions struct { // NewServer instantiates a new server with the given http.Handler. func NewServer(db client.DB, options ...func(*Server)) *Server { + brk := newBroker() + srv := &Server{ Server: http.Server{ ReadTimeout: readTimeout, WriteTimeout: writeTimeout, IdleTimeout: idleTimeout, }, + broker: brk, } for _, opt := range append(options, DefaultOpts()) { opt(srv) } - srv.Handler = newHandler(db, srv.options) + srv.Handler = newHandler(db, brk, srv.options) return srv } @@ -256,6 +265,9 @@ func (s *Server) Run(ctx context.Context) error { if s.listener == nil { return errNoListener } + + go s.broker.listen(ctx) + if s.certManager != nil { // When using TLS it's important to redirect http requests to https go func() { From 8609f9164eb3442dea58aa1ba189ad8fdc7c1071 Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Tue, 8 Nov 2022 21:32:36 -0500 Subject: [PATCH 02/16] add event publisher --- api/http/broker.go | 76 +++++++++++----------- api/http/handler.go | 17 +---- api/http/handler_test.go | 2 +- api/http/handlerfuncs.go | 42 ++++++------- api/http/handlerfuncs_test.go | 2 +- api/http/logger_test.go | 2 +- api/http/server.go | 8 +-- cli/start.go | 8 +-- client/db.go | 8 ++- client/request/request.go | 11 +++- client/request/subscription.go | 50 +++++++++++++++ db/db.go | 19 ++++++ db/query.go | 28 ++++++--- db/subscription.go | 94 ++++++++++++++++++++++++++++ events/publisher.go | 80 +++++++++++++++++++++++ planner/planner.go | 38 +++++++++++ query/graphql/parser.go | 8 ++- query/graphql/parser/query.go | 19 ++++-- query/graphql/parser/subscription.go | 73 +++++++++++++++++++++ tests/bench/query/simple/utils.go | 4 +- tests/integration/schema/utils.go | 4 +- tests/integration/utils.go | 6 +- 22 files changed, 483 insertions(+), 116 deletions(-) create mode 100644 client/request/subscription.go create mode 100644 db/subscription.go create mode 100644 events/publisher.go create mode 100644 query/graphql/parser/subscription.go diff --git a/api/http/broker.go b/api/http/broker.go index 6b3c9d7d22..4ffa248849 100644 --- a/api/http/broker.go +++ b/api/http/broker.go @@ -10,45 +10,47 @@ package http -import ( - "context" +// import ( +// "context" - "github.com/sourcenetwork/defradb/logging" -) +// "github.com/sourcenetwork/defradb/client" +// "github.com/sourcenetwork/defradb/logging" +// ) -type broker struct { - notifier chan []byte - subscribe chan chan []byte - unsubscribe chan chan []byte -} +// type broker struct { +// notifier chan client.GQLResult +// subscribe chan chan client.GQLResult +// unsubscribe chan chan client.GQLResult +// } -func newBroker() *broker { - return &broker{ - notifier: make(chan []byte, 1), - subscribe: make(chan chan []byte), - unsubscribe: make(chan chan []byte), - } -} +// func newBroker() *broker { +// return &broker{ +// notifier: make(chan client.GQLResult, 1), +// subscribe: make(chan chan client.GQLResult), +// unsubscribe: make(chan chan client.GQLResult), +// } +// } -func (b *broker) listen(ctx context.Context) { - clients := make(map[chan []byte]struct{}) +// func (b *broker) listen(ctx context.Context) { +// clients := make(map[chan client.GQLResult]struct{}) - for { - select { - case subCh := <-b.subscribe: - clients[subCh] = struct{}{} - log.Info(ctx, "GraphQL client added to broker", logging.NewKV("clients", len(clients))) - case unsubCh := <-b.unsubscribe: - delete(clients, unsubCh) - log.Info(ctx, "GraphQL client removed from broker", logging.NewKV("clients", len(clients))) - case msg := <-b.notifier: - for sub := range clients { - // To protect against unresponsive clients, we use a non-blocking send. - select { - case sub <- msg: - default: - } - } - } - } -} +// for { +// select { +// case subCh := <-b.subscribe: +// clients[subCh] = struct{}{} +// log.Info(ctx, "GraphQL client added to broker", logging.NewKV("clients", len(clients))) +// case unsubCh := <-b.unsubscribe: +// delete(clients, unsubCh) +// unsubCh = nil +// log.Info(ctx, "GraphQL client removed from broker", logging.NewKV("clients", len(clients))) +// case msg := <-b.notifier: +// for sub := range clients { +// // To protect against unresponsive clients, we use a non-blocking send. +// select { +// case sub <- msg: +// default: +// } +// } +// } +// } +// } diff --git a/api/http/handler.go b/api/http/handler.go index 34aa176030..5efb33066f 100644 --- a/api/http/handler.go +++ b/api/http/handler.go @@ -24,8 +24,7 @@ import ( ) type handler struct { - db client.DB - broker *broker + db client.DB *chi.Mux // user configurable options @@ -69,10 +68,9 @@ func simpleDataResponse(args ...any) DataResponse { } // newHandler returns a handler with the router instantiated. -func newHandler(db client.DB, b *broker, opts serverOptions) *handler { +func newHandler(db client.DB, opts serverOptions) *handler { return setRoutes(&handler{ db: db, - broker: b, options: opts, }) } @@ -82,8 +80,7 @@ func (h *handler) handle(f http.HandlerFunc) http.HandlerFunc { if h.options.tls.HasValue() { rw.Header().Add("Strict-Transport-Security", "max-age=63072000; includeSubDomains") } - ctx := context.WithValue(req.Context(), ctxBroker{}, h.broker) - ctx = context.WithValue(ctx, ctxDB{}, h.db) + ctx := context.WithValue(req.Context(), ctxDB{}, h.db) if h.options.peerID != "" { ctx = context.WithValue(ctx, ctxPeerID{}, h.options.peerID) } @@ -119,14 +116,6 @@ func sendJSON(ctx context.Context, rw http.ResponseWriter, v any, code int) { } } -func brokerFromContext(ctx context.Context) (*broker, error) { - brk, ok := ctx.Value(ctxBroker{}).(*broker) - if !ok { - return nil, errors.New("no broker available") - } - return brk, nil -} - func dbFromContext(ctx context.Context) (client.DB, error) { db, ok := ctx.Value(ctxDB{}).(client.DB) if !ok { diff --git a/api/http/handler_test.go b/api/http/handler_test.go index d7113c0348..5bbd315f10 100644 --- a/api/http/handler_test.go +++ b/api/http/handler_test.go @@ -59,7 +59,7 @@ func TestSimpleDataResponse(t *testing.T) { } func TestNewHandlerWithLogger(t *testing.T) { - h := newHandler(nil, nil, serverOptions{}) + h := newHandler(nil, serverOptions{}) dir := t.TempDir() diff --git a/api/http/handlerfuncs.go b/api/http/handlerfuncs.go index f4933e7042..1c3a919e49 100644 --- a/api/http/handlerfuncs.go +++ b/api/http/handlerfuncs.go @@ -11,11 +11,11 @@ package http import ( + "encoding/json" "fmt" "io" "mime" "net/http" - "strings" "github.com/go-chi/chi/v5" "github.com/ipfs/go-cid" @@ -25,7 +25,9 @@ import ( "github.com/multiformats/go-multihash" "github.com/pkg/errors" + "github.com/sourcenetwork/defradb/client" corecrdt "github.com/sourcenetwork/defradb/core/crdt" + "github.com/sourcenetwork/defradb/events" ) const ( @@ -138,11 +140,6 @@ func execGQLHandler(rw http.ResponseWriter, req *http.Request) { return } - if strings.HasPrefix(query, "subscription") { - subscriptionHandler(rw, req) - return - } - db, err := dbFromContext(req.Context()) if err != nil { handleErr(req.Context(), rw, err, http.StatusInternalServerError) @@ -150,7 +147,12 @@ func execGQLHandler(rw http.ResponseWriter, req *http.Request) { } result := db.ExecQuery(req.Context(), query) - sendJSON(req.Context(), rw, result, http.StatusOK) + if result.Stream != nil { + subscriptionHandler(result.Stream, rw, req) + return + } + + sendJSON(req.Context(), rw, result.GQL, http.StatusOK) } func loadSchemaHandler(rw http.ResponseWriter, req *http.Request) { @@ -264,7 +266,7 @@ func peerIDHandler(rw http.ResponseWriter, req *http.Request) { ) } -func subscriptionHandler(rw http.ResponseWriter, req *http.Request) { +func subscriptionHandler(stream *events.Publisher[client.GQLResult], rw http.ResponseWriter, req *http.Request) { flusher, ok := rw.(http.Flusher) if !ok { handleErr(req.Context(), rw, errors.New("streaming unsupported"), http.StatusInternalServerError) @@ -275,27 +277,19 @@ func subscriptionHandler(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("Cache-Control", "no-cache") rw.Header().Set("Connection", "keep-alive") - brk, err := brokerFromContext(req.Context()) - if err != nil { - handleErr(req.Context(), rw, err, http.StatusInternalServerError) - return - } - - messageCh := make(chan []byte, 5) - brk.subscribe <- messageCh - - defer func() { - brk.unsubscribe <- messageCh - }() - for { select { case <-req.Context().Done(): + stream.Close() return - default: - fmt.Fprintf(rw, "data: %s\n\n", <-messageCh) + case s := <-stream.Read(): + b, err := json.Marshal(s) + if err != nil { + handleErr(req.Context(), rw, err, http.StatusInternalServerError) + return + } + fmt.Fprintf(rw, "data: %s\n\n", b) flusher.Flush() } - } } diff --git a/api/http/handlerfuncs_test.go b/api/http/handlerfuncs_test.go index 67083047a1..f3bfe07fdb 100644 --- a/api/http/handlerfuncs_test.go +++ b/api/http/handlerfuncs_test.go @@ -832,7 +832,7 @@ func testRequest(opt testOptions) { req.Header.Set(k, v) } - h := newHandler(opt.DB, nil, opt.ServerOptions) + h := newHandler(opt.DB, opt.ServerOptions) rec := httptest.NewRecorder() h.ServeHTTP(rec, req) assert.Equal(opt.Testing, opt.ExpectedStatus, rec.Result().StatusCode) diff --git a/api/http/logger_test.go b/api/http/logger_test.go index 40a07fb3cf..6edc534ddf 100644 --- a/api/http/logger_test.go +++ b/api/http/logger_test.go @@ -79,7 +79,7 @@ func TestLoggerKeyValueOutput(t *testing.T) { rec2 := httptest.NewRecorder() - h := newHandler(nil, nil, serverOptions{}) + h := newHandler(nil, serverOptions{}) log.ApplyConfig(logging.Config{ EncoderFormat: logging.NewEncoderFormatOption(logging.JSON), OutputPaths: []string{logFile}, diff --git a/api/http/server.go b/api/http/server.go index 485e00aa55..3ab61182f2 100644 --- a/api/http/server.go +++ b/api/http/server.go @@ -44,7 +44,6 @@ const ( type Server struct { options serverOptions listener net.Listener - broker *broker certManager *autocert.Manager http.Server @@ -76,22 +75,19 @@ type tlsOptions struct { // NewServer instantiates a new server with the given http.Handler. func NewServer(db client.DB, options ...func(*Server)) *Server { - brk := newBroker() - srv := &Server{ Server: http.Server{ ReadTimeout: readTimeout, WriteTimeout: writeTimeout, IdleTimeout: idleTimeout, }, - broker: brk, } for _, opt := range append(options, DefaultOpts()) { opt(srv) } - srv.Handler = newHandler(db, brk, srv.options) + srv.Handler = newHandler(db, srv.options) return srv } @@ -266,8 +262,6 @@ func (s *Server) Run(ctx context.Context) error { return errNoListener } - go s.broker.listen(ctx) - if s.certManager != nil { // When using TLS it's important to redirect http requests to https go func() { diff --git a/cli/start.go b/cli/start.go index 81f9d92b70..294c724e11 100644 --- a/cli/start.go +++ b/cli/start.go @@ -235,11 +235,9 @@ func start(ctx context.Context) (*defraInstance, error) { return nil, errors.Wrap("failed to open datastore", err) } - var options []db.Option - - // check for p2p - if !cfg.Net.P2PDisabled { - options = append(options, db.WithUpdateEvents()) + options := []db.Option{ + db.WithUpdateEvents(), + db.WithSubscriptionRunner(ctx), } db, err := db.NewDB(ctx, rootstore, options...) diff --git a/client/db.go b/client/db.go index 9c26ace3c9..3db3f9290c 100644 --- a/client/db.go +++ b/client/db.go @@ -17,6 +17,7 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/defradb/events" ) type DB interface { @@ -40,7 +41,12 @@ type DB interface { PrintDump(ctx context.Context) error } -type QueryResult struct { +type GQLResult struct { Errors []any `json:"errors,omitempty"` Data any `json:"data"` } + +type QueryResult struct { + GQL GQLResult + Stream *events.Publisher[GQLResult] +} diff --git a/client/request/request.go b/client/request/request.go index 372d2cc40a..fab4bee626 100644 --- a/client/request/request.go +++ b/client/request/request.go @@ -10,9 +10,16 @@ package request +import ( + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/events" +) + type Request struct { - Queries []*OperationDefinition - Mutations []*OperationDefinition + Queries []*OperationDefinition + Mutations []*OperationDefinition + Subscription []*OperationDefinition + UpdateEvent client.Option[events.Subscription[client.UpdateEvent]] } type Selection any diff --git a/client/request/subscription.go b/client/request/subscription.go new file mode 100644 index 0000000000..81bccbd4db --- /dev/null +++ b/client/request/subscription.go @@ -0,0 +1,50 @@ +// Copyright 2022 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package request + +import ( + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/events" +) + +// ObjectSubscription is a field on the SubscriptionType +// of a graphql query. It includes all the possible +// arguments +type ObjectSubscription[T any] struct { + Field + + DocKeys client.Option[[]string] + CID client.Option[string] + + // Schema is the target schema/collection + Schema string + + Filter client.Option[Filter] + + Fields []Selection + + Stream *events.Publisher[T] +} + +// ToSelect returns a basic Select object, with the same Name, Alias, and Fields as +// the Subscription object. Used to create a Select planNode for the event stream return objects. +func (m ObjectSubscription[T]) ToSelect() *Select { + return &Select{ + Field: Field{ + Name: m.Schema, + Alias: m.Alias, + }, + DocKeys: m.DocKeys, + CID: m.CID, + Fields: m.Fields, + Filter: m.Filter, + } +} diff --git a/db/db.go b/db/db.go index d14670faff..c4275595aa 100644 --- a/db/db.go +++ b/db/db.go @@ -60,6 +60,8 @@ type db struct { events client.Events + streams subscription[client.GQLResult] + parser core.Parser // The options used to init the database @@ -79,6 +81,23 @@ func WithUpdateEvents() Option { } } +// WithSubscriptionRunner adds API relateded subscription capabilities. +// Must be called after WithUpdateEvents. +func WithSubscriptionRunner(ctx context.Context) Option { + return func(db *db) { + if db.events.Updates.HasValue() { + sub, err := db.events.Updates.Value().Subscribe() + if err != nil { + log.Error(ctx, err.Error()) + return + } + db.streams.updateEvt = sub + + go db.runSubscriptions(ctx) + } + } +} + // NewDB creates a new instance of the DB using the given options. func NewDB(ctx context.Context, rootstore ds.Batching, options ...Option) (client.DB, error) { return newDB(ctx, rootstore, options...) diff --git a/db/query.go b/db/query.go index 34a94dfc64..afb91e82da 100644 --- a/db/query.go +++ b/db/query.go @@ -28,7 +28,7 @@ func (db *db) ExecQuery(ctx context.Context, query string) *client.QueryResult { txn, err := db.NewTxn(ctx, false) if err != nil { - res.Errors = []any{err.Error()} + res.GQL.Errors = []any{err.Error()} return res } defer txn.Discard(ctx) @@ -39,23 +39,35 @@ func (db *db) ExecQuery(ctx context.Context, query string) *client.QueryResult { for i, err := range errors { errorStrings[i] = err.Error() } - res.Errors = errorStrings + res.GQL.Errors = errorStrings + return res + } + + stream, err := db.checkForSubsciptions(request) + if err != nil { + res.GQL.Errors = []any{err.Error()} + return res + } + + if stream != nil { + res.Stream = stream return res } planner := planner.New(ctx, db, txn) + results, err := planner.RunRequest(ctx, request) if err != nil { - res.Errors = []any{err.Error()} + res.GQL.Errors = []any{err.Error()} return res } if err := txn.Commit(ctx); err != nil { - res.Errors = []any{err.Error()} + res.GQL.Errors = []any{err.Error()} return res } - res.Data = results + res.GQL.Data = results return res } @@ -76,18 +88,18 @@ func (db *db) ExecTransactionalQuery( for i, err := range errors { errorStrings[i] = err.Error() } - res.Errors = errorStrings + res.GQL.Errors = errorStrings return res } planner := planner.New(ctx, db, txn) results, err := planner.RunRequest(ctx, request) if err != nil { - res.Errors = []any{err.Error()} + res.GQL.Errors = []any{err.Error()} return res } - res.Data = results + res.GQL.Data = results return res } diff --git a/db/subscription.go b/db/subscription.go new file mode 100644 index 0000000000..f0f1af7d4d --- /dev/null +++ b/db/subscription.go @@ -0,0 +1,94 @@ +// Copyright 2022 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package db + +import ( + "context" + "fmt" + "sync" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/client/request" + "github.com/sourcenetwork/defradb/errors" + "github.com/sourcenetwork/defradb/events" + "github.com/sourcenetwork/defradb/planner" +) + +type subscription[T any] struct { + updateEvt events.Subscription[client.UpdateEvent] + requests []*request.ObjectSubscription[T] + syncLock sync.Mutex +} + +func (db *db) runSubscriptions(ctx context.Context) { + log.Info(ctx, "Starting subscription runner") + for evt := range db.streams.updateEvt { + txn, err := db.NewTxn(ctx, false) + if err != nil { + log.Error(ctx, err.Error()) + continue + } + db.streams.syncLock.Lock() + + planner := planner.New(ctx, db, txn) + + col, err := db.GetCollectionBySchemaID(ctx, evt.SchemaID) + if err != nil { + log.Error(ctx, err.Error()) + continue + } + + // keeping track of the active requests + subs := db.streams.requests[:0] + for _, objSub := range db.streams.requests { + if objSub.Stream.IsClosed() { + continue + } + subs = append(subs, objSub) + if objSub.Schema == col.Name() { + objSub.CID = client.Some(evt.Cid.String()) + objSub.DocKeys = client.Some([]string{evt.DocKey}) + result := planner.RunSubscriptionRequest(ctx, objSub) + if result.Data == nil { + continue + } + objSub.Stream.Write(result) + } + } + + // helping the GC + for i := len(subs); i < len(db.streams.requests); i++ { + db.streams.requests[i] = nil + } + + db.streams.requests = subs + + txn.Discard(ctx) + db.streams.syncLock.Unlock() + } +} + +func (db *db) checkForSubsciptions(r *request.Request) (*events.Publisher[client.GQLResult], error) { + if len(r.Subscription) > 0 && len(r.Subscription[0].Selections) > 0 { + s := r.Subscription[0].Selections[0] + if subRequest, ok := s.(*request.ObjectSubscription[client.GQLResult]); ok { + stream := events.NewPublisher(make(chan client.GQLResult)) + db.streams.syncLock.Lock() + subRequest.Stream = stream + db.streams.requests = append(db.streams.requests, subRequest) + db.streams.syncLock.Unlock() + return stream, nil + } + + return nil, errors.New(fmt.Sprintf("expected ObjectSubscription[client.GQLResult] type but got %T", s)) + } + return nil, nil +} diff --git a/events/publisher.go b/events/publisher.go new file mode 100644 index 0000000000..b813194715 --- /dev/null +++ b/events/publisher.go @@ -0,0 +1,80 @@ +// Copyright 2022 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package events + +import "sync" + +type Publisher[T any] struct { + ch chan T + + closingCh chan struct{} + isClosed bool + writersWG sync.WaitGroup + syncLock sync.Mutex +} + +// NewPublisher creates a Publisher +func NewPublisher[T any](ch chan T) *Publisher[T] { + return &Publisher[T]{ + ch: ch, + closingCh: make(chan struct{}), + } +} + +// Read returns the channel to write +func (p *Publisher[T]) Read() <-chan T { + return p.ch +} + +// Write into the channel in a different goroutine +func (p *Publisher[T]) Write(data T) { + go func(data T) { + p.syncLock.Lock() + p.writersWG.Add(1) + p.syncLock.Unlock() + defer p.writersWG.Done() + + select { + case <-p.closingCh: + return + default: + } + + select { + case <-p.closingCh: + case p.ch <- data: + } + }(data) +} + +// Closes channel, draining any blocked writes +func (p *Publisher[T]) Close() { + close(p.closingCh) + + go func() { + for range p.ch { + } + }() + + p.syncLock.Lock() + p.writersWG.Wait() + p.isClosed = true + close(p.ch) + p.syncLock.Unlock() +} + +// Closes channel, draining any blocked writes +func (p *Publisher[T]) IsClosed() bool { + p.syncLock.Lock() + defer p.syncLock.Unlock() + + return p.isClosed +} diff --git a/planner/planner.go b/planner/planner.go index be45b61f34..909dc63a6e 100644 --- a/planner/planner.go +++ b/planner/planner.go @@ -146,7 +146,15 @@ func (p *Planner) newPlan(stmt any) (planNode, error) { return nil, err } return p.newObjectMutationPlan(m) + + case *request.ObjectSubscription[client.GQLResult]: + m, err := mapper.ToSelect(p.ctx, p.txn, n.ToSelect()) + if err != nil { + return nil, err + } + return p.Select(m) } + return nil, errors.New(fmt.Sprintf("Unknown statement type %T", stmt)) } @@ -524,6 +532,36 @@ func (p *Planner) RunRequest( return p.executeRequest(ctx, plan) } +// RunSubscriptionRequest plans a request specific to a subscription and returns the result. +func (p *Planner) RunSubscriptionRequest( + ctx context.Context, + query *request.ObjectSubscription[client.GQLResult], +) client.GQLResult { + plan, err := p.makePlan(query) + if err != nil { + return client.GQLResult{ + Errors: []any{err.Error()}, + } + } + + data, err := p.executeRequest(ctx, plan) + if err != nil { + return client.GQLResult{ + Errors: []any{err.Error()}, + } + } + + if len(data) == 0 { + return client.GQLResult{ + Data: nil, + } + } + + return client.GQLResult{ + Data: data, + } +} + // MakePlan makes a plan from the parsed query. @TODO {defradb/issues/368}: Test this exported function. func (p *Planner) MakePlan(query *request.Request) (planNode, error) { return p.makePlan(query) diff --git a/query/graphql/parser.go b/query/graphql/parser.go index 6385661cf8..c2abb37646 100644 --- a/query/graphql/parser.go +++ b/query/graphql/parser.go @@ -55,12 +55,14 @@ func (p *parser) ExecuteIntrospection(request string) *client.QueryResult { r := gql.Do(params) res := &client.QueryResult{ - Data: r.Data, - Errors: make([]any, len(r.Errors)), + GQL: client.GQLResult{ + Data: r.Data, + Errors: make([]any, len(r.Errors)), + }, } for i, err := range r.Errors { - res.Errors[i] = err + res.GQL.Errors[i] = err } return res diff --git a/query/graphql/parser/query.go b/query/graphql/parser/query.go index 2c1d603f63..755063696e 100644 --- a/query/graphql/parser/query.go +++ b/query/graphql/parser/query.go @@ -28,28 +28,37 @@ func ParseQuery(doc *ast.Document) (*request.Request, []error) { return nil, []error{errors.New("parseQuery requires a non-nil ast.Document")} } r := &request.Request{ - Queries: make([]*request.OperationDefinition, 0), - Mutations: make([]*request.OperationDefinition, 0), + Queries: make([]*request.OperationDefinition, 0), + Mutations: make([]*request.OperationDefinition, 0), + Subscription: make([]*request.OperationDefinition, 0), } for _, def := range doc.Definitions { switch node := def.(type) { case *ast.OperationDefinition: - if node.Operation == "query" { + switch node.Operation { + case "query": // parse query operation definition. qdef, err := parseQueryOperationDefinition(node) if err != nil { return nil, err } r.Queries = append(r.Queries, qdef) - } else if node.Operation == "mutation" { + case "mutation": // parse mutation operation definition. mdef, err := parseMutationOperationDefinition(node) if err != nil { return nil, []error{err} } r.Mutations = append(r.Mutations, mdef) - } else { + case "subscription": + // parse subscription operation definition. + sdef, err := parseSubscriptionOperationDefinition(node) + if err != nil { + return nil, []error{err} + } + r.Subscription = append(r.Subscription, sdef) + default: return nil, []error{errors.New("unknown GraphQL operation type")} } } diff --git a/query/graphql/parser/subscription.go b/query/graphql/parser/subscription.go new file mode 100644 index 0000000000..a0ebb8f58f --- /dev/null +++ b/query/graphql/parser/subscription.go @@ -0,0 +1,73 @@ +// Copyright 2022 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package parser + +import ( + "github.com/graphql-go/graphql/language/ast" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/client/request" +) + +// parseSubscriptionOperationDefinition parses the individual GraphQL +// 'subcription' operations, which there may be multiple of. +func parseSubscriptionOperationDefinition(def *ast.OperationDefinition) (*request.OperationDefinition, error) { + sdef := &request.OperationDefinition{ + Selections: make([]request.Selection, len(def.SelectionSet.Selections)), + } + + sdef.IsExplain = parseExplainDirective(def.Directives) + + for i, selection := range def.SelectionSet.Selections { + switch node := selection.(type) { + case *ast.Field: + sub, err := parseSubscription(node) + if err != nil { + return nil, err + } + + sdef.Selections[i] = sub + } + } + return sdef, nil +} + +// parseSubscription parses a typed subscription field +// which includes sub fields, and may include +// filters, IDs, etc. +func parseSubscription(field *ast.Field) (*request.ObjectSubscription[client.GQLResult], error) { + sub := &request.ObjectSubscription[client.GQLResult]{ + Field: request.Field{ + Name: field.Name.Value, + Alias: getFieldAlias(field), + }, + } + + sub.Schema = sub.Name + + // parse arguments + for _, argument := range field.Arguments { + prop := argument.Name.Value + if prop == request.FilterClause { // parse filter + obj := argument.Value.(*ast.ObjectValue) + filter, err := NewFilter(obj) + if err != nil { + return nil, err + } + + sub.Filter = filter + } + } + + var err error + sub.Fields, err = parseSelectFields(request.ObjectSelection, field.SelectionSet) + return sub, err +} diff --git a/tests/bench/query/simple/utils.go b/tests/bench/query/simple/utils.go index 55f54f16a3..95c32381f7 100644 --- a/tests/bench/query/simple/utils.go +++ b/tests/bench/query/simple/utils.go @@ -71,8 +71,8 @@ func runQueryBenchGetSync( b.ResetTimer() for i := 0; i < b.N; i++ { res := db.ExecQuery(ctx, query) - if len(res.Errors) > 0 { - return errors.New(fmt.Sprintf("Query error: %v", res.Errors)) + if len(res.GQL.Errors) > 0 { + return errors.New(fmt.Sprintf("Query error: %v", res.GQL.Errors)) } // leave comments for debug!! diff --git a/tests/integration/schema/utils.go b/tests/integration/schema/utils.go index 3027cbc693..e7dccdc097 100644 --- a/tests/integration/schema/utils.go +++ b/tests/integration/schema/utils.go @@ -94,10 +94,10 @@ func assertSchemaResults( result *client.QueryResult, testCase QueryTestCase, ) bool { - if assertErrors(t, result.Errors, testCase.ExpectedError) { + if assertErrors(t, result.GQL.Errors, testCase.ExpectedError) { return true } - resultantData := result.Data.(map[string]any) + resultantData := result.GQL.Data.(map[string]any) if len(testCase.ExpectedData) == 0 && len(testCase.ContainsData) == 0 { assert.Equal(t, testCase.ExpectedData, resultantData) diff --git a/tests/integration/utils.go b/tests/integration/utils.go index 7824084458..f08c678140 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -647,14 +647,14 @@ func assertQueryResults( expectedResults []map[string]any, expectedError string, ) bool { - if assertErrors(t, description, result.Errors, expectedError) { + if assertErrors(t, description, result.GQL.Errors, expectedError) { return true } // Note: if result.Data == nil this panics (the panic seems useful while testing). - resultantData := result.Data.([]map[string]any) + resultantData := result.GQL.Data.([]map[string]any) - log.Info(ctx, "", logging.NewKV("QueryResults", result.Data)) + log.Info(ctx, "", logging.NewKV("QueryResults", result.GQL.Data)) // compare results assert.Equal(t, len(expectedResults), len(resultantData), description) From a5e4f9679fd484a60046e7ec07329c368675f3fa Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Wed, 9 Nov 2022 12:13:37 -0500 Subject: [PATCH 03/16] remove unused code and useless genercis --- api/http/broker.go | 56 ---------------------- api/http/handlerfuncs.go | 3 +- cli/start.go | 2 +- client/db.go | 2 +- client/request/request.go | 6 --- client/request/subscription.go | 6 +-- db/db.go | 12 +++-- db/query.go | 2 +- db/{subscription.go => subscriptions.go} | 61 ++++++++++++++++-------- events/publisher.go | 28 ++++++----- planner/planner.go | 27 ++--------- query/graphql/parser/subscription.go | 5 +- 12 files changed, 76 insertions(+), 134 deletions(-) delete mode 100644 api/http/broker.go rename db/{subscription.go => subscriptions.go} (51%) diff --git a/api/http/broker.go b/api/http/broker.go deleted file mode 100644 index 4ffa248849..0000000000 --- a/api/http/broker.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2022 Democratized Data Foundation -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package http - -// import ( -// "context" - -// "github.com/sourcenetwork/defradb/client" -// "github.com/sourcenetwork/defradb/logging" -// ) - -// type broker struct { -// notifier chan client.GQLResult -// subscribe chan chan client.GQLResult -// unsubscribe chan chan client.GQLResult -// } - -// func newBroker() *broker { -// return &broker{ -// notifier: make(chan client.GQLResult, 1), -// subscribe: make(chan chan client.GQLResult), -// unsubscribe: make(chan chan client.GQLResult), -// } -// } - -// func (b *broker) listen(ctx context.Context) { -// clients := make(map[chan client.GQLResult]struct{}) - -// for { -// select { -// case subCh := <-b.subscribe: -// clients[subCh] = struct{}{} -// log.Info(ctx, "GraphQL client added to broker", logging.NewKV("clients", len(clients))) -// case unsubCh := <-b.unsubscribe: -// delete(clients, unsubCh) -// unsubCh = nil -// log.Info(ctx, "GraphQL client removed from broker", logging.NewKV("clients", len(clients))) -// case msg := <-b.notifier: -// for sub := range clients { -// // To protect against unresponsive clients, we use a non-blocking send. -// select { -// case sub <- msg: -// default: -// } -// } -// } -// } -// } diff --git a/api/http/handlerfuncs.go b/api/http/handlerfuncs.go index 1c3a919e49..e28488d5d2 100644 --- a/api/http/handlerfuncs.go +++ b/api/http/handlerfuncs.go @@ -25,7 +25,6 @@ import ( "github.com/multiformats/go-multihash" "github.com/pkg/errors" - "github.com/sourcenetwork/defradb/client" corecrdt "github.com/sourcenetwork/defradb/core/crdt" "github.com/sourcenetwork/defradb/events" ) @@ -266,7 +265,7 @@ func peerIDHandler(rw http.ResponseWriter, req *http.Request) { ) } -func subscriptionHandler(stream *events.Publisher[client.GQLResult], rw http.ResponseWriter, req *http.Request) { +func subscriptionHandler(stream *events.Publisher, rw http.ResponseWriter, req *http.Request) { flusher, ok := rw.(http.Flusher) if !ok { handleErr(req.Context(), rw, errors.New("streaming unsupported"), http.StatusInternalServerError) diff --git a/cli/start.go b/cli/start.go index 294c724e11..e7cd0d1f01 100644 --- a/cli/start.go +++ b/cli/start.go @@ -237,7 +237,7 @@ func start(ctx context.Context) (*defraInstance, error) { options := []db.Option{ db.WithUpdateEvents(), - db.WithSubscriptionRunner(ctx), + db.WithClientSubscriptions(ctx), } db, err := db.NewDB(ctx, rootstore, options...) diff --git a/client/db.go b/client/db.go index 3db3f9290c..091a3c7132 100644 --- a/client/db.go +++ b/client/db.go @@ -48,5 +48,5 @@ type GQLResult struct { type QueryResult struct { GQL GQLResult - Stream *events.Publisher[GQLResult] + Stream *events.Publisher } diff --git a/client/request/request.go b/client/request/request.go index fab4bee626..7ce42f05be 100644 --- a/client/request/request.go +++ b/client/request/request.go @@ -10,16 +10,10 @@ package request -import ( - "github.com/sourcenetwork/defradb/client" - "github.com/sourcenetwork/defradb/events" -) - type Request struct { Queries []*OperationDefinition Mutations []*OperationDefinition Subscription []*OperationDefinition - UpdateEvent client.Option[events.Subscription[client.UpdateEvent]] } type Selection any diff --git a/client/request/subscription.go b/client/request/subscription.go index 81bccbd4db..13a1695fc5 100644 --- a/client/request/subscription.go +++ b/client/request/subscription.go @@ -18,7 +18,7 @@ import ( // ObjectSubscription is a field on the SubscriptionType // of a graphql query. It includes all the possible // arguments -type ObjectSubscription[T any] struct { +type ObjectSubscription struct { Field DocKeys client.Option[[]string] @@ -31,12 +31,12 @@ type ObjectSubscription[T any] struct { Fields []Selection - Stream *events.Publisher[T] + Stream *events.Publisher } // ToSelect returns a basic Select object, with the same Name, Alias, and Fields as // the Subscription object. Used to create a Select planNode for the event stream return objects. -func (m ObjectSubscription[T]) ToSelect() *Select { +func (m ObjectSubscription) ToSelect() *Select { return &Select{ Field: Field{ Name: m.Schema, diff --git a/db/db.go b/db/db.go index c4275595aa..14db5c6be6 100644 --- a/db/db.go +++ b/db/db.go @@ -60,7 +60,7 @@ type db struct { events client.Events - streams subscription[client.GQLResult] + clientSubscriptions *subscriptions parser core.Parser @@ -81,9 +81,9 @@ func WithUpdateEvents() Option { } } -// WithSubscriptionRunner adds API relateded subscription capabilities. +// WithClientSubscriptions adds GraphQL API relateded subscription capabilities. // Must be called after WithUpdateEvents. -func WithSubscriptionRunner(ctx context.Context) Option { +func WithClientSubscriptions(ctx context.Context) Option { return func(db *db) { if db.events.Updates.HasValue() { sub, err := db.events.Updates.Value().Subscribe() @@ -91,9 +91,11 @@ func WithSubscriptionRunner(ctx context.Context) Option { log.Error(ctx, err.Error()) return } - db.streams.updateEvt = sub + db.clientSubscriptions = &subscriptions{ + updateEvt: sub, + } - go db.runSubscriptions(ctx) + go db.handleClientSubscriptions(ctx) } } } diff --git a/db/query.go b/db/query.go index afb91e82da..59a1bad733 100644 --- a/db/query.go +++ b/db/query.go @@ -43,7 +43,7 @@ func (db *db) ExecQuery(ctx context.Context, query string) *client.QueryResult { return res } - stream, err := db.checkForSubsciptions(request) + stream, err := db.checkForClientSubsciptions(request) if err != nil { res.GQL.Errors = []any{err.Error()} return res diff --git a/db/subscription.go b/db/subscriptions.go similarity index 51% rename from db/subscription.go rename to db/subscriptions.go index f0f1af7d4d..c6c22dee16 100644 --- a/db/subscription.go +++ b/db/subscriptions.go @@ -22,21 +22,26 @@ import ( "github.com/sourcenetwork/defradb/planner" ) -type subscription[T any] struct { +type subscriptions struct { updateEvt events.Subscription[client.UpdateEvent] - requests []*request.ObjectSubscription[T] + requests []*request.ObjectSubscription syncLock sync.Mutex } -func (db *db) runSubscriptions(ctx context.Context) { - log.Info(ctx, "Starting subscription runner") - for evt := range db.streams.updateEvt { +func (db *db) handleClientSubscriptions(ctx context.Context) { + if db.clientSubscriptions == nil { + log.Info(ctx, "can't run subscription without adding the option to the db") + return + } + + log.Info(ctx, "Starting client subscription handler") + for evt := range db.clientSubscriptions.updateEvt { txn, err := db.NewTxn(ctx, false) if err != nil { log.Error(ctx, err.Error()) continue } - db.streams.syncLock.Lock() + db.clientSubscriptions.syncLock.Lock() planner := planner.New(ctx, db, txn) @@ -47,8 +52,8 @@ func (db *db) runSubscriptions(ctx context.Context) { } // keeping track of the active requests - subs := db.streams.requests[:0] - for _, objSub := range db.streams.requests { + subs := db.clientSubscriptions.requests[:0] + for _, objSub := range db.clientSubscriptions.requests { if objSub.Stream.IsClosed() { continue } @@ -56,35 +61,49 @@ func (db *db) runSubscriptions(ctx context.Context) { if objSub.Schema == col.Name() { objSub.CID = client.Some(evt.Cid.String()) objSub.DocKeys = client.Some([]string{evt.DocKey}) - result := planner.RunSubscriptionRequest(ctx, objSub) - if result.Data == nil { + result, err := planner.RunSubscriptionRequest(ctx, objSub) + if err != nil { + objSub.Stream.Write(client.GQLResult{ + Errors: []any{err.Error()}, + }) + } + + // Don't send anything back to the client if the request yields an empty dataset. + if len(result) == 0 { continue } - objSub.Stream.Write(result) + + objSub.Stream.Write(client.GQLResult{ + Data: result, + }) } } // helping the GC - for i := len(subs); i < len(db.streams.requests); i++ { - db.streams.requests[i] = nil + for i := len(subs); i < len(db.clientSubscriptions.requests); i++ { + db.clientSubscriptions.requests[i] = nil } - db.streams.requests = subs + db.clientSubscriptions.requests = subs txn.Discard(ctx) - db.streams.syncLock.Unlock() + db.clientSubscriptions.syncLock.Unlock() } } -func (db *db) checkForSubsciptions(r *request.Request) (*events.Publisher[client.GQLResult], error) { +func (db *db) checkForClientSubsciptions(r *request.Request) (*events.Publisher, error) { + if db.clientSubscriptions == nil { + return nil, errors.New("server does not accept subscriptions") + } + if len(r.Subscription) > 0 && len(r.Subscription[0].Selections) > 0 { s := r.Subscription[0].Selections[0] - if subRequest, ok := s.(*request.ObjectSubscription[client.GQLResult]); ok { - stream := events.NewPublisher(make(chan client.GQLResult)) - db.streams.syncLock.Lock() + if subRequest, ok := s.(*request.ObjectSubscription); ok { + stream := events.NewPublisher(make(chan any)) + db.clientSubscriptions.syncLock.Lock() subRequest.Stream = stream - db.streams.requests = append(db.streams.requests, subRequest) - db.streams.syncLock.Unlock() + db.clientSubscriptions.requests = append(db.clientSubscriptions.requests, subRequest) + db.clientSubscriptions.syncLock.Unlock() return stream, nil } diff --git a/events/publisher.go b/events/publisher.go index b813194715..4336393237 100644 --- a/events/publisher.go +++ b/events/publisher.go @@ -12,8 +12,10 @@ package events import "sync" -type Publisher[T any] struct { - ch chan T +// Publisher holds a channel and sync mechanism that enable safe writes +// where the reader is expected to be the one closing the channel. +type Publisher struct { + ch chan any closingCh chan struct{} isClosed bool @@ -21,22 +23,22 @@ type Publisher[T any] struct { syncLock sync.Mutex } -// NewPublisher creates a Publisher -func NewPublisher[T any](ch chan T) *Publisher[T] { - return &Publisher[T]{ +// NewPublisher creates a Publisher with the given channel +func NewPublisher(ch chan any) *Publisher { + return &Publisher{ ch: ch, closingCh: make(chan struct{}), } } -// Read returns the channel to write -func (p *Publisher[T]) Read() <-chan T { +// Read returns the channel +func (p *Publisher) Read() <-chan any { return p.ch } -// Write into the channel in a different goroutine -func (p *Publisher[T]) Write(data T) { - go func(data T) { +// Write into the channel +func (p *Publisher) Write(data any) { + go func(data any) { p.syncLock.Lock() p.writersWG.Add(1) p.syncLock.Unlock() @@ -56,7 +58,7 @@ func (p *Publisher[T]) Write(data T) { } // Closes channel, draining any blocked writes -func (p *Publisher[T]) Close() { +func (p *Publisher) Close() { close(p.closingCh) go func() { @@ -71,8 +73,8 @@ func (p *Publisher[T]) Close() { p.syncLock.Unlock() } -// Closes channel, draining any blocked writes -func (p *Publisher[T]) IsClosed() bool { +// IsClosed returns true if the channel has been closed. +func (p *Publisher) IsClosed() bool { p.syncLock.Lock() defer p.syncLock.Unlock() diff --git a/planner/planner.go b/planner/planner.go index 909dc63a6e..8496ec2b5b 100644 --- a/planner/planner.go +++ b/planner/planner.go @@ -147,7 +147,7 @@ func (p *Planner) newPlan(stmt any) (planNode, error) { } return p.newObjectMutationPlan(m) - case *request.ObjectSubscription[client.GQLResult]: + case *request.ObjectSubscription: m, err := mapper.ToSelect(p.ctx, p.txn, n.ToSelect()) if err != nil { return nil, err @@ -535,31 +535,14 @@ func (p *Planner) RunRequest( // RunSubscriptionRequest plans a request specific to a subscription and returns the result. func (p *Planner) RunSubscriptionRequest( ctx context.Context, - query *request.ObjectSubscription[client.GQLResult], -) client.GQLResult { + query *request.ObjectSubscription, +) ([]map[string]any, error) { plan, err := p.makePlan(query) if err != nil { - return client.GQLResult{ - Errors: []any{err.Error()}, - } - } - - data, err := p.executeRequest(ctx, plan) - if err != nil { - return client.GQLResult{ - Errors: []any{err.Error()}, - } - } - - if len(data) == 0 { - return client.GQLResult{ - Data: nil, - } + return nil, err } - return client.GQLResult{ - Data: data, - } + return p.executeRequest(ctx, plan) } // MakePlan makes a plan from the parsed query. @TODO {defradb/issues/368}: Test this exported function. diff --git a/query/graphql/parser/subscription.go b/query/graphql/parser/subscription.go index a0ebb8f58f..f052212cf4 100644 --- a/query/graphql/parser/subscription.go +++ b/query/graphql/parser/subscription.go @@ -13,7 +13,6 @@ package parser import ( "github.com/graphql-go/graphql/language/ast" - "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/client/request" ) @@ -43,8 +42,8 @@ func parseSubscriptionOperationDefinition(def *ast.OperationDefinition) (*reques // parseSubscription parses a typed subscription field // which includes sub fields, and may include // filters, IDs, etc. -func parseSubscription(field *ast.Field) (*request.ObjectSubscription[client.GQLResult], error) { - sub := &request.ObjectSubscription[client.GQLResult]{ +func parseSubscription(field *ast.Field) (*request.ObjectSubscription, error) { + sub := &request.ObjectSubscription{ Field: request.Field{ Name: field.Name.Value, Alias: getFieldAlias(field), From a47ec446727298a4db194a236eb3831d24b16dc2 Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Wed, 9 Nov 2022 13:31:36 -0500 Subject: [PATCH 04/16] fix checking for clientSubscriptions too early --- db/subscriptions.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/db/subscriptions.go b/db/subscriptions.go index c6c22dee16..c62b5acbfb 100644 --- a/db/subscriptions.go +++ b/db/subscriptions.go @@ -92,13 +92,13 @@ func (db *db) handleClientSubscriptions(ctx context.Context) { } func (db *db) checkForClientSubsciptions(r *request.Request) (*events.Publisher, error) { - if db.clientSubscriptions == nil { - return nil, errors.New("server does not accept subscriptions") - } - if len(r.Subscription) > 0 && len(r.Subscription[0].Selections) > 0 { s := r.Subscription[0].Selections[0] if subRequest, ok := s.(*request.ObjectSubscription); ok { + if db.clientSubscriptions == nil { + return nil, errors.New("server does not accept subscriptions") + } + stream := events.NewPublisher(make(chan any)) db.clientSubscriptions.syncLock.Lock() subRequest.Stream = stream From 32d3a09a896d9892137744d62d29af30d45c3386 Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Thu, 10 Nov 2022 18:32:12 -0500 Subject: [PATCH 05/16] add tests --- api/http/handlerfuncs_test.go | 108 ++++++++++- db/subscriptions.go | 9 +- .../subscription/subscription_test.go | 173 ++++++++++++++++++ tests/integration/subscription/utils.go | 32 ++++ tests/integration/utils.go | 83 +++++++-- 5 files changed, 384 insertions(+), 21 deletions(-) create mode 100644 tests/integration/subscription/subscription_test.go create mode 100644 tests/integration/subscription/utils.go diff --git a/api/http/handlerfuncs_test.go b/api/http/handlerfuncs_test.go index f3bfe07fdb..e2879af4a9 100644 --- a/api/http/handlerfuncs_test.go +++ b/api/http/handlerfuncs_test.go @@ -19,7 +19,9 @@ import ( "net/http" "net/http/httptest" "strings" + "sync" "testing" + "time" badger "github.com/dgraph-io/badger/v3" "github.com/ipfs/go-cid" @@ -475,6 +477,78 @@ mutation { assert.Contains(t, users[0].Key, "bae-") } +func TestExecGQLHandlerWithSubsctiption(t *testing.T) { + ctx := context.Background() + defra := testNewInMemoryDB(t, ctx) + + // load schema + testLoadSchema(t, ctx, defra) + + stmt := ` +subscription { + user { + _key + age + name + } +}` + + buf := bytes.NewBuffer([]byte(stmt)) + + ch := make(chan []byte) + errCh := make(chan error) + + wg := &sync.WaitGroup{} + wg.Add(1) + + // We need to set a timeout otherwise the testSubscriptionRequest function will block until the + // http.ServeHTTP call returns, which in this case will only happen with a timeout. + ctxTimeout, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + go testSubscriptionRequest(ctxTimeout, testOptions{ + Testing: t, + DB: defra, + Method: "POST", + Path: GraphQLPath, + Body: buf, + Headers: map[string]string{"Content-Type": contentTypeGraphQL}, + ExpectedStatus: 200, + }, wg, ch, errCh) + + // We wait as long as possible before sending the mutation request. + wg.Wait() + + // add document + stmt2 := ` +mutation { + create_user(data: "{\"age\": 31, \"verified\": true, \"points\": 90, \"name\": \"Bob\"}") { + _key + } +}` + + buf2 := bytes.NewBuffer([]byte(stmt2)) + users := []testUser{} + resp := DataResponse{ + Data: &users, + } + testRequest(testOptions{ + Testing: t, + DB: defra, + Method: "POST", + Path: GraphQLPath, + Body: buf2, + ExpectedStatus: 200, + ResponseData: &resp, + }) + select { + case data := <-ch: + assert.Contains(t, string(data), "bae-91171025-ed21-50e3-b0dc-e31bccdfa1ab") + case err := <-errCh: + t.Fatal(err) + } +} + func TestLoadSchemaHandlerWithReadBodyError(t *testing.T) { t.Cleanup(CleanupEnv) env = "dev" @@ -848,6 +922,34 @@ func testRequest(opt testOptions) { } } +func testSubscriptionRequest(ctx context.Context, opt testOptions, wg *sync.WaitGroup, ch chan []byte, errCh chan error) { + req, err := http.NewRequest(opt.Method, opt.Path, opt.Body) + if err != nil { + errCh <- err + return + } + + req = req.WithContext(ctx) + + for k, v := range opt.Headers { + req.Header.Set(k, v) + } + + h := newHandler(opt.DB, opt.ServerOptions) + rec := httptest.NewRecorder() + wg.Done() + h.ServeHTTP(rec, req) + assert.Equal(opt.Testing, opt.ExpectedStatus, rec.Result().StatusCode) + + respBody, err := io.ReadAll(rec.Result().Body) + if err != nil { + errCh <- err + return + } + + ch <- respBody +} + func testNewInMemoryDB(t *testing.T, ctx context.Context) client.DB { // init in memory DB opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)} @@ -856,12 +958,16 @@ func testNewInMemoryDB(t *testing.T, ctx context.Context) client.DB { t.Fatal(err) } - var options []db.Option + options := []db.Option{ + db.WithUpdateEvents(), + db.WithClientSubscriptions(ctx), + } defra, err := db.NewDB(ctx, rootstore, options...) if err != nil { t.Fatal(err) } + return defra } diff --git a/db/subscriptions.go b/db/subscriptions.go index c62b5acbfb..f5e37ff5b9 100644 --- a/db/subscriptions.go +++ b/db/subscriptions.go @@ -36,18 +36,25 @@ func (db *db) handleClientSubscriptions(ctx context.Context) { log.Info(ctx, "Starting client subscription handler") for evt := range db.clientSubscriptions.updateEvt { + db.clientSubscriptions.syncLock.Lock() + if len(db.clientSubscriptions.requests) == 0 { + db.clientSubscriptions.syncLock.Unlock() + continue + } + txn, err := db.NewTxn(ctx, false) if err != nil { log.Error(ctx, err.Error()) + db.clientSubscriptions.syncLock.Unlock() continue } - db.clientSubscriptions.syncLock.Lock() planner := planner.New(ctx, db, txn) col, err := db.GetCollectionBySchemaID(ctx, evt.SchemaID) if err != nil { log.Error(ctx, err.Error()) + db.clientSubscriptions.syncLock.Unlock() continue } diff --git a/tests/integration/subscription/subscription_test.go b/tests/integration/subscription/subscription_test.go new file mode 100644 index 0000000000..39c0120b73 --- /dev/null +++ b/tests/integration/subscription/subscription_test.go @@ -0,0 +1,173 @@ +// Copyright 2022 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package subscription + +import ( + "testing" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +func TestSubscriptionWithCreateMutations(t *testing.T) { + test := testUtils.QueryTestCase{ + Description: "Subscription with user creations", + Query: `subscription { + user { + _key + name + age + } + }`, + PostSubscriptionQueries: []testUtils.SubscriptionQuery{ + { + Query: `mutation { + create_user(data: "{\"name\": \"John\",\"age\": 27,\"points\": 42.1,\"verified\": true}") { + _key + name + age + } + }`, + Results: []map[string]any{ + { + "_key": "bae-0a24cf29-b2c2-5861-9d00-abd6250c475d", + "age": uint64(27), + "name": "John", + }, + }, + }, + { + Query: `mutation { + create_user(data: "{\"name\": \"Addo\",\"age\": 31,\"points\": 42.1,\"verified\": true}") { + _key + name + age + } + }`, + Results: []map[string]any{ + { + "_key": "bae-18def051-7f0f-5dc9-8a69-2a5e423f6b55", + "age": uint64(31), + "name": "Addo", + }, + }, + }, + }, + DisableMapStore: true, + } + + executeTestCase(t, test) +} + +func TestSubscriptionWithFilterAndOneCreateMutation(t *testing.T) { + test := testUtils.QueryTestCase{ + Description: "Subscription with filter and one user creation", + Query: `subscription { + user(filter: {age: {_lt: 30}}) { + _key + name + age + } + }`, + PostSubscriptionQueries: []testUtils.SubscriptionQuery{ + { + Query: `mutation { + create_user(data: "{\"name\": \"John\",\"age\": 27,\"points\": 42.1,\"verified\": true}") { + _key + name + age + } + }`, + Results: []map[string]any{ + { + "_key": "bae-0a24cf29-b2c2-5861-9d00-abd6250c475d", + "age": uint64(27), + "name": "John", + }, + }, + }, + }, + DisableMapStore: true, + } + + executeTestCase(t, test) +} + +func TestSubscriptionWithFilterAndOneCreateMutationOutsideFilter(t *testing.T) { + test := testUtils.QueryTestCase{ + Description: "Subscription with filter and one user creation outside of the filter", + Query: `subscription { + user(filter: {age: {_gt: 30}}) { + _key + name + age + } + }`, + PostSubscriptionQueries: []testUtils.SubscriptionQuery{ + { + Query: `mutation { + create_user(data: "{\"name\": \"John\",\"age\": 27,\"points\": 42.1,\"verified\": true}") { + _key + name + age + } + }`, + ExpectedTimout: true, + }, + }, + DisableMapStore: true, + } + + executeTestCase(t, test) +} + +func TestSubscriptionWithFilterAndCreateMutations(t *testing.T) { + test := testUtils.QueryTestCase{ + Description: "Subscription with filter and user creation in and outside of the filter", + Query: `subscription { + user(filter: {age: {_lt: 30}}) { + _key + name + age + } + }`, + PostSubscriptionQueries: []testUtils.SubscriptionQuery{ + { + Query: `mutation { + create_user(data: "{\"name\": \"John\",\"age\": 27,\"points\": 42.1,\"verified\": true}") { + _key + name + age + } + }`, + Results: []map[string]any{ + { + "_key": "bae-0a24cf29-b2c2-5861-9d00-abd6250c475d", + "age": uint64(27), + "name": "John", + }, + }, + }, + { + Query: `mutation { + create_user(data: "{\"name\": \"Addo\",\"age\": 31,\"points\": 42.1,\"verified\": true}") { + _key + name + age + } + }`, + ExpectedTimout: true, + }, + }, + DisableMapStore: true, + } + + executeTestCase(t, test) +} diff --git a/tests/integration/subscription/utils.go b/tests/integration/subscription/utils.go new file mode 100644 index 0000000000..1c4e7033ba --- /dev/null +++ b/tests/integration/subscription/utils.go @@ -0,0 +1,32 @@ +// Copyright 2022 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package subscription + +import ( + "testing" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +type dataMap = map[string]any + +var userSchema = (` + type user { + name: String + age: Int + points: Float + verified: Boolean + } +`) + +func executeTestCase(t *testing.T, test testUtils.QueryTestCase) { + testUtils.ExecuteQueryTestCase(t, userSchema, []string{"user"}, test) +} diff --git a/tests/integration/utils.go b/tests/integration/utils.go index f08c678140..8e01108a39 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -21,6 +21,7 @@ import ( "strings" "syscall" "testing" + "time" badger "github.com/dgraph-io/badger/v3" ds "github.com/ipfs/go-datastore" @@ -66,6 +67,17 @@ var ( mapStore bool ) +// Represents a query assigned to a particular transaction. +type SubscriptionQuery struct { + Query string + // The expected (data) results of the query + Results []map[string]any + // The expected error resulting from the query. + ExpectedError string + // If set to true, the query should yield no results + ExpectedTimout bool +} + // Represents a query assigned to a particular transaction. type TransactionQuery struct { // Used to identify the transaction for this to run against (allows multiple @@ -82,6 +94,10 @@ type TransactionQuery struct { type QueryTestCase struct { Description string Query string + + // A collection of queries to exucute after the subscriber is listening on the stream + PostSubscriptionQueries []SubscriptionQuery + // A collection of queries tied to a specific transaction. // These will be executed before `Query` (if specified), in the order that they are listed here. TransactionalQueries []TransactionQuery @@ -218,6 +234,8 @@ func NewBadgerMemoryDB(ctx context.Context, dbopts ...db.Option) (databaseInfo, return databaseInfo{}, err } + dbopts = append(dbopts, db.WithUpdateEvents(), db.WithClientSubscriptions(ctx)) + db, err := db.NewDB(ctx, rootstore, dbopts...) if err != nil { return databaseInfo{}, err @@ -232,7 +250,7 @@ func NewBadgerMemoryDB(ctx context.Context, dbopts ...db.Option) (databaseInfo, func NewMapDB(ctx context.Context) (databaseInfo, error) { rootstore := ds.NewMapDatastore() - db, err := db.NewDB(ctx, rootstore) + db, err := db.NewDB(ctx, rootstore, db.WithUpdateEvents(), db.WithClientSubscriptions(ctx)) if err != nil { return databaseInfo{}, err } @@ -262,7 +280,7 @@ func newBadgerFileDB(ctx context.Context, t testing.TB, path string) (databaseIn return databaseInfo{}, err } - db, err := db.NewDB(ctx, rootstore) + db, err := db.NewDB(ctx, rootstore, db.WithUpdateEvents(), db.WithClientSubscriptions(ctx)) if err != nil { return databaseInfo{}, err } @@ -363,7 +381,7 @@ func ExecuteQueryTestCase( continue } result := dbi.db.ExecTransactionalQuery(ctx, tq.Query, transactions[tq.TransactionId]) - if assertQueryResults(ctx, t, test.Description, result, tq.Results, tq.ExpectedError) { + if assertQueryResults(ctx, t, test.Description, &result.GQL, tq.Results, tq.ExpectedError) { erroredQueries[i] = true } } @@ -394,19 +412,46 @@ func ExecuteQueryTestCase( // the commited result of the transactional queries if test.Query != "" { result := dbi.db.ExecQuery(ctx, test.Query) - if assertQueryResults( - ctx, - t, - test.Description, - result, - test.Results, - test.ExpectedError, - ) { - continue - } + if result.Stream != nil { + for _, q := range test.PostSubscriptionQueries { + dbi.db.ExecQuery(ctx, q.Query) + select { + case s := <-result.Stream.Read(): + data := s.(client.GQLResult) + if assertQueryResults( + ctx, + t, + test.Description, + &data, + q.Results, + q.ExpectedError, + ) { + continue + } + // a safety in case the stream hangs or no results are expected. + case <-time.After(1 * time.Second): + if q.ExpectedTimout { + continue + } + assert.Fail(t, "timeout occured while waiting for data stream", test.Description) + } + } + result.Stream.Close() + } else { + if assertQueryResults( + ctx, + t, + test.Description, + &result.GQL, + test.Results, + test.ExpectedError, + ) { + continue + } - if test.ExpectedError != "" { - assert.Fail(t, "Expected an error however none was raised.", test.Description) + if test.ExpectedError != "" { + assert.Fail(t, "Expected an error however none was raised.", test.Description) + } } } @@ -643,18 +688,18 @@ func assertQueryResults( ctx context.Context, t *testing.T, description string, - result *client.QueryResult, + result *client.GQLResult, expectedResults []map[string]any, expectedError string, ) bool { - if assertErrors(t, description, result.GQL.Errors, expectedError) { + if assertErrors(t, description, result.Errors, expectedError) { return true } // Note: if result.Data == nil this panics (the panic seems useful while testing). - resultantData := result.GQL.Data.([]map[string]any) + resultantData := result.Data.([]map[string]any) - log.Info(ctx, "", logging.NewKV("QueryResults", result.GQL.Data)) + log.Info(ctx, "", logging.NewKV("QueryResults", result.Data)) // compare results assert.Equal(t, len(expectedResults), len(resultantData), description) From 0799dde3c15755d32c0c90f68514e78cbf38698f Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Thu, 10 Nov 2022 18:49:11 -0500 Subject: [PATCH 06/16] add listener check --- db/subscriptions.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/db/subscriptions.go b/db/subscriptions.go index f5e37ff5b9..9d71190b97 100644 --- a/db/subscriptions.go +++ b/db/subscriptions.go @@ -28,6 +28,12 @@ type subscriptions struct { syncLock sync.Mutex } +func (s *subscriptions) hasListeners() bool { + s.syncLock.Lock() + defer s.syncLock.Unlock() + return len(s.requests) > 0 +} + func (db *db) handleClientSubscriptions(ctx context.Context) { if db.clientSubscriptions == nil { log.Info(ctx, "can't run subscription without adding the option to the db") @@ -36,16 +42,13 @@ func (db *db) handleClientSubscriptions(ctx context.Context) { log.Info(ctx, "Starting client subscription handler") for evt := range db.clientSubscriptions.updateEvt { - db.clientSubscriptions.syncLock.Lock() - if len(db.clientSubscriptions.requests) == 0 { - db.clientSubscriptions.syncLock.Unlock() + if !db.clientSubscriptions.hasListeners() { continue } txn, err := db.NewTxn(ctx, false) if err != nil { log.Error(ctx, err.Error()) - db.clientSubscriptions.syncLock.Unlock() continue } @@ -54,10 +57,11 @@ func (db *db) handleClientSubscriptions(ctx context.Context) { col, err := db.GetCollectionBySchemaID(ctx, evt.SchemaID) if err != nil { log.Error(ctx, err.Error()) - db.clientSubscriptions.syncLock.Unlock() continue } + db.clientSubscriptions.syncLock.Lock() + // keeping track of the active requests subs := db.clientSubscriptions.requests[:0] for _, objSub := range db.clientSubscriptions.requests { From cc8cc719e874a35bd8afa5b99c80a307e27e92cf Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Fri, 11 Nov 2022 10:04:09 -0500 Subject: [PATCH 07/16] remove dead code --- api/http/handler.go | 1 - tests/integration/subscription/utils.go | 2 -- 2 files changed, 3 deletions(-) diff --git a/api/http/handler.go b/api/http/handler.go index 5efb33066f..375f9b56a1 100644 --- a/api/http/handler.go +++ b/api/http/handler.go @@ -33,7 +33,6 @@ type handler struct { // context variables type ( - ctxBroker struct{} ctxDB struct{} ctxPeerID struct{} ) diff --git a/tests/integration/subscription/utils.go b/tests/integration/subscription/utils.go index 1c4e7033ba..315af1be6c 100644 --- a/tests/integration/subscription/utils.go +++ b/tests/integration/subscription/utils.go @@ -16,8 +16,6 @@ import ( testUtils "github.com/sourcenetwork/defradb/tests/integration" ) -type dataMap = map[string]any - var userSchema = (` type user { name: String From c343ddfdab71989973183b41ec5e565c555b793e Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Fri, 11 Nov 2022 16:19:59 -0500 Subject: [PATCH 08/16] refactor publisher --- api/http/handlerfuncs.go | 14 ++-- api/http/handlerfuncs_test.go | 1 - cli/start.go | 1 - client/db.go | 4 +- client/request/subscription.go | 12 +--- db/db.go | 21 ------ db/query.go | 7 +- db/subscriptions.go | 122 +++++++++++---------------------- events/publisher.go | 88 +++++++++--------------- planner/planner.go | 9 +-- tests/integration/utils.go | 12 ++-- 11 files changed, 97 insertions(+), 194 deletions(-) diff --git a/api/http/handlerfuncs.go b/api/http/handlerfuncs.go index e28488d5d2..e12fc59153 100644 --- a/api/http/handlerfuncs.go +++ b/api/http/handlerfuncs.go @@ -25,6 +25,7 @@ import ( "github.com/multiformats/go-multihash" "github.com/pkg/errors" + "github.com/sourcenetwork/defradb/client" corecrdt "github.com/sourcenetwork/defradb/core/crdt" "github.com/sourcenetwork/defradb/events" ) @@ -146,8 +147,8 @@ func execGQLHandler(rw http.ResponseWriter, req *http.Request) { } result := db.ExecQuery(req.Context(), query) - if result.Stream != nil { - subscriptionHandler(result.Stream, rw, req) + if result.Pub != nil { + subscriptionHandler(result.Pub, rw, req) return } @@ -265,7 +266,7 @@ func peerIDHandler(rw http.ResponseWriter, req *http.Request) { ) } -func subscriptionHandler(stream *events.Publisher, rw http.ResponseWriter, req *http.Request) { +func subscriptionHandler(pub *events.Publisher[client.UpdateEvent], rw http.ResponseWriter, req *http.Request) { flusher, ok := rw.(http.Flusher) if !ok { handleErr(req.Context(), rw, errors.New("streaming unsupported"), http.StatusInternalServerError) @@ -279,9 +280,12 @@ func subscriptionHandler(stream *events.Publisher, rw http.ResponseWriter, req * for { select { case <-req.Context().Done(): - stream.Close() + pub.Unsubscribe() return - case s := <-stream.Read(): + case s, open := <-pub.Stream(): + if !open { + return + } b, err := json.Marshal(s) if err != nil { handleErr(req.Context(), rw, err, http.StatusInternalServerError) diff --git a/api/http/handlerfuncs_test.go b/api/http/handlerfuncs_test.go index e2879af4a9..b99af70fa4 100644 --- a/api/http/handlerfuncs_test.go +++ b/api/http/handlerfuncs_test.go @@ -960,7 +960,6 @@ func testNewInMemoryDB(t *testing.T, ctx context.Context) client.DB { options := []db.Option{ db.WithUpdateEvents(), - db.WithClientSubscriptions(ctx), } defra, err := db.NewDB(ctx, rootstore, options...) diff --git a/cli/start.go b/cli/start.go index e7cd0d1f01..cefc8b924b 100644 --- a/cli/start.go +++ b/cli/start.go @@ -237,7 +237,6 @@ func start(ctx context.Context) (*defraInstance, error) { options := []db.Option{ db.WithUpdateEvents(), - db.WithClientSubscriptions(ctx), } db, err := db.NewDB(ctx, rootstore, options...) diff --git a/client/db.go b/client/db.go index 091a3c7132..a101320657 100644 --- a/client/db.go +++ b/client/db.go @@ -47,6 +47,6 @@ type GQLResult struct { } type QueryResult struct { - GQL GQLResult - Stream *events.Publisher + GQL GQLResult + Pub *events.Publisher[UpdateEvent] } diff --git a/client/request/subscription.go b/client/request/subscription.go index 13a1695fc5..e82a99acc9 100644 --- a/client/request/subscription.go +++ b/client/request/subscription.go @@ -12,7 +12,6 @@ package request import ( "github.com/sourcenetwork/defradb/client" - "github.com/sourcenetwork/defradb/events" ) // ObjectSubscription is a field on the SubscriptionType @@ -21,29 +20,24 @@ import ( type ObjectSubscription struct { Field - DocKeys client.Option[[]string] - CID client.Option[string] - // Schema is the target schema/collection Schema string Filter client.Option[Filter] Fields []Selection - - Stream *events.Publisher } // ToSelect returns a basic Select object, with the same Name, Alias, and Fields as // the Subscription object. Used to create a Select planNode for the event stream return objects. -func (m ObjectSubscription) ToSelect() *Select { +func (m ObjectSubscription) ToSelect(docKey, cid string) *Select { return &Select{ Field: Field{ Name: m.Schema, Alias: m.Alias, }, - DocKeys: m.DocKeys, - CID: m.CID, + DocKeys: client.Some([]string{docKey}), + CID: client.Some(cid), Fields: m.Fields, Filter: m.Filter, } diff --git a/db/db.go b/db/db.go index 14db5c6be6..d14670faff 100644 --- a/db/db.go +++ b/db/db.go @@ -60,8 +60,6 @@ type db struct { events client.Events - clientSubscriptions *subscriptions - parser core.Parser // The options used to init the database @@ -81,25 +79,6 @@ func WithUpdateEvents() Option { } } -// WithClientSubscriptions adds GraphQL API relateded subscription capabilities. -// Must be called after WithUpdateEvents. -func WithClientSubscriptions(ctx context.Context) Option { - return func(db *db) { - if db.events.Updates.HasValue() { - sub, err := db.events.Updates.Value().Subscribe() - if err != nil { - log.Error(ctx, err.Error()) - return - } - db.clientSubscriptions = &subscriptions{ - updateEvt: sub, - } - - go db.handleClientSubscriptions(ctx) - } - } -} - // NewDB creates a new instance of the DB using the given options. func NewDB(ctx context.Context, rootstore ds.Batching, options ...Option) (client.DB, error) { return newDB(ctx, rootstore, options...) diff --git a/db/query.go b/db/query.go index 59a1bad733..5a98df4f02 100644 --- a/db/query.go +++ b/db/query.go @@ -43,14 +43,15 @@ func (db *db) ExecQuery(ctx context.Context, query string) *client.QueryResult { return res } - stream, err := db.checkForClientSubsciptions(request) + pub, subRequest, err := db.checkForClientSubsciptions(request) if err != nil { res.GQL.Errors = []any{err.Error()} return res } - if stream != nil { - res.Stream = stream + if pub != nil { + res.Pub = pub + go db.handleSubscription(ctx, pub, subRequest) return res } diff --git a/db/subscriptions.go b/db/subscriptions.go index 9d71190b97..b1f9602c54 100644 --- a/db/subscriptions.go +++ b/db/subscriptions.go @@ -13,7 +13,6 @@ package db import ( "context" "fmt" - "sync" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/client/request" @@ -22,103 +21,62 @@ import ( "github.com/sourcenetwork/defradb/planner" ) -type subscriptions struct { - updateEvt events.Subscription[client.UpdateEvent] - requests []*request.ObjectSubscription - syncLock sync.Mutex -} - -func (s *subscriptions) hasListeners() bool { - s.syncLock.Lock() - defer s.syncLock.Unlock() - return len(s.requests) > 0 -} - -func (db *db) handleClientSubscriptions(ctx context.Context) { - if db.clientSubscriptions == nil { - log.Info(ctx, "can't run subscription without adding the option to the db") - return +func (db *db) checkForClientSubsciptions(r *request.Request) ( + *events.Publisher[client.UpdateEvent], + *request.ObjectSubscription, + error, +) { + if !db.events.Updates.HasValue() { + return nil, nil, errors.New("server does not accept subscriptions") } - log.Info(ctx, "Starting client subscription handler") - for evt := range db.clientSubscriptions.updateEvt { - if !db.clientSubscriptions.hasListeners() { - continue - } + if len(r.Subscription) > 0 && len(r.Subscription[0].Selections) > 0 { + s := r.Subscription[0].Selections[0] + if subRequest, ok := s.(*request.ObjectSubscription); ok { + pub, err := events.NewPublisher(db.events.Updates.Value()) + if err != nil { + return nil, nil, err + } - txn, err := db.NewTxn(ctx, false) - if err != nil { - log.Error(ctx, err.Error()) - continue + return pub, subRequest, nil } - planner := planner.New(ctx, db, txn) + return nil, nil, errors.New(fmt.Sprintf("expected ObjectSubscription[client.GQLResult] type but got %T", s)) + } + return nil, nil, nil +} - col, err := db.GetCollectionBySchemaID(ctx, evt.SchemaID) +func (db *db) handleSubscription( + ctx context.Context, + pub *events.Publisher[client.UpdateEvent], + r *request.ObjectSubscription, +) { + for evt := range pub.Event() { + txn, err := db.NewTxn(ctx, false) if err != nil { log.Error(ctx, err.Error()) continue } - db.clientSubscriptions.syncLock.Lock() + p := planner.New(ctx, db, txn) - // keeping track of the active requests - subs := db.clientSubscriptions.requests[:0] - for _, objSub := range db.clientSubscriptions.requests { - if objSub.Stream.IsClosed() { - continue - } - subs = append(subs, objSub) - if objSub.Schema == col.Name() { - objSub.CID = client.Some(evt.Cid.String()) - objSub.DocKeys = client.Some([]string{evt.DocKey}) - result, err := planner.RunSubscriptionRequest(ctx, objSub) - if err != nil { - objSub.Stream.Write(client.GQLResult{ - Errors: []any{err.Error()}, - }) - } - - // Don't send anything back to the client if the request yields an empty dataset. - if len(result) == 0 { - continue - } - - objSub.Stream.Write(client.GQLResult{ - Data: result, - }) - } - } + s := r.ToSelect(evt.DocKey, evt.Cid.String()) - // helping the GC - for i := len(subs); i < len(db.clientSubscriptions.requests); i++ { - db.clientSubscriptions.requests[i] = nil + result, err := p.RunSubscriptionRequest(ctx, s) + if err != nil { + pub.Publish(client.GQLResult{ + Errors: []any{err.Error()}, + }) + continue } - db.clientSubscriptions.requests = subs - - txn.Discard(ctx) - db.clientSubscriptions.syncLock.Unlock() - } -} - -func (db *db) checkForClientSubsciptions(r *request.Request) (*events.Publisher, error) { - if len(r.Subscription) > 0 && len(r.Subscription[0].Selections) > 0 { - s := r.Subscription[0].Selections[0] - if subRequest, ok := s.(*request.ObjectSubscription); ok { - if db.clientSubscriptions == nil { - return nil, errors.New("server does not accept subscriptions") - } - - stream := events.NewPublisher(make(chan any)) - db.clientSubscriptions.syncLock.Lock() - subRequest.Stream = stream - db.clientSubscriptions.requests = append(db.clientSubscriptions.requests, subRequest) - db.clientSubscriptions.syncLock.Unlock() - return stream, nil + // Don't send anything back to the client if the request yields an empty dataset. + if len(result) == 0 { + continue } - return nil, errors.New(fmt.Sprintf("expected ObjectSubscription[client.GQLResult] type but got %T", s)) + pub.Publish(client.GQLResult{ + Data: result, + }) } - return nil, nil } diff --git a/events/publisher.go b/events/publisher.go index 4336393237..69e2a32138 100644 --- a/events/publisher.go +++ b/events/publisher.go @@ -10,73 +10,49 @@ package events -import "sync" +import "time" -// Publisher holds a channel and sync mechanism that enable safe writes -// where the reader is expected to be the one closing the channel. -type Publisher struct { - ch chan any +// time limit we set for the client to read after publishing. +const clientTimeout = 60 * time.Second - closingCh chan struct{} - isClosed bool - writersWG sync.WaitGroup - syncLock sync.Mutex +type Publisher[T any] struct { + ch Channel[T] + event Subscription[T] + stream chan any } -// NewPublisher creates a Publisher with the given channel -func NewPublisher(ch chan any) *Publisher { - return &Publisher{ - ch: ch, - closingCh: make(chan struct{}), +func NewPublisher[T any](ch Channel[T]) (*Publisher[T], error) { + evtCh, err := ch.Subscribe() + if err != nil { + return nil, err } -} -// Read returns the channel -func (p *Publisher) Read() <-chan any { - return p.ch + return &Publisher[T]{ + ch: ch, + event: evtCh, + stream: make(chan any), + }, nil } -// Write into the channel -func (p *Publisher) Write(data any) { - go func(data any) { - p.syncLock.Lock() - p.writersWG.Add(1) - p.syncLock.Unlock() - defer p.writersWG.Done() - - select { - case <-p.closingCh: - return - default: - } - - select { - case <-p.closingCh: - case p.ch <- data: - } - }(data) +func (p *Publisher[T]) Event() Subscription[T] { + return p.event } -// Closes channel, draining any blocked writes -func (p *Publisher) Close() { - close(p.closingCh) - - go func() { - for range p.ch { - } - }() - - p.syncLock.Lock() - p.writersWG.Wait() - p.isClosed = true - close(p.ch) - p.syncLock.Unlock() +func (p *Publisher[T]) Stream() chan any { + return p.stream } -// IsClosed returns true if the channel has been closed. -func (p *Publisher) IsClosed() bool { - p.syncLock.Lock() - defer p.syncLock.Unlock() +func (p *Publisher[T]) Publish(data any) { + select { + case p.stream <- data: + case <-time.After(clientTimeout): + // if sending to the client times out, we assume an inactive or problematic client and + // unsubscribe them from the event stream + p.Unsubscribe() + } +} - return p.isClosed +func (p *Publisher[T]) Unsubscribe() { + p.ch.Unsubscribe(p.event) + close(p.stream) } diff --git a/planner/planner.go b/planner/planner.go index 8496ec2b5b..778e811bbf 100644 --- a/planner/planner.go +++ b/planner/planner.go @@ -146,13 +146,6 @@ func (p *Planner) newPlan(stmt any) (planNode, error) { return nil, err } return p.newObjectMutationPlan(m) - - case *request.ObjectSubscription: - m, err := mapper.ToSelect(p.ctx, p.txn, n.ToSelect()) - if err != nil { - return nil, err - } - return p.Select(m) } return nil, errors.New(fmt.Sprintf("Unknown statement type %T", stmt)) @@ -535,7 +528,7 @@ func (p *Planner) RunRequest( // RunSubscriptionRequest plans a request specific to a subscription and returns the result. func (p *Planner) RunSubscriptionRequest( ctx context.Context, - query *request.ObjectSubscription, + query *request.Select, ) ([]map[string]any, error) { plan, err := p.makePlan(query) if err != nil { diff --git a/tests/integration/utils.go b/tests/integration/utils.go index 8e01108a39..e13d43b54f 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -234,7 +234,7 @@ func NewBadgerMemoryDB(ctx context.Context, dbopts ...db.Option) (databaseInfo, return databaseInfo{}, err } - dbopts = append(dbopts, db.WithUpdateEvents(), db.WithClientSubscriptions(ctx)) + dbopts = append(dbopts, db.WithUpdateEvents()) db, err := db.NewDB(ctx, rootstore, dbopts...) if err != nil { @@ -250,7 +250,7 @@ func NewBadgerMemoryDB(ctx context.Context, dbopts ...db.Option) (databaseInfo, func NewMapDB(ctx context.Context) (databaseInfo, error) { rootstore := ds.NewMapDatastore() - db, err := db.NewDB(ctx, rootstore, db.WithUpdateEvents(), db.WithClientSubscriptions(ctx)) + db, err := db.NewDB(ctx, rootstore, db.WithUpdateEvents()) if err != nil { return databaseInfo{}, err } @@ -280,7 +280,7 @@ func newBadgerFileDB(ctx context.Context, t testing.TB, path string) (databaseIn return databaseInfo{}, err } - db, err := db.NewDB(ctx, rootstore, db.WithUpdateEvents(), db.WithClientSubscriptions(ctx)) + db, err := db.NewDB(ctx, rootstore, db.WithUpdateEvents()) if err != nil { return databaseInfo{}, err } @@ -412,11 +412,11 @@ func ExecuteQueryTestCase( // the commited result of the transactional queries if test.Query != "" { result := dbi.db.ExecQuery(ctx, test.Query) - if result.Stream != nil { + if result.Pub != nil { for _, q := range test.PostSubscriptionQueries { dbi.db.ExecQuery(ctx, q.Query) select { - case s := <-result.Stream.Read(): + case s := <-result.Pub.Stream(): data := s.(client.GQLResult) if assertQueryResults( ctx, @@ -436,7 +436,7 @@ func ExecuteQueryTestCase( assert.Fail(t, "timeout occured while waiting for data stream", test.Description) } } - result.Stream.Close() + result.Pub.Unsubscribe() } else { if assertQueryResults( ctx, From 0cad332f43ea4ae3896a41148fcf4c1c561c5ab1 Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Sat, 12 Nov 2022 10:52:34 -0500 Subject: [PATCH 09/16] change field Schema to Collection --- client/request/mutation.go | 6 +++--- client/request/subscription.go | 6 +++--- query/graphql/parser/mutation.go | 2 +- query/graphql/parser/subscription.go | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/client/request/mutation.go b/client/request/mutation.go index f3c1441b5c..89450fa57e 100644 --- a/client/request/mutation.go +++ b/client/request/mutation.go @@ -31,9 +31,9 @@ type Mutation struct { Field Type MutationType - // Schema is the target schema/collection + // Collection is the target collection name // if this mutation is on an object. - Schema string + Collection string IDs client.Option[[]string] Filter client.Option[Filter] @@ -47,7 +47,7 @@ type Mutation struct { func (m Mutation) ToSelect() *Select { return &Select{ Field: Field{ - Name: m.Schema, + Name: m.Collection, Alias: m.Alias, }, Fields: m.Fields, diff --git a/client/request/subscription.go b/client/request/subscription.go index e82a99acc9..bdc2afbdc9 100644 --- a/client/request/subscription.go +++ b/client/request/subscription.go @@ -20,8 +20,8 @@ import ( type ObjectSubscription struct { Field - // Schema is the target schema/collection - Schema string + // Collection is the target collection name + Collection string Filter client.Option[Filter] @@ -33,7 +33,7 @@ type ObjectSubscription struct { func (m ObjectSubscription) ToSelect(docKey, cid string) *Select { return &Select{ Field: Field{ - Name: m.Schema, + Name: m.Collection, Alias: m.Alias, }, DocKeys: client.Some([]string{docKey}), diff --git a/query/graphql/parser/mutation.go b/query/graphql/parser/mutation.go index 17cf9e498f..f00cfa0720 100644 --- a/query/graphql/parser/mutation.go +++ b/query/graphql/parser/mutation.go @@ -92,7 +92,7 @@ func parseMutation(field *ast.Field) (*request.Mutation, error) { // then the mutation name would be create_my_book // so we need to recreate the string my_book, which // has been split by "_", so we just join by "_" - mut.Schema = strings.Join(mutNameParts[1:], "_") + mut.Collection = strings.Join(mutNameParts[1:], "_") } // parse arguments diff --git a/query/graphql/parser/subscription.go b/query/graphql/parser/subscription.go index f052212cf4..ae51175ecc 100644 --- a/query/graphql/parser/subscription.go +++ b/query/graphql/parser/subscription.go @@ -50,7 +50,7 @@ func parseSubscription(field *ast.Field) (*request.ObjectSubscription, error) { }, } - sub.Schema = sub.Name + sub.Collection = sub.Name // parse arguments for _, argument := range field.Arguments { From 3374c524da1873d9d784cdeedd63b43b4899daa1 Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Sat, 12 Nov 2022 11:15:37 -0500 Subject: [PATCH 10/16] add package level errors --- db/errors.go | 31 +++++++++++++++++++++++++++++++ db/errors_test.go | 23 +++++++++++++++++++++++ db/subscriptions.go | 6 ++---- 3 files changed, 56 insertions(+), 4 deletions(-) create mode 100644 db/errors.go create mode 100644 db/errors_test.go diff --git a/db/errors.go b/db/errors.go new file mode 100644 index 0000000000..a5c353eae1 --- /dev/null +++ b/db/errors.go @@ -0,0 +1,31 @@ +// Copyright 2022 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package db + +import ( + "fmt" + + "github.com/sourcenetwork/defradb/errors" +) + +var ( + ErrSubscriptionsNotAllowed = errors.New("server does not accept subscriptions") + ErrUnexpectedType = errors.New("unexpected type") +) + +func NewErrUnexpectedType[T any](actual any) error { + var expected T + return errors.WithStack( + ErrUnexpectedType, + errors.NewKV("Expected", fmt.Sprintf("%T", expected)), + errors.NewKV("Actual", fmt.Sprintf("%T", actual)), + ) +} diff --git a/db/errors_test.go b/db/errors_test.go new file mode 100644 index 0000000000..0fd8e793d4 --- /dev/null +++ b/db/errors_test.go @@ -0,0 +1,23 @@ +// Copyright 2022 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package db + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNewUnexpectedType(t *testing.T) { + someString := "defradb" + err := NewErrUnexpectedType[int](someString) + assert.Equal(t, err.Error(), "unexpected type. Expected: int, Actual: string") +} diff --git a/db/subscriptions.go b/db/subscriptions.go index b1f9602c54..205112b22d 100644 --- a/db/subscriptions.go +++ b/db/subscriptions.go @@ -12,11 +12,9 @@ package db import ( "context" - "fmt" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/client/request" - "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/events" "github.com/sourcenetwork/defradb/planner" ) @@ -27,7 +25,7 @@ func (db *db) checkForClientSubsciptions(r *request.Request) ( error, ) { if !db.events.Updates.HasValue() { - return nil, nil, errors.New("server does not accept subscriptions") + return nil, nil, ErrSubscriptionsNotAllowed } if len(r.Subscription) > 0 && len(r.Subscription[0].Selections) > 0 { @@ -41,7 +39,7 @@ func (db *db) checkForClientSubsciptions(r *request.Request) ( return pub, subRequest, nil } - return nil, nil, errors.New(fmt.Sprintf("expected ObjectSubscription[client.GQLResult] type but got %T", s)) + return nil, nil, NewErrUnexpectedType[request.ObjectSubscription](s) } return nil, nil, nil } From dbe3e2b8a2a5b0138e87c8ca34422fd264507645 Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Sat, 12 Nov 2022 11:25:07 -0500 Subject: [PATCH 11/16] add timeout comment --- tests/integration/utils.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/integration/utils.go b/tests/integration/utils.go index e13d43b54f..790961e54c 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -67,6 +67,8 @@ var ( mapStore bool ) +const subsciptionTimeout = 1 * time.Second + // Represents a query assigned to a particular transaction. type SubscriptionQuery struct { Query string @@ -74,7 +76,8 @@ type SubscriptionQuery struct { Results []map[string]any // The expected error resulting from the query. ExpectedError string - // If set to true, the query should yield no results + // If set to true, the query should yield no results. + // The timeout is duration is that of subscriptionTimeout (1 second) ExpectedTimout bool } @@ -429,7 +432,7 @@ func ExecuteQueryTestCase( continue } // a safety in case the stream hangs or no results are expected. - case <-time.After(1 * time.Second): + case <-time.After(subsciptionTimeout): if q.ExpectedTimout { continue } From d447d989ae9a4557e1c7c36aaea2946b05b4acfa Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Sat, 12 Nov 2022 12:16:33 -0500 Subject: [PATCH 12/16] add update test on subscription --- .../subscription/subscription_test.go | 110 ++++++++++++++++++ tests/integration/utils.go | 58 ++++++--- 2 files changed, 151 insertions(+), 17 deletions(-) diff --git a/tests/integration/subscription/subscription_test.go b/tests/integration/subscription/subscription_test.go index 39c0120b73..108d228c37 100644 --- a/tests/integration/subscription/subscription_test.go +++ b/tests/integration/subscription/subscription_test.go @@ -171,3 +171,113 @@ func TestSubscriptionWithFilterAndCreateMutations(t *testing.T) { executeTestCase(t, test) } + +func TestSubscriptionWithUpdateMutations(t *testing.T) { + test := testUtils.QueryTestCase{ + Description: "Subscription with user creations", + Query: `subscription { + user { + _key + name + age + points + } + }`, + Docs: map[int][]string{ + 0: { + `{ + "name": "John", + "age": 27, + "verified": true, + "points": 42.1 + }`, + `{ + "name": "Addo", + "age": 35, + "verified": true, + "points": 50 + }`, + }, + }, + PostSubscriptionQueries: []testUtils.SubscriptionQuery{ + { + Query: `mutation { + update_user(filter: {name: {_eq: "John"}}, data: "{\"points\": 45}") { + _key + name + age + } + }`, + Results: []map[string]any{ + { + "_key": "bae-0a24cf29-b2c2-5861-9d00-abd6250c475d", + "age": uint64(27), + "name": "John", + "points": float64(45), + }, + }, + }, + }, + DisableMapStore: true, + } + + executeTestCase(t, test) +} + +func TestSubscriptionWithUpdateAllMutations(t *testing.T) { + test := testUtils.QueryTestCase{ + Description: "Subscription with user creations", + Query: `subscription { + user { + _key + name + age + points + } + }`, + Docs: map[int][]string{ + 0: { + `{ + "name": "John", + "age": 27, + "verified": true, + "points": 42.1 + }`, + `{ + "name": "Addo", + "age": 31, + "verified": true, + "points": 50 + }`, + }, + }, + PostSubscriptionQueries: []testUtils.SubscriptionQuery{ + { + Query: `mutation { + update_user(data: "{\"points\": 55}") { + _key + name + age + } + }`, + Results: []map[string]any{ + { + "_key": "bae-0a24cf29-b2c2-5861-9d00-abd6250c475d", + "age": uint64(27), + "name": "John", + "points": float64(55), + }, + { + "_key": "bae-cf723876-5c6a-5dcf-a877-ab288eb30d57", + "age": uint64(31), + "name": "Addo", + "points": float64(55), + }, + }, + }, + }, + DisableMapStore: true, + } + + executeTestCase(t, test) +} diff --git a/tests/integration/utils.go b/tests/integration/utils.go index 790961e54c..5d3091c845 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -418,25 +418,49 @@ func ExecuteQueryTestCase( if result.Pub != nil { for _, q := range test.PostSubscriptionQueries { dbi.db.ExecQuery(ctx, q.Query) - select { - case s := <-result.Pub.Stream(): - data := s.(client.GQLResult) - if assertQueryResults( - ctx, - t, - test.Description, - &data, - q.Results, - q.ExpectedError, - ) { - continue + data := []map[string]any{} + errs := []any{} + if len(q.Results) > 1 { + for range q.Results { + select { + case s := <-result.Pub.Stream(): + sResult := s.(client.GQLResult) + sData := sResult.Data.([]map[string]any) + errs = append(errs, sResult.Errors...) + data = append(data, sData...) + // a safety in case the stream hangs. + case <-time.After(subsciptionTimeout): + assert.Fail(t, "timeout occured while waiting for data stream", test.Description) + } } - // a safety in case the stream hangs or no results are expected. - case <-time.After(subsciptionTimeout): - if q.ExpectedTimout { - continue + } else { + select { + case s := <-result.Pub.Stream(): + sResult := s.(client.GQLResult) + sData := sResult.Data.([]map[string]any) + errs = append(errs, sResult.Errors...) + data = append(data, sData...) + // a safety in case the stream hangs or no results are expected. + case <-time.After(subsciptionTimeout): + if q.ExpectedTimout { + continue + } + assert.Fail(t, "timeout occured while waiting for data stream", test.Description) } - assert.Fail(t, "timeout occured while waiting for data stream", test.Description) + } + gqlResult := &client.GQLResult{ + Data: data, + Errors: errs, + } + if assertQueryResults( + ctx, + t, + test.Description, + gqlResult, + q.Results, + q.ExpectedError, + ) { + continue } } result.Pub.Unsubscribe() From 4e5b820c79174790a4d188586ede1473d3e00953 Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Sat, 12 Nov 2022 12:18:08 -0500 Subject: [PATCH 13/16] remove unnecessary comments --- query/graphql/parser/subscription.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/query/graphql/parser/subscription.go b/query/graphql/parser/subscription.go index ae51175ecc..adf9857f9a 100644 --- a/query/graphql/parser/subscription.go +++ b/query/graphql/parser/subscription.go @@ -52,10 +52,9 @@ func parseSubscription(field *ast.Field) (*request.ObjectSubscription, error) { sub.Collection = sub.Name - // parse arguments for _, argument := range field.Arguments { prop := argument.Name.Value - if prop == request.FilterClause { // parse filter + if prop == request.FilterClause { obj := argument.Value.(*ast.ObjectValue) filter, err := NewFilter(obj) if err != nil { From eaa17e59bbb479af9cabd32ae5c49772d51438c4 Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Sat, 12 Nov 2022 12:21:58 -0500 Subject: [PATCH 14/16] change GraphQL type to have capital letter --- .../subscription/subscription_test.go | 28 +++++++++---------- tests/integration/subscription/utils.go | 4 +-- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/tests/integration/subscription/subscription_test.go b/tests/integration/subscription/subscription_test.go index 108d228c37..4df570d857 100644 --- a/tests/integration/subscription/subscription_test.go +++ b/tests/integration/subscription/subscription_test.go @@ -20,7 +20,7 @@ func TestSubscriptionWithCreateMutations(t *testing.T) { test := testUtils.QueryTestCase{ Description: "Subscription with user creations", Query: `subscription { - user { + User { _key name age @@ -29,7 +29,7 @@ func TestSubscriptionWithCreateMutations(t *testing.T) { PostSubscriptionQueries: []testUtils.SubscriptionQuery{ { Query: `mutation { - create_user(data: "{\"name\": \"John\",\"age\": 27,\"points\": 42.1,\"verified\": true}") { + create_User(data: "{\"name\": \"John\",\"age\": 27,\"points\": 42.1,\"verified\": true}") { _key name age @@ -45,7 +45,7 @@ func TestSubscriptionWithCreateMutations(t *testing.T) { }, { Query: `mutation { - create_user(data: "{\"name\": \"Addo\",\"age\": 31,\"points\": 42.1,\"verified\": true}") { + create_User(data: "{\"name\": \"Addo\",\"age\": 31,\"points\": 42.1,\"verified\": true}") { _key name age @@ -70,7 +70,7 @@ func TestSubscriptionWithFilterAndOneCreateMutation(t *testing.T) { test := testUtils.QueryTestCase{ Description: "Subscription with filter and one user creation", Query: `subscription { - user(filter: {age: {_lt: 30}}) { + User(filter: {age: {_lt: 30}}) { _key name age @@ -79,7 +79,7 @@ func TestSubscriptionWithFilterAndOneCreateMutation(t *testing.T) { PostSubscriptionQueries: []testUtils.SubscriptionQuery{ { Query: `mutation { - create_user(data: "{\"name\": \"John\",\"age\": 27,\"points\": 42.1,\"verified\": true}") { + create_User(data: "{\"name\": \"John\",\"age\": 27,\"points\": 42.1,\"verified\": true}") { _key name age @@ -104,7 +104,7 @@ func TestSubscriptionWithFilterAndOneCreateMutationOutsideFilter(t *testing.T) { test := testUtils.QueryTestCase{ Description: "Subscription with filter and one user creation outside of the filter", Query: `subscription { - user(filter: {age: {_gt: 30}}) { + User(filter: {age: {_gt: 30}}) { _key name age @@ -113,7 +113,7 @@ func TestSubscriptionWithFilterAndOneCreateMutationOutsideFilter(t *testing.T) { PostSubscriptionQueries: []testUtils.SubscriptionQuery{ { Query: `mutation { - create_user(data: "{\"name\": \"John\",\"age\": 27,\"points\": 42.1,\"verified\": true}") { + create_User(data: "{\"name\": \"John\",\"age\": 27,\"points\": 42.1,\"verified\": true}") { _key name age @@ -132,7 +132,7 @@ func TestSubscriptionWithFilterAndCreateMutations(t *testing.T) { test := testUtils.QueryTestCase{ Description: "Subscription with filter and user creation in and outside of the filter", Query: `subscription { - user(filter: {age: {_lt: 30}}) { + User(filter: {age: {_lt: 30}}) { _key name age @@ -141,7 +141,7 @@ func TestSubscriptionWithFilterAndCreateMutations(t *testing.T) { PostSubscriptionQueries: []testUtils.SubscriptionQuery{ { Query: `mutation { - create_user(data: "{\"name\": \"John\",\"age\": 27,\"points\": 42.1,\"verified\": true}") { + create_User(data: "{\"name\": \"John\",\"age\": 27,\"points\": 42.1,\"verified\": true}") { _key name age @@ -157,7 +157,7 @@ func TestSubscriptionWithFilterAndCreateMutations(t *testing.T) { }, { Query: `mutation { - create_user(data: "{\"name\": \"Addo\",\"age\": 31,\"points\": 42.1,\"verified\": true}") { + create_User(data: "{\"name\": \"Addo\",\"age\": 31,\"points\": 42.1,\"verified\": true}") { _key name age @@ -176,7 +176,7 @@ func TestSubscriptionWithUpdateMutations(t *testing.T) { test := testUtils.QueryTestCase{ Description: "Subscription with user creations", Query: `subscription { - user { + User { _key name age @@ -202,7 +202,7 @@ func TestSubscriptionWithUpdateMutations(t *testing.T) { PostSubscriptionQueries: []testUtils.SubscriptionQuery{ { Query: `mutation { - update_user(filter: {name: {_eq: "John"}}, data: "{\"points\": 45}") { + update_User(filter: {name: {_eq: "John"}}, data: "{\"points\": 45}") { _key name age @@ -228,7 +228,7 @@ func TestSubscriptionWithUpdateAllMutations(t *testing.T) { test := testUtils.QueryTestCase{ Description: "Subscription with user creations", Query: `subscription { - user { + User { _key name age @@ -254,7 +254,7 @@ func TestSubscriptionWithUpdateAllMutations(t *testing.T) { PostSubscriptionQueries: []testUtils.SubscriptionQuery{ { Query: `mutation { - update_user(data: "{\"points\": 55}") { + update_User(data: "{\"points\": 55}") { _key name age diff --git a/tests/integration/subscription/utils.go b/tests/integration/subscription/utils.go index 315af1be6c..ef66b15b20 100644 --- a/tests/integration/subscription/utils.go +++ b/tests/integration/subscription/utils.go @@ -17,7 +17,7 @@ import ( ) var userSchema = (` - type user { + type User { name: String age: Int points: Float @@ -26,5 +26,5 @@ var userSchema = (` `) func executeTestCase(t *testing.T, test testUtils.QueryTestCase) { - testUtils.ExecuteQueryTestCase(t, userSchema, []string{"user"}, test) + testUtils.ExecuteQueryTestCase(t, userSchema, []string{"User"}, test) } From 24d575bae6d5a6847b4260fd97216e3679b96816 Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Sat, 12 Nov 2022 12:27:44 -0500 Subject: [PATCH 15/16] add comments for publisher --- events/publisher.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/events/publisher.go b/events/publisher.go index 69e2a32138..04354926be 100644 --- a/events/publisher.go +++ b/events/publisher.go @@ -15,12 +15,17 @@ import "time" // time limit we set for the client to read after publishing. const clientTimeout = 60 * time.Second +// Publisher hold a referance to the event channel, +// the associated subscription channel and the stream channel that +// returns data to the subscribed client type Publisher[T any] struct { ch Channel[T] event Subscription[T] stream chan any } +// NewPublisher creates a new Publisher with the given event Channel, subscribes to the +// event Channel and opens a new channel for the stream. func NewPublisher[T any](ch Channel[T]) (*Publisher[T], error) { evtCh, err := ch.Subscribe() if err != nil { @@ -34,14 +39,18 @@ func NewPublisher[T any](ch Channel[T]) (*Publisher[T], error) { }, nil } +// Event returns the subsciption channel func (p *Publisher[T]) Event() Subscription[T] { return p.event } +// Stream returns the streaming channel func (p *Publisher[T]) Stream() chan any { return p.stream } +// Publish sends data to the streaming channel and unsubscribes if +// the client hangs for too long. func (p *Publisher[T]) Publish(data any) { select { case p.stream <- data: @@ -52,6 +61,7 @@ func (p *Publisher[T]) Publish(data any) { } } +// Unsubscribe unsubscribes the client for the event channel and closes the stream. func (p *Publisher[T]) Unsubscribe() { p.ch.Unsubscribe(p.event) close(p.stream) From f608bee267cf22ac87d25b016d77a1d9f0b0abdf Mon Sep 17 00:00:00 2001 From: Fred Carle Date: Sat, 12 Nov 2022 12:50:58 -0500 Subject: [PATCH 16/16] add publisher test --- db/subscriptions.go | 2 +- events/publisher.go | 6 +-- events/publisher_test.go | 81 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 4 deletions(-) create mode 100644 events/publisher_test.go diff --git a/db/subscriptions.go b/db/subscriptions.go index 205112b22d..5012fcc911 100644 --- a/db/subscriptions.go +++ b/db/subscriptions.go @@ -31,7 +31,7 @@ func (db *db) checkForClientSubsciptions(r *request.Request) ( if len(r.Subscription) > 0 && len(r.Subscription[0].Selections) > 0 { s := r.Subscription[0].Selections[0] if subRequest, ok := s.(*request.ObjectSubscription); ok { - pub, err := events.NewPublisher(db.events.Updates.Value()) + pub, err := events.NewPublisher(db.events.Updates.Value(), 5) if err != nil { return nil, nil, err } diff --git a/events/publisher.go b/events/publisher.go index 04354926be..a39b96773e 100644 --- a/events/publisher.go +++ b/events/publisher.go @@ -13,7 +13,7 @@ package events import "time" // time limit we set for the client to read after publishing. -const clientTimeout = 60 * time.Second +var clientTimeout = 60 * time.Second // Publisher hold a referance to the event channel, // the associated subscription channel and the stream channel that @@ -26,7 +26,7 @@ type Publisher[T any] struct { // NewPublisher creates a new Publisher with the given event Channel, subscribes to the // event Channel and opens a new channel for the stream. -func NewPublisher[T any](ch Channel[T]) (*Publisher[T], error) { +func NewPublisher[T any](ch Channel[T], streamBufferSize int) (*Publisher[T], error) { evtCh, err := ch.Subscribe() if err != nil { return nil, err @@ -35,7 +35,7 @@ func NewPublisher[T any](ch Channel[T]) (*Publisher[T], error) { return &Publisher[T]{ ch: ch, event: evtCh, - stream: make(chan any), + stream: make(chan any, streamBufferSize), }, nil } diff --git a/events/publisher_test.go b/events/publisher_test.go new file mode 100644 index 0000000000..97ff7b6255 --- /dev/null +++ b/events/publisher_test.go @@ -0,0 +1,81 @@ +// Copyright 2022 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package events + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewPublisher(t *testing.T) { + ch := startEventChanel() + + pub, err := NewPublisher(ch, 0) + if err != nil { + t.Fatal(err) + } + assert.NotNil(t, pub) +} + +func TestNewPublisherWithError(t *testing.T) { + ch := startEventChanel() + ch.Close() + _, err := NewPublisher(ch, 0) + assert.Error(t, err) +} + +func TestPublisherToStream(t *testing.T) { + ch := startEventChanel() + + pub, err := NewPublisher(ch, 1) + if err != nil { + t.Fatal(err) + } + assert.NotNil(t, pub) + + ch.Publish(10) + evt := <-pub.Event() + assert.Equal(t, 10, evt) + + pub.Publish(evt) + assert.Equal(t, 10, <-pub.Stream()) + + pub.Unsubscribe() + + _, open := <-pub.Stream() + assert.Equal(t, false, open) +} + +func TestPublisherToStreamWithTimeout(t *testing.T) { + clientTimeout = 1 * time.Second + ch := startEventChanel() + + pub, err := NewPublisher(ch, 0) + if err != nil { + t.Fatal(err) + } + assert.NotNil(t, pub) + + ch.Publish(10) + evt := <-pub.Event() + assert.Equal(t, 10, evt) + + pub.Publish(evt) + + _, open := <-pub.Stream() + assert.Equal(t, false, open) +} + +func startEventChanel() Channel[int] { + return New[int](0, 0) +}