Skip to content

Commit

Permalink
universe: syncer filters target roots to sync using fed sync config
Browse files Browse the repository at this point in the history
  • Loading branch information
ffranr committed Oct 11, 2023
1 parent 13b097f commit ed3cb1a
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 22 deletions.
16 changes: 15 additions & 1 deletion rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3457,10 +3457,24 @@ func (r *rpcServer) SyncUniverse(ctx context.Context,

uniAddr := universe.NewServerAddrFromStr(req.UniverseHost)

// Obtain the general and universe specific federation sync configs.
queryFedSyncConfigs := r.cfg.FederationDB.QueryFederationSyncConfigs
globalConfigs, uniSyncConfigs, err := queryFedSyncConfigs(ctx)
if err != nil {
return nil, fmt.Errorf("unable to query federation sync "+
"config(s): %w", err)
}

syncConfigs := universe.SyncConfigs{
GlobalSyncConfigs: globalConfigs,
UniSyncConfigs: uniSyncConfigs,
}

// TODO(roasbeef): add layer of indirection in front of?
// * just interface interaction
// TODO(ffranr): Sync via the FederationEnvoy rather than syncer.
universeDiff, err := r.cfg.UniverseSyncer.SyncUniverse(
ctx, uniAddr, syncMode, syncTargets...,
ctx, uniAddr, syncMode, syncConfigs, syncTargets...,
)
if err != nil {
return nil, fmt.Errorf("unable to sync universe: %w", err)
Expand Down
57 changes: 54 additions & 3 deletions universe/auto_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,14 @@ func (f *FederationEnvoy) reportErr(err error) {
// If the sync is successful (even if no diff is generated), then a new sync
// event will be logged.
func (f *FederationEnvoy) syncServerState(ctx context.Context,
addr ServerAddr) error {
addr ServerAddr, syncConfigs SyncConfigs) error {

log.Infof("Syncing Universe state with server=%v", spew.Sdump(addr))

// Attempt to sync with the remote Universe server, if this errors then
// we'll bail out early as something wrong happened.
diff, err := f.cfg.UniverseSyncer.SyncUniverse(
ctx, addr, SyncIssuance,
ctx, addr, SyncFull, syncConfigs,
)
if err != nil {
return err
Expand Down Expand Up @@ -468,10 +468,61 @@ func (f *FederationEnvoy) SyncServers(serverAddrs []ServerAddr) error {
ctx, cancel := f.WithCtxQuitNoTimeout()
defer cancel()

err := fn.ParSlice(ctx, serverAddrs, f.syncServerState)
// Obtain the general and universe specific federation sync configs.
queryFedSyncConfigs := f.cfg.FederationDB.QueryFederationSyncConfigs
globalConfigs, uniSyncConfigs, err := queryFedSyncConfigs(ctx)
if err != nil {
return fmt.Errorf("unable to query federation sync "+
"config(s): %w", err)
}

syncConfigs := SyncConfigs{
GlobalSyncConfigs: globalConfigs,
UniSyncConfigs: uniSyncConfigs,
}

syncServer := func(ctx context.Context, serverAddr ServerAddr) error {
return f.syncServerState(ctx, serverAddr, syncConfigs)
}

err = fn.ParSlice(ctx, serverAddrs, syncServer)
if err != nil {
log.Warnf("unable to sync with server: %w", err)
}

return nil
}

// SyncConfigs is a set of configs that are used to control which universes to
// synchronize with the federation.
type SyncConfigs struct {
// GlobalSyncConfigs are the global proof type specific configs.
GlobalSyncConfigs []*FedGlobalSyncConfig

// UniSyncConfigs are the universe specific configs.
UniSyncConfigs []*FedUniSyncConfig
}

// IsSyncEnabled returns true if the given universe is configured to be
// synchronized with the federation.
//
// TODO(ffranr): Add support for insert/export only sync enabled status.
func (s *SyncConfigs) IsSyncEnabled(id Identifier) bool {
// Check for universe specific config. This takes precedence over the
// global config.
for _, cfg := range s.UniSyncConfigs {
if cfg.UniverseID == id {

return cfg.AllowSyncInsert && cfg.AllowSyncExport
}
}

// Check for global config.
for _, cfg := range s.GlobalSyncConfigs {
if cfg.ProofType == id.ProofType {
return cfg.AllowSyncInsert && cfg.AllowSyncExport
}
}

return false
}
2 changes: 1 addition & 1 deletion universe/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ type Syncer interface {
// remote universe, governed by the sync type and the set of universe
// IDs to sync.
SyncUniverse(ctx context.Context, host ServerAddr,
syncType SyncType,
syncType SyncType, syncConfigs SyncConfigs,
idsToSync ...Identifier) ([]AssetSyncDiff, error)
}

Expand Down
46 changes: 29 additions & 17 deletions universe/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func NewSimpleSyncer(cfg SimpleSyncCfg) *SimpleSyncer {
// A simple approach where a set difference is used to find the set of assets
// that need to be synced is used.
func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine,
syncType SyncType, idsToSync []Identifier) ([]AssetSyncDiff, error) {
syncType SyncType, syncConfigs SyncConfigs,
idsToSync []Identifier) ([]AssetSyncDiff, error) {

// Prevent the syncer from running twice.
if !s.isSyncing.CompareAndSwap(false, true) {
Expand Down Expand Up @@ -115,6 +116,30 @@ func (s *SimpleSyncer) executeSync(ctx context.Context, diffEngine DiffEngine,
}
}

targetRoots = fn.Filter(
targetRoots, func(r BaseRoot) bool {
// If we're syncing issuance proofs, then we'll only
// sync issuance roots.
if syncType == SyncIssuance &&
r.ID.ProofType != ProofTypeIssuance {

return false
}

return syncConfigs.IsSyncEnabled(r.ID)
},
)

for i := range targetRoots {
targetRoot := &targetRoots[i]

if targetRoot.ID.ProofType == ProofTypeIssuance {

}
}

//

log.Infof("Obtained %v roots from remote Universe server",
len(targetRoots))
log.Tracef("Obtained %v roots from remote Universe server: %v",
Expand Down Expand Up @@ -322,21 +347,8 @@ func (s *SimpleSyncer) batchStreamNewItems(ctx context.Context,
// SyncUniverse attempts to synchronize the local universe with the remote
// universe, governed by the sync type and the set of universe IDs to sync.
func (s *SimpleSyncer) SyncUniverse(ctx context.Context, host ServerAddr,
syncType SyncType, idsToSync ...Identifier) ([]AssetSyncDiff, error) {

// First, we'll make sure that the user requested a sync with the set
// of supported sync types.
switch syncType {
case SyncIssuance:
break

// For now, we only support issuance syncs.
case SyncFull:
fallthrough

default:
return nil, ErrUnsupportedSync
}
syncType SyncType, syncConfigs SyncConfigs,
idsToSync ...Identifier) ([]AssetSyncDiff, error) {

log.Infof("Attempting to sync universe: host=%v, sync_type=%v, ids=%v",
host.HostStr(), syncType, spew.Sdump(idsToSync))
Expand All @@ -351,5 +363,5 @@ func (s *SimpleSyncer) SyncUniverse(ctx context.Context, host ServerAddr,

// With the engine created, we can now sync the local Universe with the
// remote instance.
return s.executeSync(ctx, diffEngine, syncType, idsToSync)
return s.executeSync(ctx, diffEngine, syncType, syncConfigs, idsToSync)
}

0 comments on commit ed3cb1a

Please sign in to comment.