diff --git a/rpcserver.go b/rpcserver.go index f072a56a0..405b213ec 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3457,8 +3457,11 @@ func (r *rpcServer) SyncUniverse(ctx context.Context, uniAddr := universe.NewServerAddrFromStr(req.UniverseHost) + // TODO(ffranr): obtain federation sync config and pass to syncer. + // TODO(roasbeef): add layer of indirection in front of? // * just interface interaction + // TODO(ffranr): Sync via the FederationEnvoy rather than syncer. universeDiff, err := r.cfg.UniverseSyncer.SyncUniverse( ctx, uniAddr, syncMode, syncTargets..., ) diff --git a/universe/auto_syncer.go b/universe/auto_syncer.go index 67731cd40..444c9b05c 100644 --- a/universe/auto_syncer.go +++ b/universe/auto_syncer.go @@ -184,14 +184,14 @@ func (f *FederationEnvoy) reportErr(err error) { // If the sync is successful (even if no diff is generated), then a new sync // event will be logged. func (f *FederationEnvoy) syncServerState(ctx context.Context, - addr ServerAddr) error { + addr ServerAddr, syncConfigs SyncConfigs) error { log.Infof("Syncing Universe state with server=%v", spew.Sdump(addr)) // Attempt to sync with the remote Universe server, if this errors then // we'll bail out early as something wrong happened. diff, err := f.cfg.UniverseSyncer.SyncUniverse( - ctx, addr, SyncIssuance, + ctx, addr, SyncFull, syncConfigs, ) if err != nil { return err @@ -468,10 +468,37 @@ func (f *FederationEnvoy) SyncServers(serverAddrs []ServerAddr) error { ctx, cancel := f.WithCtxQuitNoTimeout() defer cancel() - err := fn.ParSlice(ctx, serverAddrs, f.syncServerState) + // Obtain the general and universe specific federation sync configs. + queryFedSyncConfigs := f.cfg.FederationDB.QueryFederationSyncConfigs + globalConfigs, uniSyncConfigs, err := queryFedSyncConfigs(ctx) + if err != nil { + return fmt.Errorf("unable to query federation sync "+ + "config(s): %w", err) + } + + syncConfigs := SyncConfigs{ + GlobalSyncConfigs: globalConfigs, + UniSyncConfigs: uniSyncConfigs, + } + + syncServer := func(ctx context.Context, serverAddr ServerAddr) error { + return f.syncServerState(ctx, serverAddr, syncConfigs) + } + + err = fn.ParSlice(ctx, serverAddrs, syncServer) if err != nil { log.Warnf("unable to sync with server: %w", err) } return nil } + +// SyncConfigs is a set of configs that are used to control which universes to +// synchronize with the federation. +type SyncConfigs struct { + // GlobalSyncConfigs are the global proof type specific configs. + GlobalSyncConfigs []*FedGlobalSyncConfig + + // UniSyncConfigs are the universe specific configs. + UniSyncConfigs []*FedUniSyncConfig +} diff --git a/universe/interface.go b/universe/interface.go index a2e32e257..4eccc82d0 100644 --- a/universe/interface.go +++ b/universe/interface.go @@ -473,7 +473,7 @@ type Syncer interface { // remote universe, governed by the sync type and the set of universe // IDs to sync. SyncUniverse(ctx context.Context, host ServerAddr, - syncType SyncType, + syncType SyncType, syncConfigs SyncConfigs, idsToSync ...Identifier) ([]AssetSyncDiff, error) } diff --git a/universe/syncer.go b/universe/syncer.go index c988833cf..cf338be22 100644 --- a/universe/syncer.go +++ b/universe/syncer.go @@ -60,7 +60,8 @@ func NewSimpleSyncer(cfg SimpleSyncCfg) *SimpleSyncer { // A simple approach where a set difference is used to find the set of assets // that need to be synced is used. func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine, - syncType SyncType, idsToSync []Identifier) ([]AssetSyncDiff, error) { + syncType SyncType, syncConfigs SyncConfigs, + idsToSync []Identifier) ([]AssetSyncDiff, error) { // Prevent the syncer from running twice. if !s.isSyncing.CompareAndSwap(false, true) { @@ -115,6 +116,18 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine, } } + // Filter target roots based on sync configs and sync type. + if syncType == SyncIssuance { + // Filter out all roots that are not issuance roots. + targetRoots = fn.Filter( + targetRoots, func(r BaseRoot) bool { + return r.ID.ProofType == ProofTypeIssuance + }, + ) + } + + // + log.Infof("Obtained %v roots from remote Universe server", len(targetRoots)) log.Tracef("Obtained %v roots from remote Universe server: %v", @@ -322,21 +335,8 @@ func (s *SimpleSyncer) batchStreamNewItems(ctx context.Context, // SyncUniverse attempts to synchronize the local universe with the remote // universe, governed by the sync type and the set of universe IDs to sync. func (s *SimpleSyncer) SyncUniverse(ctx context.Context, host ServerAddr, - syncType SyncType, idsToSync ...Identifier) ([]AssetSyncDiff, error) { - - // First, we'll make sure that the user requested a sync with the set - // of supported sync types. - switch syncType { - case SyncIssuance: - break - - // For now, we only support issuance syncs. - case SyncFull: - fallthrough - - default: - return nil, ErrUnsupportedSync - } + syncType SyncType, syncConfigs SyncConfigs, + idsToSync ...Identifier) ([]AssetSyncDiff, error) { log.Infof("Attempting to sync universe: host=%v, sync_type=%v, ids=%v", host.HostStr(), syncType, spew.Sdump(idsToSync)) @@ -351,5 +351,5 @@ func (s *SimpleSyncer) SyncUniverse(ctx context.Context, host ServerAddr, // With the engine created, we can now sync the local Universe with the // remote instance. - return s.executeSync(ctx, diffEngine, syncType, idsToSync) + return s.executeSync(ctx, diffEngine, syncType, syncConfigs, idsToSync) }