diff --git a/.dockerignore b/.dockerignore index 5670b8c15bf2..1c6905b1bbb3 100644 --- a/.dockerignore +++ b/.dockerignore @@ -8,6 +8,7 @@ !README.rst !pyproject.toml !poetry.lock +!Cargo.lock !build_rust.py rust/target diff --git a/.github/workflows/latest_deps.yml b/.github/workflows/latest_deps.yml index 8366ac9393e3..9a708286a465 100644 --- a/.github/workflows/latest_deps.yml +++ b/.github/workflows/latest_deps.yml @@ -201,10 +201,11 @@ jobs: open-issue: if: "failure() && github.event_name != 'push' && github.event_name != 'pull_request'" needs: - # TODO: should mypy be included here? It feels more brittle than the other two. + # TODO: should mypy be included here? It feels more brittle than the others. - mypy - trial - sytest + - complement runs-on: ubuntu-latest diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 91a080cca0e3..9fe61930a55a 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -94,7 +94,7 @@ jobs: - name: Install Rust uses: actions-rs/toolchain@v1 with: - toolchain: 1.61.0 + toolchain: 1.58.1 override: true components: clippy - uses: Swatinem/rust-cache@v2 @@ -112,7 +112,7 @@ jobs: - name: Install Rust uses: actions-rs/toolchain@v1 with: - toolchain: 1.61.0 + toolchain: 1.58.1 override: true components: rustfmt - uses: Swatinem/rust-cache@v2 @@ -204,7 +204,7 @@ jobs: - name: Install Rust uses: actions-rs/toolchain@v1 with: - toolchain: 1.61.0 + toolchain: 1.58.1 override: true - uses: Swatinem/rust-cache@v2 @@ -320,7 +320,7 @@ jobs: - name: Install Rust uses: actions-rs/toolchain@v1 with: - toolchain: 1.61.0 + toolchain: 1.58.1 override: true - uses: Swatinem/rust-cache@v2 @@ -452,7 +452,7 @@ jobs: - name: Install Rust uses: actions-rs/toolchain@v1 with: - toolchain: 1.61.0 + toolchain: 1.58.1 override: true - uses: Swatinem/rust-cache@v2 @@ -478,7 +478,7 @@ jobs: - name: Install Rust uses: actions-rs/toolchain@v1 with: - toolchain: 1.61.0 + toolchain: 1.58.1 override: true - uses: Swatinem/rust-cache@v2 diff --git a/.gitignore b/.gitignore index 31a60bb7bd38..15fbfdddf195 100644 --- a/.gitignore +++ b/.gitignore @@ -15,8 +15,9 @@ _trial_temp*/ .DS_Store __pycache__/ -# We do want the poetry lockfile. +# We do want the poetry and cargo lockfile. !poetry.lock +!Cargo.lock # stuff that is likely to exist when you run a server locally /*.db diff --git a/CHANGES.md b/CHANGES.md index fb91bc5f20c0..82b5526f94e7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,4 +1,4 @@ -Synapse 1.68.0rc1 (2022-09-20) +Synapse 1.68.0rc2 (2022-09-23) ============================== Please note that Synapse will now refuse to start if configured to use a version of SQLite earlier than 3.27. @@ -8,6 +8,23 @@ Those using packages will not be affected. On most platforms, installing with `p See the [upgrade notes](https://matrix-org.github.io/synapse/v1.68/upgrade.html#upgrading-to-v1670). +Bugfixes +-------- + +- Fix building from packaged sdist. Broke in v1.68.0rc1. ([\#13866](https://github.com/matrix-org/synapse/issues/13866)) + + +Internal Changes +---------------- + +- Fix the release script not publishing binary wheels. ([\#13850](https://github.com/matrix-org/synapse/issues/13850)) +- Lower minimum supported rustc version to 1.58.1. ([\#13857](https://github.com/matrix-org/synapse/issues/13857)) +- Lock Rust dependencies versions. ([\#13858](https://github.com/matrix-org/synapse/issues/13858)) + + +Synapse 1.68.0rc1 (2022-09-20) +============================== + Features -------- diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 000000000000..b952b6b4c01d --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,466 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "aho-corasick" +version = "0.7.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4f55bd91a0978cbfd91c457a164bab8b4001c833b7f323132c0a4e1922dd44e" +dependencies = [ + "memchr", +] + +[[package]] +name = "anyhow" +version = "1.0.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98161a4e3e2184da77bb14f02184cdd111e83bbbcc9979dfee3c44b9a85f5602" + +[[package]] +name = "arc-swap" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164" + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "blake2" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9cf849ee05b2ee5fba5e36f97ff8ec2533916700fc0758d40d92136a42f3388" +dependencies = [ + "digest", +] + +[[package]] +name = "block-buffer" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cce20737498f97b993470a6e536b8523f0af7892a4f928cceb1ac5e52ebe7e" +dependencies = [ + "generic-array", +] + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "digest" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c" +dependencies = [ + "block-buffer", + "crypto-common", + "subtle", +] + +[[package]] +name = "generic-array" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bff49e947297f3312447abdca79f45f4738097cc82b06e72054d2223f601f1b9" +dependencies = [ + "typenum", + "version_check", +] + +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + +[[package]] +name = "indoc" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adab1eaa3408fb7f0c777a73e7465fd5656136fc93b670eb6df3c88c2c1344e3" + +[[package]] +name = "itoa" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754" + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.132" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8371e4e5341c3a96db127eb2465ac681ced4c433e01dd0e938adbef26ba93ba5" + +[[package]] +name = "lock_api" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "memchr" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" + +[[package]] +name = "memoffset" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" +dependencies = [ + "autocfg", +] + +[[package]] +name = "once_cell" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "074864da206b4973b84eb91683020dbefd6a8c3f0f38e054d93954e891935e4e" + +[[package]] +name = "parking_lot" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-sys", +] + +[[package]] +name = "proc-macro2" +version = "1.0.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "pyo3" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12f72538a0230791398a0986a6518ebd88abc3fded89007b506ed072acc831e1" +dependencies = [ + "anyhow", + "cfg-if", + "indoc", + "libc", + "memoffset", + "parking_lot", + "pyo3-build-config", + "pyo3-ffi", + "pyo3-macros", + "unindent", +] + +[[package]] +name = "pyo3-build-config" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc4cf18c20f4f09995f3554e6bcf9b09bd5e4d6b67c562fdfaafa644526ba479" +dependencies = [ + "once_cell", + "target-lexicon", +] + +[[package]] +name = "pyo3-ffi" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a41877f28d8ebd600b6aa21a17b40c3b0fc4dfe73a27b6e81ab3d895e401b0e9" +dependencies = [ + "libc", + "pyo3-build-config", +] + +[[package]] +name = "pyo3-log" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5695ccff5060c13ca1751cf8c857a12da9b0bf0378cb071c5e0326f7c7e4c1b" +dependencies = [ + "arc-swap", + "log", + "pyo3", +] + +[[package]] +name = "pyo3-macros" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e81c8d4bcc2f216dc1b665412df35e46d12ee8d3d046b381aad05f1fcf30547" +dependencies = [ + "proc-macro2", + "pyo3-macros-backend", + "quote", + "syn", +] + +[[package]] +name = "pyo3-macros-backend" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85752a767ee19399a78272cc2ab625cd7d373b2e112b4b13db28de71fa892784" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "pythonize" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f7f0c136f5fbc01868185eef462800e49659eb23acca83b9e884367a006acb6" +dependencies = [ + "pyo3", + "serde", +] + +[[package]] +name = "quote" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags", +] + +[[package]] +name = "regex" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" + +[[package]] +name = "ryu" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" + +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] +name = "serde" +version = "1.0.145" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.145" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.85" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e55a28e3aaef9d5ce0506d0a14dbba8054ddc7e499ef522dd8b26859ec9d4a44" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "smallvec" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1" + +[[package]] +name = "subtle" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" + +[[package]] +name = "syn" +version = "1.0.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58dbef6ec655055e20b86b15a8cc6d439cca19b667537ac6a1369572d151ab13" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "synapse" +version = "0.1.0" +dependencies = [ + "anyhow", + "blake2", + "hex", + "lazy_static", + "log", + "pyo3", + "pyo3-log", + "pythonize", + "regex", + "serde", + "serde_json", +] + +[[package]] +name = "target-lexicon" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c02424087780c9b71cc96799eaeddff35af2bc513278cda5c99fc1f5d026d3c1" + +[[package]] +name = "typenum" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" + +[[package]] +name = "unicode-ident" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf" + +[[package]] +name = "unindent" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58ee9362deb4a96cef4d437d1ad49cffc9b9e92d202b6995674e928ce684f112" + +[[package]] +name = "version_check" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" + +[[package]] +name = "windows-sys" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" +dependencies = [ + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" + +[[package]] +name = "windows_i686_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" + +[[package]] +name = "windows_i686_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.36.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" diff --git a/changelog.d/13635.feature b/changelog.d/13635.feature new file mode 100644 index 000000000000..d86bf7ed809f --- /dev/null +++ b/changelog.d/13635.feature @@ -0,0 +1 @@ +Exponentially backoff from backfilling the same event over and over. diff --git a/changelog.d/13782.feature b/changelog.d/13782.feature new file mode 100644 index 000000000000..d0cb902dffd0 --- /dev/null +++ b/changelog.d/13782.feature @@ -0,0 +1 @@ +Experimental support for thread-specific receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771)). diff --git a/changelog.d/13796.misc b/changelog.d/13796.misc new file mode 100644 index 000000000000..9ed16623945e --- /dev/null +++ b/changelog.d/13796.misc @@ -0,0 +1 @@ +Use shared methods for cache invalidation when persisting events, remove duplicate codepaths. Contributed by Nick @ Beeper (@fizzadar). diff --git a/changelog.d/13818.doc b/changelog.d/13818.doc new file mode 100644 index 000000000000..16b31f507179 --- /dev/null +++ b/changelog.d/13818.doc @@ -0,0 +1 @@ +Update URL for the NixOS module for Synapse. diff --git a/changelog.d/13823.misc b/changelog.d/13823.misc new file mode 100644 index 000000000000..527d79f4b225 --- /dev/null +++ b/changelog.d/13823.misc @@ -0,0 +1 @@ +Faster Remote Room Joins: tell remote homeservers that we are unable to authorise them if they query a room which has partial state on our server. \ No newline at end of file diff --git a/changelog.d/13830.bugfix b/changelog.d/13830.bugfix new file mode 100644 index 000000000000..e6215806cd86 --- /dev/null +++ b/changelog.d/13830.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where typing events would be accepted from remote servers not present in a room. Also fix a bug where incoming typing events would cause other incoming events to get stuck during a fast join. diff --git a/changelog.d/13840.bugfix b/changelog.d/13840.bugfix new file mode 100644 index 000000000000..0f014439a8ed --- /dev/null +++ b/changelog.d/13840.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse v1.53.0 where the experimental implementation of [MSC3715](https://github.com/matrix-org/matrix-spec-proposals/pull/3715) would give incorrect results when paginating forward. diff --git a/changelog.d/13855.bugfix b/changelog.d/13855.bugfix new file mode 100644 index 000000000000..5ea8539bd8e6 --- /dev/null +++ b/changelog.d/13855.bugfix @@ -0,0 +1 @@ +Fix access token leak to logs from proxy agent. diff --git a/changelog.d/13859.misc b/changelog.d/13859.misc new file mode 100644 index 000000000000..2780a4af3c5b --- /dev/null +++ b/changelog.d/13859.misc @@ -0,0 +1 @@ +Raise issue if complement fails with latest deps. diff --git a/changelog.d/13870.doc b/changelog.d/13870.doc new file mode 100644 index 000000000000..2598bc270c56 --- /dev/null +++ b/changelog.d/13870.doc @@ -0,0 +1 @@ +Fix a cross-link from the register admin API to the `registration_shared_secret` configuration documentation. diff --git a/changelog.d/13874.misc b/changelog.d/13874.misc new file mode 100644 index 000000000000..499e488c35ad --- /dev/null +++ b/changelog.d/13874.misc @@ -0,0 +1 @@ +Faster room joins: Send device list updates to most servers in rooms with partial state. diff --git a/changelog.d/13876.misc b/changelog.d/13876.misc new file mode 100644 index 000000000000..ef371001151a --- /dev/null +++ b/changelog.d/13876.misc @@ -0,0 +1 @@ +Add comments to the Prometheus recording rules to make it clear which set of rules you need for Grafana or Prometheus Console. \ No newline at end of file diff --git a/changelog.d/13888.misc b/changelog.d/13888.misc new file mode 100644 index 000000000000..4ffd9bcede46 --- /dev/null +++ b/changelog.d/13888.misc @@ -0,0 +1 @@ +Faster room joins: Avoid waiting for full state when processing `/keys/changes` requests. diff --git a/changelog.d/13889.misc b/changelog.d/13889.misc new file mode 100644 index 000000000000..28bddb705967 --- /dev/null +++ b/changelog.d/13889.misc @@ -0,0 +1 @@ +Port push rules to using Rust. diff --git a/changelog.d/13905.misc b/changelog.d/13905.misc new file mode 100644 index 000000000000..efe3bed5f110 --- /dev/null +++ b/changelog.d/13905.misc @@ -0,0 +1 @@ +Fix mypy errors with canonicaljson 1.6.3. diff --git a/changelog.d/13909.bugfix b/changelog.d/13909.bugfix new file mode 100644 index 000000000000..883dd72919e5 --- /dev/null +++ b/changelog.d/13909.bugfix @@ -0,0 +1 @@ +Fix packaging to include `Cargo.lock` in `sdist`. diff --git a/contrib/prometheus/synapse-v2.rules b/contrib/prometheus/synapse-v2.rules index cbe6f7bebaa4..dde311322f52 100644 --- a/contrib/prometheus/synapse-v2.rules +++ b/contrib/prometheus/synapse-v2.rules @@ -1,7 +1,12 @@ groups: - name: synapse rules: - # These 3 rules are used in the included Prometheus console + + ### + ### Prometheus Console Only + ### The following rules are only needed if you use the Prometheus Console + ### in contrib/prometheus/consoles/synapse.html + ### - record: 'synapse_federation_client_sent' labels: type: "EDU" @@ -15,7 +20,6 @@ groups: type: "Query" expr: 'sum(synapse_federation_client_sent_queries) by (job)' - # These 3 rules are used in the included Prometheus console - record: 'synapse_federation_server_received' labels: type: "EDU" @@ -29,7 +33,6 @@ groups: type: "Query" expr: 'sum(synapse_federation_server_received_queries) by (job)' - # These 2 rules are used in the included Prometheus console - record: 'synapse_federation_transaction_queue_pending' labels: type: "EDU" @@ -38,8 +41,16 @@ groups: labels: type: "PDU" expr: 'synapse_federation_transaction_queue_pending_pdus + 0' + ### + ### End of 'Prometheus Console Only' rules block + ### + - # These 3 rules are used in the included Grafana dashboard + ### + ### Grafana Only + ### The following rules are only needed if you use the Grafana dashboard + ### in contrib/grafana/synapse.json + ### - record: synapse_storage_events_persisted_by_source_type expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep_total{origin_type="remote"}) labels: @@ -53,11 +64,11 @@ groups: labels: type: bridges - # This rule is used in the included Grafana dashboard - record: synapse_storage_events_persisted_by_event_type expr: sum without(origin_entity, origin_type) (synapse_storage_events_persisted_events_sep_total) - # This rule is used in the included Grafana dashboard - record: synapse_storage_events_persisted_by_origin expr: sum without(type) (synapse_storage_events_persisted_events_sep_total) - + ### + ### End of 'Grafana Only' rules block + ### diff --git a/debian/changelog b/debian/changelog index 6325ce29942c..339d477319f2 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +matrix-synapse-py3 (1.68.0~rc2) stable; urgency=medium + + * New Synapse release 1.68.0rc2. + + -- Synapse Packaging team Fri, 23 Sep 2022 09:40:10 +0100 + matrix-synapse-py3 (1.68.0~rc1) stable; urgency=medium * New Synapse release 1.68.0rc1. diff --git a/docs/admin_api/register_api.md b/docs/admin_api/register_api.md index f6be31b44312..dd2830f3a18a 100644 --- a/docs/admin_api/register_api.md +++ b/docs/admin_api/register_api.md @@ -5,7 +5,7 @@ non-interactive way. This is generally used for bootstrapping a Synapse instance with administrator accounts. To authenticate yourself to the server, you will need both the shared secret -([`registration_shared_secret`](../configuration/config_documentation.md#registration_shared_secret) +([`registration_shared_secret`](../usage/configuration/config_documentation.md#registration_shared_secret) in the homeserver configuration), and a one-time nonce. If the registration shared secret is not configured, this API is not enabled. diff --git a/docs/setup/installation.md b/docs/setup/installation.md index 96833effc6b9..dcd8f17c5e98 100644 --- a/docs/setup/installation.md +++ b/docs/setup/installation.md @@ -181,7 +181,7 @@ doas pkg_add synapse #### NixOS Robin Lambertz has packaged Synapse for NixOS at: - + ### Installing as a Python module from PyPI diff --git a/poetry.lock b/poetry.lock index 291f3c51e676..0f6d1cfa6944 100644 --- a/poetry.lock +++ b/poetry.lock @@ -95,14 +95,15 @@ webencodings = "*" [[package]] name = "canonicaljson" -version = "1.6.0" +version = "1.6.3" description = "Canonical JSON" category = "main" optional = false -python-versions = "~=3.7" +python-versions = ">=3.7" [package.dependencies] simplejson = ">=3.14.0" +typing-extensions = {version = ">=4.0.0", markers = "python_version < \"3.8\""} [package.extras] frozendict = ["frozendict (>=1.0)"] @@ -1682,8 +1683,8 @@ bleach = [ {file = "bleach-4.1.0.tar.gz", hash = "sha256:0900d8b37eba61a802ee40ac0061f8c2b5dee29c1927dd1d233e075ebf5a71da"}, ] canonicaljson = [ - {file = "canonicaljson-1.6.0-py3-none-any.whl", hash = "sha256:7230c2a2a3db07874f622af84effe41a655e07bf23734830e18a454e65d5b998"}, - {file = "canonicaljson-1.6.0.tar.gz", hash = "sha256:8739d5fd91aca7281d425660ae65af7663808c8177778965f67e90b16a2b2427"}, + {file = "canonicaljson-1.6.3-py3-none-any.whl", hash = "sha256:6ba3cf1702fa3d209b3e915a4e9a3e4ef194f1e8fca189c1f0b7a2a7686a27e6"}, + {file = "canonicaljson-1.6.3.tar.gz", hash = "sha256:ca59760bc274a899a0da75809d6909ae43e5123381fd6ef040a44d1952c0b448"}, ] certifi = [ {file = "certifi-2021.10.8-py2.py3-none-any.whl", hash = "sha256:d62a0163eb4c2344ac042ab2bdf75399a71a2d8c7d47eac2e2ee91b9d6339569"}, diff --git a/pyproject.toml b/pyproject.toml index 43f165b8d052..0a4242fb7201 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,7 +57,7 @@ manifest-path = "rust/Cargo.toml" [tool.poetry] name = "matrix-synapse" -version = "1.68.0rc1" +version = "1.68.0rc2" description = "Homeserver for the Matrix decentralised comms protocol" authors = ["Matrix.org Team and Contributors "] license = "Apache-2.0" @@ -86,8 +86,9 @@ include = [ { path = "tests", format = "sdist" }, { path = "UPGRADE.rst", format = "sdist" }, { path = "Cargo.toml", format = "sdist" }, + { path = "Cargo.lock", format = "sdist" }, { path = "rust/Cargo.toml", format = "sdist" }, - { path = "rust/Cargo.lock", format = "sdist" }, + { path = "rust/build.rs", format = "sdist" }, { path = "rust/src/**", format = "sdist" }, ] exclude = [ diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 8dc5f93ff111..44263bf77e5f 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -7,7 +7,7 @@ name = "synapse" version = "0.1.0" edition = "2021" -rust-version = "1.61.0" +rust-version = "1.58.1" [lib] name = "synapse" diff --git a/synapse/api/errors.py b/synapse/api/errors.py index a2f1d9e1c357..de5a5ea5da35 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -100,6 +100,12 @@ class Codes(str, Enum): UNREDACTED_CONTENT_DELETED = "FI.MAU.MSC2815_UNREDACTED_CONTENT_DELETED" + # Returned for federation requests where we can't process a request as we + # can't ensure the sending server is in a room which is partial-stated on + # our side. + # Part of MSC3895. + UNABLE_DUE_TO_PARTIAL_STATE = "ORG.MATRIX.MSC3895_UNABLE_DUE_TO_PARTIAL_STATE" + USER_AWAITING_APPROVAL = "ORG.MATRIX.MSC3866_USER_AWAITING_APPROVAL" diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index f3c54ee1d388..31834fb27dc4 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -78,7 +78,8 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: # MSC3706 (server-side support for partial state in /send_join responses) self.msc3706_enabled: bool = experimental.get("msc3706_enabled", False) - # experimental support for faster joins over federation (msc2775, msc3706) + # experimental support for faster joins over federation + # (MSC2775, MSC3706, MSC3895) # requires a target server with msc3706_enabled enabled. self.faster_joins_enabled: bool = experimental.get("faster_joins", False) @@ -97,6 +98,8 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: # MSC3786 (Add a default push rule to ignore m.room.server_acl events) self.msc3786_enabled: bool = experimental.get("msc3786_enabled", False) + # MSC3771: Thread read receipts + self.msc3771_enabled: bool = experimental.get("msc3771_enabled", False) # MSC3772: A push rule for mutual relations. self.msc3772_enabled: bool = experimental.get("msc3772_enabled", False) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 3bf84cf62515..907940e19eb0 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -530,13 +530,10 @@ async def _process_edu(edu_dict: JsonDict) -> None: async def on_room_state_request( self, origin: str, room_id: str, event_id: str ) -> Tuple[int, JsonDict]: + await self._event_auth_handler.assert_host_in_room(room_id, origin) origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) - in_room = await self._event_auth_handler.check_host_in_room(room_id, origin) - if not in_room: - raise AuthError(403, "Host not in room.") - # we grab the linearizer to protect ourselves from servers which hammer # us. In theory we might already have the response to this query # in the cache so we could return it without waiting for the linearizer @@ -560,13 +557,10 @@ async def on_state_ids_request( if not event_id: raise NotImplementedError("Specify an event") + await self._event_auth_handler.assert_host_in_room(room_id, origin) origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) - in_room = await self._event_auth_handler.check_host_in_room(room_id, origin) - if not in_room: - raise AuthError(403, "Host not in room.") - resp = await self._state_ids_resp_cache.wrap( (room_id, event_id), self._on_state_ids_request_compute, @@ -955,6 +949,7 @@ async def on_event_auth( self, origin: str, room_id: str, event_id: str ) -> Tuple[int, Dict[str, Any]]: async with self._server_linearizer.queue((origin, room_id)): + await self._event_auth_handler.assert_host_in_room(room_id, origin) origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 901e2310b706..bad262731c91 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -195,7 +195,9 @@ async def get_user_ids_changed( possibly_changed = set(changed) possibly_left = set() for room_id in rooms_changed: - current_state_ids = await self._state_storage.get_current_state_ids(room_id) + current_state_ids = await self._state_storage.get_current_state_ids( + room_id, await_full_state=False + ) # The user may have left the room # TODO: Check if they actually did or if we were just invited. @@ -234,7 +236,8 @@ async def get_user_ids_changed( # mapping from event_id -> state_dict prev_state_ids = await self._state_storage.get_state_ids_for_events( - event_ids + event_ids, + await_full_state=False, ) # Check if we've joined the room? If so we just blindly add all the users to @@ -688,11 +691,15 @@ async def _handle_new_device_update_async(self) -> None: # Ignore any users that aren't ours if self.hs.is_mine_id(user_id): hosts = set( - await self._storage_controllers.state.get_current_hosts_in_room( + await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation( room_id ) ) hosts.discard(self.server_name) + # For rooms with partial state, `hosts` is merely an + # approximation. When we transition to a full state room, we + # will have to send out device list updates to any servers we + # missed. # Check if we've already sent this update to some hosts if current_stream_id == stream_id: diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py index c3ddc5d18253..8249ca1ed26c 100644 --- a/synapse/handlers/event_auth.py +++ b/synapse/handlers/event_auth.py @@ -31,7 +31,6 @@ from synapse.events.builder import EventBuilder from synapse.events.snapshot import EventContext from synapse.types import StateMap, get_domain_from_id -from synapse.util.metrics import Measure if TYPE_CHECKING: from synapse.server import HomeServer @@ -156,9 +155,33 @@ async def get_user_which_could_invite( Codes.UNABLE_TO_GRANT_JOIN, ) - async def check_host_in_room(self, room_id: str, host: str) -> bool: - with Measure(self._clock, "check_host_in_room"): - return await self._store.is_host_joined(room_id, host) + async def is_host_in_room(self, room_id: str, host: str) -> bool: + return await self._store.is_host_joined(room_id, host) + + async def assert_host_in_room( + self, room_id: str, host: str, allow_partial_state_rooms: bool = False + ) -> None: + """ + Asserts that the host is in the room, or raises an AuthError. + + If the room is partial-stated, we raise an AuthError with the + UNABLE_DUE_TO_PARTIAL_STATE error code, unless `allow_partial_state_rooms` is true. + + If allow_partial_state_rooms is True and the room is partial-stated, + this function may return an incorrect result as we are not able to fully + track server membership in a room without full state. + """ + if not allow_partial_state_rooms and await self._store.is_partial_state_room( + room_id + ): + raise AuthError( + 403, + "Unable to authorise you right now; room is partial-stated here.", + errcode=Codes.UNABLE_DUE_TO_PARTIAL_STATE, + ) + + if not await self.is_host_in_room(room_id, host): + raise AuthError(403, "Host not in room.") async def check_restricted_join_rules( self, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index dd4b9f66d10e..e1a4265a640f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -226,9 +226,7 @@ async def _maybe_backfill_inner( """ backwards_extremities = [ _BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY) - for event_id, depth in await self.store.get_oldest_event_ids_with_depth_in_room( - room_id - ) + for event_id, depth in await self.store.get_backfill_points_in_room(room_id) ] insertion_events_to_be_backfilled: List[_BackfillPoint] = [] @@ -804,7 +802,7 @@ async def on_make_join_request( ) # now check that we are *still* in the room - is_in_room = await self._event_auth_handler.check_host_in_room( + is_in_room = await self._event_auth_handler.is_host_in_room( room_id, self.server_name ) if not is_in_room: @@ -1150,9 +1148,7 @@ async def get_state_ids_for_pdu(self, room_id: str, event_id: str) -> List[str]: async def on_backfill_request( self, origin: str, room_id: str, pdu_list: List[str], limit: int ) -> List[EventBase]: - in_room = await self._event_auth_handler.check_host_in_room(room_id, origin) - if not in_room: - raise AuthError(403, "Host not in room.") + await self._event_auth_handler.assert_host_in_room(room_id, origin) # Synapse asks for 100 events per backfill request. Do not allow more. limit = min(limit, 100) @@ -1198,21 +1194,17 @@ async def get_persisted_pdu( event_id, allow_none=True, allow_rejected=True ) - if event: - in_room = await self._event_auth_handler.check_host_in_room( - event.room_id, origin - ) - if not in_room: - raise AuthError(403, "Host not in room.") - - events = await filter_events_for_server( - self._storage_controllers, origin, [event] - ) - event = events[0] - return event - else: + if not event: return None + await self._event_auth_handler.assert_host_in_room(event.room_id, origin) + + events = await filter_events_for_server( + self._storage_controllers, origin, [event] + ) + event = events[0] + return event + async def on_get_missing_events( self, origin: str, @@ -1221,9 +1213,7 @@ async def on_get_missing_events( latest_events: List[str], limit: int, ) -> List[EventBase]: - in_room = await self._event_auth_handler.check_host_in_room(room_id, origin) - if not in_room: - raise AuthError(403, "Host not in room.") + await self._event_auth_handler.assert_host_in_room(room_id, origin) # Only allow up to 20 events to be retrieved per request. limit = min(limit, 20) @@ -1257,7 +1247,7 @@ async def exchange_third_party_invite( "state_key": target_user_id, } - if await self._event_auth_handler.check_host_in_room(room_id, self.hs.hostname): + if await self._event_auth_handler.is_host_in_room(room_id, self.hs.hostname): room_version_obj = await self.store.get_room_version(room_id) builder = self.event_builder_factory.for_room_version( room_version_obj, event_dict diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index efcdb8405783..2d7cde750609 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -238,7 +238,7 @@ async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None: # # Note that if we were never in the room then we would have already # dropped the event, since we wouldn't know the room version. - is_in_room = await self._event_auth_handler.check_host_in_room( + is_in_room = await self._event_auth_handler.is_host_in_room( room_id, self._server_name ) if not is_in_room: diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index d2bdb9c8be79..4768a34c07d5 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -63,6 +63,8 @@ def __init__(self, hs: "HomeServer"): self.clock = self.hs.get_clock() self.state = hs.get_state_handler() + self._msc3771_enabled = hs.config.experimental.msc3771_enabled + async def _received_remote_receipt(self, origin: str, content: JsonDict) -> None: """Called when we receive an EDU of type m.receipt from a remote HS.""" receipts = [] @@ -70,7 +72,7 @@ async def _received_remote_receipt(self, origin: str, content: JsonDict) -> None # If we're not in the room just ditch the event entirely. This is # probably an old server that has come back and thinks we're still in # the room (or we've been rejoined to the room by a state reset). - is_in_room = await self.event_auth_handler.check_host_in_room( + is_in_room = await self.event_auth_handler.is_host_in_room( room_id, self.server_name ) if not is_in_room: @@ -91,13 +93,23 @@ async def _received_remote_receipt(self, origin: str, content: JsonDict) -> None ) continue + # Check if these receipts apply to a thread. + thread_id = None + data = user_values.get("data", {}) + if self._msc3771_enabled and isinstance(data, dict): + thread_id = data.get("thread_id") + # If the thread ID is invalid, consider it missing. + if not isinstance(thread_id, str): + thread_id = None + receipts.append( ReadReceipt( room_id=room_id, receipt_type=receipt_type, user_id=user_id, event_ids=user_values["event_ids"], - data=user_values.get("data", {}), + thread_id=thread_id, + data=data, ) ) @@ -114,6 +126,7 @@ async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool: receipt.receipt_type, receipt.user_id, receipt.event_ids, + receipt.thread_id, receipt.data, ) @@ -146,7 +159,12 @@ async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool: return True async def received_client_receipt( - self, room_id: str, receipt_type: str, user_id: str, event_id: str + self, + room_id: str, + receipt_type: str, + user_id: str, + event_id: str, + thread_id: Optional[str], ) -> None: """Called when a client tells us a local user has read up to the given event_id in the room. @@ -156,6 +174,7 @@ async def received_client_receipt( receipt_type=receipt_type, user_id=user_id, event_ids=[event_id], + thread_id=thread_id, data={"ts": int(self.clock.time_msec())}, ) diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py index ebd445adcaf3..8d08625237bc 100644 --- a/synapse/handlers/room_summary.py +++ b/synapse/handlers/room_summary.py @@ -609,7 +609,7 @@ async def _is_local_room_accessible( # If this is a request over federation, check if the host is in the room or # has a user who could join the room. elif origin: - if await self._event_auth_handler.check_host_in_room( + if await self._event_auth_handler.is_host_in_room( room_id, origin ) or await self._store.is_host_invited(room_id, origin): return True @@ -624,9 +624,7 @@ async def _is_local_room_accessible( await self._event_auth_handler.get_rooms_that_allow_join(state_ids) ) for space_id in allowed_rooms: - if await self._event_auth_handler.check_host_in_room( - space_id, origin - ): + if await self._event_auth_handler.is_host_in_room(space_id, origin): return True logger.info( diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index a4cd8b8f0cee..f95369166985 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -340,7 +340,7 @@ async def _recv_edu(self, origin: str, content: JsonDict) -> None: # If we're not in the room just ditch the event entirely. This is # probably an old server that has come back and thinks we're still in # the room (or we've been rejoined to the room by a state reset). - is_in_room = await self.event_auth_handler.check_host_in_room( + is_in_room = await self.event_auth_handler.is_host_in_room( room_id, self.server_name ) if not is_in_room: @@ -362,11 +362,14 @@ async def _recv_edu(self, origin: str, content: JsonDict) -> None: ) return - domains = await self._storage_controllers.state.get_current_hosts_in_room( + # Let's check that the origin server is in the room before accepting the typing + # event. We don't want to block waiting on a partial state so take an + # approximation if needed. + domains = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation( room_id ) - if self.server_name in domains: + if user.domain in domains: logger.info("Got typing update from %s: %r", user_id, content) now = self.clock.time_msec() self._member_typing_until[member] = now + FEDERATION_TIMEOUT diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index b2a50c910507..1f8227896f65 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -36,6 +36,7 @@ from twisted.web.http_headers import Headers from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS +from synapse.http import redact_uri from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint, ProxyCredentials from synapse.types import ISynapseReactor @@ -220,7 +221,11 @@ def request( self._reactor, parsed_uri.host, parsed_uri.port, **self._endpoint_kwargs ) - logger.debug("Requesting %s via %s", uri, endpoint) + logger.debug( + "Requesting %s via %s", + redact_uri(uri.decode("ascii", errors="replace")), + endpoint, + ) if parsed_uri.scheme == b"https": tls_connection_creator = self._policy_for_https.creatorForNetloc( diff --git a/synapse/http/server.py b/synapse/http/server.py index 6068a94b409a..bcbfac2c9fff 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -705,7 +705,7 @@ def stopProducing(self) -> None: self._request = None -def _encode_json_bytes(json_object: Any) -> bytes: +def _encode_json_bytes(json_object: object) -> bytes: """ Encode an object into JSON. Returns an iterator of bytes. """ @@ -746,7 +746,7 @@ def respond_with_json( return None if canonical_json: - encoder = encode_canonical_json + encoder: Callable[[object], bytes] = encode_canonical_json else: encoder = _encode_json_bytes diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index cf9cd6833ba9..b2522f98cade 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -427,7 +427,8 @@ async def _on_new_receipts( receipt.receipt_type, receipt.user_id, [receipt.event_id], - receipt.data, + thread_id=receipt.thread_id, + data=receipt.data, ) await self.federation_sender.send_read_receipt(receipt_info) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 398bebeaa659..e01155ad597b 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -361,6 +361,7 @@ class ReceiptsStreamRow: receipt_type: str user_id: str event_id: str + thread_id: Optional[str] data: dict NAME = "receipts" diff --git a/synapse/rest/client/read_marker.py b/synapse/rest/client/read_marker.py index 5e5309653971..852838515cce 100644 --- a/synapse/rest/client/read_marker.py +++ b/synapse/rest/client/read_marker.py @@ -83,6 +83,8 @@ async def on_POST( receipt_type, user_id=requester.user.to_string(), event_id=event_id, + # Setting the thread ID is not possible with the /read_markers endpoint. + thread_id=None, ) return 200, {} diff --git a/synapse/rest/client/receipts.py b/synapse/rest/client/receipts.py index 5b7fad740265..f3ff156abe97 100644 --- a/synapse/rest/client/receipts.py +++ b/synapse/rest/client/receipts.py @@ -49,6 +49,7 @@ def __init__(self, hs: "HomeServer"): ReceiptTypes.READ_PRIVATE, ReceiptTypes.FULLY_READ, } + self._msc3771_enabled = hs.config.experimental.msc3771_enabled async def on_POST( self, request: SynapseRequest, room_id: str, receipt_type: str, event_id: str @@ -61,7 +62,17 @@ async def on_POST( f"Receipt type must be {', '.join(self._known_receipt_types)}", ) - parse_json_object_from_request(request, allow_empty_body=False) + body = parse_json_object_from_request(request) + + # Pull the thread ID, if one exists. + thread_id = None + if self._msc3771_enabled: + if "thread_id" in body: + thread_id = body.get("thread_id") + if not thread_id or not isinstance(thread_id, str): + raise SynapseError( + 400, "thread_id field must be a non-empty string" + ) await self.presence_handler.bump_presence_active_time(requester.user) @@ -77,6 +88,7 @@ async def on_POST( receipt_type, user_id=requester.user.to_string(), event_id=event_id, + thread_id=thread_id, ) return 200, {} diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index b3917a5abcc2..c95b0d6f197b 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -103,6 +103,8 @@ def on_GET(self, request: Request) -> Tuple[int, JsonDict]: "org.matrix.msc3030": self.config.experimental.msc3030_enabled, # Adds support for thread relations, per MSC3440. "org.matrix.msc3440.stable": True, # TODO: remove when "v1.3" is added above + # Support for thread read receipts. + "org.matrix.msc3771": self.config.experimental.msc3771_enabled, # Allows moderators to fetch redacted event content as described in MSC2815 "fi.mau.msc2815": self.config.experimental.msc2815_enabled, # Adds support for login token requests as per MSC3882 diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 303a5d529800..313e8aca7d0b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -91,6 +91,9 @@ def _invalidate_state_caches( self._attempt_to_invalidate_cache( "get_user_in_room_with_profile", (room_id, user_id) ) + self._attempt_to_invalidate_cache( + "get_rooms_for_user_with_stream_ordering", (user_id,) + ) # Purge other caches based on room state. self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index bbe568bf053e..bb60130afed7 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -23,6 +23,7 @@ List, Mapping, Optional, + Sequence, Tuple, ) @@ -406,6 +407,7 @@ async def get_current_state_ids( self, room_id: str, state_filter: Optional[StateFilter] = None, + await_full_state: bool = True, on_invalidate: Optional[Callable[[], None]] = None, ) -> StateMap[str]: """Get the current state event ids for a room based on the @@ -418,13 +420,17 @@ async def get_current_state_ids( room_id: The room to get the state IDs of. state_filter: The state filter used to fetch state from the database. + await_full_state: if true, will block if we do not yet have complete + state for the room. on_invalidate: Callback for when the `get_current_state_ids` cache for the room gets invalidated. Returns: The current state of the room. """ - if not state_filter or state_filter.must_await_full_state(self._is_mine_id): + if await_full_state and ( + not state_filter or state_filter.must_await_full_state(self._is_mine_id) + ): await self._partial_state_room_tracker.await_full_state(room_id) if state_filter and not state_filter.is_full(): @@ -524,12 +530,53 @@ async def get_current_state_event( return state_map.get(key) async def get_current_hosts_in_room(self, room_id: str) -> List[str]: - """Get current hosts in room based on current state.""" + """Get current hosts in room based on current state. + + Blocks until we have full state for the given room. This only happens for rooms + with partial state. + + Returns: + A list of hosts in the room, sorted by longest in the room first. (aka. + sorted by join with the lowest depth first). + """ await self._partial_state_room_tracker.await_full_state(room_id) return await self.stores.main.get_current_hosts_in_room(room_id) + async def get_current_hosts_in_room_or_partial_state_approximation( + self, room_id: str + ) -> Sequence[str]: + """Get approximation of current hosts in room based on current state. + + For rooms with full state, this is equivalent to `get_current_hosts_in_room`, + with the same order of results. + + For rooms with partial state, no blocking occurs. Instead, the list of hosts + in the room at the time of joining is combined with the list of hosts which + joined the room afterwards. The returned list may include hosts that are not + actually in the room and exclude hosts that are in the room, since we may + calculate state incorrectly during the partial state phase. The order of results + is arbitrary for rooms with partial state. + """ + # We have to read this list first to mitigate races with un-partial stating. + # This will be empty for rooms with full state. + hosts_at_join = await self.stores.main.get_partial_state_servers_at_join( + room_id + ) + + hosts_from_state = await self.stores.main.get_current_hosts_in_room(room_id) + hosts_from_state_set = set(hosts_from_state) + + # First take the list of hosts based on the current state. + # For rooms with partial state, this will be missing most hosts. + hosts = list(hosts_from_state) + # Then add in the list of hosts in the room at the time we joined. + # This will be an empty list for rooms with full state. + hosts.extend(host for host in hosts_at_join if host not in hosts_from_state_set) + + return hosts + async def get_users_in_room_with_profiles( self, room_id: str ) -> Dict[str, ProfileInfo]: diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 921cd4dc5ee0..9d116f6925e3 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -95,6 +95,8 @@ "local_media_repository_thumbnails": "local_media_repository_thumbnails_method_idx", "remote_media_cache_thumbnails": "remote_media_repository_thumbnails_method_idx", "event_push_summary": "event_push_summary_unique_index", + "receipts_linearized": "receipts_linearized_unique_index", + "receipts_graph": "receipts_graph_unique_index", } diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 2c421151c1be..db6ce83a2b32 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -223,15 +223,16 @@ def _invalidate_caches_for_event( # process triggering the invalidation is responsible for clearing any external # cached objects. self._invalidate_local_get_event_cache(event_id) - self.have_seen_event.invalidate((room_id, event_id)) - self.get_latest_event_ids_in_room.invalidate((room_id,)) - - self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,)) + self._attempt_to_invalidate_cache("have_seen_event", (room_id, event_id)) + self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,)) + self._attempt_to_invalidate_cache( + "get_unread_event_push_actions_by_room_for_user", (room_id,) + ) # The `_get_membership_from_event_id` is immutable, except for the # case where we look up an event *before* persisting it. - self._get_membership_from_event_id.invalidate((event_id,)) + self._attempt_to_invalidate_cache("_get_membership_from_event_id", (event_id,)) if not backfilled: self._events_stream_cache.entity_has_changed(room_id, stream_ordering) @@ -240,19 +241,26 @@ def _invalidate_caches_for_event( self._invalidate_local_get_event_cache(redacts) # Caches which might leak edits must be invalidated for the event being # redacted. - self.get_relations_for_event.invalidate((redacts,)) - self.get_applicable_edit.invalidate((redacts,)) + self._attempt_to_invalidate_cache("get_relations_for_event", (redacts,)) + self._attempt_to_invalidate_cache("get_applicable_edit", (redacts,)) if etype == EventTypes.Member: self._membership_stream_cache.entity_has_changed(state_key, stream_ordering) - self.get_invited_rooms_for_local_user.invalidate((state_key,)) + self._attempt_to_invalidate_cache( + "get_invited_rooms_for_local_user", (state_key,) + ) if relates_to: - self.get_relations_for_event.invalidate((relates_to,)) - self.get_aggregation_groups_for_event.invalidate((relates_to,)) - self.get_applicable_edit.invalidate((relates_to,)) - self.get_thread_summary.invalidate((relates_to,)) - self.get_thread_participated.invalidate((relates_to,)) + self._attempt_to_invalidate_cache("get_relations_for_event", (relates_to,)) + self._attempt_to_invalidate_cache( + "get_aggregation_groups_for_event", (relates_to,) + ) + self._attempt_to_invalidate_cache("get_applicable_edit", (relates_to,)) + self._attempt_to_invalidate_cache("get_thread_summary", (relates_to,)) + self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,)) + self._attempt_to_invalidate_cache( + "get_mutual_event_relations_for_rel_type", (relates_to,) + ) async def invalidate_cache_and_stream( self, cache_name: str, keys: Tuple[Any, ...] diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index ef477978ed63..3251fca6fbf2 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import datetime import itertools import logging from queue import Empty, PriorityQueue @@ -43,7 +44,7 @@ ) from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.signatures import SignatureWorkerStore -from synapse.storage.engines import PostgresEngine +from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -72,6 +73,13 @@ logger = logging.getLogger(__name__) +BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS: int = int( + datetime.timedelta(days=7).total_seconds() +) +BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS: int = int( + datetime.timedelta(hours=1).total_seconds() +) + # All the info we need while iterating the DAG while backfilling @attr.s(frozen=True, slots=True, auto_attribs=True) @@ -715,96 +723,189 @@ def _get_auth_chain_difference_txn( @trace @tag_args - async def get_oldest_event_ids_with_depth_in_room( - self, room_id: str + async def get_backfill_points_in_room( + self, + room_id: str, ) -> List[Tuple[str, int]]: - """Gets the oldest events(backwards extremities) in the room along with the - aproximate depth. - - We use this function so that we can compare and see if someones current - depth at their current scrollback is within pagination range of the - event extremeties. If the current depth is close to the depth of given - oldest event, we can trigger a backfill. + """ + Gets the oldest events(backwards extremities) in the room along with the + approximate depth. Sorted by depth, highest to lowest (descending). Args: room_id: Room where we want to find the oldest events Returns: - List of (event_id, depth) tuples + List of (event_id, depth) tuples. Sorted by depth, highest to lowest + (descending) """ - def get_oldest_event_ids_with_depth_in_room_txn( + def get_backfill_points_in_room_txn( txn: LoggingTransaction, room_id: str ) -> List[Tuple[str, int]]: - # Assemble a dictionary with event_id -> depth for the oldest events + # Assemble a tuple lookup of event_id -> depth for the oldest events # we know of in the room. Backwards extremeties are the oldest # events we know of in the room but we only know of them because - # some other event referenced them by prev_event and aren't peristed - # in our database yet (meaning we don't know their depth - # specifically). So we need to look for the aproximate depth from + # some other event referenced them by prev_event and aren't + # persisted in our database yet (meaning we don't know their depth + # specifically). So we need to look for the approximate depth from # the events connected to the current backwards extremeties. sql = """ - SELECT b.event_id, MAX(e.depth) FROM events as e + SELECT backward_extrem.event_id, event.depth FROM events AS event /** * Get the edge connections from the event_edges table * so we can see whether this event's prev_events points * to a backward extremity in the next join. */ - INNER JOIN event_edges as g - ON g.event_id = e.event_id + INNER JOIN event_edges AS edge + ON edge.event_id = event.event_id /** * We find the "oldest" events in the room by looking for * events connected to backwards extremeties (oldest events * in the room that we know of so far). */ - INNER JOIN event_backward_extremities as b - ON g.prev_event_id = b.event_id - WHERE b.room_id = ? AND g.is_state is ? - GROUP BY b.event_id + INNER JOIN event_backward_extremities AS backward_extrem + ON edge.prev_event_id = backward_extrem.event_id + /** + * We use this info to make sure we don't retry to use a backfill point + * if we've already attempted to backfill from it recently. + */ + LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info + ON + failed_backfill_attempt_info.room_id = backward_extrem.room_id + AND failed_backfill_attempt_info.event_id = backward_extrem.event_id + WHERE + backward_extrem.room_id = ? + /* We only care about non-state edges because we used to use + * `event_edges` for two different sorts of "edges" (the current + * event DAG, but also a link to the previous state, for state + * events). These legacy state event edges can be distinguished by + * `is_state` and are removed from the codebase and schema but + * because the schema change is in a background update, it's not + * necessarily safe to assume that it will have been completed. + */ + AND edge.is_state is ? /* False */ + /** + * Exponential back-off (up to the upper bound) so we don't retry the + * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. + * + * We use `1 << n` as a power of 2 equivalent for compatibility + * with older SQLites. The left shift equivalent only works with + * powers of 2 because left shift is a binary operation (base-2). + * Otherwise, we would use `power(2, n)` or the power operator, `2^n`. + */ + AND ( + failed_backfill_attempt_info.event_id IS NULL + OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */) + ) + /** + * Sort from highest to the lowest depth. Then tie-break on + * alphabetical order of the event_ids so we get a consistent + * ordering which is nice when asserting things in tests. + */ + ORDER BY event.depth DESC, backward_extrem.event_id DESC """ - txn.execute(sql, (room_id, False)) + if isinstance(self.database_engine, PostgresEngine): + least_function = "least" + elif isinstance(self.database_engine, Sqlite3Engine): + least_function = "min" + else: + raise RuntimeError("Unknown database engine") + + txn.execute( + sql % (least_function,), + ( + room_id, + False, + self._clock.time_msec(), + 1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS, + 1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS, + ), + ) return cast(List[Tuple[str, int]], txn.fetchall()) return await self.db_pool.runInteraction( - "get_oldest_event_ids_with_depth_in_room", - get_oldest_event_ids_with_depth_in_room_txn, + "get_backfill_points_in_room", + get_backfill_points_in_room_txn, room_id, ) @trace async def get_insertion_event_backward_extremities_in_room( - self, room_id: str + self, + room_id: str, ) -> List[Tuple[str, int]]: - """Get the insertion events we know about that we haven't backfilled yet. - - We use this function so that we can compare and see if someones current - depth at their current scrollback is within pagination range of the - insertion event. If the current depth is close to the depth of given - insertion event, we can trigger a backfill. + """ + Get the insertion events we know about that we haven't backfilled yet + along with the approximate depth. Sorted by depth, highest to lowest + (descending). Args: room_id: Room where we want to find the oldest events Returns: - List of (event_id, depth) tuples + List of (event_id, depth) tuples. Sorted by depth, highest to lowest + (descending) """ def get_insertion_event_backward_extremities_in_room_txn( txn: LoggingTransaction, room_id: str ) -> List[Tuple[str, int]]: sql = """ - SELECT b.event_id, MAX(e.depth) FROM insertion_events as i + SELECT + insertion_event_extremity.event_id, event.depth /* We only want insertion events that are also marked as backwards extremities */ - INNER JOIN insertion_event_extremities as b USING (event_id) + FROM insertion_event_extremities AS insertion_event_extremity /* Get the depth of the insertion event from the events table */ - INNER JOIN events AS e USING (event_id) - WHERE b.room_id = ? - GROUP BY b.event_id + INNER JOIN events AS event USING (event_id) + /** + * We use this info to make sure we don't retry to use a backfill point + * if we've already attempted to backfill from it recently. + */ + LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info + ON + failed_backfill_attempt_info.room_id = insertion_event_extremity.room_id + AND failed_backfill_attempt_info.event_id = insertion_event_extremity.event_id + WHERE + insertion_event_extremity.room_id = ? + /** + * Exponential back-off (up to the upper bound) so we don't retry the + * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc + * + * We use `1 << n` as a power of 2 equivalent for compatibility + * with older SQLites. The left shift equivalent only works with + * powers of 2 because left shift is a binary operation (base-2). + * Otherwise, we would use `power(2, n)` or the power operator, `2^n`. + */ + AND ( + failed_backfill_attempt_info.event_id IS NULL + OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */) + ) + /** + * Sort from highest to the lowest depth. Then tie-break on + * alphabetical order of the event_ids so we get a consistent + * ordering which is nice when asserting things in tests. + */ + ORDER BY event.depth DESC, insertion_event_extremity.event_id DESC """ - txn.execute(sql, (room_id,)) + if isinstance(self.database_engine, PostgresEngine): + least_function = "least" + elif isinstance(self.database_engine, Sqlite3Engine): + least_function = "min" + else: + raise RuntimeError("Unknown database engine") + + txn.execute( + sql % (least_function,), + ( + room_id, + self._clock.time_msec(), + 1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS, + 1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS, + ), + ) return cast(List[Tuple[str, int]], txn.fetchall()) return await self.db_pool.runInteraction( @@ -1539,7 +1640,12 @@ async def get_next_staged_event_id_for_room( self, room_id: str, ) -> Optional[Tuple[str, str]]: - """Get the next event ID in the staging area for the given room.""" + """ + Get the next event ID in the staging area for the given room. + + Returns: + Tuple of the `origin` and `event_id` + """ def _get_next_staged_event_id_for_room_txn( txn: LoggingTransaction, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 1b54a2eb5768..2e156a4a11fa 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -35,7 +35,7 @@ from prometheus_client import Counter import synapse.metrics -from synapse.api.constants import EventContentFields, EventTypes, RelationTypes +from synapse.api.constants import EventContentFields, EventTypes from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, relation_from_event @@ -410,6 +410,31 @@ def _persist_events_txn( assert min_stream_order assert max_stream_order + # Once the txn completes, invalidate all of the relevant caches. Note that we do this + # up here because it captures all the events_and_contexts before any are removed. + for event, _ in events_and_contexts: + self.store.invalidate_get_event_cache_after_txn(txn, event.event_id) + if event.redacts: + self.store.invalidate_get_event_cache_after_txn(txn, event.redacts) + + relates_to = None + relation = relation_from_event(event) + if relation: + relates_to = relation.parent_id + + assert event.internal_metadata.stream_ordering is not None + txn.call_after( + self.store._invalidate_caches_for_event, + event.internal_metadata.stream_ordering, + event.event_id, + event.room_id, + event.type, + getattr(event, "state_key", None), + event.redacts, + relates_to, + backfilled=False, + ) + self._update_forward_extremities_txn( txn, new_forward_extremities=new_forward_extremities, @@ -459,6 +484,7 @@ def _persist_events_txn( # We call this last as it assumes we've inserted the events into # room_memberships, where applicable. + # NB: This function invalidates all state related caches self._update_current_state_txn(txn, state_delta_for_room, min_stream_order) def _persist_event_auth_chain_txn( @@ -1172,13 +1198,6 @@ def _update_current_state_txn( ) # Invalidate the various caches - - for member in members_changed: - txn.call_after( - self.store.get_rooms_for_user_with_stream_ordering.invalidate, - (member,), - ) - self.store._invalidate_state_caches_and_stream( txn, room_id, members_changed ) @@ -1222,9 +1241,6 @@ def _update_forward_extremities_txn( self.db_pool.simple_delete_txn( txn, table="event_forward_extremities", keyvalues={"room_id": room_id} ) - txn.call_after( - self.store.get_latest_event_ids_in_room.invalidate, (room_id,) - ) self.db_pool.simple_insert_many_txn( txn, @@ -1294,8 +1310,6 @@ def _update_room_depths_txn( """ depth_updates: Dict[str, int] = {} for event, context in events_and_contexts: - # Remove the any existing cache entries for the event_ids - self.store.invalidate_get_event_cache_after_txn(txn, event.event_id) # Then update the `stream_ordering` position to mark the latest # event as the front of the room. This should not be done for # backfilled events because backfilled events have negative @@ -1697,16 +1711,7 @@ async def prefill() -> None: txn.async_call_after(prefill) def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None: - """Invalidate the caches for the redacted event. - - Note that these caches are also cleared as part of event replication in - _invalidate_caches_for_event. - """ assert event.redacts is not None - self.store.invalidate_get_event_cache_after_txn(txn, event.redacts) - txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,)) - txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,)) - self.db_pool.simple_upsert_txn( txn, table="redactions", @@ -1807,34 +1812,6 @@ def _store_room_members_txn( for event in events: assert event.internal_metadata.stream_ordering is not None - txn.call_after( - self.store._membership_stream_cache.entity_has_changed, - event.state_key, - event.internal_metadata.stream_ordering, - ) - txn.call_after( - self.store.get_invited_rooms_for_local_user.invalidate, - (event.state_key,), - ) - txn.call_after( - self.store.get_local_users_in_room.invalidate, - (event.room_id,), - ) - txn.call_after( - self.store.get_number_joined_users_in_room.invalidate, - (event.room_id,), - ) - txn.call_after( - self.store.get_user_in_room_with_profile.invalidate, - (event.room_id, event.state_key), - ) - - # The `_get_membership_from_event_id` is immutable, except for the - # case where we look up an event *before* persisting it. - txn.call_after( - self.store._get_membership_from_event_id.invalidate, - (event.event_id,), - ) # We update the local_current_membership table only if the event is # "current", i.e., its something that has just happened. @@ -1883,35 +1860,6 @@ def _handle_event_relations( }, ) - txn.call_after( - self.store.get_relations_for_event.invalidate, (relation.parent_id,) - ) - txn.call_after( - self.store.get_aggregation_groups_for_event.invalidate, - (relation.parent_id,), - ) - txn.call_after( - self.store.get_mutual_event_relations_for_rel_type.invalidate, - (relation.parent_id,), - ) - - if relation.rel_type == RelationTypes.REPLACE: - txn.call_after( - self.store.get_applicable_edit.invalidate, (relation.parent_id,) - ) - - if relation.rel_type == RelationTypes.THREAD: - txn.call_after( - self.store.get_thread_summary.invalidate, (relation.parent_id,) - ) - # It should be safe to only invalidate the cache if the user has not - # previously participated in the thread, but that's difficult (and - # potentially error-prone) so it is always invalidated. - txn.call_after( - self.store.get_thread_participated.invalidate, - (relation.parent_id, event.sender), - ) - def _handle_insertion_event( self, txn: LoggingTransaction, event: EventBase ) -> None: @@ -2213,28 +2161,6 @@ def _set_push_actions_for_event_and_users_txn( ), ) - room_to_event_ids: Dict[str, List[str]] = {} - for e in non_outlier_events: - room_to_event_ids.setdefault(e.room_id, []).append(e.event_id) - - for room_id, event_ids in room_to_event_ids.items(): - rows = self.db_pool.simple_select_many_txn( - txn, - table="event_push_actions_staging", - column="event_id", - iterable=event_ids, - keyvalues={}, - retcols=("user_id",), - ) - - user_ids = {row["user_id"] for row in rows} - - for user_id in user_ids: - txn.call_after( - self.store.get_unread_event_push_actions_by_room_for_user.invalidate, - (room_id, user_id), - ) - # Now we delete the staging area for *all* events that were being # persisted. txn.execute_batch( @@ -2249,11 +2175,6 @@ def _set_push_actions_for_event_and_users_txn( def _remove_push_actions_for_event_id_txn( self, txn: LoggingTransaction, room_id: str, event_id: str ) -> None: - # Sad that we have to blow away the cache for the whole room here - txn.call_after( - self.store.get_unread_event_push_actions_by_room_for_user.invalidate, - (room_id,), - ) txn.execute( "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", (room_id, event_id), diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index ddb8e80b69be..52fe0db92405 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -540,7 +540,9 @@ def _get_users_sent_receipts_between_txn(txn: LoggingTransaction) -> List[str]: async def get_all_updated_receipts( self, instance_name: str, last_id: int, current_id: int, limit: int - ) -> Tuple[List[Tuple[int, list]], int, bool]: + ) -> Tuple[ + List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]], int, bool + ]: """Get updates for receipts replication stream. Args: @@ -567,9 +569,13 @@ async def get_all_updated_receipts( def get_all_updated_receipts_txn( txn: LoggingTransaction, - ) -> Tuple[List[Tuple[int, list]], int, bool]: + ) -> Tuple[ + List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]], + int, + bool, + ]: sql = """ - SELECT stream_id, room_id, receipt_type, user_id, event_id, data + SELECT stream_id, room_id, receipt_type, user_id, event_id, thread_id, data FROM receipts_linearized WHERE ? < stream_id AND stream_id <= ? ORDER BY stream_id ASC @@ -578,8 +584,8 @@ def get_all_updated_receipts_txn( txn.execute(sql, (last_id, current_id, limit)) updates = cast( - List[Tuple[int, list]], - [(r[0], r[1:5] + (db_to_json(r[5]),)) for r in txn], + List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]], + [(r[0], r[1:6] + (db_to_json(r[6]),)) for r in txn], ) limited = False @@ -631,6 +637,7 @@ def _insert_linearized_receipt_txn( receipt_type: str, user_id: str, event_id: str, + thread_id: Optional[str], data: JsonDict, stream_id: int, ) -> Optional[int]: @@ -657,12 +664,27 @@ def _insert_linearized_receipt_txn( # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts if stream_ordering is not None: - sql = ( - "SELECT stream_ordering, event_id FROM events" - " INNER JOIN receipts_linearized AS r USING (event_id, room_id)" - " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" + if thread_id is None: + thread_clause = "r.thread_id IS NULL" + thread_args: Tuple[str, ...] = () + else: + thread_clause = "r.thread_id = ?" + thread_args = (thread_id,) + + sql = f""" + SELECT stream_ordering, event_id FROM events + INNER JOIN receipts_linearized AS r USING (event_id, room_id) + WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND {thread_clause} + """ + txn.execute( + sql, + ( + room_id, + receipt_type, + user_id, + ) + + thread_args, ) - txn.execute(sql, (room_id, receipt_type, user_id)) for so, eid in txn: if int(so) >= stream_ordering: @@ -682,21 +704,28 @@ def _insert_linearized_receipt_txn( self._receipts_stream_cache.entity_has_changed, room_id, stream_id ) + keyvalues = { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + } + where_clause = "" + if thread_id is None: + where_clause = "thread_id IS NULL" + else: + keyvalues["thread_id"] = thread_id + self.db_pool.simple_upsert_txn( txn, table="receipts_linearized", - keyvalues={ - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, - }, + keyvalues=keyvalues, values={ "stream_id": stream_id, "event_id": event_id, "event_stream_ordering": stream_ordering, "data": json_encoder.encode(data), - "thread_id": None, }, + where_clause=where_clause, # receipts_linearized has a unique constraint on # (user_id, room_id, receipt_type), so no need to lock lock=False, @@ -748,6 +777,7 @@ async def insert_receipt( receipt_type: str, user_id: str, event_ids: List[str], + thread_id: Optional[str], data: dict, ) -> Optional[Tuple[int, int]]: """Insert a receipt, either from local client or remote server. @@ -780,6 +810,7 @@ async def insert_receipt( receipt_type, user_id, linearized_event_id, + thread_id, data, stream_id=stream_id, # Read committed is actually beneficial here because we check for a receipt with @@ -794,7 +825,8 @@ async def insert_receipt( now = self._clock.time_msec() logger.debug( - "RR for event %s in %s (%i ms old)", + "Receipt %s for event %s in %s (%i ms old)", + receipt_type, linearized_event_id, room_id, now - event_ts, @@ -807,6 +839,7 @@ async def insert_receipt( receipt_type, user_id, event_ids, + thread_id, data, ) @@ -821,6 +854,7 @@ def _insert_graph_receipt_txn( receipt_type: str, user_id: str, event_ids: List[str], + thread_id: Optional[str], data: JsonDict, ) -> None: assert self._can_write_to_receipts @@ -832,19 +866,26 @@ def _insert_graph_receipt_txn( # FIXME: This shouldn't invalidate the whole cache txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,)) + keyvalues = { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + } + where_clause = "" + if thread_id is None: + where_clause = "thread_id IS NULL" + else: + keyvalues["thread_id"] = thread_id + self.db_pool.simple_upsert_txn( txn, table="receipts_graph", - keyvalues={ - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, - }, + keyvalues=keyvalues, values={ "event_ids": json_encoder.encode(event_ids), "data": json_encoder.encode(data), - "thread_id": None, }, + where_clause=where_clause, # receipts_graph has a unique constraint on # (user_id, room_id, receipt_type), so no need to lock lock=False, diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 7bd27790ebfe..898947af9536 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -51,6 +51,8 @@ class _RelatedEvent: event_id: str # The sender of the related event. sender: str + topological_ordering: Optional[int] + stream_ordering: int class RelationsWorkerStore(SQLBaseStore): @@ -91,6 +93,9 @@ async def get_relations_for_event( # it. The `event_id` must match the `event.event_id`. assert event.event_id == event_id + # Ensure bad limits aren't being passed in. + assert limit >= 0 + where_clause = ["relates_to_id = ?", "room_id = ?"] where_args: List[Union[str, int]] = [event.event_id, room_id] is_redacted = event.internal_metadata.is_redacted() @@ -139,21 +144,34 @@ def _get_recent_references_for_event_txn( ) -> Tuple[List[_RelatedEvent], Optional[StreamToken]]: txn.execute(sql, where_args + [limit + 1]) - last_topo_id = None - last_stream_id = None events = [] - for row in txn: + for event_id, relation_type, sender, topo_ordering, stream_ordering in txn: # Do not include edits for redacted events as they leak event # content. - if not is_redacted or row[1] != RelationTypes.REPLACE: - events.append(_RelatedEvent(row[0], row[2])) - last_topo_id = row[3] - last_stream_id = row[4] + if not is_redacted or relation_type != RelationTypes.REPLACE: + events.append( + _RelatedEvent(event_id, sender, topo_ordering, stream_ordering) + ) - # If there are more events, generate the next pagination key. + # If there are more events, generate the next pagination key from the + # last event returned. next_token = None - if len(events) > limit and last_topo_id and last_stream_id: - next_key = RoomStreamToken(last_topo_id, last_stream_id) + if len(events) > limit: + # Instead of using the last row (which tells us there is more + # data), use the last row to be returned. + events = events[:limit] + + topo = events[-1].topological_ordering + token = events[-1].stream_ordering + if direction == "b": + # Tokens are positions between events. + # This token points *after* the last event in the chunk. + # We need it to point to the event before it in the chunk + # when we are going backwards so we subtract one from the + # stream part. + token -= 1 + next_key = RoomStreamToken(topo, token) + if from_token: next_token = from_token.copy_and_replace( StreamKeyType.ROOM, next_key diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index bef66f199269..5dd116d76653 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -25,6 +25,7 @@ List, Mapping, Optional, + Sequence, Tuple, Union, cast, @@ -1133,6 +1134,22 @@ def get_rooms_for_retention_period_in_range_txn( get_rooms_for_retention_period_in_range_txn, ) + async def get_partial_state_servers_at_join(self, room_id: str) -> Sequence[str]: + """Gets the list of servers in a partial state room at the time we joined it. + + Returns: + The `servers_in_room` list from the `/send_join` response for partial state + rooms. May not be accurate or complete, as it comes from a remote + homeserver. + An empty list for full state rooms. + """ + return await self.db_pool.simple_select_onecol( + "partial_state_rooms_servers", + keyvalues={"room_id": room_id}, + retcol="server_name", + desc="get_partial_state_servers_at_join", + ) + async def get_partial_state_rooms_and_servers( self, ) -> Mapping[str, Collection[str]]: diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 3f9bfaeac5cb..530f04e149b3 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1334,15 +1334,15 @@ def _paginate_room_events_txn( if rows: topo = rows[-1].topological_ordering - toke = rows[-1].stream_ordering + token = rows[-1].stream_ordering if direction == "b": # Tokens are positions between events. # This token points *after* the last event in the chunk. # We need it to point to the event before it in the chunk # when we are going backwards so we subtract one from the # stream part. - toke -= 1 - next_token = RoomStreamToken(topo, toke) + token -= 1 + next_token = RoomStreamToken(topo, token) else: # TODO (erikj): We should work out what to do here instead. next_token = to_token if to_token else from_token diff --git a/synapse/types.py b/synapse/types.py index ec44601f5424..773f0438d5bd 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -835,6 +835,7 @@ class ReadReceipt: receipt_type: str user_id: str event_ids: List[str] + thread_id: Optional[str] data: JsonDict diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index a5aa500ef85d..f1e357764ff4 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -49,7 +49,12 @@ def test_send_receipts(self): sender = self.hs.get_federation_sender() receipt = ReadReceipt( - "room_id", "m.read", "user_id", ["event_id"], {"ts": 1234} + "room_id", + "m.read", + "user_id", + ["event_id"], + thread_id=None, + data={"ts": 1234}, ) self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) @@ -89,7 +94,12 @@ def test_send_receipts_with_backoff(self): sender = self.hs.get_federation_sender() receipt = ReadReceipt( - "room_id", "m.read", "user_id", ["event_id"], {"ts": 1234} + "room_id", + "m.read", + "user_id", + ["event_id"], + thread_id=None, + data={"ts": 1234}, ) self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) @@ -121,7 +131,12 @@ def test_send_receipts_with_backoff(self): # send the second RR receipt = ReadReceipt( - "room_id", "m.read", "user_id", ["other_id"], {"ts": 1234} + "room_id", + "m.read", + "user_id", + ["other_id"], + thread_id=None, + data={"ts": 1234}, ) self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) self.pump() diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index b17af2725b3d..af24c4984da5 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -447,6 +447,7 @@ def test_sending_read_receipt_batches_to_application_services(self): receipt_type="m.read", user_id=self.local_user, event_ids=[f"$eventid_{i}"], + thread_id=None, data={}, ) ) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 8adba29d7f9c..9c821b3042b8 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -129,7 +129,7 @@ async def check_user_in_room(room_id: str, requester: Requester) -> None: async def check_host_in_room(room_id: str, server_name: str) -> bool: return room_id == ROOM_ID - hs.get_event_auth_handler().check_host_in_room = check_host_in_room + hs.get_event_auth_handler().is_host_in_room = check_host_in_room async def get_current_hosts_in_room(room_id: str): return {member.domain for member in self.room_members} @@ -138,6 +138,10 @@ async def get_current_hosts_in_room(room_id: str): get_current_hosts_in_room ) + hs.get_storage_controllers().state.get_current_hosts_in_room_or_partial_state_approximation = ( + get_current_hosts_in_room + ) + async def get_users_in_room(room_id: str): return {str(u) for u in self.room_members} diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 49a21e2e8581..efd92793c078 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -171,7 +171,7 @@ def test_push_actions_for_user(self, send_receipt: bool): if send_receipt: self.get_success( self.master_store.insert_receipt( - ROOM_ID, ReceiptTypes.READ, USER_ID_2, [event1.event_id], {} + ROOM_ID, ReceiptTypes.READ, USER_ID_2, [event1.event_id], None, {} ) ) diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py index eb0011784518..ede6d0c11877 100644 --- a/tests/replication/tcp/streams/test_receipts.py +++ b/tests/replication/tcp/streams/test_receipts.py @@ -33,7 +33,12 @@ def test_receipt(self): # tell the master to send a new receipt self.get_success( self.hs.get_datastores().main.insert_receipt( - "!room:blue", "m.read", USER_ID, ["$event:blue"], {"a": 1} + "!room:blue", + "m.read", + USER_ID, + ["$event:blue"], + thread_id=None, + data={"a": 1}, ) ) self.replicate() @@ -48,6 +53,7 @@ def test_receipt(self): self.assertEqual("m.read", row.receipt_type) self.assertEqual(USER_ID, row.user_id) self.assertEqual("$event:blue", row.event_id) + self.assertIsNone(row.thread_id) self.assertEqual({"a": 1}, row.data) # Now let's disconnect and insert some data. @@ -57,7 +63,12 @@ def test_receipt(self): self.get_success( self.hs.get_datastores().main.insert_receipt( - "!room2:blue", "m.read", USER_ID, ["$event2:foo"], {"a": 2} + "!room2:blue", + "m.read", + USER_ID, + ["$event2:foo"], + thread_id=None, + data={"a": 2}, ) ) self.replicate() diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 651f4f415d1d..d33e34d82957 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -788,6 +788,7 @@ def test_basic_paginate_relations(self) -> None: channel.json_body["chunk"][0], ) + @unittest.override_config({"experimental_features": {"msc3715_enabled": True}}) def test_repeated_paginate_relations(self) -> None: """Test that if we paginate using a limit and tokens then we get the expected events. @@ -809,7 +810,7 @@ def test_repeated_paginate_relations(self) -> None: channel = self.make_request( "GET", - f"/_matrix/client/v1/rooms/{self.room}/relations/{self.parent_id}?limit=1{from_token}", + f"/_matrix/client/v1/rooms/{self.room}/relations/{self.parent_id}?limit=3{from_token}", access_token=self.user_token, ) self.assertEqual(200, channel.code, channel.json_body) @@ -827,6 +828,32 @@ def test_repeated_paginate_relations(self) -> None: found_event_ids.reverse() self.assertEqual(found_event_ids, expected_event_ids) + # Test forward pagination. + prev_token = "" + found_event_ids = [] + for _ in range(20): + from_token = "" + if prev_token: + from_token = "&from=" + prev_token + + channel = self.make_request( + "GET", + f"/_matrix/client/v1/rooms/{self.room}/relations/{self.parent_id}?org.matrix.msc3715.dir=f&limit=3{from_token}", + access_token=self.user_token, + ) + self.assertEqual(200, channel.code, channel.json_body) + + found_event_ids.extend(e["event_id"] for e in channel.json_body["chunk"]) + next_batch = channel.json_body.get("next_batch") + + self.assertNotEqual(prev_token, next_batch) + prev_token = next_batch + + if not prev_token: + break + + self.assertEqual(found_event_ids, expected_event_ids) + def test_pagination_from_sync_and_messages(self) -> None: """Pagination tokens from /sync and /messages can be used to paginate /relations.""" channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", "A") diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index a6679e131201..85739c464ec7 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -12,25 +12,38 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Tuple, Union +import datetime +from typing import Dict, List, Tuple, Union import attr from parameterized import parameterized +from twisted.test.proto_helpers import MemoryReactor + +from synapse.api.constants import EventTypes from synapse.api.room_versions import ( KNOWN_ROOM_VERSIONS, EventFormatVersions, RoomVersion, ) from synapse.events import _EventInternalMetadata -from synapse.util import json_encoder +from synapse.server import HomeServer +from synapse.storage.database import LoggingTransaction +from synapse.types import JsonDict +from synapse.util import Clock, json_encoder import tests.unittest import tests.utils +@attr.s(auto_attribs=True, frozen=True, slots=True) +class _BackfillSetupInfo: + room_id: str + depth_map: Dict[str, int] + + class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): - def prepare(self, reactor, clock, hs): + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main def test_get_prev_events_for_room(self): @@ -571,11 +584,471 @@ def prev_event_format(prev_event_id: str) -> Union[Tuple[str, dict], str]: ) self.assertEqual(count, 1) - _, event_id = self.get_success( + next_staged_event_info = self.get_success( self.store.get_next_staged_event_id_for_room(room_id) ) + assert next_staged_event_info + _, event_id = next_staged_event_info self.assertEqual(event_id, "$fake_event_id_500") + def _setup_room_for_backfill_tests(self) -> _BackfillSetupInfo: + """ + Sets up a room with various events and backward extremities to test + backfill functions against. + + Returns: + _BackfillSetupInfo including the `room_id` to test against and + `depth_map` of events in the room + """ + room_id = "!backfill-room-test:some-host" + + # The silly graph we use to test grabbing backward extremities, + # where the top is the oldest events. + # 1 (oldest) + # | + # 2 ⹁ + # | \ + # | [b1, b2, b3] + # | | + # | A + # | / + # 3 { + # | \ + # | [b4, b5, b6] + # | | + # | B + # | / + # 4 ´ + # | + # 5 (newest) + + event_graph: Dict[str, List[str]] = { + "1": [], + "2": ["1"], + "3": ["2", "A"], + "4": ["3", "B"], + "5": ["4"], + "A": ["b1", "b2", "b3"], + "b1": ["2"], + "b2": ["2"], + "b3": ["2"], + "B": ["b4", "b5", "b6"], + "b4": ["3"], + "b5": ["3"], + "b6": ["3"], + } + + depth_map: Dict[str, int] = { + "1": 1, + "2": 2, + "b1": 3, + "b2": 3, + "b3": 3, + "A": 4, + "3": 5, + "b4": 6, + "b5": 6, + "b6": 6, + "B": 7, + "4": 8, + "5": 9, + } + + # The events we have persisted on our server. + # The rest are events in the room but not backfilled tet. + our_server_events = {"5", "4", "B", "3", "A"} + + complete_event_dict_map: Dict[str, JsonDict] = {} + stream_ordering = 0 + for (event_id, prev_event_ids) in event_graph.items(): + depth = depth_map[event_id] + + complete_event_dict_map[event_id] = { + "event_id": event_id, + "type": "test_regular_type", + "room_id": room_id, + "sender": "@sender", + "prev_event_ids": prev_event_ids, + "auth_event_ids": [], + "origin_server_ts": stream_ordering, + "depth": depth, + "stream_ordering": stream_ordering, + "content": {"body": "event" + event_id}, + } + + stream_ordering += 1 + + def populate_db(txn: LoggingTransaction): + # Insert the room to satisfy the foreign key constraint of + # `event_failed_pull_attempts` + self.store.db_pool.simple_insert_txn( + txn, + "rooms", + { + "room_id": room_id, + "creator": "room_creator_user_id", + "is_public": True, + "room_version": "6", + }, + ) + + # Insert our server events + for event_id in our_server_events: + event_dict = complete_event_dict_map[event_id] + + self.store.db_pool.simple_insert_txn( + txn, + table="events", + values={ + "event_id": event_dict.get("event_id"), + "type": event_dict.get("type"), + "room_id": event_dict.get("room_id"), + "depth": event_dict.get("depth"), + "topological_ordering": event_dict.get("depth"), + "stream_ordering": event_dict.get("stream_ordering"), + "processed": True, + "outlier": False, + }, + ) + + # Insert the event edges + for event_id in our_server_events: + for prev_event_id in event_graph[event_id]: + self.store.db_pool.simple_insert_txn( + txn, + table="event_edges", + values={ + "event_id": event_id, + "prev_event_id": prev_event_id, + "room_id": room_id, + }, + ) + + # Insert the backward extremities + prev_events_of_our_events = { + prev_event_id + for our_server_event in our_server_events + for prev_event_id in complete_event_dict_map[our_server_event][ + "prev_event_ids" + ] + } + backward_extremities = prev_events_of_our_events - our_server_events + for backward_extremity in backward_extremities: + self.store.db_pool.simple_insert_txn( + txn, + table="event_backward_extremities", + values={ + "event_id": backward_extremity, + "room_id": room_id, + }, + ) + + self.get_success( + self.store.db_pool.runInteraction( + "_setup_room_for_backfill_tests_populate_db", + populate_db, + ) + ) + + return _BackfillSetupInfo(room_id=room_id, depth_map=depth_map) + + def test_get_backfill_points_in_room(self): + """ + Test to make sure we get some backfill points + """ + setup_info = self._setup_room_for_backfill_tests() + room_id = setup_info.room_id + + backfill_points = self.get_success( + self.store.get_backfill_points_in_room(room_id) + ) + backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] + self.assertListEqual( + backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"] + ) + + def test_get_backfill_points_in_room_excludes_events_we_have_attempted( + self, + ): + """ + Test to make sure that events we have attempted to backfill (and within + backoff timeout duration) do not show up as an event to backfill again. + """ + setup_info = self._setup_room_for_backfill_tests() + room_id = setup_info.room_id + + # Record some attempts to backfill these events which will make + # `get_backfill_points_in_room` exclude them because we + # haven't passed the backoff interval. + self.get_success( + self.store.record_event_failed_pull_attempt(room_id, "b5", "fake cause") + ) + self.get_success( + self.store.record_event_failed_pull_attempt(room_id, "b4", "fake cause") + ) + self.get_success( + self.store.record_event_failed_pull_attempt(room_id, "b3", "fake cause") + ) + self.get_success( + self.store.record_event_failed_pull_attempt(room_id, "b2", "fake cause") + ) + + # No time has passed since we attempted to backfill ^ + + backfill_points = self.get_success( + self.store.get_backfill_points_in_room(room_id) + ) + backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] + # Only the backfill points that we didn't record earlier exist here. + self.assertListEqual(backfill_event_ids, ["b6", "2", "b1"]) + + def test_get_backfill_points_in_room_attempted_event_retry_after_backoff_duration( + self, + ): + """ + Test to make sure after we fake attempt to backfill event "b3" many times, + we can see retry and see the "b3" again after the backoff timeout duration + has exceeded. + """ + setup_info = self._setup_room_for_backfill_tests() + room_id = setup_info.room_id + + # Record some attempts to backfill these events which will make + # `get_backfill_points_in_room` exclude them because we + # haven't passed the backoff interval. + self.get_success( + self.store.record_event_failed_pull_attempt(room_id, "b3", "fake cause") + ) + self.get_success( + self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause") + ) + self.get_success( + self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause") + ) + self.get_success( + self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause") + ) + self.get_success( + self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause") + ) + + # Now advance time by 2 hours and we should only be able to see "b3" + # because we have waited long enough for the single attempt (2^1 hours) + # but we still shouldn't see "b1" because we haven't waited long enough + # for this many attempts. We didn't do anything to "b2" so it should be + # visible regardless. + self.reactor.advance(datetime.timedelta(hours=2).total_seconds()) + + # Make sure that "b1" is not in the list because we've + # already attempted many times + backfill_points = self.get_success( + self.store.get_backfill_points_in_room(room_id) + ) + backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] + self.assertListEqual(backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2"]) + + # Now advance time by 20 hours (above 2^4 because we made 4 attemps) and + # see if we can now backfill it + self.reactor.advance(datetime.timedelta(hours=20).total_seconds()) + + # Try again after we advanced enough time and we should see "b3" again + backfill_points = self.get_success( + self.store.get_backfill_points_in_room(room_id) + ) + backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] + self.assertListEqual( + backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"] + ) + + def _setup_room_for_insertion_backfill_tests(self) -> _BackfillSetupInfo: + """ + Sets up a room with various insertion event backward extremities to test + backfill functions against. + + Returns: + _BackfillSetupInfo including the `room_id` to test against and + `depth_map` of events in the room + """ + room_id = "!backfill-room-test:some-host" + + depth_map: Dict[str, int] = { + "1": 1, + "2": 2, + "insertion_eventA": 3, + "3": 4, + "insertion_eventB": 5, + "4": 6, + "5": 7, + } + + def populate_db(txn: LoggingTransaction): + # Insert the room to satisfy the foreign key constraint of + # `event_failed_pull_attempts` + self.store.db_pool.simple_insert_txn( + txn, + "rooms", + { + "room_id": room_id, + "creator": "room_creator_user_id", + "is_public": True, + "room_version": "6", + }, + ) + + # Insert our server events + stream_ordering = 0 + for event_id, depth in depth_map.items(): + self.store.db_pool.simple_insert_txn( + txn, + table="events", + values={ + "event_id": event_id, + "type": EventTypes.MSC2716_INSERTION + if event_id.startswith("insertion_event") + else "test_regular_type", + "room_id": room_id, + "depth": depth, + "topological_ordering": depth, + "stream_ordering": stream_ordering, + "processed": True, + "outlier": False, + }, + ) + + if event_id.startswith("insertion_event"): + self.store.db_pool.simple_insert_txn( + txn, + table="insertion_event_extremities", + values={ + "event_id": event_id, + "room_id": room_id, + }, + ) + + stream_ordering += 1 + + self.get_success( + self.store.db_pool.runInteraction( + "_setup_room_for_insertion_backfill_tests_populate_db", + populate_db, + ) + ) + + return _BackfillSetupInfo(room_id=room_id, depth_map=depth_map) + + def test_get_insertion_event_backward_extremities_in_room(self): + """ + Test to make sure insertion event backward extremities are returned. + """ + setup_info = self._setup_room_for_insertion_backfill_tests() + room_id = setup_info.room_id + + backfill_points = self.get_success( + self.store.get_insertion_event_backward_extremities_in_room(room_id) + ) + backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] + self.assertListEqual( + backfill_event_ids, ["insertion_eventB", "insertion_eventA"] + ) + + def test_get_insertion_event_backward_extremities_in_room_excludes_events_we_have_attempted( + self, + ): + """ + Test to make sure that insertion events we have attempted to backfill + (and within backoff timeout duration) do not show up as an event to + backfill again. + """ + setup_info = self._setup_room_for_insertion_backfill_tests() + room_id = setup_info.room_id + + # Record some attempts to backfill these events which will make + # `get_insertion_event_backward_extremities_in_room` exclude them + # because we haven't passed the backoff interval. + self.get_success( + self.store.record_event_failed_pull_attempt( + room_id, "insertion_eventA", "fake cause" + ) + ) + + # No time has passed since we attempted to backfill ^ + + backfill_points = self.get_success( + self.store.get_insertion_event_backward_extremities_in_room(room_id) + ) + backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] + # Only the backfill points that we didn't record earlier exist here. + self.assertListEqual(backfill_event_ids, ["insertion_eventB"]) + + def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_after_backoff_duration( + self, + ): + """ + Test to make sure after we fake attempt to backfill event + "insertion_eventA" many times, we can see retry and see the + "insertion_eventA" again after the backoff timeout duration has + exceeded. + """ + setup_info = self._setup_room_for_insertion_backfill_tests() + room_id = setup_info.room_id + + # Record some attempts to backfill these events which will make + # `get_backfill_points_in_room` exclude them because we + # haven't passed the backoff interval. + self.get_success( + self.store.record_event_failed_pull_attempt( + room_id, "insertion_eventB", "fake cause" + ) + ) + self.get_success( + self.store.record_event_failed_pull_attempt( + room_id, "insertion_eventA", "fake cause" + ) + ) + self.get_success( + self.store.record_event_failed_pull_attempt( + room_id, "insertion_eventA", "fake cause" + ) + ) + self.get_success( + self.store.record_event_failed_pull_attempt( + room_id, "insertion_eventA", "fake cause" + ) + ) + self.get_success( + self.store.record_event_failed_pull_attempt( + room_id, "insertion_eventA", "fake cause" + ) + ) + + # Now advance time by 2 hours and we should only be able to see + # "insertion_eventB" because we have waited long enough for the single + # attempt (2^1 hours) but we still shouldn't see "insertion_eventA" + # because we haven't waited long enough for this many attempts. + self.reactor.advance(datetime.timedelta(hours=2).total_seconds()) + + # Make sure that "insertion_eventA" is not in the list because we've + # already attempted many times + backfill_points = self.get_success( + self.store.get_insertion_event_backward_extremities_in_room(room_id) + ) + backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] + self.assertListEqual(backfill_event_ids, ["insertion_eventB"]) + + # Now advance time by 20 hours (above 2^4 because we made 4 attemps) and + # see if we can now backfill it + self.reactor.advance(datetime.timedelta(hours=20).total_seconds()) + + # Try at "insertion_eventA" again after we advanced enough time and we + # should see "insertion_eventA" again + backfill_points = self.get_success( + self.store.get_insertion_event_backward_extremities_in_room(room_id) + ) + backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] + self.assertListEqual( + backfill_event_ids, ["insertion_eventB", "insertion_eventA"] + ) + @attr.s class FakeEvent: diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index fc43d7edd183..08c74b93e3a5 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -106,6 +106,7 @@ def _mark_read(event_id: str) -> None: "m.read", user_id=user_id, event_ids=[event_id], + thread_id=None, data={}, ) ) diff --git a/tests/storage/test_receipts.py b/tests/storage/test_receipts.py index c89bfff24171..9459ee1705c9 100644 --- a/tests/storage/test_receipts.py +++ b/tests/storage/test_receipts.py @@ -131,13 +131,18 @@ def test_get_receipts_for_user(self) -> None: # Send public read receipt for the first event self.get_success( self.store.insert_receipt( - self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_1_id], {} + self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_1_id], None, {} ) ) # Send private read receipt for the second event self.get_success( self.store.insert_receipt( - self.room_id1, ReceiptTypes.READ_PRIVATE, OUR_USER_ID, [event1_2_id], {} + self.room_id1, + ReceiptTypes.READ_PRIVATE, + OUR_USER_ID, + [event1_2_id], + None, + {}, ) ) @@ -164,7 +169,7 @@ def test_get_receipts_for_user(self) -> None: # Test receipt updating self.get_success( self.store.insert_receipt( - self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_2_id], {} + self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_2_id], None, {} ) ) res = self.get_success( @@ -180,7 +185,12 @@ def test_get_receipts_for_user(self) -> None: # Test new room is reflected in what the method returns self.get_success( self.store.insert_receipt( - self.room_id2, ReceiptTypes.READ_PRIVATE, OUR_USER_ID, [event2_1_id], {} + self.room_id2, + ReceiptTypes.READ_PRIVATE, + OUR_USER_ID, + [event2_1_id], + None, + {}, ) ) res = self.get_success( @@ -202,13 +212,18 @@ def test_get_last_receipt_event_id_for_user(self) -> None: # Send public read receipt for the first event self.get_success( self.store.insert_receipt( - self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_1_id], {} + self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_1_id], None, {} ) ) # Send private read receipt for the second event self.get_success( self.store.insert_receipt( - self.room_id1, ReceiptTypes.READ_PRIVATE, OUR_USER_ID, [event1_2_id], {} + self.room_id1, + ReceiptTypes.READ_PRIVATE, + OUR_USER_ID, + [event1_2_id], + None, + {}, ) ) @@ -241,7 +256,7 @@ def test_get_last_receipt_event_id_for_user(self) -> None: # Test receipt updating self.get_success( self.store.insert_receipt( - self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_2_id], {} + self.room_id1, ReceiptTypes.READ, OUR_USER_ID, [event1_2_id], None, {} ) ) res = self.get_success( @@ -259,7 +274,12 @@ def test_get_last_receipt_event_id_for_user(self) -> None: # Test new room is reflected in what the method returns self.get_success( self.store.insert_receipt( - self.room_id2, ReceiptTypes.READ_PRIVATE, OUR_USER_ID, [event2_1_id], {} + self.room_id2, + ReceiptTypes.READ_PRIVATE, + OUR_USER_ID, + [event2_1_id], + None, + {}, ) ) res = self.get_success(