Skip to content
This repository was archived by the owner on Nov 25, 2024. It is now read-only.

Peeking over federation via MSC2444 #1391

Merged
merged 92 commits into from
Jan 22, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
b9342d9
a very very WIP first cut of peeking via MSC2753.
ara4n Aug 30, 2020
d7bdf71
make PeekingDeviceSet private
ara4n Aug 30, 2020
cfa0be5
merge master
ara4n Aug 31, 2020
9b79f9a
add server_name param
ara4n Aug 31, 2020
d343b8f
blind stab at adding a `peek` section to /sync
ara4n Aug 31, 2020
c4e5f60
make it build
ara4n Aug 31, 2020
d1e4d66
make it launch
ara4n Aug 31, 2020
f006b37
add peeking to getResponseWithPDUsForCompleteSync
ara4n Aug 31, 2020
6c3a896
cancel any peeks when we join a room
ara4n Aug 31, 2020
7b38d48
spell out how to runoutside of docker if you want speed
ara4n Aug 31, 2020
e589984
fix SQL
ara4n Aug 31, 2020
0bb2c2c
remove unnecessary txn for SelectPeeks
ara4n Aug 31, 2020
28219c6
Merge branch 'master' into matthew/peeking
ara4n Sep 1, 2020
86e9736
fix s/join/peek/ cargocult fail
ara4n Sep 1, 2020
d0d5f70
Merge branch 'master' into matthew/peeking
ara4n Sep 1, 2020
bfecc8e
HACK: Track goroutine IDs to determine when we write by the wrong thread
kegsay Sep 1, 2020
7bf2a27
Track partition offsets and only log unsafe for non-selects
kegsay Sep 1, 2020
fcdb90c
Put redactions in the writer goroutine
kegsay Sep 1, 2020
6410b70
Update filters on writer goroutine
kegsay Sep 1, 2020
ed4b3a5
Merge branch 'kegan/redact-txn' into matthew/peeking
ara4n Sep 1, 2020
3cebd8d
Merge branch 'kegan/HACK-goid-sqlite-db-is-locked' into matthew/peeking
ara4n Sep 1, 2020
5d7f688
wrap peek storage in goid hack
ara4n Sep 1, 2020
6424117
use exclusive writer, and MarkPeeksAsOld more efficiently
ara4n Sep 1, 2020
85bce11
don't log ascii in binary at sql trace...
ara4n Sep 1, 2020
75b91ac
strip out empty roomd deltas
ara4n Sep 1, 2020
b6cc441
re-add txn to SelectPeeks
ara4n Sep 2, 2020
f6af656
re-add accidentally deleted field
ara4n Sep 2, 2020
8712ea3
Merge branch 'master' into matthew/peeking
ara4n Sep 3, 2020
eda84cd
reject peeks for non-worldreadable rooms
ara4n Sep 3, 2020
da3742c
move perform_peek
ara4n Sep 3, 2020
3c5e079
fix package
ara4n Sep 3, 2020
994cc18
correctly refactor perform_peek
ara4n Sep 3, 2020
c1f1fcd
WIP of implementing MSC2444
ara4n Sep 4, 2020
4ca2cf4
typo
ara4n Sep 4, 2020
98cf898
Revert "Merge branch 'kegan/HACK-goid-sqlite-db-is-locked' into matth…
ara4n Sep 11, 2020
f236e82
Merge branch 'master' into matthew/peeking-over-fed
ara4n Sep 11, 2020
baee97b
(almost) make it build
ara4n Sep 11, 2020
4ef6a3c
clean up bad merge
ara4n Sep 11, 2020
65e59a1
support SendEventWithState with optional event
ara4n Sep 11, 2020
a5c0521
fix build & lint
ara4n Sep 11, 2020
df29509
fix build & lint
ara4n Sep 11, 2020
410ac72
reinstate federated peeks in the roomserver (doh)
ara4n Sep 11, 2020
f8bb448
fix sql thinko
ara4n Sep 11, 2020
fff1845
todo for authenticating state returned by /peek
ara4n Sep 12, 2020
c6a2604
support returning current state from QueryStateAndAuthChain
ara4n Sep 12, 2020
803647b
handle SS /peek
ara4n Sep 12, 2020
0ae0d11
reimplement SS /peek to prod the RS to tell the FS about the peek
ara4n Sep 12, 2020
4e96e62
rename RemotePeeks as OutboundPeeks
ara4n Sep 12, 2020
59e2be7
rename remote_peeks_table as outbound_peeks_table
ara4n Sep 12, 2020
36e32f1
add perform_handle_remote_peek.go
ara4n Sep 12, 2020
0dc422c
flesh out federation doc
ara4n Sep 12, 2020
71732f2
add inbound peeks table and hook it up
ara4n Sep 12, 2020
8f203fe
rename ambiguous RemotePeek as InboundPeek
ara4n Sep 12, 2020
3caae79
rename FSAPI's PerformPeek as PerformOutboundPeek
ara4n Sep 12, 2020
a160c07
setup inbound peeks db correctly
ara4n Sep 13, 2020
32f898d
fix api.SendEventWithState with no event
ara4n Sep 13, 2020
41b9b66
Merge branch 'master' into matthew/peeking-over-fed
ara4n Sep 19, 2020
20e2cb4
track latestevent on /peek
ara4n Sep 22, 2020
3202c7e
go fmt
ara4n Sep 22, 2020
0ab4bc9
document the peek send stream race better
ara4n Sep 22, 2020
75c3f2d
merge master
ara4n Sep 26, 2020
2b4353e
fix SendEventWithRewrite not to bail if handed a non-state event
ara4n Sep 26, 2020
927a62a
add fixme
ara4n Sep 26, 2020
1e6e23d
switch SS /peek to use SendEventWithRewrite
ara4n Sep 26, 2020
fd90849
fix comment
ara4n Sep 26, 2020
ed9e3fc
use reverse topo ordering to find latest extrem
ara4n Sep 26, 2020
efd8656
support postgres for federated peeking
ara4n Sep 27, 2020
ebeff8d
go fmt
ara4n Sep 27, 2020
4262095
back out bogus go.mod change
ara4n Sep 27, 2020
797085f
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Oct 20, 2020
814c220
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Oct 20, 2020
be5a4e6
Fix performOutboundPeekUsingServer
neilalexander Oct 20, 2020
90017d0
Fix getAuthChain -> GetAuthChain
neilalexander Oct 20, 2020
0fd9e96
Merge branch 'master' into matthew/peeking-over-fed
kegsay Oct 22, 2020
a2a5c7e
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Dec 2, 2020
3ba3530
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Dec 2, 2020
7fc3852
Fix build issues
neilalexander Dec 2, 2020
d25345d
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Dec 3, 2020
c2f7c80
Fix build again
neilalexander Dec 3, 2020
fe1d2f8
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Dec 3, 2020
45f0fdd
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Dec 10, 2020
d47ab1f
Fix getAuthChain -> GetAuthChain
neilalexander Dec 10, 2020
0fe0e23
Don't repeat outbound peeks for the same room ID to the same servers
neilalexander Dec 10, 2020
e0a35c0
Fix lint
neilalexander Dec 10, 2020
8508af3
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Dec 18, 2020
3985d03
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Jan 13, 2021
f2dec90
Merge branch 'master' into matthew/peeking-over-fed
neilalexander Jan 18, 2021
609743b
Merge branch 'master' into matthew/peeking-over-fed
kegsay Jan 19, 2021
c71bf5d
Merge branch 'master' into matthew/peeking-over-fed
kegsay Jan 20, 2021
4491e53
Don't omitempty to appease sytest
kegsay Jan 22, 2021
552f583
Merge branch 'master' into matthew/peeking-over-fed
kegsay Jan 22, 2021
dd3c6ff
Merge branch 'master' into matthew/peeking-over-fed
kegsay Jan 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rename RemotePeeks as OutboundPeeks
  • Loading branch information
