-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathenvironment_utils.py
executable file
·155 lines (131 loc) · 6.55 KB
/
environment_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from omni.isaac.occupancy_map import _occupancy_map
from omni.isaac.occupancy_map.scripts.utils import update_location, compute_coordinates, generate_image
import utils.misc_utils
from utils.misc_utils import *
class environment:
def __init__(self, config, rng, local_file_prefix, meters_per_unit=0.01):
self.get_environment(config, rng, local_file_prefix)
self.meters_per_unit = meters_per_unit
def set_meters_per_unit(self, meters_per_unit):
self.meters_per_unit = meters_per_unit
def get_environment(self, config, rng: np.random.default_rng, local_file_prefix: str):
"""
Used to load and process the environment.
config: the configuration processed by the main algorithm
rng: global rng
local_file_prefix: necessary to access the local storage from isaacsim
"""
self.env_usd_export_folder = config["env_path"].get()
if config["fix_env"].get() != "":
self.env_name = config["fix_env"].get()
else:
self.env_name = rng.choice([f for f in os.listdir(self.env_usd_export_folder) if not f.startswith('.')])
self.env_path = local_file_prefix + os.path.join(self.env_usd_export_folder, self.env_name, self.env_name + ".usd")
if config["use_stl"].get():
self.env_stl_path = os.path.join(self.env_usd_export_folder, self.env_name, self.env_name + ".stl")
self.env_mesh = trimesh.load(os.path.join(self.env_usd_export_folder, self.env_name, self.env_name + ".stl"))
else:
self.env_stl_path = None
self.env_mesh = None
if config["use_npy"].get():
self.env_info = np.load(os.path.join(self.env_usd_export_folder, self.env_name, self.env_name + ".npy"),
allow_pickle=True)
self.env_info = self.env_info.tolist()
else:
self.env_info = [0, 0, 0, 0, 0, 0, np.array([[-1000, -1000], [-1000, 1000], [1000, 1000], [1000, -1000]])]
self.env_limits = self.env_info[0:6]
self.shifts = [(self.env_limits[0] + self.env_limits[3]) / 2, (self.env_limits[1] + self.env_limits[4]) / 2,
self.env_limits[2]]
self.env_limits_shifted = [self.env_limits[i] - self.shifts[i % 3] for i, _ in enumerate(self.env_limits)]
self.area_polygon = get_area(self.env_info[6])
self.env_polygon = [Point(i[0], i[1], 0) for i in self.env_info[-1]]
def generate_map(self, out_path: str, zlim=[0, 1], cell_size = 0.05, origin=[0, 0, 0]):
"""
WARNING: HACK! ALL UNKNWON ARE WHITE!
Generates a map for the environment and save it to the out_path location in the disk.
First it searches for a non colliding location.
Then it creates a map of the environment.
We ovverride the unknown color to be "white" (i.e. free) as the system map unknown unreachable areas.
out_path: the folder where to save the map
z_limit: height to consider for projection
cell_size: size of a single cell in the map (cm)
origin: computed origin. Must be a free cell
"""
bound = int(
max(abs(self.env_limits_shifted[0]) + abs(self.env_limits_shifted[3]),
abs(self.env_limits_shifted[1]) + abs(self.env_limits_shifted[4])) / self.meters_per_unit * 1.5)
_om = _occupancy_map.acquire_occupancy_map_interface()
lower_bound = [-bound, -bound, zlim[0]/ self.meters_per_unit]
lower_bound = np.array(lower_bound) - np.array(origin) / self.meters_per_unit
upper_bound = [bound, bound, zlim[1]/ self.meters_per_unit *.8]
upper_bound = np.array(upper_bound) - np.array(origin) / self.meters_per_unit
center = np.array(origin) / self.meters_per_unit
center[2] += 0.1 / self.meters_per_unit # 10 cm above the floor
update_location(_om, center, lower_bound, upper_bound)
_om.set_cell_size(cell_size/self.meters_per_unit)
_om.generate()
image_buffer = generate_image(_om, [0, 0, 0, 255], [255, 255, 255, 255], [255, 255, 255, 255])
dims = _om.get_dimensions()
_im = Image.frombytes("RGBA", (dims.x, dims.y), bytes(image_buffer))
image_width = _im.width
image_height = _im.height
size = [0, 0, 0]
size[0] = image_width * cell_size
size[1] = image_height * cell_size
scale_to_meters = 1.0 / self.meters_per_unit
default_image_name = os.path.join(out_path, "map.png")
top_left, top_right, bottom_left, bottom_right, image_coords = compute_coordinates(_om, cell_size)
ros_yaml_file_text = "image: " + default_image_name
ros_yaml_file_text += f"\nresolution: {float(cell_size / scale_to_meters)}"
ros_yaml_file_text += (
f"\norigin: [{float(bottom_left[0] / scale_to_meters)}, {float(bottom_left[1] / scale_to_meters)}, 0.0000]"
)
ros_yaml_file_text += "\nnegate: 0"
ros_yaml_file_text += f"\noccupied_thresh: {0.65}"
ros_yaml_file_text += "\nfree_thresh: 0.196"
_im.save(default_image_name)
with open(default_image_name[:-3] + "yaml", 'w') as f:
f.write(ros_yaml_file_text)
center = lower_bound
center[2] = -100000000.0
update_location(_om, center, [0, 0, 0], [0, 0, 0])
_om.generate()
# disable_extension('omni.isaac.occupancy_map')
def load_and_center(self, prim_path: str = "/World/home", correct_paths_req: bool = True):
"""
Load the environment from the usd path env_path
Center it wrt the world coordinate frames
The environment is loaded at prim_path
prim_path: path that the environment should have in the prim tree
"""
stage = omni.usd.get_context().get_stage()
print("loading environment {}".format(self.env_name))
# from omni.isaac.core.utils.nucleus import find_nucleus_server
# result, nucleus_server = find_nucleus_server()
res, _ = omni.kit.commands.execute('CreateReferenceCommand',
usd_context=omni.usd.get_context(),
path_to=prim_path,
asset_path=self.env_path,
# asset_path= nucleus_server + "/Isaac/Environments/Simple_Warehouse/warehouse.usd",
instanceable=True)
if res:
clear_properties(prim_path)
print("Correcting paths...")
if correct_paths_req:
try:
correct_paths(prim_path)
except:
print("Failed to correct paths for {}".format(prim_path))
time.sleep(10)
# center the home in the middle of the environment
set_translate(stage.GetPrimAtPath(prim_path), list(- np.array(self.shifts) / self.meters_per_unit))
for child in stage.GetPrimAtPath(prim_path).GetAllChildren():
if "xform" == child.GetTypeName().lower():
clear_properties(str(child.GetPath()))
if "floor" not in str(child.GetPath()).lower():
myold = child.GetProperty('xformOp:translate').Get()
myold = [myold[0], myold[1], myold[2] - 0.04]
set_translate(child, list(np.array(myold)))
return prim_path
else:
raise Exception("Failed to load environment {}".format(self.env_name))