-
-
Notifications
You must be signed in to change notification settings - Fork 687
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(silence): allow to silence threads #3122
base: main
Are you sure you want to change the base?
Changes from all commits
c156036
332bbbd
352dd56
3eb787a
23fd132
b488e80
f19fc4a
9035d90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,9 +2,11 @@ | |
from collections import OrderedDict | ||
from contextlib import suppress | ||
from datetime import UTC, datetime, timedelta | ||
from typing import Any | ||
|
||
from async_rediscache import RedisCache | ||
from discord import Guild, PermissionOverwrite, TextChannel, Thread, VoiceChannel | ||
from discord.abc import Messageable | ||
from discord.ext import commands, tasks | ||
from discord.ext.commands import Context | ||
from discord.utils import MISSING | ||
|
@@ -33,6 +35,7 @@ | |
MSG_UNSILENCE_SUCCESS = f"{constants.Emojis.check_mark} unsilenced {{channel}}." | ||
|
||
TextOrVoiceChannel = TextChannel | VoiceChannel | ||
SilenceableChannel = TextChannel | VoiceChannel | Thread | ||
|
||
VOICE_CHANNELS = { | ||
constants.Channels.code_help_voice_0: constants.Channels.code_help_chat_0, | ||
|
@@ -57,17 +60,17 @@ def __init__(self, alert_channel: TextChannel): | |
time=MISSING, | ||
name=None | ||
) | ||
self._silenced_channels = {} | ||
self._silenced_channels: dict[SilenceableChannel, int] = {} | ||
self._alert_channel = alert_channel | ||
|
||
def add_channel(self, channel: TextOrVoiceChannel) -> None: | ||
def add_channel(self, channel: SilenceableChannel) -> None: | ||
"""Add channel to `_silenced_channels` and start loop if not launched.""" | ||
if not self._silenced_channels: | ||
self.start() | ||
log.info("Starting notifier loop.") | ||
self._silenced_channels[channel] = self._current_loop | ||
|
||
def remove_channel(self, channel: TextChannel) -> None: | ||
def remove_channel(self, channel: SilenceableChannel) -> None: | ||
"""Remove channel from `_silenced_channels` and stop loop if no channels remain.""" | ||
with suppress(KeyError): | ||
del self._silenced_channels[channel] | ||
|
@@ -92,7 +95,7 @@ async def _notifier(self) -> None: | |
) | ||
|
||
|
||
async def _select_lock_channel(args: OrderedDict[str, any]) -> TextOrVoiceChannel: | ||
async def _select_lock_channel(args: OrderedDict[str, Any]) -> SilenceableChannel: | ||
"""Passes the channel to be silenced to the resource lock.""" | ||
channel, _ = Silence.parse_silence_args(args["ctx"], args["duration_or_channel"], args["duration"]) | ||
return channel | ||
|
@@ -130,8 +133,8 @@ async def cog_load(self) -> None: | |
async def send_message( | ||
self, | ||
message: str, | ||
source_channel: TextChannel, | ||
target_channel: TextOrVoiceChannel, | ||
source_channel: Messageable, | ||
target_channel: SilenceableChannel, | ||
*, | ||
alert_target: bool = False | ||
) -> None: | ||
|
@@ -159,7 +162,7 @@ async def send_message( | |
async def silence( | ||
self, | ||
ctx: Context, | ||
duration_or_channel: TextOrVoiceChannel | HushDurationConverter = None, | ||
duration_or_channel: SilenceableChannel | HushDurationConverter | None = None, | ||
duration: HushDurationConverter = 10, | ||
*, | ||
kick: bool = False | ||
|
@@ -178,12 +181,6 @@ async def silence( | |
channel_info = f"#{channel} ({channel.id})" | ||
log.debug(f"{ctx.author} is silencing channel {channel_info}.") | ||
|
||
# Since threads don't have specific overrides, we cannot silence them individually. | ||
# The parent channel has to be muted or the thread should be archived. | ||
if isinstance(channel, Thread): | ||
await ctx.send(":x: Threads cannot be silenced.") | ||
return | ||
|
||
if not await self._set_silence_overwrites(channel, kick=kick): | ||
log.info(f"Tried to silence channel {channel_info} but the channel was already silenced.") | ||
await self.send_message(MSG_SILENCE_FAIL, ctx.channel, channel, alert_target=False) | ||
|
@@ -210,12 +207,12 @@ async def silence( | |
@staticmethod | ||
def parse_silence_args( | ||
ctx: Context, | ||
duration_or_channel: TextOrVoiceChannel | int, | ||
duration_or_channel: SilenceableChannel | int | None, | ||
duration: HushDurationConverter | ||
) -> tuple[TextOrVoiceChannel, int | None]: | ||
) -> tuple[SilenceableChannel, int | None]: | ||
"""Helper method to parse the arguments of the silence command.""" | ||
if duration_or_channel: | ||
if isinstance(duration_or_channel, TextChannel | VoiceChannel): | ||
if isinstance(duration_or_channel, TextChannel | VoiceChannel | Thread): | ||
channel = duration_or_channel | ||
else: | ||
channel = ctx.channel | ||
|
@@ -228,8 +225,11 @@ def parse_silence_args( | |
|
||
return channel, duration | ||
|
||
async def _set_silence_overwrites(self, channel: TextOrVoiceChannel, *, kick: bool = False) -> bool: | ||
async def _set_silence_overwrites(self, channel: SilenceableChannel, *, kick: bool = False) -> bool: | ||
"""Set silence permission overwrites for `channel` and return True if successful.""" | ||
if channel.id in self.scheduler: | ||
return False | ||
|
||
# Get the original channel overwrites | ||
if isinstance(channel, TextChannel): | ||
role = self._everyone_role | ||
|
@@ -242,6 +242,11 @@ async def _set_silence_overwrites(self, channel: TextOrVoiceChannel, *, kick: bo | |
send_messages_in_threads=overwrite.send_messages_in_threads | ||
) | ||
|
||
elif isinstance(channel, Thread): | ||
log.info(f"Silencing thread #{channel.parent}/{channel} ({channel.parent.id}/{channel.id}).") | ||
await channel.edit(locked=True) | ||
Snipy7374 marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the thread is already locked, this will just schedule an unlock in the set amount of time. Failing is probably better to keep consistent with how it behaves with text channels. |
||
return True | ||
|
||
else: | ||
role = self._verified_voice_role | ||
overwrite = channel.overwrites_for(role) | ||
|
@@ -250,7 +255,7 @@ async def _set_silence_overwrites(self, channel: TextOrVoiceChannel, *, kick: bo | |
prev_overwrites.update(connect=overwrite.connect) | ||
|
||
# Stop if channel was already silenced | ||
if channel.id in self.scheduler or all(val is False for val in prev_overwrites.values()): | ||
if all(val is False for val in prev_overwrites.values()): | ||
return False | ||
|
||
# Set new permissions, store | ||
|
@@ -260,7 +265,7 @@ async def _set_silence_overwrites(self, channel: TextOrVoiceChannel, *, kick: bo | |
|
||
return True | ||
|
||
async def _schedule_unsilence(self, ctx: Context, channel: TextOrVoiceChannel, duration: int | None) -> None: | ||
async def _schedule_unsilence(self, ctx: Context, channel: SilenceableChannel, duration: int | None) -> None: | ||
"""Schedule `ctx.channel` to be unsilenced if `duration` is not None.""" | ||
if duration is None: | ||
await self.unsilence_timestamps.set(channel.id, -1) | ||
|
@@ -270,7 +275,7 @@ async def _schedule_unsilence(self, ctx: Context, channel: TextOrVoiceChannel, d | |
await self.unsilence_timestamps.set(channel.id, unsilence_time.timestamp()) | ||
|
||
@commands.command(aliases=("unhush",)) | ||
async def unsilence(self, ctx: Context, *, channel: TextOrVoiceChannel = None) -> None: | ||
async def unsilence(self, ctx: Context, *, channel: SilenceableChannel | None = None) -> None: | ||
""" | ||
Unsilence the given channel if given, else the current one. | ||
|
||
|
@@ -282,7 +287,7 @@ async def unsilence(self, ctx: Context, *, channel: TextOrVoiceChannel = None) - | |
await self._unsilence_wrapper(channel, ctx) | ||
|
||
@lock_arg(LOCK_NAMESPACE, "channel", raise_error=True) | ||
async def _unsilence_wrapper(self, channel: TextOrVoiceChannel, ctx: Context | None = None) -> None: | ||
async def _unsilence_wrapper(self, channel: SilenceableChannel, ctx: Context | None = None) -> None: | ||
""" | ||
Unsilence `channel` and send a success/failure message to ctx.channel. | ||
|
||
|
@@ -297,6 +302,10 @@ async def _unsilence_wrapper(self, channel: TextOrVoiceChannel, ctx: Context | N | |
if isinstance(channel, VoiceChannel): | ||
overwrite = channel.overwrites_for(self._verified_voice_role) | ||
has_channel_overwrites = overwrite.speak is False | ||
|
||
elif isinstance(channel, Thread): | ||
has_channel_overwrites = False | ||
|
||
else: | ||
overwrite = channel.overwrites_for(self._everyone_role) | ||
has_channel_overwrites = overwrite.send_messages is False or overwrite.add_reactions is False | ||
|
@@ -310,15 +319,15 @@ async def _unsilence_wrapper(self, channel: TextOrVoiceChannel, ctx: Context | N | |
else: | ||
await self.send_message(MSG_UNSILENCE_SUCCESS, msg_channel, channel, alert_target=True) | ||
|
||
async def _unsilence(self, channel: TextOrVoiceChannel) -> bool: | ||
async def _unsilence(self, channel: SilenceableChannel) -> bool: | ||
""" | ||
Unsilence `channel`. | ||
|
||
If `channel` has a silence task scheduled or has its previous overwrites cached, unsilence | ||
it, cancel the task, and remove it from the notifier. Notify admins if it has a task but | ||
not cached overwrites. | ||
|
||
Return `True` if channel permissions were changed, `False` otherwise. | ||
Return `True` if channel was unsilenced, `False` otherwise. | ||
""" | ||
# Get stored overwrites, and return if channel is unsilenced | ||
prev_overwrites = await self.previous_overwrites.get(channel.id) | ||
|
@@ -331,6 +340,12 @@ async def _unsilence(self, channel: TextOrVoiceChannel) -> bool: | |
role = self._everyone_role | ||
overwrite = channel.overwrites_for(role) | ||
permissions = "`Send Messages` and `Add Reactions`" | ||
elif isinstance(channel, Thread): | ||
await channel.edit(locked=False) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above: if the thread was manually unlocked before the scheduled time, there won't be any indication of it. |
||
log.info(f"Unsilenced thread #{channel.parent}/{channel} ({channel.parent_id}/{channel.id}).") | ||
self.scheduler.cancel(channel.id) | ||
self.notifier.remove_channel(channel) | ||
return True | ||
else: | ||
role = self._verified_voice_role | ||
overwrite = channel.overwrites_for(role) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, logging this is handled in
_silence()
instead so this wasn't necessary.