Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parallel solve for sample_flow_at_points #1059

Merged
merged 4 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 113 additions & 52 deletions floris/par_floris_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

from floris.core import State
from floris.floris_model import FlorisModel
from floris.type_dec import (
NDArrayFloat,
)


class ParFlorisModel(FlorisModel):
Expand Down Expand Up @@ -123,81 +126,111 @@ def run(self) -> None:
t0 = timerpc()
super().run()
t1 = timerpc()
elif self.interface == "multiprocessing":
self._print_timings(t0, t1, None, None)
else:
t0 = timerpc()
self.core.initialize_domain()
parallel_run_inputs = self._preprocessing()
t1 = timerpc()
if self.return_turbine_powers_only:
with self._PoolExecutor(self.max_workers) as p:
self._turbine_powers_split = p.starmap(
_parallel_run_powers_only,
if self.interface == "multiprocessing":
if self.return_turbine_powers_only:
with self._PoolExecutor(self.max_workers) as p:
self._turbine_powers_split = p.starmap(
_parallel_run_powers_only,
parallel_run_inputs
)
else:
with self._PoolExecutor(self.max_workers) as p:
self._fmodels_split = p.starmap(_parallel_run, parallel_run_inputs)
elif self.interface == "pathos":
if self.return_turbine_powers_only:
self._turbine_powers_split = self.pathos_pool.map(
_parallel_run_powers_only_map,
parallel_run_inputs
)
else:
with self._PoolExecutor(self.max_workers) as p:
self._fmodels_split = p.starmap(_parallel_run, parallel_run_inputs)
else:
self._fmodels_split = self.pathos_pool.map(
_parallel_run_map,
parallel_run_inputs
)
elif self.interface == "concurrent":
if self.return_turbine_powers_only:
with self._PoolExecutor(self.max_workers) as p:
self._turbine_powers_split = p.map(
_parallel_run_powers_only_map,
parallel_run_inputs
)
self._turbine_powers_split = list(self._turbine_powers_split)
else:
with self._PoolExecutor(self.max_workers) as p:
self._fmodels_split = p.map(
_parallel_run_map,
parallel_run_inputs
)
self._fmodels_split = list(self._fmodels_split)
t2 = timerpc()
self._postprocessing()
self.core.farm.finalize(self.core.grid.unsorted_indices)
self.core.state = State.USED
t3 = timerpc()
elif self.interface == "pathos":
self._print_timings(t0, t1, t2, t3)

def sample_flow_at_points(self, x: NDArrayFloat, y: NDArrayFloat, z: NDArrayFloat):
"""
Sample the flow field at specified points.

Args:
x: The x-coordinates of the points.
y: The y-coordinates of the points.
z: The z-coordinates of the points.

Returns:
NDArrayFloat: The wind speeds at the specified points.
"""
if self.return_turbine_powers_only:
raise NotImplementedError(
"Sampling flow at points is not supported when "
"return_turbine_powers_only is set to True on ParFlorisModel."
)

if self.interface is None:
t0 = timerpc()
self.core.initialize_domain()
parallel_run_inputs = self._preprocessing()
sampled_wind_speeds = super().sample_flow_at_points(x, y, z)
t1 = timerpc()
if self.return_turbine_powers_only:
self._turbine_powers_split = self.pathos_pool.map(
_parallel_run_powers_only_map,
parallel_run_inputs
)
else:
self._fmodels_split = self.pathos_pool.map(
_parallel_run_map,
parallel_run_inputs
)
t2 = timerpc()
self._postprocessing()
self.core.farm.finalize(self.core.grid.unsorted_indices)
self.core.state = State.USED
t3 = timerpc()
elif self.interface == "concurrent":
self._print_timings(t0, t1, None, None)
else:
t0 = timerpc()
self.core.initialize_domain()
parallel_run_inputs = self._preprocessing()
parallel_sample_flow_at_points_inputs = [
(fmodel_dict, control_setpoints, x, y, z)
for fmodel_dict, control_setpoints in parallel_run_inputs
]
t1 = timerpc()
if self.return_turbine_powers_only:
if self.interface == "multiprocessing":
with self._PoolExecutor(self.max_workers) as p:
self._turbine_powers_split = p.map(
_parallel_run_powers_only_map,
parallel_run_inputs
sampled_wind_speeds_p = p.starmap(
_parallel_sample_flow_at_points,
parallel_sample_flow_at_points_inputs
)
self._turbine_powers_split = list(self._turbine_powers_split)
else:
elif self.interface == "pathos":
sampled_wind_speeds_p = self.pathos_pool.map(
_parallel_sample_flow_at_points_map,
parallel_sample_flow_at_points_inputs
)
elif self.interface == "concurrent":
with self._PoolExecutor(self.max_workers) as p:
self._fmodels_split = p.map(
_parallel_run_map,
parallel_run_inputs
sampled_wind_speeds_p = p.map(
_parallel_sample_flow_at_points_map,
parallel_sample_flow_at_points_inputs
)
self._fmodels_split = list(self._fmodels_split)
sampled_wind_speeds_p = list(sampled_wind_speeds_p)
t2 = timerpc()
self._postprocessing()
self.core.farm.finalize(self.core.grid.unsorted_indices)
self.core.state = State.USED
sampled_wind_speeds = np.concatenate(sampled_wind_speeds_p, axis=0)
t3 = timerpc()
if self.print_timings:
print("===============================================================================")
if self.interface is None:
print(f"Total time spent for serial calculation (interface=None): {t1 - t0:.3f} s")
else:
print(
"Total time spent for parallel calculation "
f"({self.max_workers} workers): {t3-t0:.3f} s"
)
print(f" Time spent in parallel preprocessing: {t1-t0:.3f} s")
print(f" Time spent in parallel loop execution: {t2-t1:.3f} s.")
print(f" Time spent in parallel postprocessing: {t3-t2:.3f} s")
self._print_timings(t0, t1, t2, t3)

return sampled_wind_speeds

def _preprocessing(self):
"""
Expand Down Expand Up @@ -278,6 +311,23 @@ def _postprocessing(self):
axis=0
)

def _print_timings(self, t0, t1, t2, t3):
"""
Print the timings for the parallel execution.
"""
if self.print_timings:
print("===============================================================================")
if self.interface is None:
print(f"Total time spent for serial calculation (interface=None): {t1 - t0:.3f} s")
else:
print(
"Total time spent for parallel calculation "
f"({self.max_workers} workers): {t3-t0:.3f} s"
)
print(f" Time spent in parallel preprocessing: {t1-t0:.3f} s")
print(f" Time spent in parallel loop execution: {t2-t1:.3f} s.")
print(f" Time spent in parallel postprocessing: {t3-t2:.3f} s")

def _get_turbine_powers(self):
"""
Calculates the power at each turbine in the wind farm.
Expand Down Expand Up @@ -364,3 +414,14 @@ def _parallel_run_powers_only_map(x):
Wrapper for unpacking inputs to _parallel_run_powers_only() for use with map().
"""
return _parallel_run_powers_only(*x)

def _parallel_sample_flow_at_points(fmodel_dict, set_kwargs, x, y, z):
fmodel = FlorisModel(fmodel_dict)
fmodel.set(**set_kwargs)
return fmodel.sample_flow_at_points(x, y, z)

def _parallel_sample_flow_at_points_map(x):
"""
Wrapper for unpacking inputs to _parallel_sample_flow_at_points() for use with map().
"""
return _parallel_sample_flow_at_points(*x)
26 changes: 26 additions & 0 deletions tests/par_floris_model_unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,29 @@ def test_control_setpoints(sample_inputs_fixture):

assert powers_fmodel.shape == powers_pfmodel.shape
assert np.allclose(powers_fmodel, powers_pfmodel)

def test_sample_flow_at_points(sample_inputs_fixture):

sample_inputs_fixture.core["wake"]["model_strings"]["velocity_model"] = VELOCITY_MODEL
sample_inputs_fixture.core["wake"]["model_strings"]["deflection_model"] = DEFLECTION_MODEL

fmodel = FlorisModel(sample_inputs_fixture.core)

wind_speeds = np.array([8.0, 8.0, 9.0])
wind_directions = np.array([270.0, 270.0, 270.0])
fmodel.set(
wind_directions=wind_speeds.flatten(),
wind_speeds=wind_directions.flatten(),
turbulence_intensities=0.06 * np.ones_like(wind_speeds.flatten()),
)

x_test = np.array([500.0, 750.0, 1000.0, 1250.0, 1500.0])
y_test = np.array([0.0, 0.0, 0.0, 0.0, 0.0])
z_test = np.array([90.0, 90.0, 90.0, 90.0, 90.0])

ws_base = fmodel.sample_flow_at_points(x_test, y_test, z_test)

for interface in ["multiprocessing", "pathos", "concurrent"]:
pfmodel = ParFlorisModel(fmodel, max_workers=2, interface=interface)
ws_test = pfmodel.sample_flow_at_points(x_test, y_test, z_test)
assert np.allclose(ws_base, ws_test)