Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add GraphQL subscriptions #934

Merged
merged 16 commits into from
Nov 15, 2022
8 changes: 5 additions & 3 deletions api/http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ type handler struct {
options serverOptions
}

type ctxDB struct{}

type ctxPeerID struct{}
// context variables
type (
ctxDB struct{}
ctxPeerID struct{}
)

// DataResponse is the GQL top level object holding data for the response payload.
type DataResponse struct {
Expand Down
42 changes: 41 additions & 1 deletion api/http/handlerfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package http

import (
"encoding/json"
"fmt"
"io"
"mime"
"net/http"
Expand All @@ -23,7 +25,9 @@ import (
"github.com/multiformats/go-multihash"
"github.com/pkg/errors"

"github.com/sourcenetwork/defradb/client"
corecrdt "github.com/sourcenetwork/defradb/core/crdt"
"github.com/sourcenetwork/defradb/events"
)

const (
Expand Down Expand Up @@ -143,7 +147,12 @@ func execGQLHandler(rw http.ResponseWriter, req *http.Request) {
}
result := db.ExecQuery(req.Context(), query)

sendJSON(req.Context(), rw, result, http.StatusOK)
if result.Pub != nil {
subscriptionHandler(result.Pub, rw, req)
return
}

sendJSON(req.Context(), rw, result.GQL, http.StatusOK)
}

func loadSchemaHandler(rw http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -256,3 +265,34 @@ func peerIDHandler(rw http.ResponseWriter, req *http.Request) {
http.StatusOK,
)
}

func subscriptionHandler(pub *events.Publisher[client.UpdateEvent], rw http.ResponseWriter, req *http.Request) {
flusher, ok := rw.(http.Flusher)
if !ok {
handleErr(req.Context(), rw, errors.New("streaming unsupported"), http.StatusInternalServerError)
return
}

rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")

for {
select {
case <-req.Context().Done():
pub.Unsubscribe()
return
case s, open := <-pub.Stream():
if !open {
return
}
b, err := json.Marshal(s)
if err != nil {
handleErr(req.Context(), rw, err, http.StatusInternalServerError)
return
}
fmt.Fprintf(rw, "data: %s\n\n", b)
flusher.Flush()
}
}
}
107 changes: 106 additions & 1 deletion api/http/handlerfuncs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"

badger "github.com/dgraph-io/badger/v3"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -475,6 +477,78 @@ mutation {
assert.Contains(t, users[0].Key, "bae-")
}

func TestExecGQLHandlerWithSubsctiption(t *testing.T) {
ctx := context.Background()
defra := testNewInMemoryDB(t, ctx)

// load schema
testLoadSchema(t, ctx, defra)

stmt := `
subscription {
user {
_key
age
name
}
}`

buf := bytes.NewBuffer([]byte(stmt))

ch := make(chan []byte)
errCh := make(chan error)

wg := &sync.WaitGroup{}
wg.Add(1)

// We need to set a timeout otherwise the testSubscriptionRequest function will block until the
// http.ServeHTTP call returns, which in this case will only happen with a timeout.
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

go testSubscriptionRequest(ctxTimeout, testOptions{
Testing: t,
DB: defra,
Method: "POST",
Path: GraphQLPath,
Body: buf,
Headers: map[string]string{"Content-Type": contentTypeGraphQL},
ExpectedStatus: 200,
}, wg, ch, errCh)

// We wait as long as possible before sending the mutation request.
wg.Wait()

// add document
stmt2 := `
mutation {
create_user(data: "{\"age\": 31, \"verified\": true, \"points\": 90, \"name\": \"Bob\"}") {
_key
}
}`

buf2 := bytes.NewBuffer([]byte(stmt2))
users := []testUser{}
resp := DataResponse{
Data: &users,
}
testRequest(testOptions{
Testing: t,
DB: defra,
Method: "POST",
Path: GraphQLPath,
Body: buf2,
ExpectedStatus: 200,
ResponseData: &resp,
})
select {
case data := <-ch:
assert.Contains(t, string(data), "bae-91171025-ed21-50e3-b0dc-e31bccdfa1ab")
case err := <-errCh:
t.Fatal(err)
}
}

func TestLoadSchemaHandlerWithReadBodyError(t *testing.T) {
t.Cleanup(CleanupEnv)
env = "dev"
Expand Down Expand Up @@ -848,6 +922,34 @@ func testRequest(opt testOptions) {
}
}

func testSubscriptionRequest(ctx context.Context, opt testOptions, wg *sync.WaitGroup, ch chan []byte, errCh chan error) {
req, err := http.NewRequest(opt.Method, opt.Path, opt.Body)
if err != nil {
errCh <- err
return
}

req = req.WithContext(ctx)

for k, v := range opt.Headers {
req.Header.Set(k, v)
}

h := newHandler(opt.DB, opt.ServerOptions)
rec := httptest.NewRecorder()
wg.Done()
h.ServeHTTP(rec, req)
assert.Equal(opt.Testing, opt.ExpectedStatus, rec.Result().StatusCode)

respBody, err := io.ReadAll(rec.Result().Body)
if err != nil {
errCh <- err
return
}

ch <- respBody
}

func testNewInMemoryDB(t *testing.T, ctx context.Context) client.DB {
// init in memory DB
opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)}
Expand All @@ -856,12 +958,15 @@ func testNewInMemoryDB(t *testing.T, ctx context.Context) client.DB {
t.Fatal(err)
}

var options []db.Option
options := []db.Option{
db.WithUpdateEvents(),
}

defra, err := db.NewDB(ctx, rootstore, options...)
if err != nil {
t.Fatal(err)
}

return defra
}

