Skip to content

Blocklist rewrite #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 67 additions & 26 deletions bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from emoji import UNICODE_EMOJI
from pkg_resources import parse_version

from core.blocklist import Blocklist, BlockReason

try:
# noinspection PyUnresolvedReferences
from colorama import init
Expand Down Expand Up @@ -87,6 +89,9 @@ def __init__(self):
self._configure_logging()

self.plugin_db = PluginDatabaseClient(self) # Deprecated

self.blocklist = Blocklist(bot=self)

self.startup()

def get_guild_icon(
Expand Down Expand Up @@ -168,6 +173,11 @@ def startup(self):
logger.line()
logger.info("discord.py: v%s", discord.__version__)
logger.line()
if not self.config["blocked"] or not self.config["blocked_roles"]:
logger.warning(
"Un-migrated blocklists found. Please run the '[p]migrate blocklist' command after backing "
"up your config/database. Blocklist functionality will be disabled until this is done."
)

async def load_extensions(self):
for cog in self.loaded_cogs:
Expand Down Expand Up @@ -462,10 +472,14 @@ def main_category(self) -> typing.Optional[discord.CategoryChannel]:

@property
def blocked_users(self) -> typing.Dict[str, str]:
"""DEPRECATED, used blocklist instead"""
logger.warning("blocked_users is deprecated and does not function, its usage is a bug")
return self.config["blocked"]

@property
def blocked_roles(self) -> typing.Dict[str, str]:
"""DEPRECATED, used blocklist instead"""
logger.warning("blocked_roles is deprecated and does not function, its usage is a bug")
return self.config["blocked_roles"]

@property
Expand Down Expand Up @@ -524,6 +538,7 @@ async def on_connect(self):
logger.debug("Connected to gateway.")
await self.config.refresh()
await self.api.setup_indexes()
await self.blocklist.setup()
await self.load_extensions()
self._connected.set()

Expand Down Expand Up @@ -718,6 +733,8 @@ def check_guild_age(self, author: discord.Member) -> bool:
return True

def check_manual_blocked_roles(self, author: discord.Member) -> bool:
"""DEPRECATED"""
logger.error("check_manual_blocked_roles is deprecated, usage is a bug.")
for role in author.roles:
if str(role.id) not in self.blocked_roles:
continue
Expand All @@ -734,6 +751,8 @@ def check_manual_blocked_roles(self, author: discord.Member) -> bool:
return True

def check_manual_blocked(self, author: discord.User) -> bool:
"""DEPRECATED"""
logger.error("check_manual_blocked is deprecated, usage is a bug.")
if str(author.id) not in self.blocked_users:
return True

Expand All @@ -758,13 +777,32 @@ async def _process_blocked(self, message):
# This is to store blocked message cooldown in memory
_block_msg_cooldown = dict()

# This has a bunch of side effects
async def is_blocked(
self,
author: discord.User,
*,
channel: discord.TextChannel = None,
send_message: bool = False,
) -> bool:
"""
Check if a user is blocked for any reason and send a message if they are (if send_message is true).

If you are using this method with send_message set to false or not set,
You should be using blocklist.is_user_blocked() or blocklist.is_id_blocked()
if you only care whether a user is manually blocked then use blocklist.is_id_blocked().

Parameters
----------
author
channel
send_message

Returns
-------
bool
Whether the user is blocked or not.
"""
member = self.guild.get_member(author.id) or await MemberConverter.convert(author)
if member is None:
# try to find in other guilds
Expand Down Expand Up @@ -794,33 +832,36 @@ async def send_embed(title=None, desc=None):
self._block_msg_cooldown[str(author)] = now + timedelta(minutes=5)
return await channel.send(embed=emb)

if str(author.id) in self.blocked_whitelisted_users:
if str(author.id) in self.blocked_users:
self.blocked_users.pop(str(author.id))
await self.config.update()
return False

if not self.check_account_age(author):
blocked_reason = "Sorry, your account is too new to initiate a ticket."
await send_embed(title="Message not sent!", desc=blocked_reason)
return True

if not self.check_manual_blocked(author):
blocked_reason = "You have been blocked from contacting Modmail."
await send_embed(title="Message not sent!", desc=blocked_reason)
return True

if not self.check_guild_age(member):
blocked_reason = "Sorry, you joined the server too recently to initiate a ticket."
await send_embed(title="Message not sent!", desc=blocked_reason)
return True

if not self.check_manual_blocked_roles(member):
blocked_reason = "Sorry, your role(s) has been blacklisted from contacting Modmail."
await send_embed(title="Message not sent!", desc=blocked_reason)
return True
if member is not None:
blocked, block_type = self.blocklist.is_user_blocked(member)
else:
blocked, block_type = self.blocklist.is_id_blocked(author.id)
if not self.blocklist.is_valid_account_age(author):
blocked = True
block_type = BlockReason.ACCOUNT_AGE

return False
if blocked:
if block_type == BlockReason.ACCOUNT_AGE:
blocked_reason = "Sorry, your account is too new to initiate a ticket."
await send_embed(title="Message not sent!", desc=blocked_reason)
return True

if block_type == BlockReason.BLOCKED_USER:
blocked_reason = "You have been blocked from contacting Modmail."
await send_embed(title="Message not sent!", desc=blocked_reason)
return True

if block_type == BlockReason.GUILD_AGE:
blocked_reason = "Sorry, you joined the server too recently to initiate a ticket."
await send_embed(title="Message not sent!", desc=blocked_reason)
return True

if block_type == BlockReason.BLOCKED_ROLE:
blocked_reason = "Sorry, your role(s) has been blacklisted from contacting Modmail."
await send_embed(title="Message not sent!", desc=blocked_reason)
return True

return blocked

async def get_thread_cooldown(self, author: discord.Member):
thread_cooldown = self.config.get("thread_cooldown")
Expand Down
130 changes: 48 additions & 82 deletions cogs/modmail.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import datetime
import re
from datetime import timezone
from itertools import zip_longest
Expand All @@ -12,7 +11,8 @@
from discord.ext.commands.cooldowns import BucketType
from discord.ext.commands.view import StringView

from core import checks
from core import blocklist, checks
from core.blocklist import BlockType
from core.models import DMDisabled, PermissionLevel, SimilarCategoryConverter, getLogger
from core.paginator import EmbedPaginatorSession
from core.thread import Thread
Expand Down Expand Up @@ -1562,7 +1562,7 @@ async def contact(
elif u.bot:
errors.append(f"{u} is a bot, cannot add to thread.")
users.remove(u)
elif await self.bot.is_blocked(u):
elif await self.bot.blocklist.is_user_blocked(u):
ref = f"{u.mention} is" if ctx.author != u else "You are"
errors.append(f"{ref} currently blocked from contacting {self.bot.user.name}.")
users.remove(u)
Expand Down Expand Up @@ -1645,57 +1645,34 @@ async def contact(
async def blocked(self, ctx):
"""Retrieve a list of blocked users."""

roles, users, now = [], [], discord.utils.utcnow()
roles, users = [], []

blocked_users = list(self.bot.blocked_users.items())
for id_, data in blocked_users:
blocked_by_id = data["blocked_by"]
blocked_at = parser.parse(data["blocked_at"])
human_blocked_at = discord.utils.format_dt(blocked_at, style="R")
if "until" in data:
blocked_until = parser.parse(data["until"])
human_blocked_until = discord.utils.format_dt(blocked_until, style="R")
else:
blocked_until = human_blocked_until = "Permanent"

if isinstance(blocked_until, datetime.datetime) and blocked_until < now:
self.bot.blocked_users.pop(str(id_))
logger.debug("No longer blocked, user %s.", id_)
continue

string = f"<@{id_}> ({human_blocked_until})"
string += f"\n- Issued {human_blocked_at} by <@{blocked_by_id}>"

reason = data.get("reason")
if reason:
string += f"\n- Blocked for {reason}"
blocked: list[blocklist.BlocklistEntry] = await self.bot.blocklist.get_all_blocks()

users.append(string + "\n")
for item in blocked:
human_blocked_at = discord.utils.format_dt(item.timestamp, style="R")
if item.expires_at is not None:
human_blocked_until = discord.utils.format_dt(item.expires_at, style="R")
else:
human_blocked_until = "Permanent"

blocked_roles = list(self.bot.blocked_roles.items())
for id_, data in blocked_roles:
blocked_by_id = data["blocked_by"]
blocked_at = parser.parse(data["blocked_at"])
human_blocked_at = discord.utils.format_dt(blocked_at, style="R")
if "until" in data:
blocked_until = parser.parse(data["until"])
human_blocked_until = discord.utils.format_dt(blocked_until, style="R")
if item.type == blocklist.BlockType.USER:
string = f"<@{item.id}>"
else:
blocked_until = human_blocked_until = "Permanent"
string = f"<@&{item.id}>"

if isinstance(blocked_until, datetime.datetime) and blocked_until < now:
self.bot.blocked_users.pop(str(id_))
logger.debug("No longer blocked, user %s.", id_)
continue
string += f" ({human_blocked_until})"

string = f"<@&{id_}> ({human_blocked_until})"
string += f"\n- Issued {human_blocked_at} by <@{blocked_by_id}>"
string += f"\n- Issued {human_blocked_at} by <@{item.blocking_user_id}>"

reason = data.get("reason")
if reason:
string += f"\n- Blocked for {reason}"
if item.reason is not None:
string += f"\n- Blocked for {item.reason}"
string += "\n"

roles.append(string + "\n")
if item.type == blocklist.BlockType.USER:
users.append(string)
elif item.type == blocklist.BlockType.ROLE:
roles.append(string)

user_embeds = [discord.Embed(title="Blocked Users", color=self.bot.main_color, description="")]

Expand All @@ -1713,7 +1690,7 @@ async def blocked(self, ctx):
else:
embed.description += line
else:
user_embeds[0].description = "Currently there are no blocked users."
user_embeds[0].description = "No users are currently blocked."

if len(user_embeds) > 1:
for n, em in enumerate(user_embeds):
Expand All @@ -1736,7 +1713,7 @@ async def blocked(self, ctx):
else:
embed.description += line
else:
role_embeds[-1].description = "Currently there are no blocked roles."
role_embeds[-1].description = "No roles are currently blocked."

if len(role_embeds) > 1:
for n, em in enumerate(role_embeds):
Expand All @@ -1763,7 +1740,6 @@ async def blocked_whitelist(self, ctx, *, user: User = None):
return await ctx.send_help(ctx.command)

mention = getattr(user, "mention", f"`{user.id}`")
msg = ""

if str(user.id) in self.bot.blocked_whitelisted_users:
embed = discord.Embed(
Expand All @@ -1775,21 +1751,20 @@ async def blocked_whitelist(self, ctx, *, user: User = None):
return await ctx.send(embed=embed)

self.bot.blocked_whitelisted_users.append(str(user.id))

if str(user.id) in self.bot.blocked_users:
msg = self.bot.blocked_users.get(str(user.id)) or ""
self.bot.blocked_users.pop(str(user.id))

await self.bot.config.update()

if msg.startswith("System Message: "):
# If the user is blocked internally (for example: below minimum account age)
# Show an extended message stating the original internal message
reason = msg[16:].strip().rstrip(".")
blocked: bool
blocklist_entry: blocklist.BlocklistEntry

blocked, blocklist_entry = await self.bot.blocklist.is_id_blocked(user.id)
if blocked:
await self.bot.blocklist.unblock_id(user.id)
embed = discord.Embed(
title="Success",
description=f"{mention} was previously blocked internally for "
f'"{reason}". {mention} is now whitelisted.',
description=f"""
{mention} has been whitelisted.
They were previously blocked by <@{blocklist_entry.blocking_user_id}> {" for "+blocklist_entry.reason if blocklist_entry.reason is not None else ""}.
""",
color=self.bot.main_color,
)
else:
Expand Down Expand Up @@ -1843,33 +1818,31 @@ async def send_embed(title: str, message: str):
):
return await send_embed("Error", f"Cannot block {mention}, user is whitelisted.")

now, blocked = discord.utils.utcnow(), dict()

desc = f"{mention} is now blocked."
if duration:
desc += f"\n- Expires: {discord.utils.format_dt(duration.dt, style='R')}"
desc += f"\n- By: {ctx.author.mention}"
if reason:
desc += f"\n- Reason: {reason}"

blocked["blocked_at"] = str(now)
blocked["blocked_by"] = ctx.author.id
if duration:
blocked["until"] = str(duration.dt)
if reason:
blocked["reason"] = reason
blocktype: BlockType

if isinstance(user_or_role, discord.Role):
self.bot.blocked_roles[str(user_or_role.id)] = blocked
blocktype = BlockType.ROLE
elif isinstance(user_or_role, discord.User):
blocked_users = self.bot.blocked_users
blocked_users[str(user_or_role.id)] = blocked
blocktype = BlockType.USER
else:
return logger.warning(
f"{__name__}: cannot block user, user is neither an instance of Discord Role or User"
)

await self.bot.config.update()
await self.bot.blocklist.block_id(
user_id=user_or_role.id,
reason=reason,
expires_at=duration.dt if duration is not None else None,
blocked_by=ctx.author.id,
block_type=blocktype,
)

return await send_embed("Success", desc)

Expand Down Expand Up @@ -1901,20 +1874,13 @@ async def send_embed(title: str, message: str):

title, desc = "Error", f"{mention} is not blocked."

if isinstance(user_or_role, discord.Role):
if str(user_or_role.id) not in self.bot.blocked_roles:
return await send_embed(title, desc)
self.bot.blocked_roles.pop(str(user_or_role.id))
elif isinstance(user_or_role, discord.User):
if str(user_or_role.id) not in self.bot.blocked_users:
return await send_embed(title, desc)
self.bot.blocked_users.pop(str(user_or_role.id))
else:
if not isinstance(user_or_role, (discord.Role, discord.User)):
return logger.warning(
f"{__name__}: cannot unblock, user is neither an instance of Discord Role or User"
)

await self.bot.config.update()
if not await self.bot.blocklist.unblock_id(user_or_role.id):
return await send_embed(title, desc)

return await send_embed("Success", f"{mention} has been unblocked.")

Expand Down
Loading