Skip to content

Commit

Permalink
Feat/migration ipfs download (#8064)
Browse files Browse the repository at this point in the history
* Enable downloading migrations over IPFS

There are now options in the config file that control how migrations are downloaded. This includes enabling downloading migrations using IPFS by (when migrations are required) spinning up a temporary node for fetching the migrations before running them. There is also a config option to decide what to do with the migrations binaries once they are downloaded (e.g. cache or pin them in your node, or just throw out the data).

Co-authored-by: Steven Allen <[email protected]>
  • Loading branch information
gammazero and Stebalien authored May 12, 2021
1 parent 4f4c947 commit c54cdaa
Show file tree
Hide file tree
Showing 16 changed files with 1,186 additions and 27 deletions.
58 changes: 56 additions & 2 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
_ "expvar"
"fmt"
"io/ioutil"
"net"
"net/http"
_ "net/http/pprof"
Expand Down Expand Up @@ -268,6 +269,9 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
}
}

var cacheMigrations, pinMigrations bool
var fetcher migrations.Fetcher

// acquire the repo lock _before_ constructing a node. we need to make
// sure we are permitted to access the resources (datastore, etc.)
repo, err := fsrepo.Open(cctx.ConfigRoot)
Expand All @@ -288,8 +292,38 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
return fmt.Errorf("fs-repo requires migration")
}

// Fetch migrations from current distribution, or location from environ
fetcher := migrations.NewHttpFetcher(migrations.GetDistPathEnv(migrations.CurrentIpfsDist), "", "go-ipfs", 0)
migrationCfg, err := readMigrationConfig(cctx.ConfigRoot)
if err != nil {
return err
}

fetcher, err = getMigrationFetcher(migrationCfg, &cctx.ConfigRoot)
if err != nil {
return err
}
defer fetcher.Close()

if migrationCfg.Keep == "cache" {
cacheMigrations = true
} else if migrationCfg.Keep == "pin" {
pinMigrations = true
}

if cacheMigrations || pinMigrations {
// Create temp directory to store downloaded migration archives
migrations.DownloadDirectory, err = ioutil.TempDir("", "migrations")
if err != nil {
return err
}
// Defer cleanup of download directory so that it gets cleaned up
// if daemon returns early due to error
defer func() {
if migrations.DownloadDirectory != "" {
os.RemoveAll(migrations.DownloadDirectory)
}
}()
}

err = migrations.RunMigration(cctx.Context(), fetcher, fsrepo.RepoVersion, "", false)
if err != nil {
fmt.Println("The migrations of fs-repo failed:")
Expand Down Expand Up @@ -420,6 +454,26 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
return err
}

// Add any files downloaded by migration.
if cacheMigrations || pinMigrations {
err = addMigrations(cctx.Context(), node, fetcher, pinMigrations)
if err != nil {
fmt.Fprintln(os.Stderr, "Could not add migragion to IPFS:", err)
}
// Remove download directory so that it does not remain for lifetime of
// daemon or get left behind if daemon has a hard exit
os.RemoveAll(migrations.DownloadDirectory)
migrations.DownloadDirectory = ""
}
if fetcher != nil {
// If there is an error closing the IpfsFetcher, then print error, but
// do not fail because of it.
err = fetcher.Close()
if err != nil {
log.Errorf("error closing IPFS fetcher: %s", err)
}
}

// construct http gateway
gwErrc, err := serveHTTPGateway(req, cctx)
if err != nil {
Expand Down
300 changes: 300 additions & 0 deletions cmd/ipfs/migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
package main

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"strings"

config "github.com/ipfs/go-ipfs-config"
"github.com/ipfs/go-ipfs-files"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/coreapi"
"github.com/ipfs/go-ipfs/repo/fsrepo/migrations"
"github.com/ipfs/go-ipfs/repo/fsrepo/migrations/ipfsfetcher"
coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
ipath "github.com/ipfs/interface-go-ipfs-core/path"
"github.com/libp2p/go-libp2p-core/peer"
)

