Skip to content

Commit

Permalink
Better support for multiple simultaneous trains on a StimJim
Browse files Browse the repository at this point in the history
This fix does depend on modified StimJim firmware
  • Loading branch information
theonlydvr committed Jan 10, 2025
1 parent 5bf39c1 commit 92a6d7c
Showing 1 changed file with 62 additions and 62 deletions.
124 changes: 62 additions & 62 deletions pybehave/Components/StimJim.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ def parametrize(self, pnum: int, outs: list[int], per: int, dur: int, amps: np.n
stimulus += "{}".format(durs[i])
self.write(stimulus)

def update_parameters(self, per: int, amps: np.ndarray, durs: list[int]):
stimulus = "F{}".format(per)
def update_parameters(self, per: int, amps: np.ndarray, durs: list[int], utype: str = "F"):
stimulus = "{}{}".format(utype, per)
for i in range(amps.shape[1]):
stimulus += "; "
for j in range(amps.shape[0]):
Expand All @@ -44,71 +44,71 @@ def update_parameters(self, per: int, amps: np.ndarray, durs: list[int]):
self.write(stimulus)

def start(self, pnum: int, stype: str = "T") -> None:
self.write("{}{}".format(stype, pnum))
self.state = "{}{}".format(stype, pnum)
self.write(self.state)

def update(self, value: Union[bytes, str, int]) -> bool:
def update(self, value: Union[bytes, str]) -> bool:
if value is not None:
if isinstance(value, int):
self.state = value
if isinstance(value, str):
if value.startswith("T") or value.startswith("U"):
self.state = value
else:
if isinstance(value, str):
pass
else:
dec_val = value.decode('utf-8')
segs = dec_val.split('\n')
segs[0] = self.in_buffer + segs[0]
self.in_buffer = ''
if not dec_val.endswith('\n'):
self.in_buffer = segs[-1]
del segs[-1]
self.commands = []
for line in segs:
if len(line) > 0:
if 'Parameters' in line:
if self.cur_command is not None:
self.commands.append(self.cur_command)
self.cur_command = {"command": "P", "id": int(line.split("[")[1].split("]")[0])}
elif self.cur_command is not None and self.cur_command["command"] == "P":
l_segs = re.split(" +", line)
if "mode" in line:
if "mode" in self.cur_command:
self.cur_command["mode"].append(int(l_segs[2]))
else:
self.cur_command["mode"] = [int(l_segs[2])]
elif "period:" in line:
self.cur_command["period"] = int(l_segs[2])
elif "duration:" in line:
self.cur_command["duration"] = int(l_segs[2])
elif len(l_segs) > 1 and l_segs[1].isnumeric():
if "stages" in self.cur_command:
self.cur_command["stages"].append([l_segs[2], l_segs[4], l_segs[5][:-1]])
else:
self.cur_command["stages"] = [[l_segs[2], l_segs[4], l_segs[5][:-1]]]
elif line[0] == '-':
self.commands.append(self.cur_command)
self.configs[self.cur_command["id"]] = self.cur_command
self.cur_command = None
elif 'Started' in line:
self.state = int(line[line.rindex(' ')+1:-1])
if self.cur_command is not None:
self.commands.append(self.cur_command)
self.cur_command = None
elif 'complete' in line:
if self.cur_command is not None:
self.commands.append(self.cur_command)
self.cur_command = {"command": "C", "id": self.state}
self.state = None
l_segs = line.split(" ")
self.cur_command["n_pulse"] = int(l_segs[3])
elif self.cur_command is not None and self.cur_command["command"] == "C" and "Stage" in line:
l_segs = re.split(" +", line)
dec_val = value.decode('utf-8')
segs = dec_val.split('\n')
segs[0] = self.in_buffer + segs[0]
self.in_buffer = ''
if not dec_val.endswith('\n'):
self.in_buffer = segs[-1]
del segs[-1]
self.commands = []
for line in segs:
if len(line) > 0:
if 'Parameters' in line:
if self.cur_command is not None:
self.commands.append(self.cur_command)
self.cur_command = {"command": "P", "id": int(line.split("[")[1].split("]")[0])}
elif self.cur_command is not None and self.cur_command["command"] == "P":
l_segs = re.split(" +", line)
if "mode" in line:
if "mode" in self.cur_command:
self.cur_command["mode"].append(int(l_segs[2]))
else:
self.cur_command["mode"] = [int(l_segs[2])]
elif "period:" in line:
self.cur_command["period"] = int(l_segs[2])
elif "duration:" in line:
self.cur_command["duration"] = int(l_segs[2])
elif len(l_segs) > 1 and l_segs[1].isnumeric():
if "stages" in self.cur_command:
self.cur_command["stages"].append([l_segs[2][:-1], l_segs[3][:-1]])
self.cur_command["stages"].append([l_segs[2], l_segs[4], l_segs[5][:-1]])
else:
self.cur_command["stages"] = [[l_segs[2][:-1], l_segs[3][:-1]]]
if len(self.cur_command["stages"]) == len(self.configs[self.cur_command["id"]]["stages"]):
self.commands.append(self.cur_command)
self.cur_command = None
self.cur_command["stages"] = [[l_segs[2], l_segs[4], l_segs[5][:-1]]]
elif line[0] == '-':
self.commands.append(self.cur_command)
self.configs[self.cur_command["id"]] = self.cur_command
self.cur_command = None
elif 'Started' in line:
# self.state = int(line[line.rindex(' ')+1:-1])
if self.cur_command is not None:
self.commands.append(self.cur_command)
self.cur_command = None
elif 'complete' in line:
if self.cur_command is not None:
self.commands.append(self.cur_command)
if self.state is not None and line.startswith(self.state[0]):
self.cur_command = {"command": "C", "id": int(self.state[1:])}
# self.state = None
l_segs = line.split(" ")
self.cur_command["n_pulse"] = int(l_segs[4])
elif self.cur_command is not None and self.cur_command["command"] == "C" and "Stage" in line:
l_segs = re.split(" +", line)
if "stages" in self.cur_command:
self.cur_command["stages"].append([l_segs[2][:-1], l_segs[3][:-1]])
else:
self.cur_command["stages"] = [[l_segs[2][:-1], l_segs[3][:-1]]]
if len(self.cur_command["stages"]) == len(self.configs[self.cur_command["id"]]["stages"]):
self.commands.append(self.cur_command)
self.cur_command = None
return len(self.commands) > 0

@staticmethod
Expand Down

0 comments on commit 92a6d7c

Please sign in to comment.