Skip to content

Commit

Permalink
Update FormulaEnginePool to have multiple strongly-typed functions
Browse files Browse the repository at this point in the history
The previous design used a single dictionary `_engines` to store
engines of different types.  This is fixed now.

Signed-off-by: Sahas Subramanian <[email protected]>
  • Loading branch information
shsms committed Jun 5, 2023
1 parent 0058d88 commit c1fe897
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 34 deletions.
73 changes: 59 additions & 14 deletions src/frequenz/sdk/timeseries/_formula_engine/_formula_engine_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Type
from typing import TYPE_CHECKING, Type

from frequenz.channels import Sender

from ...actor import ChannelRegistry, ComponentMetricRequest
from ...microgrid.component import ComponentMetricId
from .._quantities import Quantity
from .._quantities import Current, Power, Quantity
from ._formula_generators._formula_generator import (
FormulaGenerator,
FormulaGeneratorConfig,
Expand Down Expand Up @@ -47,7 +47,9 @@ def __init__(
self._namespace = namespace
self._channel_registry = channel_registry
self._resampler_subscription_sender = resampler_subscription_sender
self._engines: dict[str, "FormulaEngine[Any]|FormulaEngine3Phase[Any]"] = {}
self._string_engines: dict[str, FormulaEngine[Quantity]] = {}
self._power_engines: dict[str, FormulaEngine[Power]] = {}
self._current_engines: dict[str, FormulaEngine3Phase[Current]] = {}

def from_string(
self,
Expand All @@ -68,8 +70,8 @@ def from_string(
A FormulaReceiver that streams values with the formulas applied.
"""
channel_key = formula + component_metric_id.value
if channel_key in self._engines:
return self._engines[channel_key] # type: ignore
if channel_key in self._string_engines:
return self._string_engines[channel_key]

builder = ResampledFormulaBuilder(
self._namespace,
Expand All @@ -80,16 +82,16 @@ def from_string(
Quantity,
)
formula_engine = builder.from_string(formula, nones_are_zeros)
self._engines[channel_key] = formula_engine
self._string_engines[channel_key] = formula_engine

return formula_engine

def from_generator(
def from_power_formula_generator(
self,
channel_key: str,
generator: "Type[FormulaGenerator[Any]]",
generator: Type[FormulaGenerator[Power]],
config: FormulaGeneratorConfig = FormulaGeneratorConfig(),
) -> FormulaEngine[Any] | FormulaEngine3Phase[Any]:
) -> FormulaEngine[Power]:
"""Get a receiver for a formula from a generator.
Args:
Expand All @@ -101,19 +103,62 @@ def from_generator(
A FormulaReceiver or a FormulaReceiver3Phase instance based on what the
FormulaGenerator returns.
"""
if channel_key in self._engines:
return self._engines[channel_key]
from ._formula_engine import ( # pylint: disable=import-outside-toplevel
FormulaEngine,
)

if channel_key in self._power_engines:
return self._power_engines[channel_key]

engine = generator(
self._namespace,
self._channel_registry,
self._resampler_subscription_sender,
config,
).generate()
assert isinstance(engine, FormulaEngine)
self._power_engines[channel_key] = engine
return engine

def from_3_phase_current_formula_generator(
self,
channel_key: str,
generator: "Type[FormulaGenerator[Current]]",
config: FormulaGeneratorConfig = FormulaGeneratorConfig(),
) -> FormulaEngine3Phase[Current]:
"""Get a receiver for a formula from a generator.
Args:
channel_key: A string to uniquely identify the formula.
generator: A formula generator.
config: config to initialize the formula generator with.
Returns:
A FormulaReceiver or a FormulaReceiver3Phase instance based on what the
FormulaGenerator returns.
"""
from ._formula_engine import ( # pylint: disable=import-outside-toplevel
FormulaEngine3Phase,
)

if channel_key in self._current_engines:
return self._current_engines[channel_key]

engine = generator(
self._namespace,
self._channel_registry,
self._resampler_subscription_sender,
config,
).generate()
self._engines[channel_key] = engine
assert isinstance(engine, FormulaEngine3Phase)
self._current_engines[channel_key] = engine
return engine

async def stop(self) -> None:
"""Stop all formula engines in the pool."""
for engine in self._engines.values():
await engine._stop() # pylint: disable=protected-access
for string_engine in self._string_engines.values():
await string_engine._stop() # pylint: disable=protected-access
for power_engine in self._power_engines.values():
await power_engine._stop() # pylint: disable=protected-access
for current_engine in self._current_engines.values():
await current_engine._stop() # pylint: disable=protected-access
10 changes: 5 additions & 5 deletions src/frequenz/sdk/timeseries/battery_pool/battery_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def power(self) -> FormulaEngine[Power]:
A FormulaEngine that will calculate and stream the total power of all
batteries in the pool.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_power_formula_generator(
"battery_pool_power",
BatteryPowerFormula,
FormulaGeneratorConfig(
Expand All @@ -280,7 +280,7 @@ def production_power(self) -> FormulaEngine[Power]:
A FormulaEngine that will calculate and stream the total production power of
all batteries in the pool.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_power_formula_generator(
"battery_pool_production_power",
BatteryPowerFormula,
FormulaGeneratorConfig(
Expand All @@ -304,10 +304,10 @@ def consumption_power(self) -> FormulaEngine[Power]:
method.
Returns:
A FormulaEngine that will calculate and stream the total consumption power of
all batteries in the pool.
A FormulaEngine that will calculate and stream the total consumption
power of all batteries in the pool.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_power_formula_generator(
"battery_pool_consumption_power",
BatteryPowerFormula,
FormulaGeneratorConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def current(self) -> FormulaEngine3Phase[Current]:
A FormulaEngine that will calculate and stream the total current of all EV
Chargers.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_3_phase_current_formula_generator(
"ev_charger_total_current",
EVChargerCurrentFormula,
FormulaGeneratorConfig(component_ids=self._component_ids),
Expand All @@ -147,7 +147,7 @@ def power(self) -> FormulaEngine[Power]:
A FormulaEngine that will calculate and stream the total power of all EV
Chargers.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_power_formula_generator(
"ev_charger_power",
EVChargerPowerFormula,
FormulaGeneratorConfig(
Expand All @@ -174,7 +174,7 @@ def production_power(self) -> FormulaEngine[Power]:
A FormulaEngine that will calculate and stream the production power of all
EV Chargers.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_power_formula_generator(
"ev_charger_production_power",
EVChargerPowerFormula,
FormulaGeneratorConfig(
Expand All @@ -201,7 +201,7 @@ def consumption_power(self) -> FormulaEngine[Power]:
A FormulaEngine that will calculate and stream the consumption power of all
EV Chargers.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_power_formula_generator(
"ev_charger_consumption_power",
EVChargerPowerFormula,
FormulaGeneratorConfig(
Expand Down
22 changes: 11 additions & 11 deletions src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def grid_power(self) -> FormulaEngine[Power]:
Returns:
A FormulaEngine that will calculate and stream grid power.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_power_formula_generator(
"grid_power",
GridPowerFormula,
)
Expand All @@ -184,7 +184,7 @@ def grid_consumption_power(self) -> FormulaEngine[Power]:
Returns:
A FormulaEngine that will calculate and stream grid consumption power.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_power_formula_generator(
"grid_consumption_power",
GridPowerFormula,
FormulaGeneratorConfig(formula_type=FormulaType.CONSUMPTION),
Expand All @@ -207,7 +207,7 @@ def grid_production_power(self) -> FormulaEngine[Power]:
Returns:
A FormulaEngine that will calculate and stream grid production power.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_power_formula_generator(
"grid_production_power",
GridPowerFormula,
FormulaGeneratorConfig(formula_type=FormulaType.PRODUCTION),
Expand All @@ -230,7 +230,7 @@ def grid_current(self) -> FormulaEngine3Phase[Current]:
Returns:
A FormulaEngine that will calculate and stream grid current.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_3_phase_current_formula_generator(
"grid_current",
GridCurrentFormula,
)
Expand All @@ -255,7 +255,7 @@ def consumer_power(self) -> FormulaEngine[Power]:
Returns:
A FormulaEngine that will calculate and stream consumer power.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_power_formula_generator(
"consumer_power",
ConsumerPowerFormula,
)
Expand All @@ -277,7 +277,7 @@ def pv_power(self) -> FormulaEngine[Power]:
Returns:
A FormulaEngine that will calculate and stream PV total power.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_power_formula_generator(
"pv_power",
PVPowerFormula,
FormulaGeneratorConfig(formula_type=FormulaType.PASSIVE_SIGN_CONVENTION),
Expand All @@ -300,7 +300,7 @@ def pv_production_power(self) -> FormulaEngine[Power]:
Returns:
A FormulaEngine that will calculate and stream PV power production.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_power_formula_generator(
"pv_production_power",
PVPowerFormula,
FormulaGeneratorConfig(formula_type=FormulaType.PRODUCTION),
Expand All @@ -323,7 +323,7 @@ def pv_consumption_power(self) -> FormulaEngine[Power]:
Returns:
A FormulaEngine that will calculate and stream PV power consumption.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_power_formula_generator(
"pv_consumption_power",
PVPowerFormula,
FormulaGeneratorConfig(formula_type=FormulaType.CONSUMPTION),
Expand All @@ -346,7 +346,7 @@ def chp_power(self) -> FormulaEngine[Power]:
Returns:
A FormulaEngine that will calculate and stream CHP power production.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_power_formula_generator(
"chp_power",
CHPPowerFormula,
FormulaGeneratorConfig(formula_type=FormulaType.PASSIVE_SIGN_CONVENTION),
Expand All @@ -369,7 +369,7 @@ def chp_production_power(self) -> FormulaEngine[Power]:
Returns:
A FormulaEngine that will calculate and stream CHP power production.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_power_formula_generator(
"chp_production_power",
CHPPowerFormula,
FormulaGeneratorConfig(
Expand All @@ -394,7 +394,7 @@ def chp_consumption_power(self) -> FormulaEngine[Power]:
Returns:
A FormulaEngine that will calculate and stream CHP power consumption.
"""
engine = self._formula_pool.from_generator(
engine = self._formula_pool.from_power_formula_generator(
"chp_consumption_power",
CHPPowerFormula,
FormulaGeneratorConfig(
Expand Down

0 comments on commit c1fe897

Please sign in to comment.