Skip to content

Commit

Permalink
Add InputDeviceListUpdate to the keyserver, remove old input API (m…
Browse files Browse the repository at this point in the history
…atrix-org#2536)

* Add `InputDeviceListUpdate` to the keyserver, remove old input API

* Fix copyright

* Log more information when a device list update fails
  • Loading branch information
neilalexander authored Jun 15, 2022
1 parent 1b90cc9 commit 7120eb6
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 56 deletions.
1 change: 1 addition & 0 deletions federationapi/federationapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func AddPublicRoutes(
TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
TopicDeviceListUpdate: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
ServerName: cfg.Matrix.ServerName,
UserAPI: userAPI,
}
Expand Down
17 changes: 17 additions & 0 deletions federationapi/producers/syncapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package producers
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"

Expand All @@ -34,6 +35,7 @@ type SyncAPIProducer struct {
TopicSendToDeviceEvent string
TopicTypingEvent string
TopicPresenceEvent string
TopicDeviceListUpdate string
JetStream nats.JetStreamContext
ServerName gomatrixserverlib.ServerName
UserAPI userapi.UserInternalAPI
Expand Down Expand Up @@ -161,3 +163,18 @@ func (p *SyncAPIProducer) SendPresence(
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
return err
}

func (p *SyncAPIProducer) SendDeviceListUpdate(
ctx context.Context, deviceListUpdate *gomatrixserverlib.DeviceListUpdateEvent,
) (err error) {
m := nats.NewMsg(p.TopicDeviceListUpdate)
m.Header.Set(jetstream.UserID, deviceListUpdate.UserID)
m.Data, err = json.Marshal(deviceListUpdate)
if err != nil {
return fmt.Errorf("json.Marshal: %w", err)
}

log.Debugf("Sending device list update: %+v", m.Header)
_, err = p.JetStream.PublishMsg(m, nats.Context(ctx))
return err
}
8 changes: 2 additions & 6 deletions federationapi/routing/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,11 +501,7 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
} else if serverName != t.Origin {
return
}
var inputRes keyapi.InputDeviceListUpdateResponse
t.keyAPI.InputDeviceListUpdate(context.Background(), &keyapi.InputDeviceListUpdateRequest{
Event: payload,
}, &inputRes)
if inputRes.Error != nil {
util.GetLogger(ctx).WithError(inputRes.Error).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate")
if err := t.producer.SendDeviceListUpdate(ctx, &payload); err != nil {
util.GetLogger(ctx).WithError(err).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate")
}
}
10 changes: 0 additions & 10 deletions keyserver/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ type FederationKeyAPI interface {
QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse)
QuerySignatures(ctx context.Context, req *QuerySignaturesRequest, res *QuerySignaturesResponse)
QueryDeviceMessages(ctx context.Context, req *QueryDeviceMessagesRequest, res *QueryDeviceMessagesResponse)
// InputDeviceListUpdate from a federated server EDU
InputDeviceListUpdate(ctx context.Context, req *InputDeviceListUpdateRequest, res *InputDeviceListUpdateResponse)
PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse)
PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse)
}
Expand Down Expand Up @@ -337,11 +335,3 @@ type QuerySignaturesResponse struct {
// The request error, if any
Error *KeyError
}

type InputDeviceListUpdateRequest struct {
Event gomatrixserverlib.DeviceListUpdateEvent
}

type InputDeviceListUpdateResponse struct {
Error *KeyError
}
82 changes: 82 additions & 0 deletions keyserver/consumers/devicelistupdate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consumers

import (
"context"
"encoding/json"

"github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)

// DeviceListUpdateConsumer consumes device list updates that came in over federation.
type DeviceListUpdateConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable string
topic string
updater *internal.DeviceListUpdater
}

// NewDeviceListUpdateConsumer creates a new DeviceListConsumer. Call Start() to begin consuming from key servers.
func NewDeviceListUpdateConsumer(
process *process.ProcessContext,
cfg *config.KeyServer,
js nats.JetStreamContext,
updater *internal.DeviceListUpdater,
) *DeviceListUpdateConsumer {
return &DeviceListUpdateConsumer{
ctx: process.Context(),
jetstream: js,
durable: cfg.Matrix.JetStream.Prefixed("KeyServerInputDeviceListConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
updater: updater,
}
}

// Start consuming from key servers
func (t *DeviceListUpdateConsumer) Start() error {
return jetstream.JetStreamConsumer(
t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
nats.DeliverAll(), nats.ManualAck(),
)
}

// onMessage is called in response to a message received on the
// key change events topic from the key server.
func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var m gomatrixserverlib.DeviceListUpdateEvent
if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("Failed to read from device list update input topic")
return true
}
err := t.updater.Update(ctx, m)
if err != nil {
logrus.WithFields(logrus.Fields{
"user_id": m.UserID,
"device_id": m.DeviceID,
"stream_id": m.StreamID,
"prev_id": m.PrevID,
}).WithError(err).Errorf("Failed to update device list")
return false
}
return true
}
11 changes: 0 additions & 11 deletions keyserver/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,6 @@ func (a *KeyInternalAPI) SetUserAPI(i userapi.KeyserverUserAPI) {
a.UserAPI = i
}

