Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PagerDuty Migrator: Add filtering capabilities and fix user notification rule preservation #5454

Merged
merged 8 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions tools/migrators/lib/pagerduty/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"Zabbix Webhook (for 5.0 and 5.2)": "zabbix",
"Elastic Alerts": "elastalert",
"Firebase": "fabric",
"Amazon CloudWatch": "amazon_sns",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was a request to map Amazon CloudWatch Integration in PagerDuty to our Amazon SNS Integration (apparently these are essentially the same thing?)

}

# Experimental feature to migrate PD rulesets to OnCall integrations
Expand All @@ -38,3 +39,21 @@
)

MIGRATE_USERS = os.getenv("MIGRATE_USERS", "true").lower() == "true"

# Filter resources by team
PAGERDUTY_FILTER_TEAM = os.getenv("PAGERDUTY_FILTER_TEAM")

# Filter resources by users (comma-separated list of PagerDuty user IDs)
PAGERDUTY_FILTER_USERS = [
user_id.strip()
for user_id in os.getenv("PAGERDUTY_FILTER_USERS", "").split(",")
if user_id.strip()
]

# Filter resources by name regex patterns
PAGERDUTY_FILTER_SCHEDULE_REGEX = os.getenv("PAGERDUTY_FILTER_SCHEDULE_REGEX")
PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX = os.getenv("PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX")
PAGERDUTY_FILTER_INTEGRATION_REGEX = os.getenv("PAGERDUTY_FILTER_INTEGRATION_REGEX")

# Whether to preserve existing notification policies when migrating users
PRESERVE_EXISTING_NOTIFICATION_POLICIES = os.getenv("PRESERVE_EXISTING_NOTIFICATION_POLICIES", "true").lower() == "true"
144 changes: 141 additions & 3 deletions tools/migrators/lib/pagerduty/migrate.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import re

from pdpyras import APISession

Expand All @@ -11,6 +12,11 @@
MODE,
MODE_PLAN,
PAGERDUTY_API_TOKEN,
PAGERDUTY_FILTER_TEAM,
PAGERDUTY_FILTER_USERS,
PAGERDUTY_FILTER_SCHEDULE_REGEX,
PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX,
PAGERDUTY_FILTER_INTEGRATION_REGEX,
)
from lib.pagerduty.report import (
escalation_policy_report,
Expand Down Expand Up @@ -42,6 +48,128 @@
match_users_for_schedule,
)

def filter_schedules(schedules):
"""Filter schedules based on configured filters"""
filtered_schedules = []
filtered_out = 0

for schedule in schedules:
should_include = True
reason = None

# Filter by team
if PAGERDUTY_FILTER_TEAM:
teams = schedule.get("teams", [])
if not any(team["summary"] == PAGERDUTY_FILTER_TEAM for team in teams):
should_include = False
reason = f"No teams found for team filter: {PAGERDUTY_FILTER_TEAM}"

# Filter by users
if should_include and PAGERDUTY_FILTER_USERS:
schedule_users = set()
for layer in schedule.get("schedule_layers", []):
for user in layer.get("users", []):
schedule_users.add(user["user"]["id"])

if not any(user_id in schedule_users for user_id in PAGERDUTY_FILTER_USERS):
should_include = False
reason = f"No users found for user filter: {','.join(PAGERDUTY_FILTER_USERS)}"

# Filter by name regex
if should_include and PAGERDUTY_FILTER_SCHEDULE_REGEX:
if not re.match(PAGERDUTY_FILTER_SCHEDULE_REGEX, schedule["name"]):
should_include = False
reason = f"Schedule regex filter: {PAGERDUTY_FILTER_SCHEDULE_REGEX}"

if should_include:
filtered_schedules.append(schedule)
else:
filtered_out += 1
print(f"{TAB}Schedule {schedule['id']}: {reason}")

if filtered_out > 0:
print(f"Filtered out {filtered_out} schedules")

return filtered_schedules

def filter_escalation_policies(policies):
"""Filter escalation policies based on configured filters"""
filtered_policies = []
filtered_out = 0

for policy in policies:
should_include = True
reason = None

# Filter by team
if PAGERDUTY_FILTER_TEAM:
teams = policy.get("teams", [])
if not any(team["summary"] == PAGERDUTY_FILTER_TEAM for team in teams):
should_include = False
reason = f"No teams found for team filter: {PAGERDUTY_FILTER_TEAM}"

# Filter by users
if should_include and PAGERDUTY_FILTER_USERS:
policy_users = set()
for rule in policy.get("escalation_rules", []):
for target in rule.get("targets", []):
if target["type"] == "user":
policy_users.add(target["id"])

if not any(user_id in policy_users for user_id in PAGERDUTY_FILTER_USERS):
should_include = False
reason = f"No users found for user filter: {','.join(PAGERDUTY_FILTER_USERS)}"

