-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
distunroller set last step periodically #1725
Conversation
# One episode finishes; move to the next worker | ||
# We need to make sure a whole episode is always sent to the same | ||
# worker so that the temporal information is preserved. | ||
exp = alf.nest.set_field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of a single trainer workers, we don't need to change the step type to LAST.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of a single trainer workers, we don't need to change the step type to LAST.
If there are multiple unrollers, we still need to set LAST. But it's not straightforward for an unroller to know if there is any other unroller, unless via the trainer. So for simplicity, here we always set LAST.
if self._exp_socket is None: | ||
self._exp_socket, _ = create_zmq_socket(zmq.ROUTER, '*', | ||
self._port, self._id) | ||
|
||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should send only for LAST step or episode length reached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should send only for LAST step or episode length reached.
Right now, we always send on a per-exp basis, instead of waiting for a long traj. The trainer is responsible for maintaining the traj integrity. The reason is for latency concern, because sending a very long traj might take a long time (especially with images), blocking the unroller.
self._num_earliest_frames_ignored = self._core_alg._num_earliest_frames_ignored | ||
|
||
# We always test tensor sharing among processes, because | ||
# we rely on undocumented features of PyTorch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explain what the undocumented feature is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explain what the undocumented feature is.
added explanation
alf/utils/common_test.py
Outdated
process.join() | ||
|
||
# numpy array should not be modified | ||
assert np.allclose(m.y, np.zeros([2])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be equal instead of close
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be equal instead of close
updated
# Add the temp exp buffer to the replay buffer | ||
for exp_params in unroller_exps_buffer[unroller_id]: | ||
for i, exp_params in enumerate( | ||
unroller_exps_buffer[unroller_id]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the batch size of the replay buffer is 1. env_id has to be 0 at the next line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the batch size of the replay buffer is 1. env_id has to be 0 at the next line
This is true for the current assumption. But since exp_params always contains env_id, we can just use it. Do you mean we should assert it's equal to 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just set it to 0 here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just set it to 0 here?
updated
# Add the temp exp buffer to the replay buffer | ||
for exp_params in unroller_exps_buffer[unroller_id]: | ||
replay_buffer.add_batch(exp_params, exp_params.env_id) | ||
env_id = torch.zeros([1], dtype=torch.int32) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
device="cpu"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should change exp_arams.env_id
instead, since exp_params.env_id will be stored in replay buffer and we dont' want inconsistency.
exp_params.env_id.zero_()
48969cd
to
53f6e4d
Compare
This PR let the DistributedUnroller to truncate the experience stream on its own. The stream is truncated if either the predefined max episode length is reached, or the env returns a LAST step.
After the stream is truncated, the unroller will switch to sending exps to a different trainer worker (if available).
This PR is dependent on PR #1723 which fixes a ReplayBuffer sharing issue among processes. Now I've added a minimal test to the init of DistributedTrainer to make sure that ReplayBuffer can correctly shared with a subprocess.