-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathemailpager
executable file
·452 lines (393 loc) · 18.4 KB
/
emailpager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
#!/usr/bin/env python
# stdlib imports
import argparse
from copy import copy
import configparser
import os.path
import sys
import smtplib
import unicodedata
from xml.dom import minidom
from datetime import datetime, timedelta
import logging
# third party imports
import pandas as pd
import numpy as np
from impactutils.transfer.emailsender import EmailSender
from impactutils.comcat.query import ComCatInfo
# local imports
from losspager.schema import emailschema as es
from losspager.mail.formatter import format_msg, generate_subject_line
from losspager.utils.config import read_mail_config
from losspager.io.pagerdata import PagerData
DEFAULT_PAGER_URL = 'http://earthquake.usgs.gov/data/pager/'
ALERT_DICT = {'green': 0,
'yellow': 1,
'orange': 2,
'red': 3}
def _convert_ascii(text):
"""Temporary patch to encode text in ascii.
Args:
text (str): Text to convert to ascii encoding.
any characters cannot be converted to ascii,
then they are removed.
Returns:
str: text converted to ascii.
"""
text = str(text)
# Normalize characters and exclude leftovers with accents
text = ''.join(c for c in unicodedata.normalize(
'NFKD', text) if unicodedata.category(c) != 'Mn')
# Remove any leftovers that cannot be normalized
text = bytes(text, 'ascii', errors='ignore')
return text.decode("utf-8")
def _is_ascii(text):
"""
Args:
text (str): Text to check.
Returns:
bool: whether or not the text can be encoded in ascii.
"""
return all(ord(char) < 128 for char in text)
def get_version(session, pdata, release=False, renotify=False):
eventid = pdata.id
event = session.query(es.Event).filter(
es.Event.eventcode == eventid).first()
ccinfo = None
authid = eventid
if event is None:
try:
logging.debug('Looking in Comcat for %s' % eventid)
ccinfo = ComCatInfo(eventid)
authid, allids = ccinfo.getAssociatedIds()
logging.debug('Found authid %s for eventid %s' % (authid, eventid))
allids.insert(0, authid)
allids.remove(eventid)
for testid in allids:
event = session.query(es.Event).filter(
es.Event.eventcode == testid).first()
if event is not None:
break
except Exception as e:
fmt = 'Could not retrieve event information: "%s"'
logging.debug(fmt % (str(e)))
if event is None:
# we need to create a new event
event = es.Event(eventcode=authid)
logging.debug('Created event %s.' % str(event))
# check the most recent version of this event
# if it was pending and we got a release message,
# don't create a new version, just update this one
# to indicate that its been released.
# The reason is that we don't want the release action to
# increment the PAGER version number in the database.
sversions = sorted(event.versions, key=lambda v: v.number)
if len(sversions) and (not sversions[-1].released and release):
version = sversions[-1]
version.released = True
return (version, event, ccinfo)
# if we got a renotify message and we've seen the event before
# just grab the most recent version and make that the current one
if len(sversions) and renotify:
version = sversions[-1]
return (version, event, ccinfo)
prow = pdata.toSeries()
country = prow['Impacted Country ($)']
# Now create a new version
pending = pdata.summary_alert_pending
level = pdata.summary_alert
released = True
was_pending = False
if pending == 'pending':
released = False
was_pending = True
alert = ALERT_DICT[pdata.summary_alert]
version = es.Version(versioncode=eventid,
time=pdata.time,
country=country,
lat=pdata.latitude,
lon=pdata.longitude,
depth=pdata.depth,
magnitude=pdata.magnitude,
number=len(event.versions) + 1,
fatlevel=ALERT_DICT[pdata.fatality_alert],
ecolevel=ALERT_DICT[pdata.fatality_alert],
summarylevel=alert,
released=released,
was_pending=was_pending,
processtime=pdata.processing_time,
maxmmi=pdata.maxmmi)
event.versions.append(version)
return (version, event, ccinfo)
def send_emails(version, addresses, properties, msg, subject, DEBUG, attachments=[]):
props = properties.copy()
props['recipients'] = [address.email for address in addresses]
props['message'] = msg
props['subject'] = subject
if not DEBUG:
print('Sending message to %i recipients...' % len(addresses))
sender = EmailSender(properties=props, local_files=attachments)
sender.send()
else:
print('DEBUG on: would send message to %i recipients...' % len(addresses))
for addr in addresses:
print('\t%s' % addr.email)
version.addresses += addresses
return version
def main(args):
DEBUG = False
if args.debug:
DEBUG = True
# get all of the information from the mail config file
config = read_mail_config()
if 'log_folder' in config:
eventid = args.eventid
tnowstr = datetime.utcnow().strftime('%Y%m%d%H%M%S')
logfile = os.path.join(
config['log_folder'], 'emailpager_%s_%s.log' % (eventid, tnowstr))
logging.basicConfig(filename=logfile, level=logging.DEBUG,
format='%(asctime)s %(message)s')
else:
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(message)s')
# check the status of the system - stop if we are NOT primary
if 'status' not in config or config['status'] != 'primary':
logging.warning(
'This system is not configured to send email. Stopping.')
sys.exit(0)
# first make sure this is a losspager product
if args.type not in ['losspager', 'losspager-admin']:
logging.warning(
'emailpager is only configured to work with losspager/losspager-admin products. Exiting.')
sys.exit(1)
# TODO: Do something with delete messages
if args.status == 'DELETE':
msg = 'No action to take with delete messages.' % args.status
logging.warning(msg)
sys.exit(1)
jsondir = os.path.join(args.directory, 'json')
if not os.path.isdir(jsondir):
logging.warning(
'JSON directory "%s" containing PAGER output not found. Exiting.' % args.directory)
sys.exit(1)
# check to see if a --property-renotify option has been set
renotify = False
if args.renotify and args.renotify == 'true':
renotify = True
# check to see if a --property-release option has been set
release = False
if args.release and args.release == 'true':
release = True
# check to see if a --property-force-email option has been set
force_email = False
if args.force_email and args.force_email == 'true':
force_email = True
# scenario events are frequently set in the future. Since we NEVER anticipate
# wanting to send emails for scenario events for any reason, let's set a check
# for this. Times look like this: 2018-03-11T04:45:36.000Z
etime = datetime.strptime(args.time, '%Y-%m-%dT%H:%M:%S.%fZ')
if etime > datetime.utcnow():
logging.warning('The event time stamp is in the future. Exiting.')
sys.exit(1)
# Everything is cool...
logging.debug('Loading from data directory...')
pdata = PagerData()
pdata.loadFromJSON(jsondir)
# Instantiate a session with our database
logging.debug('Connecting to database...')
dburl = config['email']['database']['url']
session = es.get_session(url=dburl, create_db=False)
# Find event in database, or create it if not found. Return a new version for that event,
# or (if event has just been released and previous version was not released, return most recent
# version with released column set to True.
logging.debug('Finding event in database...')
version, event, ccinfo = get_version(
session, pdata, release=release, renotify=renotify)
# check to see if we forced and the event is older than the configured threshold
past_email_deadline = False
if force_email:
nowtime = datetime.utcnow()
threshtime = version.time + \
timedelta(seconds=config['release_threshold'] * 3600)
if nowtime > threshtime:
past_email_deadline = True
# add/commit the event for now, but we may have to delete it if we crash for any reason
session.add(event)
session.commit()
try:
# loop over all adddresses, check to see if they should get notified
# some of these users may have been notified about previous versions
# of this event, so they will have to be put in separate bins
# because their subject line will start with "UPDATE:".
logging.debug('Getting list of all user addresses...')
all_addresses = session.query(es.Address).all()
short_addresses_update = []
short_addresses_nonupdate = []
long_addresses_update = []
long_addresses_nonupdate = []
pdf_addresses_update = []
pdf_addresses_nonupdate = []
logging.debug('Determining which users should get emailed...')
for address in all_addresses:
logging.debug('Checking address "%s"...' % address)
should_alert, notified_before = address.shouldAlert(version,
renotify=renotify,
release=release,
ignore_time_limit=force_email)
if should_alert:
logging.debug('Address "%s" should be notified.' % address)
if address.format == 'short':
if notified_before:
short_addresses_update.append(address)
else:
short_addresses_nonupdate.append(address)
elif address.format == 'long':
if notified_before:
long_addresses_update.append(address)
else:
long_addresses_nonupdate.append(address)
else:
if notified_before:
pdf_addresses_update.append(address)
else:
pdf_addresses_nonupdate.append(address)
# how many emails are we sending
logging.debug('%i new short addresses.' %
(len(short_addresses_update)))
logging.debug('%i short addresses to update.' %
(len(short_addresses_nonupdate)))
logging.debug('%i new long addresses.' % (len(long_addresses_update)))
logging.debug('%i long addresses to update.' %
(len(long_addresses_nonupdate)))
logging.debug('%i new pdf addresses.' % (len(pdf_addresses_update)))
logging.debug('%i pdf addresses to update.' %
(len(pdf_addresses_nonupdate)))
# try to find the event url
logging.debug('Getting event url...')
if ccinfo is not None:
event_url = ccinfo.getURL() + '#pager'
else:
event_url = DEFAULT_PAGER_URL
# create the short and long message texts
logging.debug('Creating message text and subject...')
short_msg = format_msg(
version, pdata, 'short', event_url, past_email_deadline=past_email_deadline)
long_msg = format_msg(version, pdata, 'long', event_url,
past_email_deadline=past_email_deadline)
# create the long and short subjects
subject, subject_update = generate_subject_line(version, pdata)
all_props = {}
all_props['smtp_servers'] = config['email']['smtp_servers']
all_props['sender'] = config['email']['sender']
if 'max_bcc' in config['email']:
all_props['max_bcc'] = config['email']['max_bcc']
# send emails to all short format addresses
logging.debug('Sending short addresses...')
if len(short_addresses_update):
version = send_emails(
version, short_addresses_update, all_props, short_msg, subject_update, DEBUG)
if len(short_addresses_nonupdate):
version = send_emails(
version, short_addresses_nonupdate, all_props, short_msg, subject, DEBUG)
# send emails to all long format addresses
logging.debug('Sending long addresses...')
if len(long_addresses_update):
version = send_emails(
version, long_addresses_update, all_props, long_msg, subject_update, DEBUG)
if len(long_addresses_nonupdate):
version = send_emails(
version, long_addresses_nonupdate, all_props, long_msg, subject, DEBUG)
# Temporary fix for messages that cannot be ascii encoded
# TODO: Move this to earthquake-impact-utils when it is clear that it
# will not effect other realtime products
try:
# Copy original text
temp_message = copy(long_msg)
temp_subject = copy(subject)
temp_subject_update = copy(subject_update)
# Check if the characters in the message and subject line are ascii
if not _is_ascii(temp_subject):
temp_subject = _convert_ascii(temp_subject)
if not _is_ascii(temp_subject_update):
temp_subject_update = _convert_ascii(temp_subject_update)
if not _is_ascii(temp_message):
temp_message = _convert_ascii(temp_message)
# No errors in the check/convert so update
long_msg = temp_message
subject_update = temp_subject_update
subject = temp_subject
except:
# If this code checking/encoding the text does not work
# it should not change the original text
pass
# send emails to all pdf format addresses
logging.debug('Sending pdf addresses...')
onepager_file = os.path.join(args.directory, 'onepager.pdf')
# only send attachments if this event has been released
if version.released:
attachments = [onepager_file]
else:
attachments = []
if len(pdf_addresses_update):
version = send_emails(version, pdf_addresses_update,
all_props, long_msg, subject_update, DEBUG,
attachments=attachments)
if len(pdf_addresses_nonupdate):
version = send_emails(version, pdf_addresses_nonupdate,
all_props, long_msg, subject, DEBUG,
attachments=attachments)
logging.debug('Done.')
except Exception as e:
# if we have any errors, we want to back out the event and version we added above.
# todo - the event might not be new, we can't just delete it, only if its empty
print('Exception "%s" on input %s Backing out any new events/versions.' %
(str(e), args.directory))
session.delete(version)
if len(event.versions) == 0:
session.delete(event)
session.commit()
session.commit()
session.close()
sys.exit(0)
if __name__ == '__main__':
# pdl passes in property arguments surrounded by double quotes. At the command shell,
# or in ipython, this are replaced for you. When called from PDL (presumably from a Java
# system call, they are NOT, and therefore those arguments are not parsed correctly.
sys.argv = [arg.replace('"', '') for arg in sys.argv]
desc = 'Send emails to PAGER users.'
argparser = argparse.ArgumentParser(description=desc,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
argparser.add_argument("--debug", action='store_true', default=False,
help='Turn off emailing, print out who *would* be notified.')
argparser.add_argument("--directory",
help="Directory where PAGER data can be found", metavar='DIRECTORY')
argparser.add_argument("--type",
help="Product type", metavar='TYPE')
argparser.add_argument("--code",
help="Product code", metavar='CODE')
argparser.add_argument("--source",
help="Product source", metavar='SOURCE')
argparser.add_argument("--status",
help="Product status", metavar='STATUS')
argparser.add_argument("--action",
help="Product action", metavar='ACTION')
argparser.add_argument("--preferred-latitude", type=float,
help="Event latitude", metavar='LAT', dest='lat')
argparser.add_argument("--preferred-longitude", type=float,
help="Event longitude", metavar='LON', dest='lon')
argparser.add_argument("--preferred-eventid",
help="Event ID", metavar='ID', dest='eventid')
argparser.add_argument("--preferred-depth", type=float,
help="Event depth", metavar='DEPTH', dest='depth')
argparser.add_argument("--preferred-magnitude", type=float,
help="Event magnitude", metavar='MAG', dest='magnitude')
argparser.add_argument("--preferred-eventtime",
help="Event time", metavar='TIME', dest='time')
argparser.add_argument("--property-renotify", dest='renotify',
help='If flag is set, any users previously notified of an event will be renotified.')
argparser.add_argument("--property-release", dest='release',
help='If flag is set, send non-pending email notifications if previous version status was pending.')
argparser.add_argument("--property-force-email", dest='force_email',
help='If flag is set, send email notifications to all appropriate users, ignoring email threshold.')
pargs, unknown = argparser.parse_known_args()
main(pargs)