Skip to content

Commit 3239acc

Browse files
committed
Implement the core federation protocol
First pass at implementing the IPNI federation protocol: * core data structures/functionality * periodic snapshot * basic vector clock reconciliation * HTTP serve mux for `/ipni/v1/fed`
1 parent 01d7008 commit 3239acc

7 files changed

+635
-0
lines changed

federation/federation.go

+195
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
package federation
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net/http"
7+
"sync"
8+
"time"
9+
10+
"github.com/ipfs/go-cid"
11+
"github.com/ipfs/go-datastore"
12+
logging "github.com/ipfs/go-log/v2"
13+
"github.com/ipld/go-ipld-prime"
14+
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
15+
"github.com/ipld/go-ipld-prime/node/basicnode"
16+
"github.com/ipld/go-ipld-prime/node/bindnode"
17+
"github.com/libp2p/go-libp2p/core/crypto"
18+
"github.com/libp2p/go-libp2p/core/peer"
19+
)
20+
21+
var (
22+
headSnapshotKey = datastore.NewKey("head-snapshot")
23+
logger = logging.Logger("federation")
24+
)
25+
26+
type (
27+
Federation struct {
28+
*options
29+
members []peer.ID
30+
vc vectorClock
31+
32+
mu sync.Mutex
33+
snapshotTicker *time.Ticker
34+
reconciliationTicker *time.Ticker
35+
headNodeCache ipld.Node
36+
ctx context.Context
37+
cancel context.CancelFunc
38+
}
39+
)
40+
41+
func New(o ...Option) (*Federation, error) {
42+
opts, err := newOptions(o...)
43+
if err != nil {
44+
return nil, err
45+
}
46+
ctx, cancel := context.WithCancel(context.Background())
47+
return &Federation{
48+
options: opts,
49+
snapshotTicker: time.NewTicker(opts.snapshotInterval),
50+
reconciliationTicker: time.NewTicker(opts.reconciliationInterval),
51+
vc: newVectorClock(),
52+
ctx: ctx,
53+
cancel: cancel,
54+
}, nil
55+
}
56+
57+
func (f *Federation) Start(_ context.Context) error {
58+
go func() {
59+
for {
60+
select {
61+
case <-f.ctx.Done():
62+
logger.Warnw("Stopping federation event loop", "err", f.ctx.Err())
63+
return
64+
case t := <-f.snapshotTicker.C:
65+
if err := f.snapshot(f.ctx, t); err != nil {
66+
logger.Errorw("Failed to take snapshot", "err", err)
67+
}
68+
case t := <-f.reconciliationTicker.C:
69+
if err := f.reconcile(f.ctx, t); err != nil {
70+
logger.Errorw("Failed to reconcile", "err", err)
71+
}
72+
}
73+
}
74+
}()
75+
return nil
76+
}
77+
78+
func (f *Federation) ServeMux() *http.ServeMux {
79+
mux := http.NewServeMux()
80+
mux.HandleFunc("/ipni/v1/fed/head", f.handleV1FedHead)
81+
mux.HandleFunc("/ipni/v1/fed/", f.handleV1FedHead)
82+
return mux
83+
}
84+
85+
func (f *Federation) snapshot(ctx context.Context, t time.Time) error {
86+
f.mu.Lock()
87+
defer f.mu.Unlock()
88+
89+
previous, err := f.getHeadLink(ctx)
90+
if err != nil {
91+
return err
92+
}
93+
newSnapshot := &Snapshot{
94+
Epoch: uint64(t.UTC().Unix()), // TODO: consider time dilution?
95+
VectorClock: f.vc.tick(f.host.ID()),
96+
Providers: ProvidersIngestStatusMap{
97+
Values: make(map[string]IngestStatus),
98+
},
99+
Previous: previous,
100+
}
101+
for _, info := range f.registry.AllProviderInfo() {
102+
var latestProcessedAdLink ipld.Link
103+
if !info.LastAdvertisement.Equals(cid.Undef) {
104+
latestProcessedAdLink = cidlink.Link{Cid: info.LastAdvertisement}
105+
}
106+
key := info.AddrInfo.ID.String()
107+
value := IngestStatus{
108+
LastAdvertisement: latestProcessedAdLink,
109+
// TODO: Add height hint once it is measured by indexers.
110+
}
111+
newSnapshot.Providers.Put(key, value)
112+
}
113+
114+
node := bindnode.Wrap(newSnapshot, Prototypes.Snapshot.Type())
115+
headSnapshotLink, err := f.linkSystem.Store(ipld.LinkContext{Ctx: ctx}, Prototypes.link, node)
116+
if err != nil {
117+
f.vc.untick(f.host.ID())
118+
return err
119+
}
120+
err = f.setHeadLink(ctx, headSnapshotLink)
121+
if err != nil {
122+
f.vc.untick(f.host.ID())
123+
}
124+
return err
125+
}
126+
127+
func (f *Federation) getHeadNode(ctx context.Context) (ipld.Node, error) {
128+
f.mu.Lock()
129+
defer f.mu.Unlock()
130+
131+
if f.headNodeCache != nil {
132+
return f.headNodeCache, nil
133+
}
134+
135+
headLink, err := f.getHeadLink(ctx)
136+
switch {
137+
case errors.Is(err, datastore.ErrNotFound):
138+
// Set headNodeCache to an empty node to avoid hitting datastore every time head is fetched.
139+
f.headNodeCache = basicnode.Prototype.Any.NewBuilder().Build()
140+
return f.headNodeCache, nil
141+
case err != nil:
142+
return nil, err
143+
default:
144+
key, err := f.host.ID().ExtractPublicKey()
145+
if err != nil {
146+
logger.Errorw("Failed to get public key", "err", err)
147+
return nil, err
148+
}
149+
marshalledPubKey, err := crypto.MarshalPublicKey(key)
150+
if err != nil {
151+
logger.Errorw("Failed to marshal public key", "err", err)
152+
return nil, err
153+
}
154+
head := Head{
155+
Head: headLink,
156+
PublicKey: marshalledPubKey,
157+
}
158+
if err := head.Sign(f.host.Peerstore().PrivKey(f.host.ID())); err != nil {
159+
logger.Errorw("Failed to sign head", "err", err)
160+
return nil, err
161+
}
162+
f.headNodeCache = bindnode.Wrap(head, Prototypes.Head.Type())
163+
return f.headNodeCache, nil
164+
}
165+
}
166+
167+
func (f *Federation) getHeadLink(ctx context.Context) (ipld.Link, error) {
168+
switch v, err := f.datastore.Get(ctx, headSnapshotKey); {
169+
case errors.Is(err, datastore.ErrNotFound):
170+
return nil, nil
171+
case err != nil:
172+
return nil, err
173+
default:
174+
_, c, err := cid.CidFromBytes(v)
175+
if err != nil {
176+
return nil, err
177+
}
178+
return cidlink.Link{Cid: c}, nil
179+
}
180+
}
181+
182+
func (f *Federation) setHeadLink(ctx context.Context, head ipld.Link) error {
183+
return f.datastore.Put(ctx, headSnapshotKey, head.(cidlink.Link).Cid.Bytes())
184+
}
185+
186+
func (f *Federation) Shutdown(_ context.Context) error {
187+
f.snapshotTicker.Stop()
188+
f.cancel()
189+
return nil
190+
}
191+
192+
func (f *Federation) reconcile(ctx context.Context, t time.Time) error {
193+
// TODO: For each f.peers, get head, get snapshot, f.vc.reconcile, aggregate providers.
194+
return nil
195+
}

