-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
57 lines (43 loc) · 2.29 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from atlasbuggy import Orchestrator, run
from naboris.hardware_interface import HardwareInterface
from naboris.cli import NaborisCLI
from naboris.soundfiles import Sounds
from naboris.basic_guidance import BasicGuidance
from naboris.odometry import Odometry
from naboris.website_client import CommandClient
from naboris.website_client import CameraClient
from naboris.website_client import WebsiteConnection
from naboris.picamera import PiCamera
log = True
class NaborisOrchestrator(Orchestrator):
def __init__(self, event_loop):
self.set_default(write=log, level=20)
super(NaborisOrchestrator, self).__init__(event_loop)
self.hardware = HardwareInterface()
self.cli = NaborisCLI(enabled=True)
self.sounds = Sounds(
"sounds", "/home/pi/Music/Bastion/",
("humming", "curiousity", "nothing", "confusion", "concern", "sleepy", "vibrating"),
enabled=False
)
client = WebsiteConnection("10.76.76.1", user="robot", password="naboris")
self.odometry = Odometry()
self.guidance = BasicGuidance(enabled=False)
self.camera_client = CameraClient(client, 720, 480, enabled=False)
self.command_client = CommandClient(client)
self.picamera = PiCamera(enabled=False)
self.add_nodes(self.hardware, self.cli, self.sounds, self.guidance, self.odometry, self.camera_client,
self.command_client, self.picamera)
self.subscribe(self.sounds, self.cli, self.cli.sounds_tag)
self.subscribe(self.hardware, self.cli, self.cli.hardware_tag)
self.subscribe(self.guidance, self.cli, self.cli.guidance_tag)
self.subscribe(self.command_client, self.cli, self.cli.command_source_tag)
self.subscribe(self.odometry, self.guidance, self.guidance.odometry_tag)
self.subscribe(self.hardware, self.guidance, self.guidance.hardware_tag)
self.subscribe(self.hardware, self.odometry, self.odometry.encoder_tag, self.hardware.encoder_service)
self.subscribe(self.hardware, self.odometry, self.odometry.bno055_tag, self.hardware.bno055_service)
# async def loop(self):
# while True:
# print(self.hardware.left_tick, self.hardware.right_tick)
# await asyncio.sleep(0.1)
run(NaborisOrchestrator)