# Filter by name regex
if should_include and PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX:
if not re.match(PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX, policy["name"]):
should_include = False
reason = f"Escalation policy regex filter: {PAGERDUTY_FILTER_ESCALATION_POLICY_REGEX}"

if should_include:
filtered_policies.append(policy)
else:
filtered_out += 1
print(f"{TAB}Policy {policy['id']}: {reason}")

if filtered_out > 0:
print(f"Filtered out {filtered_out} escalation policies")

return filtered_policies

def filter_integrations(integrations):
"""Filter integrations based on configured filters"""
filtered_integrations = []
filtered_out = 0

for integration in integrations:
should_include = True
reason = None

# Filter by team
if PAGERDUTY_FILTER_TEAM:
teams = integration["service"].get("teams", [])
if not any(team["summary"] == PAGERDUTY_FILTER_TEAM for team in teams):
should_include = False
reason = f"No teams found for team filter: {PAGERDUTY_FILTER_TEAM}"

# Filter by name regex
if should_include and PAGERDUTY_FILTER_INTEGRATION_REGEX:
integration_name = f"{integration['service']['name']} - {integration['name']}"
if not re.match(PAGERDUTY_FILTER_INTEGRATION_REGEX, integration_name):
should_include = False
reason = f"Integration regex filter: {PAGERDUTY_FILTER_INTEGRATION_REGEX}"

if should_include:
filtered_integrations.append(integration)
else:
filtered_out += 1
print(f"{TAB}Integration {integration['id']}: {reason}")

if filtered_out > 0:
print(f"Filtered out {filtered_out} integrations")

return filtered_integrations

def migrate() -> None:
session = APISession(PAGERDUTY_API_TOKEN)
Expand All @@ -59,9 +187,12 @@ def migrate() -> None:
print("▶ Fetching schedules...")
# Fetch schedules from PagerDuty
schedules = session.list_all(
"schedules", params={"include[]": "schedule_layers", "time_zone": "UTC"}
"schedules", params={"include[]": ["schedule_layers", "teams"], "time_zone": "UTC"}
)

# Apply filters to schedules
schedules = filter_schedules(schedules)

