Skip to content

Commit ea7f478

Browse files
committed
Reflect PR comments
1 parent 3239acc commit ea7f478

File tree

3 files changed

+144
-82
lines changed

3 files changed

+144
-82
lines changed

federation/federation.go

+117-64
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ import (
1111
"github.com/ipfs/go-datastore"
1212
logging "github.com/ipfs/go-log/v2"
1313
"github.com/ipld/go-ipld-prime"
14+
"github.com/ipld/go-ipld-prime/codec/dagjson"
1415
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
1516
"github.com/ipld/go-ipld-prime/node/basicnode"
1617
"github.com/ipld/go-ipld-prime/node/bindnode"
18+
"github.com/ipni/go-libipni/maurl"
19+
"github.com/ipni/go-libipni/mautil"
1720
"github.com/libp2p/go-libp2p/core/crypto"
18-
"github.com/libp2p/go-libp2p/core/peer"
1921
)
2022

2123
var (
@@ -26,15 +28,14 @@ var (
2628
type (
2729
Federation struct {
2830
*options
29-
members []peer.ID
30-
vc vectorClock
31+
vc vectorClock
3132

32-
mu sync.Mutex
33-
snapshotTicker *time.Ticker
34-
reconciliationTicker *time.Ticker
35-
headNodeCache ipld.Node
36-
ctx context.Context
37-
cancel context.CancelFunc
33+
mu sync.Mutex
34+
headNodeCache ipld.Node
35+
http http.Server
36+
shutdown chan struct{}
37+
38+
lastSeenSnapshotByMember map[string]ipld.Link
3839
}
3940
)
4041

@@ -43,30 +44,33 @@ func New(o ...Option) (*Federation, error) {
4344
if err != nil {
4445
return nil, err
4546
}
46-
ctx, cancel := context.WithCancel(context.Background())
4747
return &Federation{
48-
options: opts,
49-
snapshotTicker: time.NewTicker(opts.snapshotInterval),
50-
reconciliationTicker: time.NewTicker(opts.reconciliationInterval),
51-
vc: newVectorClock(),
52-
ctx: ctx,
53-
cancel: cancel,
48+
options: opts,
49+
vc: newVectorClock(),
50+
shutdown: make(chan struct{}),
51+
lastSeenSnapshotByMember: make(map[string]ipld.Link),
5452
}, nil
5553
}
5654

5755
func (f *Federation) Start(_ context.Context) error {
5856
go func() {
57+
snapshotTicker := time.NewTicker(f.snapshotInterval)
58+
reconciliationTicker := time.NewTicker(f.reconciliationInterval)
59+
defer func() {
60+
snapshotTicker.Stop()
61+
reconciliationTicker.Stop()
62+
}()
5963
for {
6064
select {
61-
case <-f.ctx.Done():
62-
logger.Warnw("Stopping federation event loop", "err", f.ctx.Err())
65+
case <-f.shutdown:
66+
logger.Warnw("Stopping federation event loop")
6367
return
64-
case t := <-f.snapshotTicker.C:
65-
if err := f.snapshot(f.ctx, t); err != nil {
68+
case t := <-snapshotTicker.C:
69+
if err := f.snapshot(context.TODO(), t); err != nil {
6670
logger.Errorw("Failed to take snapshot", "err", err)
6771
}
68-
case t := <-f.reconciliationTicker.C:
69-
if err := f.reconcile(f.ctx, t); err != nil {
72+
case t := <-reconciliationTicker.C:
73+
if err := f.reconcile(context.TODO(), t); err != nil {
7074
logger.Errorw("Failed to reconcile", "err", err)
7175
}
7276
}
@@ -117,11 +121,10 @@ func (f *Federation) snapshot(ctx context.Context, t time.Time) error {
117121
f.vc.untick(f.host.ID())
118122
return err
119123
}
120-
err = f.setHeadLink(ctx, headSnapshotLink)
121-
if err != nil {
124+
if err := f.setHeadLink(ctx, headSnapshotLink); err != nil {
122125
f.vc.untick(f.host.ID())
123126
}
124-
return err
127+
return nil
125128
}
126129

127130
func (f *Federation) getHeadNode(ctx context.Context) (ipld.Node, error) {
@@ -133,63 +136,113 @@ func (f *Federation) getHeadNode(ctx context.Context) (ipld.Node, error) {
133136
}
134137

135138
headLink, err := f.getHeadLink(ctx)
136-
switch {
137-
case errors.Is(err, datastore.ErrNotFound):
138-
// Set headNodeCache to an empty node to avoid hitting datastore every time head is fetched.
139-
f.headNodeCache = basicnode.Prototype.Any.NewBuilder().Build()
140-
return f.headNodeCache, nil
141-
case err != nil:
142-
return nil, err
143-
default:
144-
key, err := f.host.ID().ExtractPublicKey()
145-
if err != nil {
146-
logger.Errorw("Failed to get public key", "err", err)
147-
return nil, err
148-
}
149-
marshalledPubKey, err := crypto.MarshalPublicKey(key)
150-
if err != nil {
151-
logger.Errorw("Failed to marshal public key", "err", err)
152-
return nil, err
153-
}
154-
head := Head{
155-
Head: headLink,
156-
PublicKey: marshalledPubKey,
157-
}
158-
if err := head.Sign(f.host.Peerstore().PrivKey(f.host.ID())); err != nil {
159-
logger.Errorw("Failed to sign head", "err", err)
160-
return nil, err
139+
if err != nil {
140+
if errors.Is(err, datastore.ErrNotFound) {
141+
// Set headNodeCache to an empty node to avoid hitting datastore every time head is fetched.
142+
f.headNodeCache = basicnode.Prototype.Any.NewBuilder().Build()
143+
return f.headNodeCache, nil
161144
}
162-
f.headNodeCache = bindnode.Wrap(head, Prototypes.Head.Type())
163-
return f.headNodeCache, nil
145+
return nil, err
146+
}
147+
key, err := f.host.ID().ExtractPublicKey()
148+
if err != nil {
149+
logger.Errorw("Failed to get public key", "err", err)
150+
return nil, err
151+
}
152+
marshalledPubKey, err := crypto.MarshalPublicKey(key)
153+
if err != nil {
154+
logger.Errorw("Failed to marshal public key", "err", err)
155+
return nil, err
156+
}
157+
head := Head{
158+
Head: headLink,
159+
PublicKey: marshalledPubKey,
160+
}
161+
if err := head.Sign(f.host.Peerstore().PrivKey(f.host.ID())); err != nil {
162+
logger.Errorw("Failed to sign head", "err", err)
163+
return nil, err
164164
}
165+
f.headNodeCache = bindnode.Wrap(head, Prototypes.Head.Type())
166+
return f.headNodeCache, nil
165167
}
166168

167169
func (f *Federation) getHeadLink(ctx context.Context) (ipld.Link, error) {
168-
switch v, err := f.datastore.Get(ctx, headSnapshotKey); {
169-
case errors.Is(err, datastore.ErrNotFound):
170-
return nil, nil
171-
case err != nil:
172-
return nil, err
173-
default:
174-
_, c, err := cid.CidFromBytes(v)
175-
if err != nil {
176-
return nil, err
170+
v, err := f.datastore.Get(ctx, headSnapshotKey)
171+
if err != nil {
172+
if errors.Is(err, datastore.ErrNotFound) {
173+
return nil, nil
177174
}
178-
return cidlink.Link{Cid: c}, nil
175+
return nil, err
179176
}
177+
_, c, err := cid.CidFromBytes(v)
178+
if err != nil {
179+
return nil, err
180+
}
181+
return cidlink.Link{Cid: c}, nil
180182
}
181183

182184
func (f *Federation) setHeadLink(ctx context.Context, head ipld.Link) error {
183185
return f.datastore.Put(ctx, headSnapshotKey, head.(cidlink.Link).Cid.Bytes())
184186
}
185187

186188
func (f *Federation) Shutdown(_ context.Context) error {
187-
f.snapshotTicker.Stop()
188-
f.cancel()
189+
close(f.shutdown)
189190
return nil
190191
}
191192

192193
func (f *Federation) reconcile(ctx context.Context, t time.Time) error {
194+
for _, member := range f.members {
195+
logger := logger.With("id", member.ID)
196+
memberPubKey, err := member.ID.ExtractPublicKey()
197+
if err != nil {
198+
logger.Errorw("Failed to extract public key fom federation member peer ID", "err", err)
199+
continue
200+
}
201+
httpAddrs := mautil.FindHTTPAddrs(member.Addrs)
202+
switch len(httpAddrs) {
203+
case 0:
204+
logger.Errorw("No HTTP(S) address found for federation member", "addrs", member.Addrs)
205+
continue
206+
case 1:
207+
// Exactly one HTTP(S) multiaddr was found. As you were; proceed.
208+
default:
209+
logger.Warnw("Multiple HTTP(S) multiaddrs found for federation member. Picking the first one.", "httpAddrs", httpAddrs)
210+
}
211+
endpoint, err := maurl.ToURL(httpAddrs[0])
212+
if err != nil {
213+
logger.Errorw("Failed to extract URL from HTTP(S) multiaddr", "httpAddr", httpAddrs[0], "err", err)
214+
continue
215+
}
216+
req, err := http.NewRequest(http.MethodGet, endpoint.JoinPath("ipni", "v1", "fed", "head").String(), nil)
217+
if err != nil {
218+
logger.Errorw("Failed instantiate GET request", "err", err)
219+
continue
220+
}
221+
// TODO: accept other kinds too, like dagcbor.
222+
req.Header.Set("Accept", "application/json")
223+
resp, err := f.reconciliationHttpClient.Do(req)
224+
if err != nil {
225+
logger.Errorw("Failed to GET federation state head from member", "err", err)
226+
continue
227+
}
228+
if resp.StatusCode != http.StatusOK {
229+
logger.Errorw("GET federation state head response status code is not OK", "status", resp.Status, "err", err)
230+
continue
231+
}
232+
defer func() { _ = resp.Body.Close() }()
233+
// TODO: add checks for Content-Type.
234+
var head Head
235+
builder := Prototypes.Head.NewBuilder()
236+
if err := dagjson.Decode(builder, resp.Body); err != nil {
237+
logger.Errorw("Failed to decode ", "status", resp.Status, "err", err)
238+
continue
239+
}
240+
if err := head.Verify(memberPubKey); err != nil {
241+
logger.Errorw("Invalid federation state head", "err", err)
242+
continue
243+
}
244+
}
245+
193246
// TODO: For each f.peers, get head, get snapshot, f.vc.reconcile, aggregate providers.
194247
return nil
195248
}

federation/handlers.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
func (f *Federation) handleV1FedHead(w http.ResponseWriter, r *http.Request) {
1818
if r.Method != http.MethodGet {
19+
w.Header().Set("Allow", http.MethodGet)
1920
http.Error(w, "", http.StatusMethodNotAllowed)
2021
return
2122
}
@@ -35,6 +36,7 @@ func (f *Federation) handleV1FedHead(w http.ResponseWriter, r *http.Request) {
3536

3637
func (f *Federation) handleV1FedSubtree(w http.ResponseWriter, r *http.Request) {
3738
if r.Method != http.MethodGet {
39+
w.Header().Set("Allow", http.MethodGet)
3840
http.Error(w, "", http.StatusMethodNotAllowed)
3941
return
4042
}
@@ -56,15 +58,17 @@ func (f *Federation) handleV1FedSubtree(w http.ResponseWriter, r *http.Request)
5658

5759
ctx := ipld.LinkContext{Ctx: r.Context()}
5860
lnk := cidlink.Link{Cid: c}
59-
switch node, err := f.linkSystem.Load(ctx, lnk, basicnode.Prototype.Any); {
60-
case errors.Is(err, datastore.ErrNotFound), errors.Is(err, ipld.ErrNotExists{}):
61-
http.Error(w, "", http.StatusNotFound)
62-
case err != nil:
61+
node, err := f.linkSystem.Load(ctx, lnk, basicnode.Prototype.Any)
62+
if err != nil {
63+
if errors.Is(err, datastore.ErrNotFound) || errors.Is(err, ipld.ErrNotExists{}) {
64+
http.Error(w, "", http.StatusNotFound)
65+
return
66+
}
6367
logger.Errorw("Failed to load link", "err", err)
6468
http.Error(w, "", http.StatusInternalServerError)
65-
default:
66-
if err := encoder(node, w); err != nil {
67-
logger.Errorw("Failed to encode node", "err", err)
68-
}
69+
return
70+
}
71+
if err := encoder(node, w); err != nil {
72+
logger.Errorw("Failed to encode node", "err", err)
6973
}
7074
}

federation/options.go

+15-10
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package federation
22

33
import (
44
"errors"
5+
"net/http"
56
"time"
67

78
"github.com/ipfs/go-datastore"
@@ -19,20 +20,24 @@ type (
1920
Option func(*options) error
2021

2122
options struct {
22-
datastore datastore.Datastore
23-
host host.Host
24-
linkSystem *ipld.LinkSystem
25-
peers []peer.AddrInfo
26-
registry *registry.Registry
27-
reconciliationInterval time.Duration
28-
snapshotInterval time.Duration
23+
datastore datastore.Datastore
24+
host host.Host
25+
httpListenAddr string
26+
linkSystem *ipld.LinkSystem
27+
members []peer.AddrInfo
28+
registry *registry.Registry
29+
reconciliationHttpClient *http.Client
30+
reconciliationInterval time.Duration
31+
snapshotInterval time.Duration
2932
}
3033
)
3134

3235
func newOptions(o ...Option) (*options, error) {
3336
opt := options{
34-
snapshotInterval: 30 * time.Minute,
35-
reconciliationInterval: 1 * time.Hour,
37+
snapshotInterval: 30 * time.Minute,
38+
reconciliationHttpClient: http.DefaultClient,
39+
reconciliationInterval: 1 * time.Hour,
40+
httpListenAddr: "0.0.0.0:3004",
3641
}
3742
for _, apply := range o {
3843
if err := apply(&opt); err != nil {
@@ -80,7 +85,7 @@ func WithHost(v host.Host) Option {
8085

8186
func WithPeers(v ...peer.AddrInfo) Option {
8287
return func(o *options) error {
83-
o.peers = append(o.peers, v...)
88+
o.members = append(o.members, v...)
8489
return nil
8590
}
8691
}

0 commit comments

Comments
 (0)