Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Return events in correct order for /events #570

Merged
merged 1 commit into from
Feb 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,7 @@ def get_new_events(
from_key=from_key,
to_key=to_key,
limit=limit or 10,
order='ASC',
)

events = list(room_events)
Expand Down
19 changes: 11 additions & 8 deletions synapse/storage/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ def app_service_interested(row):
defer.returnValue(results)

@defer.inlineCallbacks
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0):
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
order='DESC'):
from_id = RoomStreamToken.parse_stream_token(from_key).stream

room_ids = yield self._events_stream_cache.get_entities_changed(
Expand All @@ -172,7 +173,7 @@ def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0):
for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
res = yield defer.gatherResults([
preserve_fn(self.get_room_events_stream_for_room)(
room_id, from_key, to_key, limit,
room_id, from_key, to_key, limit, order=order,
)
for room_id in room_ids
])
Expand All @@ -181,7 +182,8 @@ def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0):
defer.returnValue(results)

@defer.inlineCallbacks
def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0):
def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0,
order='DESC'):
if from_key is not None:
from_id = RoomStreamToken.parse_stream_token(from_key).stream
else:
Expand All @@ -206,17 +208,17 @@ def f(txn):
" room_id = ?"
" AND not outlier"
" AND stream_ordering > ? AND stream_ordering <= ?"
" ORDER BY stream_ordering DESC LIMIT ?"
)
" ORDER BY stream_ordering %s LIMIT ?"
) % (order,)
txn.execute(sql, (room_id, from_id, to_id, limit))
else:
sql = (
"SELECT event_id, stream_ordering FROM events WHERE"
" room_id = ?"
" AND not outlier"
" AND stream_ordering <= ?"
" ORDER BY stream_ordering DESC LIMIT ?"
)
" ORDER BY stream_ordering %s LIMIT ?"
) % (order,)
txn.execute(sql, (room_id, to_id, limit))

rows = self.cursor_to_dict(txn)
Expand All @@ -232,7 +234,8 @@ def f(txn):

self._set_before_and_after(ret, rows, topo_order=False)

ret.reverse()
if order.lower() == "desc":
ret.reverse()

if rows:
key = "s%d" % min(r["stream_ordering"] for r in rows)
Expand Down