forked from cubewise-code/tm1py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMessageLogService.py
155 lines (129 loc) · 6.6 KB
/
MessageLogService.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from warnings import warn
from datetime import datetime
from typing import Dict, Iterable, Optional
from TM1py.Objects.Process import Process
from TM1py.Services.ObjectService import ObjectService
from TM1py.Services.RestService import RestService
from TM1py.Utils import verify_version, deprecated_in_version, odata_track_changes_header, require_ops_admin, require_data_admin, \
format_url, CaseAndSpaceInsensitiveDict, CaseAndSpaceInsensitiveSet, utc_localize_time
class MessageLogService(ObjectService):
def __init__(self, rest: RestService):
super().__init__(rest)
if verify_version(required_version="12.0.0", version=rest.version):
# warn only due to use in Monitoring Service
warn("Message Logs are not available in this version of TM1, removed as of 12.0.0", DeprecationWarning,
2)
self.last_delta_request = None
@deprecated_in_version(version="12.0.0")
@odata_track_changes_header
def initialize_delta_requests(self, filter=None, **kwargs):
url = "/TailMessageLog()"
if filter:
url += "?$filter={}".format(filter)
response = self._rest.GET(url=url, **kwargs)
# Read the next delta-request-url from the response
self.last_delta_request = response.text[response.text.rfind(
"MessageLogEntries/!delta('"):-2]
@deprecated_in_version(version="12.0.0")
@odata_track_changes_header
def execute_delta_request(self, **kwargs) -> Dict:
response = self._rest.GET(
url="/" + self.last_delta_request, **kwargs)
self.last_delta_request = response.text[response.text.rfind(
"MessageLogEntries/!delta('"):-2]
return response.json()['value']
@deprecated_in_version(version="12.0.0")
@require_ops_admin
def get_entries(self, reverse: bool = True, since: datetime = None,
until: datetime = None, top: int = None, logger: str = None,
level: str = None, msg_contains: Iterable = None, msg_contains_operator: str = 'and',
**kwargs) -> Dict:
"""
:param reverse: Boolean
:param since: of type datetime. If it doesn't have tz information, UTC is assumed.
:param until: of type datetime. If it doesn't have tz information, UTC is assumed.
:param top: Integer
:param logger: string, eg TM1.Server, TM1.Chore, TM1.Mdx.Interface, TM1.Process
:param level: string, ERROR, WARNING, INFO, DEBUG, UNKNOWN
:param msg_contains: iterable, find substring in log message; list of substrings will be queried as AND statement
:param msg_contains_operator: 'and' or 'or'
:param kwargs:
:return: Dict of server log
"""
msg_contains_operator = msg_contains_operator.strip().lower()
if msg_contains_operator not in ("and", "or"):
raise ValueError(
"'msg_contains_operator' must be either 'AND' or 'OR'")
reverse = 'desc' if reverse else 'asc'
url = '/MessageLogEntries?$orderby=TimeStamp {}'.format(reverse)
if since or until or logger or level or msg_contains:
log_filters = []
if since:
# If since doesn't have tz information, UTC is assumed
if not since.tzinfo:
since = utc_localize_time(since)
log_filters.append(format_url(
"TimeStamp ge {}", since.strftime("%Y-%m-%dT%H:%M:%SZ")))
if until:
# If until doesn't have tz information, UTC is assumed
if not until.tzinfo:
until = utc_localize_time(until)
log_filters.append(format_url(
"TimeStamp le {}", until.strftime("%Y-%m-%dT%H:%M:%SZ")))
if logger:
log_filters.append(format_url("Logger eq '{}'", logger))
if level:
level_dict = CaseAndSpaceInsensitiveDict(
{'ERROR': 1, 'WARNING': 2, 'INFO': 3, 'DEBUG': 4, 'UNKNOWN': 5})
level_index = level_dict.get(level)
if level_index:
log_filters.append("Level eq {}".format(level_index))
if msg_contains:
if isinstance(msg_contains, str):
log_filters.append(format_url(
"contains(toupper(Message),toupper('{}'))", msg_contains))
else:
msg_filters = [format_url("contains(toupper(Message),toupper('{}'))", wildcard)
for wildcard in msg_contains]
log_filters.append("({})".format(
f" {msg_contains_operator} ".join(msg_filters)))
url += "&$filter={}".format(" and ".join(log_filters))
if top:
url += '&$top={}'.format(top)
response = self._rest.GET(url, **kwargs)
return response.json()['value']
@require_data_admin
def create_entry(self, level: str, message: str, **kwargs) -> None:
"""
:param level: string, FATAL, ERROR, WARN, INFO, DEBUG
:param message: string
:return:
"""
valid_levels = CaseAndSpaceInsensitiveSet(
{'FATAL', 'ERROR', 'WARN', 'INFO', 'DEBUG'})
if level not in valid_levels:
raise ValueError(f"Invalid level: '{level}'")
from TM1py.Services import ProcessService
process_service = ProcessService(self._rest)
process = Process(
name="", prolog_procedure="LogOutput('{}', '{}');".format(level, message))
success, status, _ = process_service.execute_process_with_return(
process, **kwargs)
if not success:
raise RuntimeError(
f"Failed to write to TM1 Message Log through unbound process. Status: '{status}'")
@require_ops_admin
@deprecated_in_version(version="12.0.0")
def get_last_process_message(self, process_name: str, **kwargs) -> Optional[str]:
""" Get the latest message log entry for a process
:param process_name: name of the process
:return: String - the message, for instance: "Ausführung normal beendet, verstrichene Zeit 0.03 Sekunden"
"""
url = format_url(
"/MessageLog()?$orderby='TimeStamp'&$filter=Logger eq 'TM1.Process' and contains(Message, '{}')",
process_name)
response = self._rest.GET(url=url, **kwargs)
response_as_list = response.json()['value']
if len(response_as_list) > 0:
message_log_entry = response_as_list[0]
return message_log_entry['Message']