Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add GraphQL subscriptions #934

Merged
merged 16 commits into from
Nov 15, 2022
8 changes: 5 additions & 3 deletions api/http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ type handler struct {
options serverOptions
}

type ctxDB struct{}

type ctxPeerID struct{}
// context variables
type (
ctxDB struct{}
ctxPeerID struct{}
)

// DataResponse is the GQL top level object holding data for the response payload.
type DataResponse struct {
Expand Down
42 changes: 41 additions & 1 deletion api/http/handlerfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package http

import (
"encoding/json"
"fmt"
"io"
"mime"
"net/http"
Expand All @@ -23,7 +25,9 @@ import (
"github.com/multiformats/go-multihash"
"github.com/pkg/errors"

"github.com/sourcenetwork/defradb/client"
corecrdt "github.com/sourcenetwork/defradb/core/crdt"
"github.com/sourcenetwork/defradb/events"
)

const (
Expand Down Expand Up @@ -143,7 +147,12 @@ func execGQLHandler(rw http.ResponseWriter, req *http.Request) {
}
result := db.ExecQuery(req.Context(), query)

sendJSON(req.Context(), rw, result, http.StatusOK)
if result.Pub != nil {
subscriptionHandler(result.Pub, rw, req)
return
}

sendJSON(req.Context(), rw, result.GQL, http.StatusOK)
}

func loadSchemaHandler(rw http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -256,3 +265,34 @@ func peerIDHandler(rw http.ResponseWriter, req *http.Request) {
http.StatusOK,
)
}

func subscriptionHandler(pub *events.Publisher[client.UpdateEvent], rw http.ResponseWriter, req *http.Request) {
flusher, ok := rw.(http.Flusher)
if !ok {
handleErr(req.Context(), rw, errors.New("streaming unsupported"), http.StatusInternalServerError)
return
}

rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")

for {
select {
case <-req.Context().Done():
pub.Unsubscribe()
return
case s, open := <-pub.Stream():
if !open {
return
}
b, err := json.Marshal(s)
if err != nil {
handleErr(req.Context(), rw, err, http.StatusInternalServerError)
return
}
fmt.Fprintf(rw, "data: %s\n\n", b)
flusher.Flush()
}
}
}
107 changes: 106 additions & 1 deletion api/http/handlerfuncs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"

badger "github.com/dgraph-io/badger/v3"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -475,6 +477,78 @@ mutation {
assert.Contains(t, users[0].Key, "bae-")
}

func TestExecGQLHandlerWithSubsctiption(t *testing.T) {
ctx := context.Background()
defra := testNewInMemoryDB(t, ctx)

// load schema
testLoadSchema(t, ctx, defra)

stmt := `
subscription {
user {
_key
age
name
}
}`

buf := bytes.NewBuffer([]byte(stmt))

ch := make(chan []byte)
errCh := make(chan error)

wg := &sync.WaitGroup{}
wg.Add(1)

// We need to set a timeout otherwise the testSubscriptionRequest function will block until the
// http.ServeHTTP call returns, which in this case will only happen with a timeout.
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()

go testSubscriptionRequest(ctxTimeout, testOptions{
Testing: t,
DB: defra,
Method: "POST",
Path: GraphQLPath,
Body: buf,
Headers: map[string]string{"Content-Type": contentTypeGraphQL},
ExpectedStatus: 200,
}, wg, ch, errCh)

// We wait as long as possible before sending the mutation request.
wg.Wait()

// add document
stmt2 := `
mutation {
create_user(data: "{\"age\": 31, \"verified\": true, \"points\": 90, \"name\": \"Bob\"}") {
_key
}
}`

buf2 := bytes.NewBuffer([]byte(stmt2))
users := []testUser{}
resp := DataResponse{
Data: &users,
}
testRequest(testOptions{
Testing: t,
DB: defra,
Method: "POST",
Path: GraphQLPath,
Body: buf2,
ExpectedStatus: 200,
ResponseData: &resp,
})
select {
case data := <-ch:
assert.Contains(t, string(data), "bae-91171025-ed21-50e3-b0dc-e31bccdfa1ab")
case err := <-errCh:
t.Fatal(err)
}
}

func TestLoadSchemaHandlerWithReadBodyError(t *testing.T) {
t.Cleanup(CleanupEnv)
env = "dev"
Expand Down Expand Up @@ -848,6 +922,34 @@ func testRequest(opt testOptions) {
}
}

func testSubscriptionRequest(ctx context.Context, opt testOptions, wg *sync.WaitGroup, ch chan []byte, errCh chan error) {
req, err := http.NewRequest(opt.Method, opt.Path, opt.Body)
if err != nil {
errCh <- err
return
}

req = req.WithContext(ctx)

for k, v := range opt.Headers {
req.Header.Set(k, v)
}

h := newHandler(opt.DB, opt.ServerOptions)
rec := httptest.NewRecorder()
wg.Done()
h.ServeHTTP(rec, req)
assert.Equal(opt.Testing, opt.ExpectedStatus, rec.Result().StatusCode)

respBody, err := io.ReadAll(rec.Result().Body)
if err != nil {
errCh <- err
return
}

ch <- respBody
}

func testNewInMemoryDB(t *testing.T, ctx context.Context) client.DB {
// init in memory DB
opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)}
Expand All @@ -856,12 +958,15 @@ func testNewInMemoryDB(t *testing.T, ctx context.Context) client.DB {
t.Fatal(err)
}

var options []db.Option
options := []db.Option{
db.WithUpdateEvents(),
}

defra, err := db.NewDB(ctx, rootstore, options...)
if err != nil {
t.Fatal(err)
}

return defra
}

