Skip to content

Commit b049a7c

Browse files
committed
Implement the core federation protocol
First pass at implementing the IPNI federation protocol: * core data structures/functionality * periodic snapshot * basic vector clock reconciliation * HTTP serve mux for `/ipni/v1/fed`
1 parent 885bd54 commit b049a7c

10 files changed

+1115
-0
lines changed

federation/federation.go

+411
Large diffs are not rendered by default.

federation/federation_test.go

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package federation_test
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/libp2p/go-libp2p"
9+
"github.com/libp2p/go-libp2p/core/peer"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestFederationStandalone(t *testing.T) {
14+
provider, err := libp2p.New()
15+
require.NoError(t, err)
16+
17+
subject := newTestFederationMember(t)
18+
require.NoError(t, subject.Start(context.Background()))
19+
20+
madeUpAdCid := requireRandomCid(t)
21+
require.NoError(t, subject.ingester.MarkAdProcessed(provider.ID(), madeUpAdCid))
22+
providerAddrInfo := peer.AddrInfo{
23+
ID: provider.ID(),
24+
Addrs: provider.Addrs(),
25+
}
26+
require.NoError(t, subject.registry.Update(context.TODO(), providerAddrInfo, providerAddrInfo, madeUpAdCid, nil, 0))
27+
28+
head := subject.requireHeadEventually(t, 10*time.Second, time.Second)
29+
snapshot := subject.requireSnapshot(t, head.Head)
30+
31+
require.True(t, snapshot.Epoch > 0)
32+
require.True(t, snapshot.VectorClock > 0)
33+
require.Len(t, snapshot.Providers.Keys, 1)
34+
require.Len(t, snapshot.Providers.Values, 1)
35+
status, found := snapshot.Providers.Get(provider.ID().String())
36+
require.True(t, found)
37+
require.Equal(t, madeUpAdCid.String(), status.LastAdvertisement.String())
38+
require.Nil(t, status.HeightHint)
39+
40+
require.NoError(t, subject.Shutdown(context.TODO()))
41+
}

federation/handlers.go

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package federation
2+
3+
import (
4+
"errors"
5+
"net/http"
6+
"path"
7+
8+
"github.com/ipfs/go-cid"
9+
"github.com/ipfs/go-datastore"
10+
"github.com/ipld/go-ipld-prime"
11+
"github.com/ipld/go-ipld-prime/codec/dagjson"
12+
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
13+
"github.com/ipld/go-ipld-prime/multicodec"
14+
"github.com/ipld/go-ipld-prime/schema"
15+
)
16+
17+
func (f *Federation) handleV1FedHead(w http.ResponseWriter, r *http.Request) {
18+
if r.Method != http.MethodGet {
19+
w.Header().Set("Allow", http.MethodGet)
20+
http.Error(w, "", http.StatusMethodNotAllowed)
21+
return
22+
}
23+
24+
switch headNode, err := f.getHeadNode(r.Context()); {
25+
case err != nil:
26+
logger.Errorw("Failed to load head snapshot link", "err", err)
27+
http.Error(w, "", http.StatusInternalServerError)
28+
default:
29+
// TODO: Support alternative content types based on request Accept header.
30+
w.Header().Add("Content-Type", "application/json")
31+
if err := dagjson.Encode(headNode, w); err != nil {
32+
logger.Errorw("Failed to encode head", "err", err)
33+
http.Error(w, "", http.StatusInternalServerError)
34+
}
35+
}
36+
}
37+
38+
func (f *Federation) handleV1FedSubtree(w http.ResponseWriter, r *http.Request) {
39+
if r.Method != http.MethodGet {
40+
w.Header().Set("Allow", http.MethodGet)
41+
http.Error(w, "", http.StatusMethodNotAllowed)
42+
return
43+
}
44+
cidStr := path.Base(r.URL.Path)
45+
c, err := cid.Decode(cidStr)
46+
if err != nil {
47+
http.Error(w, "invalid cid", http.StatusBadRequest)
48+
return
49+
}
50+
logger := logger.With("link", cidStr)
51+
52+
// Fail fast if codec ask for is not known.
53+
encoder, err := multicodec.LookupEncoder(c.Prefix().Codec)
54+
if err != nil {
55+
logger.Errorw("Failed to find codec for link", "err", err)
56+
http.Error(w, "codec not supported", http.StatusBadRequest)
57+
return
58+
}
59+
60+
ctx := ipld.LinkContext{Ctx: r.Context()}
61+
lnk := cidlink.Link{Cid: c}
62+
node, err := f.linkSystem.Load(ctx, lnk, Prototypes.Snapshot)
63+
if err != nil {
64+
if errors.Is(err, datastore.ErrNotFound) || errors.Is(err, ipld.ErrNotExists{}) {
65+
http.Error(w, "", http.StatusNotFound)
66+
return
67+
}
68+
logger.Errorw("Failed to load link", "err", err)
69+
http.Error(w, "", http.StatusInternalServerError)
70+
return
71+
}
72+
if err := encoder(node.(schema.TypedNode).Representation(), w); err != nil {
73+
logger.Errorw("Failed to encode node", "err", err)
74+
}
75+
}