func (a *KeyInternalAPI) InputDeviceListUpdate(
ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse,
) {
err := a.Updater.Update(ctx, req.Event)
if err != nil {
res.Error = &api.KeyError{
Err: fmt.Sprintf("failed to update device list: %s", err),
}
}
}

func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) {
userIDs, latest, err := a.DB.KeyChanges(ctx, req.Offset, req.ToOffset)
if err != nil {
Expand Down
14 changes: 0 additions & 14 deletions keyserver/inthttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,6 @@ type httpKeyInternalAPI struct {
func (h *httpKeyInternalAPI) SetUserAPI(i userapi.KeyserverUserAPI) {
// no-op: doesn't need it
}
func (h *httpKeyInternalAPI) InputDeviceListUpdate(
ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse,
) {
span, ctx := opentracing.StartSpanFromContext(ctx, "InputDeviceListUpdate")
defer span.Finish()

apiURL := h.apiURL + InputDeviceListUpdatePath
err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
if err != nil {
res.Error = &api.KeyError{
Err: err.Error(),
}
}
}

func (h *httpKeyInternalAPI) PerformClaimKeys(
ctx context.Context,
Expand Down
11 changes: 0 additions & 11 deletions keyserver/inthttp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,6 @@ import (
)

func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) {
internalAPIMux.Handle(InputDeviceListUpdatePath,
httputil.MakeInternalAPI("inputDeviceListUpdate", func(req *http.Request) util.JSONResponse {
request := api.InputDeviceListUpdateRequest{}
response := api.InputDeviceListUpdateResponse{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
s.InputDeviceListUpdate(req.Context(), &request, &response)
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(PerformClaimKeysPath,
httputil.MakeInternalAPI("performClaimKeys", func(req *http.Request) util.JSONResponse {
request := api.PerformClaimKeysRequest{}
Expand Down
10 changes: 9 additions & 1 deletion keyserver/keyserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/gorilla/mux"
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/consumers"
"github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/keyserver/inthttp"
"github.com/matrix-org/dendrite/keyserver/producers"
Expand Down Expand Up @@ -59,10 +60,17 @@ func NewInternalAPI(
updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
ap.Updater = updater
go func() {
if err := updater.Start(); err != nil {
if err = updater.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start device list updater")
}
}()

dlConsumer := consumers.NewDeviceListUpdateConsumer(
base.ProcessContext, cfg, js, updater,
)
if err = dlConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start device list consumer")
}

return ap
}
6 changes: 6 additions & 0 deletions setup/jetstream/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (

var (
InputRoomEvent = "InputRoomEvent"
InputDeviceListUpdate = "InputDeviceListUpdate"
OutputRoomEvent = "OutputRoomEvent"
OutputSendToDeviceEvent = "OutputSendToDeviceEvent"
OutputKeyChangeEvent = "OutputKeyChangeEvent"
Expand Down Expand Up @@ -45,6 +46,11 @@ var streams = []*nats.StreamConfig{
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{
Name: InputDeviceListUpdate,
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{
Name: OutputRoomEvent,
Retention: nats.InterestPolicy,
Expand Down
3 changes: 0 additions & 3 deletions syncapi/internal/keychange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ func (k *mockKeyAPI) QueryOneTimeKeys(ctx context.Context, req *keyapi.QueryOneT
}
func (k *mockKeyAPI) QueryDeviceMessages(ctx context.Context, req *keyapi.QueryDeviceMessagesRequest, res *keyapi.QueryDeviceMessagesResponse) {

}
func (k *mockKeyAPI) InputDeviceListUpdate(ctx context.Context, req *keyapi.InputDeviceListUpdateRequest, res *keyapi.InputDeviceListUpdateResponse) {

}
func (k *mockKeyAPI) QuerySignatures(ctx context.Context, req *keyapi.QuerySignaturesRequest, res *keyapi.QuerySignaturesResponse) {
}
Expand Down

0 comments on commit 7120eb6

Please sign in to comment.