Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Remove dependence on joriks/opentracing_e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
JorikSchellekens committed Jul 17, 2019
1 parent 68ee84c commit 5de6656
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 103 deletions.
12 changes: 1 addition & 11 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from twisted.internet import defer

import synapse.logging.opentracing as opentracing
from synapse.api import errors
from synapse.api.constants import EventTypes
from synapse.api.errors import (
Expand Down Expand Up @@ -572,7 +571,6 @@ def _need_to_do_resync(self, user_id, updates):

defer.returnValue(False)

@opentracing.trace_deferred
@defer.inlineCallbacks
def user_device_resync(self, user_id):
"""Fetches all devices for a user and updates the device cache with them.
Expand All @@ -584,7 +582,6 @@ def user_device_resync(self, user_id):
request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
"""
opentracing.log_kv({"message": "Doing resync to update device list."})
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
try:
Expand All @@ -601,20 +598,13 @@ def user_device_resync(self, user_id):
# eventually become consistent.
return
except FederationDeniedError as e:
opentracing.set_tag("error", True)
opentracing.log_kv({"reason": "FederationDeniedError"})
logger.info(e)
return
except Exception as e:
except Exception:
# TODO: Remember that we are now out of sync and try again
# later
opentracing.set_tag("error", True)
opentracing.log_kv(
{"message": "Exception raised by federation request", "exception": e}
)
logger.exception("Failed to handle device list update for %s", user_id)
return
opentracing.log_kv({"result": result})
stream_id = result["stream_id"]
devices = result["devices"]

Expand Down
93 changes: 1 addition & 92 deletions synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

from twisted.internet import defer

import synapse.logging.opentracing as opentracing
from synapse.api.errors import CodeMessageException, SynapseError
from synapse.logging.context import (
LoggingContext,
Expand Down Expand Up @@ -51,7 +50,6 @@ def __init__(self, hs):
"client_keys", self.on_federation_query_client_keys
)

@opentracing.trace_deferred
@defer.inlineCallbacks
def query_devices(self, query_body, timeout):
""" Handle a device key query from a client
Expand Down Expand Up @@ -87,9 +85,6 @@ def query_devices(self, query_body, timeout):
else:
remote_queries[user_id] = device_ids

opentracing.set_tag("local_key_query", local_query)
opentracing.set_tag("remote_key_query", remote_queries)

# First get local devices.
failures = {}
results = {}
Expand Down Expand Up @@ -130,13 +125,10 @@ def query_devices(self, query_body, timeout):
r[user_id] = remote_queries[user_id]

# Now fetch any devices that we don't have in our cache
@opentracing.trace_deferred
@defer.inlineCallbacks
def do_remote_query(destination):
destination_query = remote_queries_not_in_cache[destination]

opentracing.set_tag("key_query", destination_query)

# We first consider whether we wish to update the dive list cache with
# the users device list. We want to track a user's devices when the
# authenticated user shares a room with the queried user and the query
Expand All @@ -151,43 +143,19 @@ def do_remote_query(destination):
with PreserveLoggingContext(LoggingContext.current_context()):
room_ids = yield self.store.get_rooms_for_user(user_id)
if not device_list and room_ids:
opentracing.log_kv(
{
"message": "Resyncing devices for user",
"user_id": user_id,
}
)
user_devices = yield self.device_handler.device_list_updater.user_device_resync(
user_id
)
user_devices = user_devices["devices"]
opentracing.log_kv(
{
"message": "got user devices",
"user_devices": user_devices,
}
)
for device in user_devices:
results[user_id] = {
device["device_id"]: device["keys"]
}
opentracing.log_kv(
{"adding user to user_ids_updated": user_id}
)
user_ids_updated.append(user_id)
else:
opentracing.log_kv(
{
"message": "Not resyncing devices for user",
"user_id": user_id,
}
)
except Exception as e:
failures[destination] = failures.get(destination, []).append(
_exception_to_failure(e)
)
opentracing.set_tag("error", True)
opentracing.log_kv({"exception": e})

if len(destination_query) == len(user_ids_updated):
# We've updated all the users in the query and we do not need to
Expand All @@ -198,13 +166,6 @@ def do_remote_query(destination):
for user_id in user_ids_updated:
destination_query.pop(user_id)

opentracing.log_kv(
{
"message": "Querying remote servers for keys",
"destination_query": destination_query,
"not querying": user_ids_updated,
}
)
try:
remote_result = yield self.federation.query_client_keys(
destination, {"device_keys": destination_query}, timeout=timeout
Expand All @@ -227,10 +188,8 @@ def do_remote_query(destination):
consumeErrors=True,
)
)
opentracing.log_kv({"device_keys": results, "failures": failures})
defer.returnValue({"device_keys": results, "failures": failures})

@opentracing.trace_deferred
@defer.inlineCallbacks
def query_local_devices(self, query):
"""Get E2E device keys for local users
Expand All @@ -243,22 +202,13 @@ def query_local_devices(self, query):
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
map from user_id -> device_id -> device details
"""
opentracing.set_tag("local_query", query)
local_query = []

result_dict = {}
for user_id, device_ids in query.items():
# we use UserID.from_string to catch invalid user ids
if not self.is_mine(UserID.from_string(user_id)):
logger.warning("Request for keys for non-local user %s", user_id)
opentracing.log_kv(
{
"message": "Requested a local key for a user which"
+ " was not local to the homeserver",
"user_id": user_id,
}
)
opentracing.set_tag("error", True)
raise SynapseError(400, "Not a user here")

if not device_ids:
Expand All @@ -283,7 +233,6 @@ def query_local_devices(self, query):
r["unsigned"]["device_display_name"] = display_name
result_dict[user_id][device_id] = r

opentracing.log_kv(results)
defer.returnValue(result_dict)

@defer.inlineCallbacks
Expand All @@ -294,7 +243,6 @@ def on_federation_query_client_keys(self, query_body):
res = yield self.query_local_devices(device_keys_query)
defer.returnValue({"device_keys": res})

@opentracing.trace_deferred
@defer.inlineCallbacks
def claim_one_time_keys(self, query, timeout):
local_query = []
Expand All @@ -309,9 +257,6 @@ def claim_one_time_keys(self, query, timeout):
domain = get_domain_from_id(user_id)
remote_queries.setdefault(domain, {})[user_id] = device_keys

opentracing.set_tag("local_key_query", local_query)
opentracing.set_tag("remote_key_query", remote_queries)

results = yield self.store.claim_e2e_one_time_keys(local_query)

json_result = {}
Expand All @@ -323,10 +268,8 @@ def claim_one_time_keys(self, query, timeout):
key_id: json.loads(json_bytes)
}

@opentracing.trace_deferred
@defer.inlineCallbacks
def claim_client_keys(destination):
opentracing.set_tag("destination", destination)
device_keys = remote_queries[destination]
try:
remote_result = yield self.federation.claim_client_keys(
Expand All @@ -339,8 +282,6 @@ def claim_client_keys(destination):
except Exception as e:
failure = _exception_to_failure(e)
failures[destination] = failure
opentracing.set_tag("error", True)
opentracing.set_tag("reason", failure)

yield make_deferred_yieldable(
defer.gatherResults(
Expand All @@ -364,63 +305,35 @@ def claim_client_keys(destination):
),
)

opentracing.log_kv({"one_time_keys": json_result, "failures": failures})

defer.returnValue({"one_time_keys": json_result, "failures": failures})

@opentracing.trace_deferred
@opentracing.tag_args
@defer.inlineCallbacks
@opentracing.tag_args
def upload_keys_for_user(self, user_id, device_id, keys):

time_now = self.clock.time_msec()

# TODO: Validate the JSON to make sure it has the right keys.
device_keys = keys.get("device_keys", None)
opentracing.set_tag("device_keys", device_keys)
if device_keys:
logger.info(
"Updating device_keys for device %r for user %s at %d",
device_id,
user_id,
time_now,
)
opentracing.log_kv(
{
"message": "Updating device_keys for user.",
"user_id": user_id,
"device_id": device_id,
}
)
# TODO: Sign the JSON with the server key
changed = yield self.store.set_e2e_device_keys(
user_id, device_id, time_now, device_keys
)
if changed:
# Only notify about device updates *if* the keys actually changed
yield self.device_handler.notify_device_update(user_id, [device_id])
else:
opentracing.log_kv(
{"message": "Not updating device_keys for user", "user_id": user_id}
)

one_time_keys = keys.get("one_time_keys", None)
opentracing.set_tag("one_time_keys", one_time_keys)
if one_time_keys:
opentracing.log_kv(
{
"message": "Updating one_time_keys for device.",
"user_id": user_id,
"device_id": device_id,
}
)
yield self._upload_one_time_keys_for_user(
user_id, device_id, time_now, one_time_keys
)
else:
opentracing.log_kv(
{"message": "Did not update one_time_keys", "reason": "no keys given"}
)

# the device should have been registered already, but it may have been
# deleted due to a race with a DELETE request. Or we may be using an
Expand All @@ -431,7 +344,6 @@ def upload_keys_for_user(self, user_id, device_id, keys):

result = yield self.store.count_e2e_one_time_keys(user_id, device_id)

opentracing.set_tag("one_time_key_counts", result)
defer.returnValue({"one_time_key_counts": result})

@defer.inlineCallbacks
Expand Down Expand Up @@ -475,9 +387,6 @@ def _upload_one_time_keys_for_user(
(algorithm, key_id, encode_canonical_json(key).decode("ascii"))
)

opentracing.log_kv(
{"message": "Inserting new one_time_keys.", "keys": new_keys}
)
yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)


Expand Down

0 comments on commit 5de6656

Please sign in to comment.