Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PagerDuty Migrator: Add filtering capabilities and fix user notification rule preservation #5454

Merged
merged 8 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 53 additions & 29 deletions tools/migrators/README.md

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions tools/migrators/lib/pagerduty/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"Zabbix Webhook (for 5.0 and 5.2)": "zabbix",
"Elastic Alerts": "elastalert",
"Firebase": "fabric",
"Amazon CloudWatch": "amazon_sns",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was a request to map Amazon CloudWatch Integration in PagerDuty to our Amazon SNS Integration (apparently these are essentially the same thing?)

}

# Experimental feature to migrate PD rulesets to OnCall integrations
Expand All @@ -38,3 +39,25 @@
)

MIGRATE_USERS = os.getenv("MIGRATE_USERS", "true").lower() == "true"

# Filter resources by team
PAGERDUTY_FILTER_TEAM = os.getenv("PAGERDUTY_FILTER_TEAM")

# Filter resources by users (comma-separated list of PagerDuty user IDs)
PAGERDUTY_FILTER_USERS = [
user_id.strip()
for user_id in os.getenv("PAGERDUTY_FILTER_USERS", "").split(",")
if user_id.strip()
]

# Filter resources by name regex patterns
PAGERDUTY_FILTER_SCHEDULE_REGEX = os.getenv("PAGERDUTY_FILTER_SCHEDULE_REGEX")
PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX = os.getenv(
"PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX"
)
PAGERDUTY_FILTER_INTEGRATION_REGEX = os.getenv("PAGERDUTY_FILTER_INTEGRATION_REGEX")

# Whether to preserve existing notification rules when migrating users
PRESERVE_EXISTING_USER_NOTIFICATION_RULES = (
os.getenv("PRESERVE_EXISTING_USER_NOTIFICATION_RULES", "true").lower() == "true"
)
157 changes: 154 additions & 3 deletions tools/migrators/lib/pagerduty/migrate.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import re

from pdpyras import APISession

Expand All @@ -11,6 +12,11 @@
MODE,
MODE_PLAN,
PAGERDUTY_API_TOKEN,
PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX,
PAGERDUTY_FILTER_INTEGRATION_REGEX,
PAGERDUTY_FILTER_SCHEDULE_REGEX,
PAGERDUTY_FILTER_TEAM,
PAGERDUTY_FILTER_USERS,
)
from lib.pagerduty.report import (
escalation_policy_report,
Expand Down Expand Up @@ -43,6 +49,136 @@
)


def filter_schedules(schedules):
"""Filter schedules based on configured filters"""
filtered_schedules = []
filtered_out = 0

for schedule in schedules:
should_include = True
reason = None

# Filter by team
if PAGERDUTY_FILTER_TEAM:
teams = schedule.get("teams", [])
if not any(team["summary"] == PAGERDUTY_FILTER_TEAM for team in teams):
should_include = False
reason = f"No teams found for team filter: {PAGERDUTY_FILTER_TEAM}"

# Filter by users
if should_include and PAGERDUTY_FILTER_USERS:
schedule_users = set()
for layer in schedule.get("schedule_layers", []):
for user in layer.get("users", []):
schedule_users.add(user["user"]["id"])

if not any(user_id in schedule_users for user_id in PAGERDUTY_FILTER_USERS):
should_include = False
reason = f"No users found for user filter: {','.join(PAGERDUTY_FILTER_USERS)}"

# Filter by name regex
if should_include and PAGERDUTY_FILTER_SCHEDULE_REGEX:
if not re.match(PAGERDUTY_FILTER_SCHEDULE_REGEX, schedule["name"]):
should_include = False
reason = f"Schedule regex filter: {PAGERDUTY_FILTER_SCHEDULE_REGEX}"

if should_include:
filtered_schedules.append(schedule)
else:
filtered_out += 1
print(f"{TAB}Schedule {schedule['id']}: {reason}")

if filtered_out > 0:
print(f"Filtered out {filtered_out} schedules")

return filtered_schedules


def filter_escalation_policies(policies):
"""Filter escalation policies based on configured filters"""
filtered_policies = []
filtered_out = 0

