-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrefresh.py
303 lines (256 loc) · 10.4 KB
/
refresh.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
import requests
import os
import pandas as pd
import logging
import time
import datetime
import json
import msal
from sshtunnel import SSHTunnelForwarder
from dotenv import load_dotenv
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import signal
import atexit
load_dotenv()
# String variables = Replace with your own
tenant_id = os.getenv("TENANT_ID")
authority_url = os.getenv("AUTHORITY_URL") + tenant_id
client_id = os.getenv("CLIENT_ID")
client_secret = os.getenv("CLIENT_SECRET")
scope = [os.getenv("SCOPE")]
# https://learn.microsoft.com/en-us/power-bi/connect-data/asynchronous-refresh
# https://api.powerbi.com/v1.0/myorg/groups/{groupId}/datasets/{datasetId}/refreshes
workspace_id = os.getenv("WORKSPACE_ID") # = group_id
dataset_id = os.getenv("DATASET_ID")
url = (
"https://api.powerbi.com/v1.0/myorg/groups/"
+ workspace_id
+ "/datasets/"
+ dataset_id
+ "/refreshes"
)
ssh_tunnel = SSHTunnelForwarder(
"54.79.64.88",
ssh_username="bm",
ssh_pkey=os.getenv("PRIVATE_KEY_PATH"),
remote_bind_address=("replicated-db-api.okebet.com.au", 3306),
local_bind_address=("localhost", 3309),
)
def cleanup_tunnel():
"""Ensure proper tunnel cleanup on exit"""
if ssh_tunnel and ssh_tunnel.is_active:
logging.info("Cleaning up SSH tunnel...")
try:
ssh_tunnel.stop()
logging.info("SSH tunnel cleaned up successfully")
except Exception as e:
logging.error(f"Error cleaning up SSH tunnel: {e}")
# Register cleanup handlers
atexit.register(cleanup_tunnel)
signal.signal(signal.SIGTERM, lambda s, f: cleanup_tunnel())
signal.signal(signal.SIGINT, lambda s, f: cleanup_tunnel())
def configure_logging():
# Ensure the logs directory exists
if not os.path.exists("logs"):
os.makedirs("logs")
logs = os.listdir("logs")
if len(logs) > 10:
logs.sort()
for log in logs[:-10]:
os.remove(f"logs/{log}")
# Configure logging
logging.basicConfig(
filename=f"logs/refresh_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.log",
filemode="w",
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d_%H-%M-%S",
level=logging.INFO,
)
# Create a console handler and set its level to INFO
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# Create a formatter and add it to the console handler
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
console_handler.setFormatter(formatter)
# Add the console handler to the root logger
logging.getLogger().addHandler(console_handler)
logging.info(f"-------------DETAILS--------------------")
logging.info(f"Initiated at : {datetime.datetime.now()}")
logging.info(f"tenant_id = {tenant_id}")
logging.info(f"authority_url = {authority_url}")
logging.info(f"client_id = {client_id}")
logging.info(f"client_secret = {client_secret}")
logging.info(f"scope = {scope}")
logging.info(f"workspace_id = {workspace_id}")
logging.info(f"dataset_id = {dataset_id}")
logging.info(f"base_url = {url}")
logging.info(f"SSH Tunnel details: \n{ssh_tunnel}")
logging.info(f"-----------------------------------------")
def send_status_email(status, duration=None, error=None):
# Get email settings from env
sender_email = os.getenv("SMTP_EMAIL")
sender_password = os.getenv("SMTP_PASSWORD")
recipient_emails = os.getenv("RECIPIENT_EMAILS").split(",")
# Get latest log file content
logs = os.listdir("logs")
if logs:
latest_log = max(
[f for f in logs], key=lambda x: os.path.getctime(os.path.join("logs", x))
)
with open(os.path.join("logs", latest_log), "r") as f:
log_content = f.read()
else:
log_content = "No log file found"
# Create message
msg = MIMEMultipart("alternative")
msg["Subject"] = f"OkeBet PowerBI Refresh {status}"
msg["From"] = sender_email
msg["To"] = ", ".join(recipient_emails)
# Create HTML content
powerbi_url = "https://app.powerbi.com/Redirect?action=OpenApp&appId=69ae4888-532a-4c15-81a7-e883d8029a78&ctid=31c159be-8f05-4809-abc1-7ab3f75a37ee"
html = f"""
<html>
<body>
<h2>PowerBI Refresh Status: {status}</h2>
<p>The refresh completed at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
{"<p>Duration: " + str(duration) + "</p>" if duration else ""}
{"<p style='color:red'>Error: " + str(error) + "</p>" if error else ""}
<p><a href="{powerbi_url}">View PowerBI Dashboard</a></p>
<hr>
<h3>Log Output:</h3>
<pre style="background-color: #f5f5f5; padding: 10px; font-family: monospace; white-space: pre-wrap;">
{log_content}
</pre>
</body>
</html>
"""
msg.attach(MIMEText(html, "html"))
# Send email
try:
with smtplib.SMTP("smtp.gmail.com", 587) as server:
server.starttls()
server.login(sender_email, sender_password)
server.send_message(msg)
logging.info(f"Status email sent to {recipient_emails}")
except Exception as e:
logging.error(f"Failed to send email: {str(e)}")
def start_ssh_tunnel(max_retries=3, retry_delay=10):
for attempt in range(max_retries):
try:
# Clean up any existing tunnel first
if ssh_tunnel.is_active:
logging.info("Stopping existing SSH tunnel...")
ssh_tunnel.stop()
time.sleep(2) # Wait for cleanup
ssh_tunnel.start()
if ssh_tunnel.is_active:
logging.info(
f"SSH tunnel successfully started on attempt {attempt + 1}"
)
return True
else:
logging.warning(f"SSH tunnel failed to start on attempt {attempt + 1}")
except Exception as e:
logging.error(f"Failed to start SSH tunnel on attempt {attempt + 1}: {e}")
try:
ssh_tunnel.stop() # Ensure cleanup on error
except:
pass
if attempt < max_retries - 1:
logging.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
continue
logging.error("All SSH tunnel connection attempts failed")
return False
def check_ssh_connection():
try:
if not ssh_tunnel.is_active:
logging.warning("SSH tunnel became inactive, attempting to reconnect...")
if start_ssh_tunnel():
time.sleep(5) # Give connection time to stabilize
return True
return False
return True
except Exception as e:
logging.error(f"SSH connection check failed: {e}")
return False
def get_token_for_client(scope):
app = msal.ConfidentialClientApplication(
client_id, authority=authority_url, client_credential=client_secret
)
result = app.acquire_token_for_client(scopes=scope)
if "access_token" in result:
logging.info("Token acquired successfully.")
return result["access_token"]
else:
logging.error(
"Error in get_token_username_password:",
result.get("error"),
result.get("error_description"),
)
configure_logging()
start_ssh_tunnel()
access_token = get_token_for_client(scope)
header = {"Content-Type": "application/json", "Authorization": f"Bearer {access_token}"}
# Make the POST request with the parameters in the body
params = {"type": "full"}
params_json = json.dumps(params)
api_call = requests.post(url=url, headers=header, data=params_json)
logging.info(f"Started refresh at : {datetime.datetime.now()}")
logging.info(f"Status code: {api_call.status_code}")
logging.info(
"API headers:\n" + "\n".join(f"{k}: {v}" for k, v in api_call.headers.items())
)
# Get status of current api call to see if it's in progress or failed
def get_current_status():
api_call = requests.get(url=url, headers=header)
result = api_call.json()["value"]
df = pd.DataFrame(
result,
columns=["requestID", "id", "refreshType", "startTime", "endTime", "status"],
)
return df.status[0]
# while the df.status[0] returns anything other than Completed, log the current status to the log file with current time. Do it for a maximum of 1 hour.
start_time = datetime.datetime.now()
timeout = 60
max_time = start_time + datetime.timedelta(minutes=timeout)
while datetime.datetime.now() <= max_time:
if not check_ssh_connection():
logging.error("Cannot maintain SSH connection")
send_status_email("Failed", error="SSH connection lost and reconnection failed")
break
current_status = get_current_status()
if current_status == "Completed":
duration = datetime.datetime.now() - start_time
logging.info(f"Refresh completed. Status: {current_status}")
logging.info(f"Time to refresh: {duration}")
send_status_email("Completed", duration=duration)
break
elif current_status == "Unknown":
logging.info(
f"Refresh in progress... (Status Unknown = In progress) Status: {current_status}"
)
elif current_status == "Failed":
logging.error(f"Refresh failed. Status: {current_status}")
send_status_email("Failed", error="Refresh failed")
break
elif current_status == "Disabled":
logging.error(f"Refresh disabled. Status: {current_status}")
send_status_email("Failed", error="Refresh disabled")
break
else:
logging.error(f"Unknown status. Status: {current_status}")
send_status_email("Failed", error=f"Unknown status: {current_status}")
break
# Sleep before checking the status again
time.sleep(30)
# Handle timeout case
if datetime.datetime.now() > max_time:
logging.error(f"Refresh timed out after {timeout} minutes.")
send_status_email("Failed", error=f"Refresh timed out after {timeout} minutes.")
# Always stop SSH tunnel
ssh_tunnel.stop()