This repository has been archived by the owner on Jun 17, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclass_email_functions.py
242 lines (192 loc) · 10.2 KB
/
class_email_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import base64
import logging
import requests
from datetime import datetime, timezone, timedelta
logging.basicConfig(filename='rmm_email_automation.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def get_access_token(api_key):
"""Obtains and returns an access token for the Wild Apricot API."""
api_base_url = 'https://api.wildapricot.org/v2.2'
auth_url = 'https://oauth.wildapricot.org/auth/token'
# Encode the API key in base64 format
encoded_key = base64.b64encode(f'APIKEY:{api_key}'.encode()).decode()
# Set the headers for authentication
auth_headers = {
'Authorization': f'Basic {encoded_key}',
'Content-Type': 'application/x-www-form-urlencoded',
}
# Obtain the access token
auth_data = {'grant_type': 'client_credentials', 'scope': 'auto'}
auth_response = requests.post(auth_url, headers=auth_headers, data=auth_data)
access_token = auth_response.json()['access_token']
return access_token
def get_event_attendees(event_id, access_token):
"""Retrieves event details from the Wild Apricot API given an event ID and prints the number of checked-in attendees."""
api_base_url = 'https://api.wildapricot.org/v2.2'
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json',
'Accept': 'application/json',
}
# Make an API request to retrieve the account details
account_response = requests.get(f'{api_base_url}/accounts', headers=headers)
if account_response.status_code != 200:
logging.error(f'Error: Unable to retrieve account details. Status code: {account_response.status_code}')
return
account_id = account_response.json()[0]['Id']
# Make an API request to retrieve event registrations
registrations_response = requests.get(
f'{api_base_url}/accounts/{account_id}/eventregistrations?eventId={event_id}', headers=headers
)
if registrations_response.status_code != 200:
logging.error(f'Error: Unable to retrieve event registrations. Status code: {registrations_response.status_code}')
return
registrations = registrations_response.json()
# Print the individual contact IDs
contact_ids = [registration['Contact']['Id'] for registration in registrations]
#logging.info(f'Contact IDs: {contact_ids}')
return contact_ids
def get_contact_info(contact_id, access_token):
"""Retrieves the email address and first name of a contact given a contact ID."""
api_base_url = 'https://api.wildapricot.org/v2.1'
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json',
'Accept': 'application/json',
}
# Make an API request to retrieve the account details
account_response = requests.get(f'{api_base_url}/accounts', headers=headers)
if account_response.status_code != 200:
logging.error(f'Error: Unable to retrieve account details. Status code: {account_response.status_code}')
return
account_id = account_response.json()[0]['Id']
# Make an API request to retrieve the contact details
contact_response = requests.get(f'{api_base_url}/accounts/{account_id}/contacts/{contact_id}', headers=headers)
if contact_response.status_code != 200:
logging.error(f'Error: Unable to retrieve contact details. Status code: {contact_response.status_code}')
return
contact_details = contact_response.json()
# Get the email address, first name, and membership status from the contact details
email = contact_details.get('Email', 'Unknown')
first_name = contact_details.get('FirstName', 'Unknown')
membership_enabled = contact_details.get('MembershipEnabled', False)
return email, first_name, contact_id, membership_enabled
def send_email(access_token,body, contact_id,first_name, email):
"""Sends a test email using the Wild Apricot API."""
api_base_url = 'https://api.wildapricot.org/v2.2'
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json',
'Accept': 'application/json',
}
account_response = requests.get(f'{api_base_url}/accounts', headers=headers)
if account_response.status_code != 200:
logging.error(f'Error: Unable to retrieve account details. Status code: {account_response.status_code}')
return
account_id = account_response.json()[0]['Id']
# Prepare email data
email_data = {
"Subject": "Free Month Promo at Red Mountain Makers!",
"Body": body,
"ReplyToAddress": "[email protected]",
"ReplyToName": "Red Mountain Makers",
"Recipients": [
{
"Id": contact_id,
"Type": "IndividualContactRecipient",
"Name": first_name,
"Email": email,
}
],
}
# Make an API request to send the email
send_email_response = requests.post(f'{api_base_url}/rpc/{account_id}/email/SendEmail', headers=headers, json=email_data)
if send_email_response.status_code != 200:
logging.error(f'Error: Unable to send email. Status code: {send_email_response.status_code}, contact ID {contact_id}')
return
def fill_email_template(Contact_First_Name, Event_Title,Discount_Code, template):
return template.format(Contact_First_Name=Contact_First_Name, Event_Title=Event_Title, Discount_Code=Discount_Code)
def read_template_file(file_path):
with open(file_path, 'r') as file:
return file.read()
def event_title(event_id, access_token):
"""Retrieves event details from the Wild Apricot API given an event ID and prints the number of checked-in attendees."""
api_base_url = 'https://api.wildapricot.org/v2.2'
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json',
'Accept': 'application/json',
}
# Make an API request to retrieve the account details
account_response = requests.get(f'{api_base_url}/accounts', headers=headers)
if account_response.status_code != 200:
logging.error(f'Error: Unable to retrieve account details. Status code: {account_response.status_code}')
return
account_id = account_response.json()[0]['Id']
# Make an API request to retrieve event details
event_response = requests.get(f'{api_base_url}/accounts/{account_id}/Events/{event_id}', headers=headers)
if event_response.status_code != 200:
logging.error(f'Error: Unable to retrieve event details. Status code: {event_response.status_code}')
return
event_details = event_response.json()
# Return the number of checked-in attendees
event_name = event_details.get('Name', 'Unknown')
return event_name
def get_past_event_ids(access_token, current_datetime=None):
"""Retrieves past public event data from the Wild Apricot API and returns a list of event IDs."""
api_base_url = 'https://api.wildapricot.org/v2.2'
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json',
'Accept': 'application/json',
}
# Make an API request to retrieve the account details
account_response = requests.get(f'{api_base_url}/accounts', headers=headers)
account_id = account_response.json()[0]['Id']
# Get the current date and time, if not provided
if current_datetime is None:
current_datetime = datetime.now(timezone.utc)
# Check if today is Monday
hours_to_check = 72 if current_datetime.weekday() == 0 else 24
past_datetime = current_datetime - timedelta(hours=hours_to_check)
# Make an API request to retrieve event data
events_response = requests.get(f'{api_base_url}/accounts/{account_id}/Events', headers=headers)
events = events_response.json()['Events']
# Filter events that occurred within the specified time frame, are visible to the public, and do not have "free" or "awa" in the title
past_event_ids = [event['Id'] for event in events if event.get('EndDate') is not None and
past_datetime <= datetime.fromisoformat(event['EndDate'].replace('Z', '+00:00')) < current_datetime and
event.get('AccessLevel') == 'Public' and
'free' not in event.get('Name', '').lower() and
'awa' not in event.get('Name', '').lower() and
'summer' not in event.get('Name', '').lower()]
return past_event_ids
def send_discount_emails(access_token, event_id_list, template_file_path, Discount_Code):
html_template = read_template_file(template_file_path)
total_num_emails = 0
event_num_emails = 0
for event_id in event_id_list:
#logging.info(event_id)
Event_Title = event_title(event_id, access_token)
contact_ids = get_event_attendees(event_id, access_token)
if not contact_ids:
logging.info(f'No attendees found for event {event_id}')
else:
event_num_emails = 0
for id in contact_ids:
contact_info = get_contact_info(id, access_token)
membership_enabled = contact_info[3]
# Skip over members
if membership_enabled:
logging.info(f"A class attendee for {event_id} is already a member, skipping to the next attendee")
continue
event_num_emails +=1
total_num_emails += 1
email = contact_info[0]
Contact_First_Name = contact_info[1]
contact_id = contact_info[2]
#logging.info(f" Found past event: {Event_Title}")
# print(Contact_First_Name)
# print(email)
filled_template = fill_email_template(Contact_First_Name, Event_Title, Discount_Code, html_template)
send_email(access_token, filled_template, contact_id, Contact_First_Name, email)
logging.info(f"Sent {event_num_emails} emails for event {event_id}")
logging.info(f'Sent {total_num_emails} emails in total.')