diff --git a/federation/federation.go b/federation/federation.go new file mode 100644 index 000000000..1061ede76 --- /dev/null +++ b/federation/federation.go @@ -0,0 +1,414 @@ +package federation + +import ( + "context" + "errors" + "net" + "net/http" + "net/url" + "sync" + "time" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + logging "github.com/ipfs/go-log/v2" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/codec/dagjson" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/multicodec" + "github.com/ipld/go-ipld-prime/node/bindnode" + "github.com/ipni/go-libipni/find/client" + "github.com/ipni/go-libipni/find/model" + "github.com/ipni/go-libipni/maurl" + "github.com/ipni/go-libipni/mautil" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" +) + +var ( + headSnapshotKey = datastore.NewKey("head-snapshot") + logger = logging.Logger("federation") +) + +type ( + Federation struct { + *options + vc vectorClock + + mu sync.Mutex + headNodeCache ipld.Node + httpServer http.Server + shutdown chan struct{} + + // lastSeenSnapshotByMember is the cache of already reconciled snapshots by federation member. + // An in-memory data structure is enough, because upon indexer restart we want to check if + // the local node has missed anything since reconciliation process is cheap enough. + lastSeenSnapshotByMember map[peer.ID]cid.Cid + } +) + +func New(o ...Option) (*Federation, error) { + opts, err := newOptions(o...) + if err != nil { + return nil, err + } + return &Federation{ + options: opts, + vc: newVectorClock(), + shutdown: make(chan struct{}), + lastSeenSnapshotByMember: make(map[peer.ID]cid.Cid), + }, nil +} + +func (f *Federation) Start(_ context.Context) error { + + listen, err := net.Listen("tcp", f.httpListenAddr) + if err != nil { + return err + } + go func() { + f.httpServer.Handler = f.ServeMux() + switch err := f.httpServer.Serve(listen); { + case errors.Is(err, http.ErrServerClosed): + logger.Infow("Federation HTTP server stopped") + default: + logger.Errorw("Federation HTTP server stopped unexpectedly") + } + }() + + go func() { + snapshotTicker := time.NewTicker(f.snapshotInterval) + reconciliationTicker := time.NewTicker(f.reconciliationInterval) + defer func() { + snapshotTicker.Stop() + reconciliationTicker.Stop() + }() + for { + select { + case <-f.shutdown: + logger.Warnw("Stopping federation event loop") + return + case t := <-snapshotTicker.C: + if err := f.snapshot(context.TODO(), t); err != nil { + logger.Errorw("Failed to take snapshot", "err", err) + } + case t := <-reconciliationTicker.C: + if err := f.reconcile(context.TODO(), t); err != nil { + logger.Errorw("Failed to reconcile", "err", err) + } + } + } + }() + return nil +} + +func (f *Federation) ServeMux() *http.ServeMux { + mux := http.NewServeMux() + mux.HandleFunc("/ipni/v1/fed/head", f.handleV1FedHead) + mux.HandleFunc("/ipni/v1/fed/", f.handleV1FedSubtree) + return mux +} + +func (f *Federation) snapshot(ctx context.Context, t time.Time) error { + f.mu.Lock() + defer f.mu.Unlock() + + previous, err := f.getHeadLink(ctx) + if err != nil { + return err + } + newSnapshot := &Snapshot{ + // TODO: consider reducing the granularity of epoch. Millisecond increments in terms of unix clock may be too + // fine for the purposes of IPNI federation. + Epoch: uint64(t.UTC().Unix()), + VectorClock: f.vc.tick(f.host.ID()), + Providers: ProvidersIngestStatusMap{ + Values: make(map[string]IngestStatus), + }, + Previous: previous, + } + for _, info := range f.registry.AllProviderInfo() { + var latestProcessedAdLink ipld.Link + if !info.LastAdvertisement.Equals(cid.Undef) { + latestProcessedAdLink = cidlink.Link{Cid: info.LastAdvertisement} + } + key := info.AddrInfo.ID.String() + value := IngestStatus{ + LastAdvertisement: latestProcessedAdLink, + // TODO: Add height hint once it is measured by indexers. + } + newSnapshot.Providers.Put(key, value) + } + + node := bindnode.Wrap(newSnapshot, Prototypes.Snapshot.Type()).Representation() + headSnapshotLink, err := f.linkSystem.Store(ipld.LinkContext{Ctx: ctx}, Prototypes.link, node) + if err != nil { + return err + } + if err := f.setHeadLink(ctx, headSnapshotLink); err != nil { + return err + } + return nil +} + +func (f *Federation) getHeadNode(ctx context.Context) (ipld.Node, error) { + f.mu.Lock() + defer f.mu.Unlock() + + if f.headNodeCache != nil { + return f.headNodeCache, nil + } + + headLink, err := f.getHeadLink(ctx) + if err != nil { + if errors.Is(err, datastore.ErrNotFound) { + return nil, nil + } + return nil, err + } + key, err := f.host.ID().ExtractPublicKey() + if err != nil { + logger.Errorw("Failed to get public key", "err", err) + return nil, err + } + marshalledPubKey, err := crypto.MarshalPublicKey(key) + if err != nil { + logger.Errorw("Failed to marshal public key", "err", err) + return nil, err + } + head := &Head{ + Head: headLink, + PublicKey: marshalledPubKey, + } + if err := head.Sign(f.host.Peerstore().PrivKey(f.host.ID())); err != nil { + logger.Errorw("Failed to sign head", "err", err) + return nil, err + } + f.headNodeCache = bindnode.Wrap(head, Prototypes.Head.Type()).Representation() + return f.headNodeCache, nil +} + +func (f *Federation) getHeadLink(ctx context.Context) (ipld.Link, error) { + v, err := f.datastore.Get(ctx, headSnapshotKey) + if err != nil { + if errors.Is(err, datastore.ErrNotFound) { + return nil, nil + } + return nil, err + } + _, c, err := cid.CidFromBytes(v) + if err != nil { + return nil, err + } + return cidlink.Link{Cid: c}, nil +} + +func (f *Federation) setHeadLink(ctx context.Context, head ipld.Link) error { + return f.datastore.Put(ctx, headSnapshotKey, head.(cidlink.Link).Cid.Bytes()) +} + +func (f *Federation) reconcile(ctx context.Context, t time.Time) error { + for _, member := range f.members { + logger := logger.With("id", member.ID) + memberPubKey, err := member.ID.ExtractPublicKey() + if err != nil { + logger.Errorw("Failed to extract public key fom federation member peer ID", "err", err) + continue + } + + var endpoint *url.URL + { // Get URL of federation member server + + httpAddrs := mautil.FindHTTPAddrs(member.Addrs) + switch len(httpAddrs) { + case 0: + logger.Errorw("No HTTP(S) address found for federation member", "addrs", member.Addrs) + continue + case 1: + // Exactly one HTTP(S) multiaddr was found. As you were; proceed. + default: + logger.Warnw("Multiple HTTP(S) multiaddrs found for federation member. Picking the first one.", "httpAddrs", httpAddrs) + } + endpoint, err = maurl.ToURL(httpAddrs[0]) + if err != nil { + logger.Errorw("Failed to extract URL from HTTP(S) multiaddr", "httpAddr", httpAddrs[0], "err", err) + continue + } + } + + var memberProviders map[peer.ID]*model.ProviderInfo + { // List providers known by the federation member. + findClient, err := client.New(endpoint.String()) + if err != nil { + logger.Errorw("Failed to instantiate find client for member", "endpoint", endpoint, "err", err) + continue + } + providers, err := findClient.ListProviders(ctx) + if err != nil { + logger.Errorw("Failed to list providers for member", "endpoint", endpoint, "err", err) + continue + } + memberProviders = make(map[peer.ID]*model.ProviderInfo) + for _, provider := range providers { + memberProviders[provider.AddrInfo.ID] = provider + } + } + + var head *Head + { // Get federation head from member + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.JoinPath("ipni", "v1", "fed", "head").String(), nil) + if err != nil { + logger.Errorw("Failed instantiate GET request", "err", err) + continue + } + // TODO: accept other kinds too, like dagcbor. + req.Header.Set("Accept", "application/json") + resp, err := f.reconciliationHttpClient.Do(req) + if err != nil { + logger.Errorw("Failed to GET federation state head from member", "err", err) + continue + } + if resp.StatusCode != http.StatusOK { + logger.Errorw("GET federation state head response status code is not OK", "status", resp.Status, "err", err) + continue + } + defer func() { _ = resp.Body.Close() }() + // TODO: add checks for Content-Type. + builder := Prototypes.Head.NewBuilder() + if err := dagjson.Decode(builder, resp.Body); err != nil { + logger.Errorw("Failed to decode ", "status", resp.Status, "err", err) + continue + } + headNode := builder.Build() + head := bindnode.Unwrap(headNode).(*Head) + if err := head.Verify(memberPubKey); err != nil { + logger.Errorw("Invalid federation state head", "err", err) + continue + } + + if head.Head == nil { + logger.Debug("Head has no link to snapshot.") + continue + } + } + + var ( + snapshot *Snapshot + snapshotCid cid.Cid + ) + { // Get the federation member snapshot + snapshotCid := head.Head.(cidlink.Link).Cid + logger = logger.With("snapshotCid", snapshotCid) + + if cid.Undef.Equals(snapshotCid) { + logger.Debug("Head link is empty") + continue + } + + if seenCid, ok := f.lastSeenSnapshotByMember[member.ID]; ok && seenCid.Equals(snapshotCid) { + logger.Debugw("Snapshot already seen; skipping reconciliation") + continue + } + + sDecoder, err := multicodec.LookupDecoder(snapshotCid.Prefix().Codec) + if err != nil { + logger.Errorw("Failed to find decoder for snapshot link ", "err", err) + continue + } + + sReq, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.JoinPath("ipni", "v1", "fed", snapshotCid.String()).String(), nil) + if err != nil { + logger.Errorw("Failed instantiate GET snapshot request request", "err", err) + continue + } + sResp, err := f.reconciliationHttpClient.Do(sReq) + if err != nil { + logger.Errorw("Failed to GET snapshot from member", "err", err) + continue + } + if sResp.StatusCode != http.StatusOK { + logger.Errorw("GET snapshot response status code is not OK", "status", sResp.Status, "err", err) + continue + } + defer func() { _ = sResp.Body.Close() }() + // TODO: add checks for Content-Type. + sBuilder := Prototypes.Snapshot.NewBuilder() + if err := sDecoder(sBuilder, sResp.Body); err != nil { + logger.Errorw("Failed to decode ", "status", sResp.Status, "err", err) + continue + } + snapshotNode := sBuilder.Build() + snapshot = bindnode.Unwrap(snapshotNode).(*Snapshot) + gotLink, err := f.linkSystem.ComputeLink(cidlink.LinkPrototype{Prefix: snapshotCid.Prefix()}, snapshotNode) + if err != nil { + logger.Errorw("Failed to compute link for snapshot", "err", err) + continue + } + gotSnapshotCid := gotLink.(cidlink.Link).Cid + if !snapshotCid.Equals(gotSnapshotCid) { + logger.Errorw("Snapshot link mismatch", "got", gotSnapshotCid) + continue + } + } + + { // Reconcile local registry based on snapshot + if f.vc.reconcile(member.ID, snapshot.VectorClock) { + for providerID, snapshotIngestStatus := range snapshot.Providers.Values { + pid, err := peer.Decode(providerID) + if err != nil { + logger.Warnw("Failed to decode provider ID in snapshot", "id", providerID) + continue + } + + providerInfo, ok := memberProviders[pid] + if !ok { + logger.Warnw("Provider in snapshot os not known by the member; skipping reconciliation for provider", "provider", pid) + continue + } + + // TODO: confirm this is the right way to deduce publiser addrinfo from model.ProviderInfo + var pai peer.AddrInfo + if providerInfo.Publisher != nil { + pai = *providerInfo.Publisher + } else { + pai = providerInfo.AddrInfo + } + + info, found := f.registry.ProviderInfo(pid) + // If local indexer does not know about the provider in snapshot, then + // simply announce it to the local ingester. + if !found { + if err := f.ingester.Announce(ctx, snapshotIngestStatus.LastAdvertisement.(cidlink.Link).Cid, pai); err != nil { + logger.Warnw("Failed to announce newly discovered provider to ingester", "provider", pid, "err", err) + } + continue + } + + // If the latest processed ad CID by both local and remote indexer are the same, then + // they are in-sync and there is nothing to do. + if info.LastAdvertisement.Equals(snapshotIngestStatus.LastAdvertisement.(cidlink.Link).Cid) { + continue + } + + // TODO: Optimise what CID we announce by finding which is most recent. + // For now, simply announcing an ad would work since indexers keep the chain of ads they have processed. + if err := f.ingester.Announce(ctx, snapshotIngestStatus.LastAdvertisement.(cidlink.Link).Cid, pai); err != nil { + logger.Warnw("Failed to announce newly discovered provider to ingester", "provider", pid, "err", err) + } + } + } + + // Remember the latest processed snapshot for member to avoid re-processing already seen snapshots by that member. + f.lastSeenSnapshotByMember[member.ID] = snapshotCid + } + } + + logger.Infow("Finished reconciliation cycle", "triggeredAt", t) + return nil +} + +func (f *Federation) Shutdown(ctx context.Context) error { + close(f.shutdown) + return f.httpServer.Shutdown(ctx) +} diff --git a/federation/federation_test.go b/federation/federation_test.go new file mode 100644 index 000000000..653651b0e --- /dev/null +++ b/federation/federation_test.go @@ -0,0 +1,41 @@ +package federation_test + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" +) + +func TestFederationStandalone(t *testing.T) { + provider, err := libp2p.New() + require.NoError(t, err) + + subject := newTestFederationMember(t) + require.NoError(t, subject.Start(context.Background())) + + madeUpAdCid := requireRandomCid(t) + require.NoError(t, subject.ingester.MarkAdProcessed(provider.ID(), madeUpAdCid)) + providerAddrInfo := peer.AddrInfo{ + ID: provider.ID(), + Addrs: provider.Addrs(), + } + require.NoError(t, subject.registry.Update(context.TODO(), providerAddrInfo, providerAddrInfo, madeUpAdCid, nil, 0)) + + head := subject.requireHeadEventually(t, 20*time.Second, time.Second) + snapshot := subject.requireSnapshot(t, head.Head) + + require.True(t, snapshot.Epoch > 0) + require.True(t, snapshot.VectorClock > 0) + require.Len(t, snapshot.Providers.Keys, 1) + require.Len(t, snapshot.Providers.Values, 1) + status, found := snapshot.Providers.Get(provider.ID().String()) + require.True(t, found) + require.Equal(t, madeUpAdCid.String(), status.LastAdvertisement.String()) + require.Nil(t, status.HeightHint) + + require.NoError(t, subject.Shutdown(context.TODO())) +} diff --git a/federation/handlers.go b/federation/handlers.go new file mode 100644 index 000000000..3ea0420ad --- /dev/null +++ b/federation/handlers.go @@ -0,0 +1,77 @@ +package federation + +import ( + "errors" + "net/http" + "path" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/codec/dagjson" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/multicodec" + "github.com/ipld/go-ipld-prime/schema" +) + +func (f *Federation) handleV1FedHead(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.Header().Set("Allow", http.MethodGet) + http.Error(w, "", http.StatusMethodNotAllowed) + return + } + + switch headNode, err := f.getHeadNode(r.Context()); { + case err != nil: + logger.Errorw("Failed to load head snapshot link", "err", err) + http.Error(w, "", http.StatusInternalServerError) + case headNode == nil: + w.WriteHeader(http.StatusNoContent) + default: + // TODO: Support alternative content types based on request Accept header. + w.Header().Add("Content-Type", "application/json") + if err := dagjson.Encode(headNode, w); err != nil { + logger.Errorw("Failed to encode head", "err", err) + http.Error(w, "", http.StatusInternalServerError) + } + } +} + +func (f *Federation) handleV1FedSubtree(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.Header().Set("Allow", http.MethodGet) + http.Error(w, "", http.StatusMethodNotAllowed) + return + } + cidStr := path.Base(r.URL.Path) + c, err := cid.Decode(cidStr) + if err != nil { + http.Error(w, "invalid cid", http.StatusBadRequest) + return + } + logger := logger.With("link", cidStr) + + // Fail fast if codec ask for is not known. + encoder, err := multicodec.LookupEncoder(c.Prefix().Codec) + if err != nil { + logger.Errorw("Failed to find codec for link", "err", err) + http.Error(w, "codec not supported", http.StatusBadRequest) + return + } + + ctx := ipld.LinkContext{Ctx: r.Context()} + lnk := cidlink.Link{Cid: c} + node, err := f.linkSystem.Load(ctx, lnk, Prototypes.Snapshot) + if err != nil { + if errors.Is(err, datastore.ErrNotFound) || errors.Is(err, ipld.ErrNotExists{}) { + http.Error(w, "", http.StatusNotFound) + return + } + logger.Errorw("Failed to load link", "err", err) + http.Error(w, "", http.StatusInternalServerError) + return + } + if err := encoder(node.(schema.TypedNode).Representation(), w); err != nil { + logger.Errorw("Failed to encode node", "err", err) + } +} diff --git a/federation/options.go b/federation/options.go new file mode 100644 index 000000000..c980eed74 --- /dev/null +++ b/federation/options.go @@ -0,0 +1,140 @@ +package federation + +import ( + "errors" + "net/http" + "time" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/sync" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/storage/dsadapter" + "github.com/ipni/storetheindex/internal/ingest" + "github.com/ipni/storetheindex/internal/registry" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" +) + +type ( + Option func(*options) error + + options struct { + datastore datastore.Datastore + host host.Host + httpListenAddr string + linkSystem *ipld.LinkSystem + members []peer.AddrInfo + registry *registry.Registry + ingester *ingest.Ingester + reconciliationHttpClient *http.Client + reconciliationInterval time.Duration + snapshotInterval time.Duration + } +) + +func newOptions(o ...Option) (*options, error) { + opt := options{ + httpListenAddr: "0.0.0.0:3004", + reconciliationHttpClient: http.DefaultClient, + reconciliationInterval: 1 * time.Hour, + snapshotInterval: 30 * time.Minute, + } + for _, apply := range o { + if err := apply(&opt); err != nil { + return nil, err + } + } + // Check required options + if opt.ingester == nil { + return nil, errors.New("ingester must be specified") + } + if opt.registry == nil { + return nil, errors.New("registry must be specified") + } + + // Set defaults + if opt.datastore == nil { + opt.datastore = sync.MutexWrap(datastore.NewMapDatastore()) + } + if opt.host == nil { + var err error + opt.host, err = libp2p.New() + if err != nil { + return nil, err + } + } + if opt.linkSystem == nil { + ls := cidlink.DefaultLinkSystem() + storage := dsadapter.Adapter{ + Wrapped: opt.datastore, + } + ls.SetReadStorage(&storage) + ls.SetWriteStorage(&storage) + opt.linkSystem = &ls + } + + return &opt, nil +} + +func WithDatastore(v datastore.Datastore) Option { + return func(o *options) error { + o.datastore = v + return nil + } +} + +func WithHost(v host.Host) Option { + return func(o *options) error { + o.host = v + return nil + } +} +func WithHttpListenAddr(v string) Option { + return func(o *options) error { + o.httpListenAddr = v + return nil + } +} + +func WithMembers(v ...peer.AddrInfo) Option { + return func(o *options) error { + o.members = append(o.members, v...) + return nil + } +} +func WithLinkSystem(v *ipld.LinkSystem) Option { + return func(o *options) error { + o.linkSystem = v + return nil + } +} + +func WithIngester(v *ingest.Ingester) Option { + return func(o *options) error { + o.ingester = v + return nil + } +} + +func WithRegistry(v *registry.Registry) Option { + return func(o *options) error { + o.registry = v + return nil + } +} + +func WithReconciliationInterval(v time.Duration) Option { + return func(o *options) error { + o.reconciliationInterval = v + return nil + } +} + +func WithSnapshotInterval(v time.Duration) Option { + return func(o *options) error { + o.snapshotInterval = v + return nil + } +} diff --git a/federation/schema.go b/federation/schema.go new file mode 100644 index 000000000..4cce65e98 --- /dev/null +++ b/federation/schema.go @@ -0,0 +1,166 @@ +package federation + +import ( + "bytes" + "crypto/sha256" + _ "embed" + "errors" + "fmt" + + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + _ "github.com/ipld/go-ipld-prime/codec/dagcbor" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/node/bindnode" + "github.com/ipld/go-ipld-prime/schema" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/record" + "github.com/multiformats/go-multicodec" +) + +const ( + stateHeadRecordDomain = "indexer" + stateHeadRecordCodec = "/indexer/fed/stateHeadSignature" +) + +var ( + Prototypes struct { + Head schema.TypedPrototype + Snapshot schema.TypedPrototype + ProvidersIngestStatusMap schema.TypedPrototype + IngestStatus schema.TypedPrototype + + link ipld.LinkPrototype + } + + //go:embed schema.ipldsch + schemaBytes []byte + + _ record.Record = (*Head)(nil) + stateHeadRecordCodecBytes = []byte(stateHeadRecordCodec) +) + +type ( + Head struct { + Head ipld.Link + Topic *string + PublicKey []byte + Signature []byte + } + Snapshot struct { + Epoch uint64 + VectorClock uint64 + Providers ProvidersIngestStatusMap + Previous ipld.Link + } + ProvidersIngestStatusMap struct { + Keys []string + Values map[string]IngestStatus + } + IngestStatus struct { + LastAdvertisement ipld.Link + HeightHint *uint64 + } +) + +func init() { + switch ts, err := ipld.LoadSchemaBytes(schemaBytes); { + case err != nil: + // Fatal error; IPLD schema is likely invalid. + logger.Fatalw("Cannot load federation IPLD schema", "err", err) + default: + Prototypes.Head = bindnode.Prototype((*Head)(nil), ts.TypeByName("FederationHead")) + Prototypes.Snapshot = bindnode.Prototype((*Snapshot)(nil), ts.TypeByName("FederationSnapshot")) + Prototypes.ProvidersIngestStatusMap = bindnode.Prototype((*ProvidersIngestStatusMap)(nil), ts.TypeByName("ProvidersIngestStatusMap")) + Prototypes.IngestStatus = bindnode.Prototype((*IngestStatus)(nil), ts.TypeByName("IngestStatus")) + } + Prototypes.link = cidlink.LinkPrototype{ + Prefix: cid.Prefix{ + Version: 1, + Codec: uint64(multicodec.DagJson), + MhType: uint64(multicodec.Sha2_256), + MhLength: -1, + }, + } +} + +func (h *Head) Domain() string { + return stateHeadRecordDomain +} + +func (h *Head) Codec() []byte { + return stateHeadRecordCodecBytes +} + +func (h *Head) MarshalRecord() ([]byte, error) { + hasher := sha256.New() + if h.Head != nil { + hasher.Write(h.Head.(cidlink.Link).Cid.Bytes()) + } + if h.Topic != nil && *h.Topic != "" { + hasher.Write([]byte(*h.Topic)) + } + hasher.Write(h.PublicKey) + return hasher.Sum(nil), nil +} + +func (h *Head) UnmarshalRecord(payload []byte) error { + if len(payload) != sha256.Size { + return errors.New("malformed payload: unexpected length") + } + wantHash, err := h.MarshalRecord() + if err != nil { + return err + } + if !bytes.Equal(payload, wantHash) { + return errors.New("malformed payload: hashes do not match") + } + return nil +} + +func (h *Head) Sign(signer crypto.PrivKey) error { + seal, err := record.Seal(h, signer) + if err != nil { + return err + } + sig, err := seal.Marshal() + if err != nil { + return err + } + h.Signature = sig + return nil +} + +func (h *Head) Verify(signer crypto.PubKey) error { + if len(h.Signature) == 0 { + return errors.New("unsigned head") + } + + headPubKey, err := crypto.UnmarshalPublicKey(h.PublicKey) + if err != nil { + return fmt.Errorf("cannot unmarshal head public key: %w", err) + } + + switch envelope, err := record.ConsumeTypedEnvelope(h.Signature, h); { + case err != nil: + return err + case !envelope.PublicKey.Equals(signer): + return errors.New("envelope and signer public keys do not match") + case !signer.Equals(headPubKey): + return errors.New("head public key and signer public keys do not match") + default: + return nil + } +} + +func (m *ProvidersIngestStatusMap) Put(k string, v IngestStatus) { + if _, ok := m.Values[k]; !ok { + m.Keys = append(m.Keys, k) + } + m.Values[k] = v +} + +func (m *ProvidersIngestStatusMap) Get(k string) (IngestStatus, bool) { + status, ok := m.Values[k] + return status, ok +} diff --git a/federation/schema.ipldsch b/federation/schema.ipldsch new file mode 100644 index 000000000..be531da86 --- /dev/null +++ b/federation/schema.ipldsch @@ -0,0 +1,20 @@ +type FederationHead struct { + Head optional Link + Topic optional String + PublicKey Bytes + Signature Bytes +} + +type FederationSnapshot struct { + Epoch Int + VectorClock Int + Providers ProvidersIngestStatusMap + Previous optional Link +} + +type ProvidersIngestStatusMap {String:IngestStatus} + +type IngestStatus struct { + LastAdvertisement Link + HeightHint optional Int +} diff --git a/federation/schema_test.go b/federation/schema_test.go new file mode 100644 index 000000000..1a5135164 --- /dev/null +++ b/federation/schema_test.go @@ -0,0 +1,48 @@ +package federation + +import ( + "crypto/rand" + "testing" + + "github.com/ipfs/go-cid" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" +) + +func TestHeadSignVerify(t *testing.T) { + mh, err := multihash.Sum([]byte("fish"), multihash.SHA2_256, -1) + require.NoError(t, err) + link := cidlink.Link{Cid: cid.NewCidV1(cid.Raw, mh)} + + t.Run("Correct Signature Verifies", func(t *testing.T) { + key, pubKey, err := crypto.GenerateEd25519Key(rand.Reader) + require.NoError(t, err) + mpk, err := crypto.MarshalPublicKey(pubKey) + require.NoError(t, err) + + head := Head{ + Head: link, + PublicKey: mpk, + } + require.NoError(t, head.Sign(key)) + require.NoError(t, head.Verify(pubKey)) + }) + + t.Run("Key Missmatch Fails", func(t *testing.T) { + key, pubKey, err := crypto.GenerateEd25519Key(rand.Reader) + require.NoError(t, err) + _, anotherPubKey, err := crypto.GenerateEd25519Key(rand.Reader) + require.NoError(t, err) + + mpk, err := crypto.MarshalPublicKey(anotherPubKey) + require.NoError(t, err) + head := Head{ + Head: link, + PublicKey: mpk, + } + require.NoError(t, head.Sign(key)) + require.ErrorContains(t, head.Verify(pubKey), "head public key and signer public keys do not match") + }) +} diff --git a/federation/tooling_test.go b/federation/tooling_test.go new file mode 100644 index 000000000..2b3e9d99d --- /dev/null +++ b/federation/tooling_test.go @@ -0,0 +1,143 @@ +package federation_test + +import ( + "context" + "crypto/rand" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/sync" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/codec/dagjson" + "github.com/ipld/go-ipld-prime/node/bindnode" + "github.com/ipni/go-indexer-core" + "github.com/ipni/storetheindex/config" + "github.com/ipni/storetheindex/federation" + "github.com/ipni/storetheindex/internal/ingest" + "github.com/ipni/storetheindex/internal/registry" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" +) + +var _ indexer.Interface = (*testIndexer)(nil) + +type ( + testIndexer struct { + store map[string][]indexer.Value + } + + testFederationMember struct { + *federation.Federation + host host.Host + indexer *testIndexer + ingester *ingest.Ingester + registry *registry.Registry + } +) + +func (t testIndexer) Get(multihash multihash.Multihash) ([]indexer.Value, bool, error) { + values, found := t.store[string(multihash)] + return values, found, nil +} + +func (t testIndexer) Put(value indexer.Value, multihash ...multihash.Multihash) error { + for _, m := range multihash { + key := string(m) + t.store[key] = append(t.store[key], value) + } + return nil +} + +func (t testIndexer) Remove(indexer.Value, ...multihash.Multihash) error { panic("unimplemented") } +func (t testIndexer) RemoveProvider(context.Context, peer.ID) error { panic("unimplemented") } +func (t testIndexer) RemoveProviderContext(peer.ID, []byte) error { panic("unimplemented") } +func (t testIndexer) Size() (int64, error) { panic("unimplemented") } +func (t testIndexer) Flush() error { panic("unimplemented") } +func (t testIndexer) Close() error { return nil } +func (t testIndexer) Iter() (indexer.Iterator, error) { panic("unimplemented") } +func (t testIndexer) Stats() (*indexer.Stats, error) { return nil, nil } + +func newTestFederationMember(t *testing.T, members ...peer.AddrInfo) *testFederationMember { + h, err := libp2p.New() + require.NoError(t, err) + idxr := &testIndexer{store: make(map[string][]indexer.Value)} + ds := sync.MutexWrap(datastore.NewMapDatastore()) + reg, err := registry.New(context.Background(), config.NewDiscovery(), ds) + require.NoError(t, err) + igstr, err := ingest.NewIngester(config.NewIngest(), h, idxr, reg, ds, ds) + require.NoError(t, err) + f, err := federation.New( + federation.WithHost(h), + federation.WithRegistry(reg), + federation.WithIngester(igstr), + federation.WithReconciliationInterval(time.Second), + federation.WithSnapshotInterval(time.Second), + federation.WithHttpListenAddr("0.0.0.0:0"), + federation.WithMembers(members...), + ) + require.NoError(t, err) + return &testFederationMember{ + Federation: f, + host: h, + indexer: idxr, + ingester: igstr, + registry: reg, + } +} + +func (tfm *testFederationMember) requireHeadEventually(t *testing.T, timeout time.Duration, retry time.Duration) *federation.Head { + var head *federation.Head + require.Eventually(t, func() bool { + recorder := httptest.NewRecorder() + request := httptest.NewRequest(http.MethodGet, "/ipni/v1/fed/head", nil) + tfm.ServeMux().ServeHTTP(recorder, request) + + if recorder.Code != http.StatusOK { + return false + } + builder := federation.Prototypes.Head.NewBuilder() + err := dagjson.Decode(builder, recorder.Body) + if err != nil { + t.Log(err) + return false + } + head = bindnode.Unwrap(builder.Build()).(*federation.Head) + if err := head.Verify(tfm.host.Peerstore().PubKey(tfm.host.ID())); err != nil { + t.Log(err) + return false + } + if head.Head == nil { + return false + } + return true + }, timeout, retry) + return head +} + +func (tfm *testFederationMember) requireSnapshot(t *testing.T, l ipld.Link) *federation.Snapshot { + recorder := httptest.NewRecorder() + request := httptest.NewRequest(http.MethodGet, "/ipni/v1/fed/"+l.String(), nil) + tfm.ServeMux().ServeHTTP(recorder, request) + require.Equal(t, http.StatusOK, recorder.Code) + + builder := federation.Prototypes.Snapshot.NewBuilder() + require.NoError(t, dagjson.Decode(builder, recorder.Body)) + return bindnode.Unwrap(builder.Build()).(*federation.Snapshot) +} + +var buf [32]byte + +func requireRandomCid(t *testing.T) cid.Cid { + n, err := rand.Reader.Read(buf[:]) + require.NoError(t, err) + mh, err := multihash.Sum(buf[:n], multihash.SHA2_256, -1) + require.NoError(t, err) + return cid.NewCidV1(cid.Raw, mh) +} diff --git a/federation/vector_clock.go b/federation/vector_clock.go new file mode 100644 index 000000000..b78e46e3d --- /dev/null +++ b/federation/vector_clock.go @@ -0,0 +1,30 @@ +package federation + +import ( + "github.com/libp2p/go-libp2p/core/peer" +) + +type vectorClock map[peer.ID]uint64 + +func newVectorClock() vectorClock { + return make(map[peer.ID]uint64) +} + +func (vc vectorClock) clock(id peer.ID) (uint64, bool) { + clock, found := vc[id] + return clock, found +} + +func (vc vectorClock) tick(id peer.ID) uint64 { + vc[id] = vc[id] + 1 + return vc[id] + +} + +func (vc vectorClock) reconcile(id peer.ID, c uint64) bool { + if vc[id] < c { + vc[id] = c + return true + } + return false +} diff --git a/federation/vector_clock_test.go b/federation/vector_clock_test.go new file mode 100644 index 000000000..df2263410 --- /dev/null +++ b/federation/vector_clock_test.go @@ -0,0 +1,31 @@ +package federation + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestVectorClock(t *testing.T) { + subject := newVectorClock() + + subject.tick("one") + subject.tick("another") + + clock, found := subject.clock("one") + require.True(t, found) + require.Equal(t, uint64(1), clock) + + clock, found = subject.clock("another") + require.True(t, found) + require.Equal(t, uint64(1), clock) + + clock, found = subject.clock("and another") + require.False(t, found) + require.Equal(t, uint64(0), clock) + + subject.reconcile("another", 1413) + clock, found = subject.clock("another") + require.True(t, found) + require.Equal(t, uint64(1413), clock) +}