# Fetch overrides from PagerDuty
since = datetime.datetime.now(datetime.timezone.utc)
until = since + datetime.timedelta(
Expand All @@ -78,11 +209,15 @@ def migrate() -> None:
oncall_schedules = OnCallAPIClient.list_all("schedules")

print("▶ Fetching escalation policies...")
escalation_policies = session.list_all("escalation_policies")
escalation_policies = session.list_all("escalation_policies", params={"include[]": "teams"})

# Apply filters to escalation policies
escalation_policies = filter_escalation_policies(escalation_policies)

oncall_escalation_chains = OnCallAPIClient.list_all("escalation_chains")

print("▶ Fetching integrations...")
services = session.list_all("services", params={"include[]": "integrations"})
services = session.list_all("services", params={"include[]": ["integrations", "teams"]})
vendors = session.list_all("vendors")

integrations = []
Expand All @@ -92,6 +227,9 @@ def migrate() -> None:
integration["service"] = service
integrations.append(integration)

# Apply filters to integrations
integrations = filter_integrations(integrations)

oncall_integrations = OnCallAPIClient.list_all("integrations")

rulesets = None
Expand Down
19 changes: 10 additions & 9 deletions tools/migrators/lib/pagerduty/resources/notification_rules.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import copy

from lib.oncall.api_client import OnCallAPIClient
from lib.pagerduty.config import PAGERDUTY_TO_ONCALL_CONTACT_METHOD_MAP
from lib.pagerduty.config import PAGERDUTY_TO_ONCALL_CONTACT_METHOD_MAP, PRESERVE_EXISTING_NOTIFICATION_POLICIES
from lib.utils import remove_duplicates, transform_wait_delay


Expand All @@ -23,6 +23,15 @@ def remove_duplicate_rules_between_waits(rules: list[dict]) -> list[dict]:


def migrate_notification_rules(user: dict) -> None:
if PRESERVE_EXISTING_NOTIFICATION_POLICIES and user["oncall_user"]["notification_rules"]:
print(f"Preserving existing notification rules for {user['email']}")
return

# Delete existing rules if not preserving
if not PRESERVE_EXISTING_NOTIFICATION_POLICIES:
for rule in user["oncall_user"]["notification_rules"]:
OnCallAPIClient.delete(f"personal_notification_rules/{rule['id']}")

notification_rules = [
rule for rule in user["notification_rules"] if rule["urgency"] == "high"
]
Expand All @@ -35,14 +44,6 @@ def migrate_notification_rules(user: dict) -> None:
for rule in oncall_rules:
OnCallAPIClient.create("personal_notification_rules", rule)

if oncall_rules:
# delete old notification rules if any new rules were created
for rule in user["oncall_user"]["notification_rules"]:
if rule["important"] == important:
OnCallAPIClient.delete(
"personal_notification_rules/{}".format(rule["id"])
)


def transform_notification_rules(
notification_rules: list[dict], user_id: str, important: bool
Expand Down
125 changes: 125 additions & 0 deletions tools/migrators/lib/tests/pagerduty/test_notification_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import os
from unittest.mock import call, patch

# Mock environment variables before importing any modules that use them
os.environ["MIGRATING_FROM"] = "pagerduty"
os.environ["ONCALL_API_TOKEN"] = "test-token"
os.environ["ONCALL_API_URL"] = "http://test"
os.environ["PAGERDUTY_API_TOKEN"] = "test-pd-token"
os.environ["MODE"] = "plan"

from lib.pagerduty.resources.notification_rules import migrate_notification_rules


class TestNotificationRulesMigration:
def setup_method(self):
self.pd_user = {
"id": "U1",
"name": "Test User",
"email": "[email protected]",
"notification_rules": [{
"id": "PD1",
"urgency": "high",
"start_delay_in_minutes": 0,
"contact_method": {
"type": "email_contact_method"
}
}]
}
self.oncall_user = {
"id": "OC1",
"email": "[email protected]",
"notification_rules": []
}
self.pd_user["oncall_user"] = self.oncall_user

@patch("lib.pagerduty.resources.notification_rules.PRESERVE_EXISTING_NOTIFICATION_POLICIES", True)
@patch("lib.pagerduty.resources.notification_rules.OnCallAPIClient")
def test_existing_notification_policies_are_preserved(self, MockOnCallAPIClient):
# Setup user with existing notification rules
self.oncall_user["notification_rules"] = [{"id": "NR1"}]

# Run migration
migrate_notification_rules(self.pd_user)

# Verify no notification rules were migrated
MockOnCallAPIClient.create.assert_not_called()

@patch("lib.pagerduty.resources.notification_rules.PRESERVE_EXISTING_NOTIFICATION_POLICIES", True)
@patch("lib.pagerduty.resources.notification_rules.OnCallAPIClient")
def test_notification_policies_migrated_when_none_exist(self, MockOnCallAPIClient):
# Run migration
migrate_notification_rules(self.pd_user)

# Verify notification rules were migrated for both important and non-important cases
expected_calls = [
call("personal_notification_rules", {
"user_id": "OC1",
"type": "notify_by_email",
"important": False
}),
call("personal_notification_rules", {
"user_id": "OC1",
"type": "notify_by_email",
"important": True
})
]
MockOnCallAPIClient.create.assert_has_calls(expected_calls)

@patch("lib.pagerduty.resources.notification_rules.PRESERVE_EXISTING_NOTIFICATION_POLICIES", False)
@patch("lib.pagerduty.resources.notification_rules.OnCallAPIClient")
def test_existing_notification_policies_are_replaced_when_preserve_is_false(self, MockOnCallAPIClient):
# Setup user with existing notification rules
self.oncall_user["notification_rules"] = [
{"id": "NR1", "important": False},
{"id": "NR2", "important": True}
]

# Run migration
migrate_notification_rules(self.pd_user)

# Verify old rules were deleted
expected_delete_calls = [
call("personal_notification_rules/NR1"),
call("personal_notification_rules/NR2")
]
MockOnCallAPIClient.delete.assert_has_calls(expected_delete_calls, any_order=True)

# Verify new rules were created
expected_create_calls = [
call("personal_notification_rules", {
"user_id": "OC1",
"type": "notify_by_email",
"important": False
}),
call("personal_notification_rules", {
"user_id": "OC1",
"type": "notify_by_email",
"important": True
})
]
MockOnCallAPIClient.create.assert_has_calls(expected_create_calls)

@patch("lib.pagerduty.resources.notification_rules.PRESERVE_EXISTING_NOTIFICATION_POLICIES", False)
@patch("lib.pagerduty.resources.notification_rules.OnCallAPIClient")
def test_notification_policies_migrated_when_none_exist_and_preserve_is_false(self, MockOnCallAPIClient):
# Run migration
migrate_notification_rules(self.pd_user)

# Verify no rules were deleted (since none existed)
MockOnCallAPIClient.delete.assert_not_called()

# Verify new rules were created
expected_create_calls = [
call("personal_notification_rules", {
"user_id": "OC1",
"type": "notify_by_email",
"important": False
}),
call("personal_notification_rules", {
"user_id": "OC1",
"type": "notify_by_email",
"important": True
})
]
MockOnCallAPIClient.create.assert_has_calls(expected_create_calls)
Loading