ara4n committed Sep 12, 2020
commit 4e96e62923779157a05182f9346392a90b512130
2 changes: 1 addition & 1 deletion federationsender/consumers/roomserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
// TODO: track what hosts are peeking (federationsender_received_peeks)
// TODO: rename federationsender_remote_peeks as federationsender_sent_peeks

// TOOD: add peeking hosts to the joinedHosts list
// TODO: add peeking hosts to the joinedHosts list

// TODO: do housekeeping to evict unrenewed peeking hosts

Expand Down
14 changes: 7 additions & 7 deletions federationsender/internal/perform.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,18 +294,18 @@ func (r *FederationSenderInternalAPI) performPeekUsingServer(
// check whether we're peeking already to try to avoid needlessly
// re-peeking on the server. we don't need a transaction for this,
// given this is a nice-to-have.
remotePeek, err := r.db.GetRemotePeek(ctx, serverName, roomID, peekID)
outboundPeek, err := r.db.GetOutboundPeek(ctx, serverName, roomID, peekID)
if err != nil {
return err
}
renewing := false
if remotePeek != nil {
if outboundPeek != nil {
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
if nowMilli > remotePeek.RenewedTimestamp+remotePeek.RenewalInterval {
logrus.Infof("stale remote peek to %s for %s already exists; renewing", serverName, roomID)
if nowMilli > outboundPeek.RenewedTimestamp + outboundPeek.RenewalInterval {
logrus.Infof("stale outbound peek to %s for %s already exists; renewing", serverName, roomID)
renewing = true
} else {
logrus.Infof("live remote peek to %s for %s already exists", serverName, roomID)
logrus.Infof("live outbound peek to %s for %s already exists", serverName, roomID)
return nil
}
}
Expand Down Expand Up @@ -339,11 +339,11 @@ func (r *FederationSenderInternalAPI) performPeekUsingServer(

// If we've got this far, the remote server is peeking.
if renewing {
if err = r.db.RenewRemotePeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
if err = r.db.RenewOutboundPeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
return err
}
} else {
if err = r.db.AddRemotePeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
if err = r.db.AddOutboundPeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
return err
}
}
Expand Down
8 changes: 4 additions & 4 deletions federationsender/storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ type Database interface {
RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error
IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error)

AddRemotePeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error
RenewRemotePeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error
GetRemotePeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.RemotePeek, error)
GetRemotePeeks(ctx context.Context, roomID string) ([]types.RemotePeek, error)
AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error
RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error
GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error)
GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error)
}
34 changes: 17 additions & 17 deletions federationsender/storage/shared/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ import (
)