for policy in policies:
should_include = True
reason = None

# Filter by team
if PAGERDUTY_FILTER_TEAM:
teams = policy.get("teams", [])
if not any(team["summary"] == PAGERDUTY_FILTER_TEAM for team in teams):
should_include = False
reason = f"No teams found for team filter: {PAGERDUTY_FILTER_TEAM}"

# Filter by users
if should_include and PAGERDUTY_FILTER_USERS:
policy_users = set()
for rule in policy.get("escalation_rules", []):
for target in rule.get("targets", []):
if target["type"] == "user":
policy_users.add(target["id"])

if not any(user_id in policy_users for user_id in PAGERDUTY_FILTER_USERS):
should_include = False
reason = f"No users found for user filter: {','.join(PAGERDUTY_FILTER_USERS)}"

# Filter by name regex
if should_include and PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX:
if not re.match(PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX, policy["name"]):
should_include = False
reason = f"Escalation policy regex filter: {PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX}"

if should_include:
filtered_policies.append(policy)
else:
filtered_out += 1
print(f"{TAB}Policy {policy['id']}: {reason}")

if filtered_out > 0:
print(f"Filtered out {filtered_out} escalation policies")

return filtered_policies


def filter_integrations(integrations):
"""Filter integrations based on configured filters"""
filtered_integrations = []
filtered_out = 0

for integration in integrations:
should_include = True
reason = None

# Filter by team
if PAGERDUTY_FILTER_TEAM:
teams = integration["service"].get("teams", [])
if not any(team["summary"] == PAGERDUTY_FILTER_TEAM for team in teams):
should_include = False
reason = f"No teams found for team filter: {PAGERDUTY_FILTER_TEAM}"

# Filter by name regex
if should_include and PAGERDUTY_FILTER_INTEGRATION_REGEX:
integration_name = (
f"{integration['service']['name']} - {integration['name']}"
)
if not re.match(PAGERDUTY_FILTER_INTEGRATION_REGEX, integration_name):
should_include = False
reason = (
f"Integration regex filter: {PAGERDUTY_FILTER_INTEGRATION_REGEX}"
)

if should_include:
filtered_integrations.append(integration)
else:
filtered_out += 1
print(f"{TAB}Integration {integration['id']}: {reason}")

if filtered_out > 0:
print(f"Filtered out {filtered_out} integrations")

return filtered_integrations


def migrate() -> None:
session = APISession(PAGERDUTY_API_TOKEN)
session.timeout = 20
Expand All @@ -59,9 +195,13 @@ def migrate() -> None:
print("▶ Fetching schedules...")
# Fetch schedules from PagerDuty
schedules = session.list_all(
"schedules", params={"include[]": "schedule_layers", "time_zone": "UTC"}
"schedules",
params={"include[]": ["schedule_layers", "teams"], "time_zone": "UTC"},
)

# Apply filters to schedules
schedules = filter_schedules(schedules)