Expand Down
16 changes: 8 additions & 8 deletions api/http/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package http

import (
"net/http"
"strconv"
"time"

"github.com/sourcenetwork/defradb/logging"
Expand All @@ -33,20 +32,21 @@ func newLoggingResponseWriter(w http.ResponseWriter) *loggingResponseWriter {
}
}

func (lrw *loggingResponseWriter) Flush() {
lrw.ResponseWriter.(http.Flusher).Flush()
}

func (lrw *loggingResponseWriter) Header() http.Header {
return lrw.ResponseWriter.Header()
}

func (lrw *loggingResponseWriter) WriteHeader(code int) {
lrw.statusCode = code
lrw.ResponseWriter.WriteHeader(code)
}

func (lrw *loggingResponseWriter) Write(b []byte) (int, error) {
// used for chucked payloads. Content-Length should not be set
// for each chunk.
if lrw.ResponseWriter.Header().Get("Content-Length") != "" {
return lrw.ResponseWriter.Write(b)
}

lrw.contentLength = len(b)
lrw.ResponseWriter.Header().Set("Content-Length", strconv.Itoa(lrw.contentLength))
return lrw.ResponseWriter.Write(b)
}

Expand Down
16 changes: 11 additions & 5 deletions api/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"net/http"
"path"
"strings"
"time"

"golang.org/x/crypto/acme/autocert"

Expand All @@ -28,11 +27,17 @@ import (
)

const (
// these constants are best effort durations that fit our current API
// These constants are best effort durations that fit our current API
// and possibly prevent from running out of file descriptors.
readTimeout = 5 * time.Second
writeTimeout = 10 * time.Second
idleTimeout = 120 * time.Second
// readTimeout = 5 * time.Second
// writeTimeout = 10 * time.Second
// idleTimeout = 120 * time.Second

// Temparily disabling timeouts until [this proposal](https://github.com/golang/go/issues/54136) is merged.
// https://github.com/sourcenetwork/defradb/issues/927
readTimeout = 0
writeTimeout = 0
idleTimeout = 0
)

// Server struct holds the Handler for the HTTP API.
Expand Down Expand Up @@ -256,6 +261,7 @@ func (s *Server) Run(ctx context.Context) error {
if s.listener == nil {
return errNoListener
}

if s.certManager != nil {
// When using TLS it's important to redirect http requests to https
go func() {
Expand Down
7 changes: 2 additions & 5 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,8 @@ func start(ctx context.Context) (*defraInstance, error) {
return nil, errors.Wrap("failed to open datastore", err)
}

var options []db.Option

// check for p2p
if !cfg.Net.P2PDisabled {
options = append(options, db.WithUpdateEvents())
options := []db.Option{
db.WithUpdateEvents(),
}

db, err := db.NewDB(ctx, rootstore, options...)
Expand Down
8 changes: 7 additions & 1 deletion client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore"

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/events"
)

type DB interface {
Expand All @@ -40,7 +41,12 @@ type DB interface {
PrintDump(ctx context.Context) error
}

type QueryResult struct {
type GQLResult struct {
Errors []any `json:"errors,omitempty"`
Data any `json:"data"`
}

type QueryResult struct {
GQL GQLResult
Pub *events.Publisher[UpdateEvent]
}
6 changes: 3 additions & 3 deletions client/request/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ type Mutation struct {
Field
Type MutationType

// Schema is the target schema/collection
// Collection is the target collection name
// if this mutation is on an object.
Schema string
Collection string

IDs client.Option[[]string]
Filter client.Option[Filter]
Expand All @@ -47,7 +47,7 @@ type Mutation struct {
func (m Mutation) ToSelect() *Select {
return &Select{
Field: Field{
Name: m.Schema,
Name: m.Collection,
Alias: m.Alias,
},
Fields: m.Fields,
Expand Down
5 changes: 3 additions & 2 deletions client/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
package request

type Request struct {
Queries []*OperationDefinition
Mutations []*OperationDefinition
Queries []*OperationDefinition
Mutations []*OperationDefinition
Subscription []*OperationDefinition
}

type Selection any
Expand Down
Loading