federation/handlers.go

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package federation
2+
3+
import (
4+
"errors"
5+
"net/http"
6+
"path"
7+
8+
"github.com/ipfs/go-cid"
9+
"github.com/ipfs/go-datastore"
10+
"github.com/ipld/go-ipld-prime"
11+
"github.com/ipld/go-ipld-prime/codec/dagjson"
12+
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
13+
"github.com/ipld/go-ipld-prime/multicodec"
14+
"github.com/ipld/go-ipld-prime/node/basicnode"
15+
)
16+
17+
func (f *Federation) handleV1FedHead(w http.ResponseWriter, r *http.Request) {
18+
if r.Method != http.MethodGet {
19+
http.Error(w, "", http.StatusMethodNotAllowed)
20+
return
21+
}
22+
23+
switch headNode, err := f.getHeadNode(r.Context()); {
24+
case err != nil:
25+
logger.Errorw("Failed to load head snapshot link", "err", err)
26+
http.Error(w, "", http.StatusInternalServerError)
27+
default:
28+
w.Header().Add("Content-Type", "application/json")
29+
if err := dagjson.Encode(headNode, w); err != nil {
30+
logger.Errorw("Failed to encode head", "err", err)
31+
http.Error(w, "", http.StatusInternalServerError)
32+
}
33+
}
34+
}
35+
36+
func (f *Federation) handleV1FedSubtree(w http.ResponseWriter, r *http.Request) {
37+
if r.Method != http.MethodGet {
38+
http.Error(w, "", http.StatusMethodNotAllowed)
39+
return
40+
}
41+
cidStr := path.Base(r.URL.Path)
42+
c, err := cid.Decode(cidStr)
43+
if err != nil {
44+
http.Error(w, "invalid cid", http.StatusBadRequest)
45+
return
46+
}
47+
logger := logger.With("link", cidStr)
48+
49+
// Fail fast if codec ask for is not known.
50+
encoder, err := multicodec.LookupEncoder(c.Prefix().Codec)
51+
if err != nil {
52+
logger.Errorw("Failed to find codec for link", "err", err)
53+
http.Error(w, "codec not supported", http.StatusBadRequest)
54+
return
55+
}
56+
57+
ctx := ipld.LinkContext{Ctx: r.Context()}
58+
lnk := cidlink.Link{Cid: c}
59+
switch node, err := f.linkSystem.Load(ctx, lnk, basicnode.Prototype.Any); {
60+
case errors.Is(err, datastore.ErrNotFound), errors.Is(err, ipld.ErrNotExists{}):
61+
http.Error(w, "", http.StatusNotFound)
62+
case err != nil:
63+
logger.Errorw("Failed to load link", "err", err)
64+
http.Error(w, "", http.StatusInternalServerError)
65+
default:
66+
if err := encoder(node, w); err != nil {
67+
logger.Errorw("Failed to encode node", "err", err)
68+
}
69+
}
70+
}