federation/options.go

+140
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package federation
2+
3+
import (
4+
"errors"
5+
"net/http"
6+
"time"
7+
8+
"github.com/ipfs/go-datastore"
9+
"github.com/ipfs/go-datastore/sync"
10+
"github.com/ipld/go-ipld-prime"
11+
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
12+
"github.com/ipld/go-ipld-prime/storage/dsadapter"
13+
"github.com/ipni/storetheindex/internal/ingest"
14+
"github.com/ipni/storetheindex/internal/registry"
15+
"github.com/libp2p/go-libp2p"
16+
"github.com/libp2p/go-libp2p/core/host"
17+
"github.com/libp2p/go-libp2p/core/peer"
18+
)
19+
20+
type (
21+
Option func(*options) error
22+
23+
options struct {
24+
datastore datastore.Datastore
25+
host host.Host
26+
httpListenAddr string
27+
linkSystem *ipld.LinkSystem
28+
members []peer.AddrInfo
29+
registry *registry.Registry
30+
ingester *ingest.Ingester
31+
reconciliationHttpClient *http.Client
32+
reconciliationInterval time.Duration
33+
snapshotInterval time.Duration
34+
}
35+
)
36+
37+
func newOptions(o ...Option) (*options, error) {
38+
opt := options{
39+
httpListenAddr: "0.0.0.0:3004",
40+
reconciliationHttpClient: http.DefaultClient,
41+
reconciliationInterval: 1 * time.Hour,
42+
snapshotInterval: 30 * time.Minute,
43+
}
44+
for _, apply := range o {
45+
if err := apply(&opt); err != nil {
46+
return nil, err
47+
}
48+
}
49+
// Check required options
50+
if opt.ingester == nil {
51+
return nil, errors.New("ingester must be specified")
52+
}
53+
if opt.registry == nil {
54+
return nil, errors.New("registry must be specified")
55+
}
56+
57+
// Set defaults
58+
if opt.datastore == nil {
59+
opt.datastore = sync.MutexWrap(datastore.NewMapDatastore())
60+
}
61+
if opt.host == nil {
62+
var err error
63+
opt.host, err = libp2p.New()
64+
if err != nil {
65+
return nil, err
66+
}
67+
}
68+
if opt.linkSystem == nil {
69+
ls := cidlink.DefaultLinkSystem()
70+
storage := dsadapter.Adapter{
71+
Wrapped: opt.datastore,
72+
}
73+
ls.SetReadStorage(&storage)
74+
ls.SetWriteStorage(&storage)
75+
opt.linkSystem = &ls
76+
}
77+
78+
return &opt, nil
79+
}
80+
81+
func WithDatastore(v datastore.Datastore) Option {
82+
return func(o *options) error {
83+
o.datastore = v
84+
return nil
85+
}
86+
}
87+
88+
func WithHost(v host.Host) Option {
89+
return func(o *options) error {
90+
o.host = v
91+
return nil
92+
}
93+
}
94+
func WithHttpListenAddr(v string) Option {
95+
return func(o *options) error {
96+
o.httpListenAddr = v
97+
return nil
98+
}
99+
}
100+
101+
func WithMembers(v ...peer.AddrInfo) Option {
102+
return func(o *options) error {
103+
o.members = append(o.members, v...)
104+
return nil
105+
}
106+
}
107+
func WithLinkSystem(v *ipld.LinkSystem) Option {
108+
return func(o *options) error {
109+
o.linkSystem = v
110+
return nil
111+
}
112+
}
113+
114+
func WithIngester(v *ingest.Ingester) Option {
115+
return func(o *options) error {
116+
o.ingester = v
117+
return nil
118+
}
119+
}
120+
121+
func WithRegistry(v *registry.Registry) Option {
122+
return func(o *options) error {
123+
o.registry = v
124+
return nil
125+
}
126+
}
127+
128+
func WithReconciliationInterval(v time.Duration) Option {
129+
return func(o *options) error {
130+
o.reconciliationInterval = v
131+
return nil
132+
}
133+
}
134+
135+
func WithSnapshotInterval(v time.Duration) Option {
136+
return func(o *options) error {
137+
o.snapshotInterval = v
138+
return nil
139+
}
140+
}

0 commit comments

Comments
 (0)