diff --git a/ipfs/api_router.go b/ipfs/api_router.go index e2e20fa714..f11a7971ea 100644 --- a/ipfs/api_router.go +++ b/ipfs/api_router.go @@ -15,6 +15,7 @@ import ( routing "gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing" ropts "gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing/options" pstore "gx/ipfs/QmaCTz9RkrU13bm9kMB54f7atgqM4qkjDZpRwRoJiWXEqs/go-libp2p-peerstore" + record "gx/ipfs/QmbeHtaBy9nZsW4cHRcvgVY4CnDhXudE2Dr6qDxS7yg9rX/go-libp2p-record" ) var apiRouterHTTPClient = &http.Client{ @@ -32,13 +33,14 @@ var ErrNotStarted = errors.New("API router not started") // provides the features offerened by routing.ValueStore and marks the others as // unsupported. type APIRouter struct { - uri string - started chan (struct{}) + uri string + started chan (struct{}) + validator record.Validator } // NewAPIRouter creates a new APIRouter backed by the given URI. -func NewAPIRouter(uri string) APIRouter { - return APIRouter{uri: uri, started: make(chan (struct{}))} +func NewAPIRouter(uri string, validator record.Validator) APIRouter { + return APIRouter{uri: uri, started: make(chan (struct{})), validator: validator} } func (r *APIRouter) Start(proxyDialer proxy.Dialer) { @@ -79,7 +81,11 @@ func (r APIRouter) GetValue(ctx context.Context, key string, opts ...ropts.Optio defer resp.Body.Close() log.Debugf("read value from %s", path) - return ioutil.ReadAll(resp.Body) + value, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + return value, r.validator.Validate(key, value) } // GetValues reads the value for the given key. The API does not return multiple diff --git a/ipfs/caching_router.go b/ipfs/caching_router.go index 5cd055cd73..4c850467b5 100644 --- a/ipfs/caching_router.go +++ b/ipfs/caching_router.go @@ -2,12 +2,14 @@ package ipfs import ( "context" + "encoding/hex" "errors" - routinghelpers "gx/ipfs/QmRCrPXk2oUwpK1Cj2FXrUotRpddUxz56setkny2gz13Cx/go-libp2p-routing-helpers" - dht "gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht" - routing "gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing" - ropts "gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing/options" - record "gx/ipfs/QmbeHtaBy9nZsW4cHRcvgVY4CnDhXudE2Dr6qDxS7yg9rX/go-libp2p-record" + + "gx/ipfs/QmSY3nkMNLzh9GdbFKK5tT7YMfLpf52iUZ8ZRkr29MJaa5/go-libp2p-kad-dht" + ci "gx/ipfs/QmTW4SdgBWq9GjsBsHeUx8WuGxzhgzAf88UMH2w62PC8yK/go-libp2p-crypto" + "gx/ipfs/QmYVXrKrKHDC9FobgmcmshCDyWwdrfwfanNQN4oxJ9Fk3h/go-libp2p-peer" + "gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing" + "gx/ipfs/QmYxUdYY9S6yg5tSPVin5GFTvtfsLauVcr7reHDD3dM8xf/go-libp2p-routing/options" ) var ( @@ -17,14 +19,12 @@ var ( type CachingRouter struct { apiRouter *APIRouter routing.IpfsRouting - RecordValidator record.Validator } func NewCachingRouter(dht *dht.IpfsDHT, apiRouter *APIRouter) *CachingRouter { return &CachingRouter{ - apiRouter: apiRouter, - IpfsRouting: dht, - RecordValidator: dht.Validator, + apiRouter: apiRouter, + IpfsRouting: dht, } } @@ -43,28 +43,53 @@ func (r *CachingRouter) APIRouter() *APIRouter { func (r *CachingRouter) PutValue(ctx context.Context, key string, value []byte, opts ...ropts.Option) error { // Write to the tiered router in the background then write to the caching // router and return - go r.IpfsRouting.PutValue(ctx, key, value, opts...) - return r.apiRouter.PutValue(ctx, key, value, opts...) + var err error + if err = r.IpfsRouting.PutValue(ctx, key, value, opts...); err != nil { + log.Errorf("ipfs dht put (%s): %s", hex.EncodeToString([]byte(key)), err) + return err + } + if err = r.apiRouter.PutValue(ctx, key, value, opts...); err != nil { + log.Errorf("api cache put (%s): %s", hex.EncodeToString([]byte(key)), err) + } + return err } func (r *CachingRouter) GetValue(ctx context.Context, key string, opts ...ropts.Option) ([]byte, error) { // First check the DHT router. If it's successful return the value otherwise // continue on to check the other routers. val, err := r.IpfsRouting.GetValue(ctx, key, opts...) - if err == nil { - return val, r.apiRouter.PutValue(ctx, key, val, opts...) + if err != nil && len(val) == 0 { + // No values from the DHT, check the API cache + log.Warningf("ipfs dht lookup was empty: %s", err.Error()) + if val, err = r.apiRouter.GetValue(ctx, key, opts...); err != nil && len(val) == 0 { + // No values still, report NotFound + return nil, routing.ErrNotFound + } } + if err := r.apiRouter.PutValue(ctx, key, val, opts...); err != nil { + log.Errorf("api cache put found dht value (%s): %s", hex.EncodeToString([]byte(key)), err.Error()) + } + return val, nil +} - // Value miss; Check API router - return r.apiRouter.GetValue(ctx, key, opts...) +func (r *CachingRouter) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) { + if dht, ok := r.IpfsRouting.(routing.PubKeyFetcher); ok { + return dht.GetPublicKey(ctx, p) + } + return nil, routing.ErrNotSupported } func (r *CachingRouter) SearchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) { - return routinghelpers.Parallel{ - Routers: []routing.IpfsRouting{ - r.IpfsRouting, - r.apiRouter, - }, - Validator: r.RecordValidator, - }.SearchValue(ctx, key, opts...) + // TODO: Restore parallel lookup once validation is properly applied to + // the apiRouter results ensuring it doesn't return invalid records before the + // IpfsRouting object can. For some reason the validation is not being considered + // on returned results. + return r.IpfsRouting.SearchValue(ctx, key, opts...) + //return routinghelpers.Parallel{ + //Routers: []routing.IpfsRouting{ + //r.IpfsRouting, + //r.apiRouter, + //}, + //Validator: r.RecordValidator, + //}.SearchValue(ctx, key, opts...) } diff --git a/ipfs/config.go b/ipfs/config.go index 5376d98bea..27e6e0f9af 100644 --- a/ipfs/config.go +++ b/ipfs/config.go @@ -67,7 +67,7 @@ func constructRouting(ctx context.Context, host p2phost.Host, dstore ds.Batching if err != nil { return nil, err } - apiRouter := NewAPIRouter(routerCacheURI) + apiRouter := NewAPIRouter(routerCacheURI, dhtRouting.Validator) cachingRouter := NewCachingRouter(dhtRouting, &apiRouter) return cachingRouter, nil } @@ -99,7 +99,7 @@ func constructTestnetRouting(ctx context.Context, host p2phost.Host, dstore ds.B if err != nil { return nil, err } - apiRouter := NewAPIRouter(routerCacheURI) + apiRouter := NewAPIRouter(routerCacheURI, dhtRouting.Validator) cachingRouter := NewCachingRouter(dhtRouting, &apiRouter) return cachingRouter, nil } diff --git a/mobile/node.go b/mobile/node.go index 85f9a12f19..cfa6acd24f 100644 --- a/mobile/node.go +++ b/mobile/node.go @@ -269,7 +269,7 @@ func constructMobileRouting(ctx context.Context, host p2phost.Host, dstore ds.Ba if err != nil { return nil, err } - apiRouter := ipfs.NewAPIRouter(schema.IPFSCachingRouterDefaultURI) + apiRouter := ipfs.NewAPIRouter(schema.IPFSCachingRouterDefaultURI, dhtRouting.Validator) apiRouter.Start(nil) cachingRouter := ipfs.NewCachingRouter(dhtRouting, &apiRouter) return cachingRouter, nil