federation/options.go

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package federation
2+
3+
import (
4+
"errors"
5+
"time"
6+
7+
"github.com/ipfs/go-datastore"
8+
"github.com/ipfs/go-datastore/sync"
9+
"github.com/ipld/go-ipld-prime"
10+
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
11+
"github.com/ipld/go-ipld-prime/storage/dsadapter"
12+
"github.com/ipni/storetheindex/internal/registry"
13+
"github.com/libp2p/go-libp2p"
14+
"github.com/libp2p/go-libp2p/core/host"
15+
"github.com/libp2p/go-libp2p/core/peer"
16+
)
17+
18+
type (
19+
Option func(*options) error
20+
21+
options struct {
22+
datastore datastore.Datastore
23+
host host.Host
24+
linkSystem *ipld.LinkSystem
25+
peers []peer.AddrInfo
26+
registry *registry.Registry
27+
reconciliationInterval time.Duration
28+
snapshotInterval time.Duration
29+
}
30+
)
31+
32+
func newOptions(o ...Option) (*options, error) {
33+
opt := options{
34+
snapshotInterval: 30 * time.Minute,
35+
reconciliationInterval: 1 * time.Hour,
36+
}
37+
for _, apply := range o {
38+
if err := apply(&opt); err != nil {
39+
return nil, err
40+
}
41+
}
42+
if opt.datastore == nil {
43+
opt.datastore = sync.MutexWrap(datastore.NewMapDatastore())
44+
}
45+
if opt.host == nil {
46+
var err error
47+
opt.host, err = libp2p.New()
48+
if err != nil {
49+
return nil, err
50+
}
51+
}
52+
if opt.linkSystem == nil {
53+
ls := cidlink.DefaultLinkSystem()
54+
storage := dsadapter.Adapter{
55+
Wrapped: opt.datastore,
56+
}
57+
ls.SetReadStorage(&storage)
58+
ls.SetWriteStorage(&storage)
59+
opt.linkSystem = &ls
60+
}
61+
if opt.registry == nil {
62+
return nil, errors.New("registry must be specified")
63+
}
64+
return &opt, nil
65+
}
66+
67+
func WithDatastore(v datastore.Datastore) Option {
68+
return func(o *options) error {
69+
o.datastore = v
70+
return nil
71+
}
72+
}
73+
74+
func WithHost(v host.Host) Option {
75+
return func(o *options) error {
76+
o.host = v
77+
return nil
78+
}
79+
}
80+
81+
func WithPeers(v ...peer.AddrInfo) Option {
82+
return func(o *options) error {
83+
o.peers = append(o.peers, v...)
84+
return nil
85+
}
86+
}
87+
func WithLinkSystem(v *ipld.LinkSystem) Option {
88+
return func(o *options) error {
89+
o.linkSystem = v
90+
return nil
91+
}
92+
}
93+
94+
func WithRegistry(v *registry.Registry) Option {
95+
return func(o *options) error {
96+
o.registry = v
97+
return nil
98+
}
99+
}
100+
101+
func WithSnapshotInterval(v time.Duration) Option {
102+
return func(o *options) error {
103+
o.snapshotInterval = v
104+
return nil
105+
}
106+
}

0 commit comments

Comments
 (0)