# Fetch overrides from PagerDuty
since = datetime.datetime.now(datetime.timezone.utc)
until = since + datetime.timedelta(
Expand All @@ -78,11 +218,19 @@ def migrate() -> None:
oncall_schedules = OnCallAPIClient.list_all("schedules")

print("▶ Fetching escalation policies...")
escalation_policies = session.list_all("escalation_policies")
escalation_policies = session.list_all(
"escalation_policies", params={"include[]": "teams"}
)

# Apply filters to escalation policies
escalation_policies = filter_escalation_policies(escalation_policies)

oncall_escalation_chains = OnCallAPIClient.list_all("escalation_chains")

print("▶ Fetching integrations...")
services = session.list_all("services", params={"include[]": "integrations"})
services = session.list_all(
"services", params={"include[]": ["integrations", "teams"]}
)
vendors = session.list_all("vendors")

integrations = []
Expand All @@ -92,6 +240,9 @@ def migrate() -> None:
integration["service"] = service
integrations.append(integration)

# Apply filters to integrations
integrations = filter_integrations(integrations)

oncall_integrations = OnCallAPIClient.list_all("integrations")

rulesets = None
Expand Down
19 changes: 17 additions & 2 deletions tools/migrators/lib/pagerduty/report.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from lib.common.report import ERROR_SIGN, SUCCESS_SIGN, TAB, WARNING_SIGN
from lib.pagerduty.config import PRESERVE_EXISTING_USER_NOTIFICATION_RULES


def format_user(user: dict) -> str:
Expand Down Expand Up @@ -88,8 +89,22 @@ def user_report(users: list[dict]) -> str:
for user in sorted(users, key=lambda u: bool(u["oncall_user"]), reverse=True):
result += "\n" + TAB + format_user(user)

if user["oncall_user"] and user["notification_rules"]:
result += " (existing notification rules will be deleted)"
if user["oncall_user"]:
if (
user["oncall_user"]["notification_rules"]
and PRESERVE_EXISTING_USER_NOTIFICATION_RULES
):
# already has user notification rules defined in OnCall.. we won't touch these
result += " (existing notification rules will be preserved due to the PRESERVE_EXISTING_USER_NOTIFICATION_RULES being set to True and this user already having notification rules defined in OnCall)"
elif (
user["oncall_user"]["notification_rules"]
and not PRESERVE_EXISTING_USER_NOTIFICATION_RULES
):
# already has user notification rules defined in OnCall.. we will overwrite these
result += " (existing notification rules will be overwritten due to the PRESERVE_EXISTING_USER_NOTIFICATION_RULES being set to False)"
elif user["notification_rules"]:
# user has notification rules defined in PagerDuty, but none defined in OnCall, we will migrate these
result += " (existing PagerDuty notification rules will be migrated due to this user not having any notification rules defined in OnCall)"

return result

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ def match_escalation_policy_for_integration(
policy_id = integration["service"]["escalation_policy"]["id"]
policy = find_by_id(escalation_policies, policy_id)

if policy is None:
integration["is_escalation_policy_flawed"] = True
return

integration["is_escalation_policy_flawed"] = bool(
policy["unmatched_users"] or policy["flawed_schedules"]
)
Expand Down
12 changes: 11 additions & 1 deletion tools/migrators/lib/pagerduty/resources/notification_rules.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import copy

from lib.oncall.api_client import OnCallAPIClient
from lib.pagerduty.config import PAGERDUTY_TO_ONCALL_CONTACT_METHOD_MAP
from lib.pagerduty.config import (
PAGERDUTY_TO_ONCALL_CONTACT_METHOD_MAP,
PRESERVE_EXISTING_USER_NOTIFICATION_RULES,
)
from lib.utils import remove_duplicates, transform_wait_delay


Expand All @@ -23,6 +26,13 @@ def remove_duplicate_rules_between_waits(rules: list[dict]) -> list[dict]:


def migrate_notification_rules(user: dict) -> None:
if (
PRESERVE_EXISTING_USER_NOTIFICATION_RULES
and user["oncall_user"]["notification_rules"]
):
print(f"Preserving existing notification rules for {user['email']}")
return

notification_rules = [
rule for rule in user["notification_rules"] if rule["urgency"] == "high"
]
Expand Down
6 changes: 3 additions & 3 deletions tools/migrators/lib/tests/pagerduty/test_matching.py
Original file line number Diff line number Diff line change
Expand Up @@ -1330,7 +1330,7 @@
"scheduled_actions": [],
},
"oncall_integration": None,
"oncall_type": None,
"oncall_type": "amazon_sns",
"is_escalation_policy_flawed": False,
},
{
Expand Down Expand Up @@ -1420,7 +1420,7 @@
"scheduled_actions": [],
},
"oncall_integration": None,
"oncall_type": None,
"oncall_type": "amazon_sns",
"is_escalation_policy_flawed": True,
},
{
Expand Down Expand Up @@ -1510,7 +1510,7 @@
"scheduled_actions": [],
},
"oncall_integration": None,
"oncall_type": None,
"oncall_type": "amazon_sns",
"is_escalation_policy_flawed": True,
},
{
Expand Down
Loading
Loading