// readMigrationConfig reads the migration config out of the config, avoiding
// reading anything other than the migration section. That way, we're free to
// make arbitrary changes to all _other_ sections in migrations.
func readMigrationConfig(repoRoot string) (*config.Migration, error) {
var cfg struct {
Migration config.Migration
}

cfgPath, err := config.Filename(repoRoot)
if err != nil {
return nil, err
}

cfgFile, err := os.Open(cfgPath)
if err != nil {
return nil, err
}
defer cfgFile.Close()

err = json.NewDecoder(cfgFile).Decode(&cfg)
if err != nil {
return nil, err
}

switch cfg.Migration.Keep {
case "":
cfg.Migration.Keep = config.DefaultMigrationKeep
case "discard", "cache", "keep":
default:
return nil, errors.New("unknown config value, Migrations.Keep must be 'cache', 'pin', or 'discard'")
}

if len(cfg.Migration.DownloadSources) == 0 {
cfg.Migration.DownloadSources = config.DefaultMigrationDownloadSources
}

return &cfg.Migration, nil
}

func readIpfsConfig(repoRoot *string) (bootstrap []string, peers []peer.AddrInfo) {
if repoRoot == nil {
return
}

cfgPath, err := config.Filename(*repoRoot)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}

cfgFile, err := os.Open(cfgPath)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
defer cfgFile.Close()

// Attempt to read bootstrap addresses
var bootstrapCfg struct {
Bootstrap []string
}
err = json.NewDecoder(cfgFile).Decode(&bootstrapCfg)
if err != nil {
fmt.Fprintln(os.Stderr, "cannot read bootstrap peers from config")
} else {
bootstrap = bootstrapCfg.Bootstrap
}

if _, err = cfgFile.Seek(0, 0); err != nil {
fmt.Fprintln(os.Stderr, err)
}

// Attempt to read peers
var peeringCfg struct {
Peering config.Peering
}
err = json.NewDecoder(cfgFile).Decode(&peeringCfg)
if err != nil {
fmt.Fprintln(os.Stderr, "cannot read peering from config")
} else {
peers = peeringCfg.Peering.Peers
}

return
}

// getMigrationFetcher creates one or more fetchers according to
// config.Migration.DownloadSources. If an IpfsFetcher is required, then
// bootstrap and peer information in read from the config file in repoRoot,
// unless repoRoot is nil.
func getMigrationFetcher(cfg *config.Migration, repoRoot *string) (migrations.Fetcher, error) {
const httpUserAgent = "go-ipfs"

// Fetch migrations from current distribution, or location from environ
fetchDistPath := migrations.GetDistPathEnv(migrations.CurrentIpfsDist)

var fetchers []migrations.Fetcher
for _, src := range cfg.DownloadSources {
src := strings.TrimSpace(src)
switch src {
case "IPFS", "ipfs":
bootstrap, peers := readIpfsConfig(repoRoot)
fetchers = append(fetchers, ipfsfetcher.NewIpfsFetcher(fetchDistPath, 0, bootstrap, peers))
case "HTTPS", "https", "HTTP", "http":
fetchers = append(fetchers, migrations.NewHttpFetcher(fetchDistPath, "", httpUserAgent, 0))
default:
u, err := url.Parse(src)
if err != nil {
return nil, fmt.Errorf("bad gateway address: %s", err)
}
switch u.Scheme {
case "":
u.Scheme = "https"
case "https", "http":
default:
return nil, errors.New("bad gateway address: url scheme must be http or https")
}
fetchers = append(fetchers, migrations.NewHttpFetcher(fetchDistPath, u.String(), httpUserAgent, 0))
case "":
// Ignore empty string
}
}
if len(fetchers) == 0 {
return nil, errors.New("no sources specified")
}

if len(fetchers) == 1 {
return fetchers[0], nil
}

// Wrap fetchers in a MultiFetcher to try them in order
return migrations.NewMultiFetcher(fetchers...), nil
}

