-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathagent.py
153 lines (123 loc) · 4.26 KB
/
agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/env python3
import os
import logging
import configparser
import argparse
import traceback
import asyncio
from systemd import journal
from log import ColorLogHandler
from glonax import client as gclient
from glonax.message import Instance
from glonax_agent import GlonaxAgent
from glonax_agent.system import System
from glonax_agent.models import GpsTelemetry
APP_NAME = "glonax-agent"
config = configparser.ConfigParser()
logger = logging.getLogger()
instance_id = os.getenv("GLONAX_INSTANCE_ID")
async def gps_server():
from gps import client
from gps.schemas import TPV
while True:
try:
async with await client.open() as c:
async for result in c:
if isinstance(result, TPV):
location = GpsTelemetry(
# instance=str(glonax_agent.instance.id),
mode=result.mode,
lat=result.lat,
lon=result.lon,
alt=result.alt,
speed=result.speed,
heading=result.track,
)
logger.debug(location)
# glonax_agent.location_service.feed(location)
except asyncio.CancelledError:
logger.info("GPS handler cancelled")
return
except ConnectionError as e:
logger.debug(f"GPS connection error: {e}")
logger.error("GPS is not running")
except Exception as e:
logger.critical(f"Unknown error: {traceback.format_exc()}")
logger.info("Reconnecting to GPS...")
await asyncio.sleep(1)
async def glonax_server():
logger.info("Starting glonax task")
path = config["glonax"]["unix_socket"]
while True:
try:
logger.info(f"Connecting to glonax at {path}")
user_agent = "glonax-agent/1.0"
async with await gclient.open_session(
path, user_agent=user_agent
) as session:
logger.info(f"Glonax connected to {path}")
# async for message in session:
# glonax_agent.machine_service.feed(message)
except asyncio.CancelledError:
logger.info("Glonax task cancelled")
return
except ConnectionError as e:
logger.debug(f"Glonax connection error: {e}")
logger.error("Glonax is not running")
except Exception as e:
logger.critical(f"Unknown error: {traceback.format_exc()}")
logger.info("Reconnecting to glonax...")
await asyncio.sleep(1)
async def main():
logger.info(f"Starting {APP_NAME}")
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(glonax_server())
tg.create_task(gps_server())
except asyncio.CancelledError:
logger.info("Agent is gracefully shutting down")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Glonax agent for the Laixer Edge platform"
)
parser.add_argument(
"-l",
"--log-level",
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Set the logging level",
)
parser.add_argument(
"--log-systemd",
action="store_true",
help="Enable logging to systemd journal",
)
parser.add_argument(
"-c",
"--config",
default="config.ini",
help="Specify the configuration file to use",
)
parser.add_argument(
"-i",
"--instance",
default=instance_id,
help="Specify the instance ID to use",
)
# parser.add_argument(
# "-s",
# "--socket",
# default=UNIX_SOCKET,
# help="Specify the UNIX socket path to use",
# )
args = parser.parse_args()
log_level = logging.getLevelName(args.log_level.upper())
logger.setLevel(log_level)
if args.log_systemd:
logger.addHandler(journal.JournaldLogHandler(identifier=APP_NAME))
else:
logger.addHandler(ColorLogHandler())
config.read(args.config)
config["DEFAULT"]["instance_id"] = args.instance
# config["glonax"]["unix_socket"] = args.socket
asyncio.run(main())