Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
orpheuslummis committed Jun 27, 2022
1 parent 10f5fb0 commit 542df2d
Show file tree
Hide file tree
Showing 56 changed files with 245 additions and 276 deletions.
26 changes: 13 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,25 +282,25 @@ This only scratches the surface of the DefraDB Query Language, see below for the

You can access the official DefraDB Query Language documentation online here: [https://hackmd.io/@source/BksQY6Qfw](https://hackmd.io/@source/BksQY6Qfw)

## Peer-to-Peer Data Syncronization
## Peer-to-Peer Data Synchronization
DefraDB has a native P2P network builtin to each node, allowing them to exchange, synchronize, and replicate documents and commits.

The P2P network uses a combination of server to server gRPC commands, gossip based pub-sub network, and a shared Distributed Hash Table, all powered by
[LibP2P](https://libp2p.io/).

Unless specifying `--no-p2p` option when running `start` the default behaviour for a DefraDB node is to intialize the P2P network stack.
Unless specifying `--no-p2p` option when running `start` the default behaviour for a DefraDB node is to initialize the P2P network stack.

When you start a node for the first time, DefraDB will auto generate a private key pair and store it in the `data` folder specified in the config or `--data` CLI option. Each node has a unique `Peer ID` generated based on the public key, which is printed to the console during startup.

You'll see a printed line: `Created LibP2P host with Peer ID XXX` where `XXX` is your nodes `Peer ID`. This is important to know if we want other nodes to connect to this node.
You'll see a printed line: `Created LibP2P host with Peer ID XXX` where `XXX` is your node's `Peer ID`. This is important to know if we want other nodes to connect to this node.

There are two types of relationships a given DefraDB node can establish with another peer, which is a pubsub peer or a replicator peer.

Pubsub peers can be specified on the command line with `--peers` which accepts a comma seperated list of peer [MultiAddress](https://docs.libp2p.io/concepts/addressing/). Which take the form of `/ip4/IP_ADDRESS/tcp/PORT/p2p/PEER_ID`.
Pubsub peers can be specified on the command line with `--peers` which accepts a comma-separated list of peer [MultiAddress](https://docs.libp2p.io/concepts/addressing/). Which takes the form of `/ip4/IP_ADDRESS/tcp/PORT/p2p/PEER_ID`.

> If a node is listening on port 9000 with the IP address `192.168.1.12` and a Peer ID of `12D3KooWNXm3dmrwCYSxGoRUyZstaKYiHPdt8uZH5vgVaEJyzU8B` then the fully quantified multi address is `/ip4/192.168.1.12/tcp/9000/p2p/12D3KooWNXm3dmrwCYSxGoRUyZstaKYiHPdt8uZH5vgVaEJyzU8B`.
Pubsub nodes *passively* synchronize data between nodes by broadcasting Document Commit updates over the pubsub channel with the document `DocKey` as the topic. This requires nodes to already be listening on this pubsub channel to recieve updates for. This is used when two nodes *already* have a shared document, and want to keep both their changes in sync with one another.
Pubsub nodes *passively* synchronize data between nodes by broadcasting Document Commit updates over the pubsub channel with the document `DocKey` as the topic. This requires nodes to already be listening on this pubsub channel to receive updates for. This is used when two nodes *already* have a shared document, and want to keep both their changes in sync with one another.

Replicator nodes are specified using the CLI `rpc` command after a node has already started with `defradb rpc add-replicator <collection> <peer_multiaddress>`.

Expand All @@ -310,38 +310,38 @@ Replicator nodes *actively* push changes from the specific collection *to* the t
### PubSub Example

Lets construct a simple example of two nodes (node1 & node2) connecting to one another over the pubsub network on the same machine.
Let's construct a simple example of two nodes (node1 & node2) connecting to one another over the pubsub network on the same machine.

On Node1 start a regular node with all the defaults:
```
defradb start
```

Make sure to get the `Peer ID` from the console output. Lets assume its `12D3KooWNXm3dmrwCYSxGoRUyZstaKYiHPdt8uZH5vgVaEJyzU8B`.
Make sure to get the `Peer ID` from the console output. Let's assume its `12D3KooWNXm3dmrwCYSxGoRUyZstaKYiHPdt8uZH5vgVaEJyzU8B`.

One Node2 we need to change some of the default config options if we are running on the same machine.
```
defradb start --data $HOME/.defradb/data-node2 --p2paddr /ip4/0.0.0.0/tcp/9172 --url localhost:9182 --peers /ip4/0.0.0.0/tcp/9171/p2p/12D3KooWNXm3dmrwCYSxGoRUyZstaKYiHPdt8uZH5vgVaEJyzU8B
```

Lets break this down
Let's break this down
- `--data` specifies the data folder
- `--p2paddr` is the multiaddress to listen on for the p2p network (default is port 9171)
- `--url` is the HTTP address to listen on for the client HTTP and GraphQL API.
- `--peers` is a comma-sperated list of peer multiaddresses. This will be our first node we started, with the default config options.
- `--peers` is a comma-separated list of peer multiaddresses. This will be our first node we started, with the default config options.

This will startup two nodes, connect to eachother, and establish the P2P gossib pubsub network.
This will startup two nodes, connect to each other, and establish the P2P gossib pubsub network.

### Replicator Example

Lets construct a simple example of Node1 *replicating* to Node2.
Let's construct a simple example of Node1 *replicating* to Node2.

Node1 is the leader, lets startup the node **and** define a collection.
Node1 is the leader, let's startup the node **and** define a collection.
```
defradb start
```

On Node2 lets startup a node
On Node2 let's startup a node
```
defradb start --data $HOME/.defradb/data-node2 --p2paddr /ip4/0.0.0.0/tcp/9172 --url localhost:9182
```
Expand Down
2 changes: 1 addition & 1 deletion api/http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (h *handler) handle(f http.HandlerFunc) http.HandlerFunc {
func getJSON(req *http.Request, v interface{}) error {
err := json.NewDecoder(req.Body).Decode(v)
if err != nil {
return errors.Wrap(err, "unmarshall error")
return errors.Wrap(err, "unmarshal error")
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion api/http/handlerfuncs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func TestExecGQLHandlerContentTypeJSONWithJSONError(t *testing.T) {
assert.Contains(t, errResponse.Errors[0].Extensions.Stack, "invalid character")
assert.Equal(t, http.StatusBadRequest, errResponse.Errors[0].Extensions.Status)
assert.Equal(t, "Bad Request", errResponse.Errors[0].Extensions.HTTPError)
assert.Equal(t, "unmarshall error: invalid character ':' after array element", errResponse.Errors[0].Message)
assert.Equal(t, "unmarshal error: invalid character ':' after array element", errResponse.Errors[0].Message)
}

func TestExecGQLHandlerContentTypeJSON(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions api/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

/*
Package http provides DefraDB's HTTP API, offering various capabilities.
*/
package http

import "github.com/sourcenetwork/defradb/logging"
Expand Down
2 changes: 1 addition & 1 deletion api/http/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func setRoutes(h *handler) *handler {
return h
}

// JoinPaths takes a base path and any number of additionnal paths
// JoinPaths takes a base path and any number of additional paths
// and combines them safely to form a full URL path.
// The base must start with a http or https.
func JoinPaths(base string, paths ...string) (*url.URL, error) {
Expand Down
4 changes: 2 additions & 2 deletions api/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/sourcenetwork/defradb/client"
)

// The Server struct holds the Handler for the HTTP API
// Server struct holds the Handler for the HTTP API.
type Server struct {
options serverOptions
http.Server
Expand All @@ -26,7 +26,7 @@ type serverOptions struct {
allowedOrigins []string
}

// NewServer instantiated a new server with the given http.Handler.
// NewServer instantiates a new server with the given http.Handler.
func NewServer(db client.DB, options ...func(*Server)) *Server {
svr := &Server{}

Expand Down
3 changes: 3 additions & 0 deletions cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

/*
Package cli provides the command-line interface.
*/
package cli

import (
Expand Down
4 changes: 2 additions & 2 deletions client/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type Collection interface {
//
// Target can be a Filter statement, a single docKey, a single document,
// an array of docKeys, or an array of documents.
// It is recommened to use the respective typed versions of Update
// It is recommend to use the respective typed versions of Update
// (e.g. UpdateWithFilter or UpdateWithKey) over this function if you can.
//
// Returns an ErrInvalidUpdateTarget error if the target type is not supported.
Expand Down Expand Up @@ -107,7 +107,7 @@ type Collection interface {
// DeleteWith deletes a target document.
//
// Target can be a Filter statement, a single docKey, a single document, an array of docKeys,
// or an array of documents. It is recommened to use the respective typed versions of Delete
// or an array of documents. It is recommend to use the respective typed versions of Delete
// (e.g. DeleteWithFilter or DeleteWithKey) over this function if you can.
// This operation will hard-delete all state relating to the given DocKey. This includes data, block, and head storage.
//
Expand Down
7 changes: 3 additions & 4 deletions client/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
// licenses/APL.txt.

/*
The client package provides public members for interacting with a Defra DB instance.
Package client provides public members for interacting with a Defra DB instance.
Only calls made via the `DB` and `Collection` interfaces interact with the underlying datastores.
Currently the only provided implementation of `DB` is found in the `defra/db` package and can be
instantiated via the `NewDB` function.
Only calls made via the `DB` and `Collection` interfaces interact with the underlying datastores. Currently the only
provided implementation of `DB` is found in the `defra/db` package and can be instantiated via the `NewDB` function.
*/
package client
3 changes: 1 addition & 2 deletions client/dockey.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ type DocKey struct {
}

// NewDocKeyV0 creates a new doc key identified by the root data CID, peer ID, and
// namespaced by the versionNS
// TODO: Parameterize namespace Version
// namespaced by the versionNS.
func NewDocKeyV0(dataCID cid.Cid) DocKey {
return DocKey{
version: v0,
Expand Down
3 changes: 3 additions & 0 deletions cmd/genclidocs/genclidocs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

/*
genclidocs is a tool to generate the command line interface documentation.
*/
package main

import (
Expand Down
6 changes: 3 additions & 3 deletions core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ type CompositeDAGDelta struct {
SubDAGs []core.DAGLink
}

// GetPriority gets the current priority for this delta
// GetPriority gets the current priority for this delta.
func (delta *CompositeDAGDelta) GetPriority() uint64 {
return delta.Priority
}

// SetPriority will set the priority for this delta
// SetPriority will set the priority for this delta.
func (delta *CompositeDAGDelta) SetPriority(prio uint64) {
delta.Priority = prio
}
Expand Down Expand Up @@ -130,7 +130,7 @@ func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta, id string) er

// DeltaDecode is a typed helper to extract
// a LWWRegDelta from a ipld.Node
// for now lets do cbor (quick to implement)
// for now let's do cbor (quick to implement)
func (c CompositeDAG) DeltaDecode(node ipld.Node) (core.Delta, error) {
delta := &CompositeDAGDelta{}
pbNode, ok := node.(*dag.ProtoNode)
Expand Down
32 changes: 15 additions & 17 deletions core/crdt/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,22 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package crdt

// Conflict-Free Replicated Data Types (CRDT)
// are a data structure which can be replicated across multiple computers in a network,
// where the replicas can be updated independently and concurrently without coordination
// between the replicas and are able to deterministically converge to the same state.
/*
Package crdt implements a collection of CRDT types specifically to be used in DefraDB, and use the Delta-State CRDT
architecture to update and replicate state. It is based on the go Merkle-CRDT project.
// This package implements a collection of CRDT types specifically to be used in DefraDB,
// and use the Delta-State CRDT architecture to update and replicate state. It is based on
// the go Merkle-CRDT project
Conflict-Free Replicated Data Types (CRDT) are a data structure which can be replicated across multiple computers in a
network, where the replicas can be updated independently and concurrently without coordination between the replicas and
are able to deterministically converge to the same state.
// The CRDTs shall satisfy the ReplicatedData interface which is a single merge function
// which given two states of the same data type will merge into a single state.
The CRDTs shall satisfy the ReplicatedData interface which is a single merge function which given two states of the
same data type will merge into a single state.
// Unless the explicitly enabling the entire state to be fully loaded into memory as an object,
// all data will reside inside the BadgerDB datastore.
Unless explicitly enabling the entire state to be fully loaded into memory as an object, all data will reside inside
the BadgerDB datastore.
// In general, each CRDT type will be implemented independent, and oblivious to its underlying
// datastore, and to how it will be structured as Merkle-CRDT. Instead they will focus on their
// core semantics and implementation and will be wrapped in handlers to ensure state persistence
// to DBs, DAG creation, and replication to peers.
In general, each CRDT type will be implemented independently, and oblivious to its underlying datastore, and to how it
will be structured as Merkle-CRDT. Instead they will focus on their core semantics and implementation and will be
wrapped in handlers to ensure state persistence to DBs, DAG creation, and replication to peers.
*/
package crdt
17 changes: 8 additions & 9 deletions core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ type LWWRegDelta struct {
DocKey []byte
}

// GetPriority gets the current priority for this delta
// GetPriority gets the current priority for this delta.
func (delta *LWWRegDelta) GetPriority() uint64 {
return delta.Priority
}

// SetPriority will set the priority for this delta
// SetPriority will set the priority for this delta.
func (delta *LWWRegDelta) SetPriority(prio uint64) {
delta.Priority = prio
}

// Marshal encodes the delta using CBOR
// for now lets do cbor (quick to implement)
// Marshal encodes the delta using CBOR.
// for now le'ts do cbor (quick to implement)
func (delta *LWWRegDelta) Marshal() ([]byte, error) {
h := &codec.CborHandle{}
buf := bytes.NewBuffer(nil)
Expand All @@ -74,14 +74,13 @@ func (delta *LWWRegDelta) Value() interface{} {
return delta.Data
}

// LWWRegister Last-Writer-Wins Register
// a simple CRDT type that allows set/get of an
// arbitrary data type that ensures convergence
// LWWRegister, Last-Writer-Wins Register, is a simple CRDT type that allows set/get
// of an arbitrary data type that ensures convergence.
type LWWRegister struct {
baseCRDT
}

// NewLWWRegister returns a new instance of the LWWReg with the given ID
// NewLWWRegister returns a new instance of the LWWReg with the given ID.
func NewLWWRegister(store datastore.DSReaderWriter, key core.DataStoreKey) LWWRegister {
return LWWRegister{
baseCRDT: newBaseCRDT(store, key),
Expand Down Expand Up @@ -171,7 +170,7 @@ func (reg LWWRegister) setValue(ctx context.Context, val []byte, priority uint64

// DeltaDecode is a typed helper to extract
// a LWWRegDelta from a ipld.Node
// for now lets do cbor (quick to implement)
// for now let's do cbor (quick to implement)
func (reg LWWRegister) DeltaDecode(node ipld.Node) (core.Delta, error) {
delta := &LWWRegDelta{}
pbNode, ok := node.(*dag.ProtoNode)
Expand Down
26 changes: 13 additions & 13 deletions core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ package core

import "strings"

// Span is a range of keys from [Start, End)
// Span is a range of keys from [Start, End).
type Span interface {
// Start returns the starting key of the Span
// Start returns the starting key of the Span.
Start() DataStoreKey
// End returns the ending key of the Span
// End returns the ending key of the Span.
End() DataStoreKey
// Contains returns true of the Span contains the provided Span's range
// Contains returns true of the Span contains the provided Span's range.
Contains(Span) bool
// Equal returns true if the provided Span is equal to the current
// Equal returns true if the provided Span is equal to the current.
Equal(Span) bool
// Compare returns -1 if the provided span is less, 0 if it is equal, and 1 if its greater
// Compare returns -1 if the provided span is less, 0 if it is equal, and 1 if its greater.
Compare(Span) SpanComparisonResult
}

Expand All @@ -39,22 +39,22 @@ func NewSpan(start, end DataStoreKey) Span {
}
}

// Start returns the starting key of the Span
// Start returns the starting key of the Span.
func (s span) Start() DataStoreKey {
return s.start
}

// End returns the ending key of the Span
// End returns the ending key of the Span.
func (s span) End() DataStoreKey {
return s.end
}

// Contains returns true of the Span contains the provided Span's range
// Contains returns true of the Span contains the provided Span's range.
func (s span) Contains(s2 Span) bool {
panic("not implemented") // TODO: Implement
}

// Equal returns true if the provided Span is equal to the current
// Equal returns true if the provided Span is equal to the current.
func (s span) Equal(s2 Span) bool {
panic("not implemented") // TODO: Implement
}
Expand Down Expand Up @@ -153,10 +153,10 @@ func isAdjacent(this DataStoreKey, other DataStoreKey) bool {
this.ToString() == other.PrefixEnd().ToString())
}

// Spans is a collection of individual spans
// Spans is a collection of individual spans.
type Spans []Span

// KeyValue is a KV store response containing the resulting core.Key and byte array value
// KeyValue is a KV store response containing the resulting core.Key and byte array value.
type KeyValue struct {
Key DataStoreKey
Value []byte
Expand Down Expand Up @@ -237,7 +237,7 @@ func (spans Spans) MergeAscending() Spans {
}

// Removes any items from the collection (given index onwards) who's end key is smaller
// than the given value. The returned collection will be a different instance to the given
// than the given value. The returned collection will be a different instance to the given
// and the given collection will not be mutated.
func (spans Spans) removeBefore(startIndex int, end string) Spans {
indexOfLastMatchingItem := -1
Expand Down
Loading

0 comments on commit 542df2d

Please sign in to comment.