func addMigrations(ctx context.Context, node *core.IpfsNode, fetcher migrations.Fetcher, pin bool) error {
var fetchers []migrations.Fetcher
if mf, ok := fetcher.(*migrations.MultiFetcher); ok {
fetchers = mf.Fetchers()
} else {
fetchers = []migrations.Fetcher{fetcher}
}

for _, fetcher := range fetchers {
switch f := fetcher.(type) {
case *ipfsfetcher.IpfsFetcher:
// Add migrations by connecting to temp node and getting from IPFS
err := addMigrationPaths(ctx, node, f.AddrInfo(), f.FetchedPaths(), pin)
if err != nil {
return err
}
case *migrations.HttpFetcher:
// Add the downloaded migration files directly
if migrations.DownloadDirectory != "" {
var paths []string
err := filepath.Walk(migrations.DownloadDirectory, func(filePath string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}
paths = append(paths, filePath)
return nil
})
if err != nil {
return err
}
err = addMigrationFiles(ctx, node, paths, pin)
if err != nil {
return err
}
}
default:
return errors.New("Cannot get migrations from unknown fetcher type")
}
}

return nil
}

// addMigrationFiles adds the files at paths to IPFS, optionally pinning them
func addMigrationFiles(ctx context.Context, node *core.IpfsNode, paths []string, pin bool) error {
if len(paths) == 0 {
return nil
}
ifaceCore, err := coreapi.NewCoreAPI(node)
if err != nil {
return err
}
ufs := ifaceCore.Unixfs()

// Add migration files
for _, filePath := range paths {
f, err := os.Open(filePath)
if err != nil {
return err
}

fi, err := f.Stat()
if err != nil {
return err
}

ipfsPath, err := ufs.Add(ctx, files.NewReaderStatFile(f, fi), options.Unixfs.Pin(pin))
if err != nil {
return err
}
fmt.Printf("Added migration file %q: %s\n", filepath.Base(filePath), ipfsPath)
}

return nil
}

// addMigrationPaths adds the files at paths to IPFS, optionally pinning
// them. This is done after connecting to the peer.
func addMigrationPaths(ctx context.Context, node *core.IpfsNode, peerInfo peer.AddrInfo, paths []ipath.Path, pin bool) error {
if len(paths) == 0 {
return errors.New("nothing downloaded by ipfs fetcher")
}
if len(peerInfo.Addrs) == 0 {
return errors.New("no local swarm address for migration node")
}

ipfs, err := coreapi.NewCoreAPI(node)
if err != nil {
return err
}

// Connect to temp node
if err := ipfs.Swarm().Connect(ctx, peerInfo); err != nil {
return fmt.Errorf("could not connect to migration peer %q: %s", peerInfo.ID, err)
}
fmt.Printf("connected to migration peer %q\n", peerInfo)

if pin {
pinApi := ipfs.Pin()
for _, ipfsPath := range paths {
err := pinApi.Add(ctx, ipfsPath)
if err != nil {
return err
}
fmt.Printf("Added and pinned migration file: %q\n", ipfsPath)
}
return nil
}

ufs := ipfs.Unixfs()

// Add migration files
for _, ipfsPath := range paths {
err = ipfsGet(ctx, ufs, ipfsPath)
if err != nil {
return err
}
}

return nil
}

func ipfsGet(ctx context.Context, ufs coreiface.UnixfsAPI, ipfsPath ipath.Path) error {
nd, err := ufs.Get(ctx, ipfsPath)
if err != nil {
return err
}
defer nd.Close()

fnd, ok := nd.(files.File)
if !ok {
return fmt.Errorf("not a file node: %q", ipfsPath)
}
_, err = io.Copy(ioutil.Discard, fnd)
if err != nil {
return fmt.Errorf("cannot read migration: %w", err)
}
fmt.Printf("Added migration file: %q\n", ipfsPath)
return nil
}
Loading

0 comments on commit c54cdaa

Please sign in to comment.