Expand Down
16 changes: 8 additions & 8 deletions api/http/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package http

import (
"net/http"
"strconv"
"time"

"github.com/sourcenetwork/defradb/logging"
Expand All @@ -33,20 +32,21 @@ func newLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter {
}
}

func (lrw *loggingResponseWriter) Flush() {
lrw.ResponseWriter.(http.Flusher).Flush()
}

func (lrw *loggingResponseWriter) Header() http.Header {
return lrw.ResponseWriter.Header()
}

func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.statusCode = code
lrw.ResponseWriter.WriteHeader(code)
}

func (lrw *loggingResponseWriter) Write(b []byte) (int, error) {
// used for chucked payloads. Content-Length should not be set
// for each chunk.
if lrw.ResponseWriter.Header().Get("Content-Length") != "" {
return lrw.ResponseWriter.Write(b)
}

lrw.contentLength = len(b)
lrw.ResponseWriter.Header().Set("Content-Length", strconv.Itoa(lrw.contentLength))
return lrw.ResponseWriter.Write(b)
}

Expand Down
16 changes: 11 additions & 5 deletions api/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"net/http"
"path"
"strings"
"time"

"golang.org/x/crypto/acme/autocert"

Expand All @@ -28,11 +27,17 @@ import (
)

const (
// these constants are best effort durations that fit our current API
// These constants are best effort durations that fit our current API
// and possibly prevent from running out of file descriptors.
readTimeout = 5 * time.Second
writeTimeout = 10 * time.Second
idleTimeout = 120 * time.Second
// readTimeout = 5 * time.Second
// writeTimeout = 10 * time.Second
// idleTimeout = 120 * time.Second

// Temparily disabling timeouts until [this proposal](https://github.com/golang/go/issues/54136) is merged.
// https://github.com/sourcenetwork/defradb/issues/927
readTimeout = 0
writeTimeout = 0
idleTimeout = 0
)

// Server struct holds the Handler for the HTTP API.
Expand Down Expand Up @@ -256,6 +261,7 @@ func (s *Server) Run(ctx context.Context) error {
if s.listener == nil {
return errNoListener
}

if s.certManager != nil {
// When using TLS it's important to redirect http requests to https
go func() {
Expand Down
7 changes: 2 additions & 5 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,8 @@ func start(ctx context.Context) (*defraInstance, error) {
return nil, errors.Wrap("failed to open datastore", err)
}

var options []db.Option

// check for p2p
if !cfg.Net.P2PDisabled {
options = append(options, db.WithUpdateEvents())
options := []db.Option{
db.WithUpdateEvents(),
}

db, err := db.NewDB(ctx, rootstore, options...)
Expand Down
8 changes: 7 additions & 1 deletion client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore"

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/events"
)

type DB interface {
Expand All @@ -40,7 +41,12 @@ type DB interface {
PrintDump(ctx context.Context) error
}

type QueryResult struct {
type GQLResult struct {
Errors []any `json:"errors,omitempty"`
Data any `json:"data"`
}

type QueryResult struct {
GQL GQLResult
Pub *events.Publisher[UpdateEvent]
}
5 changes: 3 additions & 2 deletions client/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
package request

type Request struct {
Queries []*OperationDefinition
Mutations []*OperationDefinition
Queries []*OperationDefinition
Mutations []*OperationDefinition
Subscription []*OperationDefinition
}

type Selection any
Expand Down
44 changes: 44 additions & 0 deletions client/request/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2022 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package request

import (
"github.com/sourcenetwork/defradb/client"
)

// ObjectSubscription is a field on the SubscriptionType
// of a graphql query. It includes all the possible
// arguments
type ObjectSubscription struct {
fredcarle marked this conversation as resolved.
Show resolved Hide resolved
Field

// Schema is the target schema/collection
Schema string
fredcarle marked this conversation as resolved.
Show resolved Hide resolved

Filter client.Option[Filter]

Fields []Selection
}

// ToSelect returns a basic Select object, with the same Name, Alias, and Fields as
// the Subscription object. Used to create a Select planNode for the event stream return objects.
func (m ObjectSubscription) ToSelect(docKey, cid string) *Select {
return &Select{
Field: Field{
Name: m.Schema,
Alias: m.Alias,
},
DocKeys: client.Some([]string{docKey}),
CID: client.Some(cid),
Fields: m.Fields,
Filter: m.Filter,
}
}
Loading