type Database struct {
DB *sql.DB
Writer sqlutil.Writer
FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
FederationSenderQueueJSON tables.FederationSenderQueueJSON
FederationSenderJoinedHosts tables.FederationSenderJoinedHosts
FederationSenderRooms tables.FederationSenderRooms
FederationSenderBlacklist tables.FederationSenderBlacklist
FederationSenderRemotePeeks tables.FederationSenderRemotePeeks
DB *sql.DB
Writer sqlutil.Writer
FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
FederationSenderQueueJSON tables.FederationSenderQueueJSON
FederationSenderJoinedHosts tables.FederationSenderJoinedHosts
FederationSenderRooms tables.FederationSenderRooms
FederationSenderBlacklist tables.FederationSenderBlacklist
FederationSenderOutboundPeeks tables.FederationSenderOutboundPeeks
}

// An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs.
Expand Down Expand Up @@ -165,22 +165,22 @@ func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName)
return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName)
}

func (d *Database) AddRemotePeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error {
func (d *Database) AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationSenderRemotePeeks.InsertRemotePeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
return d.FederationSenderOutboundPeeks.InsertOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
})
}

func (d *Database) RenewRemotePeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error {
func (d *Database) RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.FederationSenderRemotePeeks.RenewRemotePeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
return d.FederationSenderOutboundPeeks.RenewOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
})
}

func (d *Database) GetRemotePeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.RemotePeek, error) {
return d.FederationSenderRemotePeeks.SelectRemotePeek(ctx, nil, serverName, roomID, peekID)
func (d *Database) GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error) {
return d.FederationSenderOutboundPeeks.SelectOutboundPeek(ctx, nil, serverName, roomID, peekID)
}

func (d *Database) GetRemotePeeks(ctx context.Context, roomID string) ([]types.RemotePeek, error) {
return d.FederationSenderRemotePeeks.SelectRemotePeeks(ctx, nil, roomID)
func (d *Database) GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) {
return d.FederationSenderOutboundPeeks.SelectOutboundPeeks(ctx, nil, roomID)
}
176 changes: 0 additions & 176 deletions federationsender/storage/sqlite3/remote_peeks_table.go

This file was deleted.

20 changes: 10 additions & 10 deletions federationsender/storage/sqlite3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,20 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
if err != nil {
return nil, err
}
remotePeeks, err := NewSQLiteRemotePeeksTable(d.db)
outboundPeeks, err := NewSQLiteOutboundPeeksTable(d.db)
if err != nil {
return nil, err
}
d.Database = shared.Database{
DB: d.db,
Writer: d.writer,
FederationSenderJoinedHosts: joinedHosts,
FederationSenderQueuePDUs: queuePDUs,
FederationSenderQueueEDUs: queueEDUs,
FederationSenderQueueJSON: queueJSON,
FederationSenderRooms: rooms,
FederationSenderBlacklist: blacklist,
FederationSenderRemotePeeks: remotePeeks,
DB: d.db,
Writer: d.writer,
FederationSenderJoinedHosts: joinedHosts,
FederationSenderQueuePDUs: queuePDUs,
FederationSenderQueueEDUs: queueEDUs,
FederationSenderQueueJSON: queueJSON,
FederationSenderRooms: rooms,
FederationSenderBlacklist: blacklist,
FederationSenderOutboundPeeks: outboundPeeks,
}
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil {
return nil, err
Expand Down
14 changes: 7 additions & 7 deletions federationsender/storage/tables/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ type FederationSenderBlacklist interface {
DeleteBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error
}

type FederationSenderRemotePeeks interface {
InsertRemotePeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) (err error)
RenewRemotePeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) (err error)
SelectRemotePeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (remotePeek *types.RemotePeek, err error)
SelectRemotePeeks(ctx context.Context, txn *sql.Tx, roomID string) (remotePeeks []types.RemotePeek, err error)
DeleteRemotePeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error)
DeleteRemotePeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
type FederationSenderOutboundPeeks interface {
InsertOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) (err error)
RenewOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int) (err error)
SelectOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (outboundPeek *types.OutboundPeek, err error)
SelectOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (outboundPeeks []types.OutboundPeek, err error)
DeleteOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error)
DeleteOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
}
2 changes: 1 addition & 1 deletion federationsender/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (e EventIDMismatchError) Error() string {
)
}

type RemotePeek struct {
type OutboundPeek struct {
PeekID string
RoomID string
ServerName gomatrixserverlib.ServerName
Expand Down