This repository has been archived by the owner on Dec 18, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathsecrets_client.py
69 lines (55 loc) · 2.25 KB
/
secrets_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import boto3
from botocore.exceptions import ClientError
import json
from sentry_sdk import capture_exception
import datetime
class STSCredentials:
def __init__(self, role_arn):
self.role_arn = role_arn
self.session_name = "my_session"
self.credentials = None
self.expiration = None
def get_credentials(self, sts_client):
now_utc = datetime.datetime.utcnow()
now_utc_str = now_utc.strftime('%Y-%m-%d %H:%M:%S+00:00')
now_utc = datetime.datetime.strptime(now_utc_str, '%Y-%m-%d %H:%M:%S+00:00')
if self.expiration:
expiration_str = self.expiration.strftime('%Y-%m-%d %H:%M:%S+00:00')
expiration = datetime.datetime.strptime(expiration_str, '%Y-%m-%d %H:%M:%S+00:00')
if self.credentials is None or now_utc >= expiration:
# sts_client = boto3.client('sts')
response = sts_client.assume_role(
RoleArn=self.role_arn,
RoleSessionName=self.session_name,
DurationSeconds=900
)
self.credentials = response['Credentials']
expiration_str = str(self.credentials['Expiration'])
dt = datetime.datetime.fromisoformat(expiration_str)
self.expiration = dt - datetime.timedelta(minutes=5)
return self.credentials
def set_arn(self, role_arn):
self.role_arn = role_arn
def get_secret(secret_name, region_name, credentials):
"""Get secrets from AWS Secret Manager
:param str secret_name: secret name
:param str region_name: region name
:param Sts credentials: Sts credentials object
:raises e: ClientError
:return str: value of the secret
"""
try:
secrets_client = boto3.client(
'secretsmanager',
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'],
region_name=region_name)
get_secret_value_response = secrets_client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
capture_exception(e)
raise e
secret = json.loads(get_secret_value_response['SecretString'])
return secret[secret_name]