From ed3cb1a923e3e391fab50d2cdae08e47d7aea937 Mon Sep 17 00:00:00 2001 From: ffranr Date: Wed, 11 Oct 2023 18:36:28 +0100 Subject: [PATCH] universe: syncer filters target roots to sync using fed sync config --- rpcserver.go | 16 +++++++++++- universe/auto_syncer.go | 57 ++++++++++++++++++++++++++++++++++++++--- universe/interface.go | 2 +- universe/syncer.go | 46 +++++++++++++++++++++------------ 4 files changed, 99 insertions(+), 22 deletions(-) diff --git a/rpcserver.go b/rpcserver.go index f072a56a01..fb7b41f113 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3457,10 +3457,24 @@ func (r *rpcServer) SyncUniverse(ctx context.Context, uniAddr := universe.NewServerAddrFromStr(req.UniverseHost) + // Obtain the general and universe specific federation sync configs. + queryFedSyncConfigs := r.cfg.FederationDB.QueryFederationSyncConfigs + globalConfigs, uniSyncConfigs, err := queryFedSyncConfigs(ctx) + if err != nil { + return nil, fmt.Errorf("unable to query federation sync "+ + "config(s): %w", err) + } + + syncConfigs := universe.SyncConfigs{ + GlobalSyncConfigs: globalConfigs, + UniSyncConfigs: uniSyncConfigs, + } + // TODO(roasbeef): add layer of indirection in front of? // * just interface interaction + // TODO(ffranr): Sync via the FederationEnvoy rather than syncer. universeDiff, err := r.cfg.UniverseSyncer.SyncUniverse( - ctx, uniAddr, syncMode, syncTargets..., + ctx, uniAddr, syncMode, syncConfigs, syncTargets..., ) if err != nil { return nil, fmt.Errorf("unable to sync universe: %w", err) diff --git a/universe/auto_syncer.go b/universe/auto_syncer.go index 67731cd40a..f4530e08a3 100644 --- a/universe/auto_syncer.go +++ b/universe/auto_syncer.go @@ -184,14 +184,14 @@ func (f *FederationEnvoy) reportErr(err error) { // If the sync is successful (even if no diff is generated), then a new sync // event will be logged. func (f *FederationEnvoy) syncServerState(ctx context.Context, - addr ServerAddr) error { + addr ServerAddr, syncConfigs SyncConfigs) error { log.Infof("Syncing Universe state with server=%v", spew.Sdump(addr)) // Attempt to sync with the remote Universe server, if this errors then // we'll bail out early as something wrong happened. diff, err := f.cfg.UniverseSyncer.SyncUniverse( - ctx, addr, SyncIssuance, + ctx, addr, SyncFull, syncConfigs, ) if err != nil { return err @@ -468,10 +468,61 @@ func (f *FederationEnvoy) SyncServers(serverAddrs []ServerAddr) error { ctx, cancel := f.WithCtxQuitNoTimeout() defer cancel() - err := fn.ParSlice(ctx, serverAddrs, f.syncServerState) + // Obtain the general and universe specific federation sync configs. + queryFedSyncConfigs := f.cfg.FederationDB.QueryFederationSyncConfigs + globalConfigs, uniSyncConfigs, err := queryFedSyncConfigs(ctx) + if err != nil { + return fmt.Errorf("unable to query federation sync "+ + "config(s): %w", err) + } + + syncConfigs := SyncConfigs{ + GlobalSyncConfigs: globalConfigs, + UniSyncConfigs: uniSyncConfigs, + } + + syncServer := func(ctx context.Context, serverAddr ServerAddr) error { + return f.syncServerState(ctx, serverAddr, syncConfigs) + } + + err = fn.ParSlice(ctx, serverAddrs, syncServer) if err != nil { log.Warnf("unable to sync with server: %w", err) } return nil } + +// SyncConfigs is a set of configs that are used to control which universes to +// synchronize with the federation. +type SyncConfigs struct { + // GlobalSyncConfigs are the global proof type specific configs. + GlobalSyncConfigs []*FedGlobalSyncConfig + + // UniSyncConfigs are the universe specific configs. + UniSyncConfigs []*FedUniSyncConfig +} + +// IsSyncEnabled returns true if the given universe is configured to be +// synchronized with the federation. +// +// TODO(ffranr): Add support for insert/export only sync enabled status. +func (s *SyncConfigs) IsSyncEnabled(id Identifier) bool { + // Check for universe specific config. This takes precedence over the + // global config. + for _, cfg := range s.UniSyncConfigs { + if cfg.UniverseID == id { + + return cfg.AllowSyncInsert && cfg.AllowSyncExport + } + } + + // Check for global config. + for _, cfg := range s.GlobalSyncConfigs { + if cfg.ProofType == id.ProofType { + return cfg.AllowSyncInsert && cfg.AllowSyncExport + } + } + + return false +} diff --git a/universe/interface.go b/universe/interface.go index a2e32e2578..4eccc82d0a 100644 --- a/universe/interface.go +++ b/universe/interface.go @@ -473,7 +473,7 @@ type Syncer interface { // remote universe, governed by the sync type and the set of universe // IDs to sync. SyncUniverse(ctx context.Context, host ServerAddr, - syncType SyncType, + syncType SyncType, syncConfigs SyncConfigs, idsToSync ...Identifier) ([]AssetSyncDiff, error) } diff --git a/universe/syncer.go b/universe/syncer.go index c988833cfd..32927179c8 100644 --- a/universe/syncer.go +++ b/universe/syncer.go @@ -60,7 +60,8 @@ func NewSimpleSyncer(cfg SimpleSyncCfg) *SimpleSyncer { // A simple approach where a set difference is used to find the set of assets // that need to be synced is used. func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine, - syncType SyncType, idsToSync []Identifier) ([]AssetSyncDiff, error) { + syncType SyncType, syncConfigs SyncConfigs, + idsToSync []Identifier) ([]AssetSyncDiff, error) { // Prevent the syncer from running twice. if !s.isSyncing.CompareAndSwap(false, true) { @@ -115,6 +116,30 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine, } } + targetRoots = fn.Filter( + targetRoots, func(r BaseRoot) bool { + // If we're syncing issuance proofs, then we'll only + // sync issuance roots. + if syncType == SyncIssuance && + r.ID.ProofType != ProofTypeIssuance { + + return false + } + + return syncConfigs.IsSyncEnabled(r.ID) + }, + ) + + for i := range targetRoots { + targetRoot := &targetRoots[i] + + if targetRoot.ID.ProofType == ProofTypeIssuance { + + } + } + + // + log.Infof("Obtained %v roots from remote Universe server", len(targetRoots)) log.Tracef("Obtained %v roots from remote Universe server: %v", @@ -322,21 +347,8 @@ func (s *SimpleSyncer) batchStreamNewItems(ctx context.Context, // SyncUniverse attempts to synchronize the local universe with the remote // universe, governed by the sync type and the set of universe IDs to sync. func (s *SimpleSyncer) SyncUniverse(ctx context.Context, host ServerAddr, - syncType SyncType, idsToSync ...Identifier) ([]AssetSyncDiff, error) { - - // First, we'll make sure that the user requested a sync with the set - // of supported sync types. - switch syncType { - case SyncIssuance: - break - - // For now, we only support issuance syncs. - case SyncFull: - fallthrough - - default: - return nil, ErrUnsupportedSync - } + syncType SyncType, syncConfigs SyncConfigs, + idsToSync ...Identifier) ([]AssetSyncDiff, error) { log.Infof("Attempting to sync universe: host=%v, sync_type=%v, ids=%v", host.HostStr(), syncType, spew.Sdump(idsToSync)) @@ -351,5 +363,5 @@ func (s *SimpleSyncer) SyncUniverse(ctx context.Context, host ServerAddr, // With the engine created, we can now sync the local Universe with the // remote instance. - return s.executeSync(ctx, diffEngine, syncType, idsToSync) + return s.executeSync(ctx, diffEngine, syncType, syncConfigs, idsToSync) }