Skip to content

Commit

Permalink
test: Use fake profile data instead of test_usa_tamu
Browse files Browse the repository at this point in the history
Fixes: #489
  • Loading branch information
chadvoegele authored and BainanXia committed Feb 18, 2022
1 parent 5b1500e commit 733c5b1
Show file tree
Hide file tree
Showing 3 changed files with 298 additions and 57 deletions.
112 changes: 55 additions & 57 deletions powersimdata/input/tests/test_transform_profile.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
from unittest.mock import patch

import numpy as np
import pytest
from numpy.testing import assert_almost_equal

from powersimdata.input.change_table import ChangeTable
from powersimdata.input.grid import Grid
from powersimdata.input.input_data import InputData
from powersimdata.input.transform_grid import TransformGrid
from powersimdata.input.transform_profile import TransformProfile
from powersimdata.tests.mock_input_data import MockInputData

interconnect = ["Western"]
param = {
"demand": "vJan2021",
"hydro": "vJan2021",
"solar": "vJan2021",
"wind": "vJan2021",
"n_zone_to_scale": 6,
"n_plant_to_scale": 50,
"n_plant_to_add": 100,
Expand Down Expand Up @@ -88,7 +86,7 @@ def get_change_table_for_new_plant_addition(base_grid, resource):
return ct.ct


def _check_plants_are_scaled(ct, base_grid, profile_info, raw_profile, resource):
def _check_plants_are_scaled(ct, base_grid, raw_profile, resource):
plant_id_type = get_plant_with_resource(base_grid, resource)

base_profile = (
Expand All @@ -98,7 +96,8 @@ def _check_plants_are_scaled(ct, base_grid, profile_info, raw_profile, resource)
tg = TransformGrid(base_grid, ct)
transformed_grid = tg.get_grid()

tp = TransformProfile(profile_info, transformed_grid, ct)
empty_scenario_info = {} # scenario_info not needed since input_data is mocked
tp = TransformProfile(empty_scenario_info, transformed_grid, ct)
transformed_profile = tp.get_profile(resource)

scaled_plant_id = []
Expand Down Expand Up @@ -134,7 +133,7 @@ def _check_plants_are_scaled(ct, base_grid, profile_info, raw_profile, resource)
return transformed_profile


def _check_new_plants_are_added(ct, base_grid, profile_info, raw_profile, resource):
def _check_new_plants_are_added(ct, base_grid, raw_profile, resource):
n_plant = param["n_plant_to_add"]
plant_id_type = (
base_grid.plant.isin(profile_type[resource]).query("type == True").index
Expand All @@ -146,7 +145,8 @@ def _check_new_plants_are_added(ct, base_grid, profile_info, raw_profile, resour
tg = TransformGrid(base_grid, ct)
transformed_grid = tg.get_grid()

tp = TransformProfile(profile_info, transformed_grid, ct)
empty_scenario_info = {} # scenario_info not needed since input_data is mocked
tp = TransformProfile(empty_scenario_info, transformed_grid, ct)
transformed_profile = tp.get_profile(resource)

assert not transformed_profile.equals(base_profile)
Expand All @@ -159,30 +159,24 @@ def _check_new_plants_are_added(ct, base_grid, profile_info, raw_profile, resour
return transformed_profile.drop(base_profile.columns, axis=1)


def _check_new_plants_are_not_scaled(base_grid, profile_info, raw_profile, resource):
def _check_new_plants_are_not_scaled(base_grid, raw_profile, resource):
ct_zone = get_change_table_for_zone_scaling(base_grid, resource)
ct_id = get_change_table_for_id_scaling(base_grid, resource)
ct_new = get_change_table_for_new_plant_addition(base_grid, resource)
ct = {**ct_zone, **ct_id[resource], **ct_new}
# profile of new plants
new_profile = _check_new_plants_are_added(
ct_new, base_grid, profile_info, raw_profile, resource
)
new_profile = _check_new_plants_are_added(ct_new, base_grid, raw_profile, resource)
# transformed profile
scaled_profile = _check_plants_are_scaled(
ct, base_grid, profile_info, raw_profile, resource
)
scaled_profile = _check_plants_are_scaled(ct, base_grid, raw_profile, resource)
# check that the profiles of new plants in the scaled profile are not scaled
assert new_profile.equals(scaled_profile[new_profile.columns])


def _check_profile_of_new_plants_are_produced_correctly(
base_grid, profile_info, raw_profile, resource
base_grid, raw_profile, resource
):
ct_new = get_change_table_for_new_plant_addition(base_grid, resource)
new_profile = _check_new_plants_are_added(
ct_new, base_grid, profile_info, raw_profile, resource
)
new_profile = _check_new_plants_are_added(ct_new, base_grid, raw_profile, resource)
neighbor_id = [d["plant_id_neighbor"] for d in ct_new["new_plant"]]
new_plant_pmax = [d["Pmax"] for d in ct_new["new_plant"]]

Expand All @@ -197,39 +191,42 @@ def base_grid():
return grid


def raw_profile(kind):
input_data = InputData()
grid_model = "test_usa_tamu"
profile_info = {
"grid_model": grid_model,
f"base_{kind}": param[kind],
}
profile = input_data.get_data(profile_info, kind)
return profile_info, profile
@pytest.fixture(scope="module")
def input_data(base_grid):
mock_input_data = MockInputData(base_grid)
return mock_input_data


@pytest.fixture(scope="module", autouse=True)
def mock_input_data_class(input_data):
with patch(
"powersimdata.input.transform_profile.InputData"
) as mock_input_data_class:
mock_input_data_class.return_value = input_data
yield


@pytest.fixture(scope="module")
def raw_hydro():
return raw_profile("hydro")
def raw_hydro(input_data):
return input_data.get_data({}, "hydro")


@pytest.fixture(scope="module")
def raw_wind():
return raw_profile("wind")
def raw_wind(input_data):
return input_data.get_data({}, "wind")


@pytest.fixture(scope="module")
def raw_solar():
return raw_profile("solar")
def raw_solar(input_data):
return input_data.get_data({}, "solar")


@pytest.fixture(scope="module")
def raw_demand():
return raw_profile("demand")
def raw_demand(input_data):
return input_data.get_data({}, "demand")


def test_demand_is_scaled(base_grid, raw_demand):
demand_info, raw_demand = raw_demand
base_demand = raw_demand[base_grid.id2zone.keys()]

n_zone = param["n_zone_to_scale"]
Expand All @@ -249,7 +246,8 @@ def test_demand_is_scaled(base_grid, raw_demand):
tg = TransformGrid(base_grid, ct.ct)
transformed_grid = tg.get_grid()

tp = TransformProfile(demand_info, transformed_grid, ct.ct)
empty_scenario_info = {} # scenario_info not needed since input_data is mocked
tp = TransformProfile(empty_scenario_info, transformed_grid, ct.ct)
transformed_profile = tp.get_profile("demand")
assert not base_demand.equals(transformed_profile)

Expand All @@ -265,89 +263,89 @@ def test_demand_is_scaled(base_grid, raw_demand):

def test_solar_is_scaled_by_zone(base_grid, raw_solar):
ct = get_change_table_for_zone_scaling(base_grid, "solar")
_check_plants_are_scaled(ct, base_grid, *raw_solar, "solar")
_check_plants_are_scaled(ct, base_grid, raw_solar, "solar")


def test_solar_is_scaled_by_id(base_grid, raw_solar):
ct = get_change_table_for_id_scaling(base_grid, "solar")
_check_plants_are_scaled(ct, base_grid, *raw_solar, "solar")
_check_plants_are_scaled(ct, base_grid, raw_solar, "solar")


def test_solar_is_scaled_by_zone_and_id(base_grid, raw_solar):
ct_zone = get_change_table_for_zone_scaling(base_grid, "solar")
ct_id = get_change_table_for_id_scaling(base_grid, "solar")
ct = {**ct_zone, **ct_id["solar"]}
_check_plants_are_scaled(ct, base_grid, *raw_solar, "solar")
_check_plants_are_scaled(ct, base_grid, raw_solar, "solar")


def test_wind_is_scaled_by_zone(base_grid, raw_wind):
ct = get_change_table_for_zone_scaling(base_grid, "wind")
_check_plants_are_scaled(ct, base_grid, *raw_wind, "wind")
_check_plants_are_scaled(ct, base_grid, raw_wind, "wind")


def test_wind_is_scaled_by_id(base_grid, raw_wind):
ct = get_change_table_for_id_scaling(base_grid, "wind")
_check_plants_are_scaled(ct, base_grid, *raw_wind, "wind")
_check_plants_are_scaled(ct, base_grid, raw_wind, "wind")


def test_wind_is_scaled_by_zone_and_id(base_grid, raw_wind):
ct_zone = get_change_table_for_zone_scaling(base_grid, "wind")
ct_id = get_change_table_for_id_scaling(base_grid, "wind")
ct = {**ct_zone, **ct_id["wind"]}
_check_plants_are_scaled(ct, base_grid, *raw_wind, "wind")
_check_plants_are_scaled(ct, base_grid, raw_wind, "wind")


def test_hydro_is_scaled_by_zone(base_grid, raw_hydro):
ct = get_change_table_for_zone_scaling(base_grid, "hydro")
_check_plants_are_scaled(ct, base_grid, *raw_hydro, "hydro")
_check_plants_are_scaled(ct, base_grid, raw_hydro, "hydro")


def test_hydro_is_scaled_by_id(base_grid, raw_hydro):
ct = get_change_table_for_id_scaling(base_grid, "hydro")
_check_plants_are_scaled(ct, base_grid, *raw_hydro, "hydro")
_check_plants_are_scaled(ct, base_grid, raw_hydro, "hydro")


def test_hydro_is_scaled_by_zone_and_id(base_grid, raw_hydro):
ct_zone = get_change_table_for_zone_scaling(base_grid, "hydro")
ct_id = get_change_table_for_id_scaling(base_grid, "hydro")
ct = {**ct_zone, **ct_id["hydro"]}
_check_plants_are_scaled(ct, base_grid, *raw_hydro, "hydro")
_check_plants_are_scaled(ct, base_grid, raw_hydro, "hydro")


def test_new_solar_are_added(base_grid, raw_solar):
ct = get_change_table_for_new_plant_addition(base_grid, "solar")
_ = _check_new_plants_are_added(ct, base_grid, *raw_solar, "solar")
_ = _check_new_plants_are_added(ct, base_grid, raw_solar, "solar")


def test_new_wind_are_added(base_grid, raw_wind):
ct = get_change_table_for_new_plant_addition(base_grid, "wind")
_ = _check_new_plants_are_added(ct, base_grid, *raw_wind, "wind")
_ = _check_new_plants_are_added(ct, base_grid, raw_wind, "wind")


def test_new_hydro_added(base_grid, raw_hydro):
ct = get_change_table_for_new_plant_addition(base_grid, "hydro")
_ = _check_new_plants_are_added(ct, base_grid, *raw_hydro, "hydro")
_ = _check_new_plants_are_added(ct, base_grid, raw_hydro, "hydro")


def test_new_solar_are_not_scaled(base_grid, raw_solar):
_check_new_plants_are_not_scaled(base_grid, *raw_solar, "solar")
_check_new_plants_are_not_scaled(base_grid, raw_solar, "solar")


def test_new_wind_are_not_scaled(base_grid, raw_wind):
_check_new_plants_are_not_scaled(base_grid, *raw_wind, "wind")
_check_new_plants_are_not_scaled(base_grid, raw_wind, "wind")


def test_new_hydro_are_not_scaled(base_grid, raw_hydro):
_check_new_plants_are_not_scaled(base_grid, *raw_hydro, "hydro")
_check_new_plants_are_not_scaled(base_grid, raw_hydro, "hydro")


def test_new_solar_profile(base_grid, raw_solar):
_check_profile_of_new_plants_are_produced_correctly(base_grid, *raw_solar, "solar")
_check_profile_of_new_plants_are_produced_correctly(base_grid, raw_solar, "solar")


def test_new_wind_profile(base_grid, raw_wind):
_check_profile_of_new_plants_are_produced_correctly(base_grid, *raw_wind, "wind")
_check_profile_of_new_plants_are_produced_correctly(base_grid, raw_wind, "wind")


def test_new_hydro_profile(base_grid, raw_hydro):
_check_profile_of_new_plants_are_produced_correctly(base_grid, *raw_hydro, "hydro")
_check_profile_of_new_plants_are_produced_correctly(base_grid, raw_hydro, "hydro")
113 changes: 113 additions & 0 deletions powersimdata/tests/mock_input_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import numpy as np
import pandas as pd

from powersimdata.input.grid import Grid


class MockInputData:
"""
MockInputData is a mock of powersimdata.input.input_data.InputData
that generates random profiles.
Exactly 3 of {`start_time`, `end_time`, `periods`, `freq`} must be specified. See <https://pandas.pydata.org/docs/reference/api/pandas.date_range.html>.
:param powersimdata.input.grid.Grid grid: instance of Grid object.
:param str start_time: when profiles begin.
:param str end_time: when profiles end.
:param int periods: number of times in profile.
:param str freq: frequency of times in profile.
:param int random_seed: used to initialize the random generator.
:raises ValueError: raised if `field_name` specified in `get_data()` is not specified by this mock
:return: (*powersimdata.tests.mock_input_data.MockInputData*)
"""

_RESOURCES = {
"wind": {"wind", "wind_offshore"},
"solar": {"solar"},
"hydro": {"hydro"},
}

def __init__(
self,
grid: Grid,
start_time="2016-01-01 00:00:00",
end_time=None,
periods=24,
freq="H",
random_seed=6669,
):
self._grid = grid
self._start_time = start_time
self._end_time = end_time
self._periods = periods
self._freq = freq
self._random = np.random.default_rng(seed=random_seed)

self._profiles = {
"demand": self._get_demand(),
**{
resource: self._get_resource_profile(resource)
for resource in self._RESOURCES.keys()
},
}

def get_data(self, scenario_info, field_name):
"""Returns fake profile data.
:param dict scenario_info: not used.
:param str field_name: Can be any of *'demand'*, *'hydro'*, *'solar'*, or *'wind'*.
:return: (*pandas.DataFrame*) -- fake profile data
"""
profile = self._profiles.get(field_name)

if profile is None:
raise ValueError(f"No profile specified for {field_name}!")

return profile

def _get_demand(self):
"""Returns fake demand data.
:return: (*pandas.DataFrame*) -- fake demand data
"""
zone_ids = set(self._grid.plant["zone_id"])
fake_demand_profile = self._create_fake_profile(zone_ids)
return fake_demand_profile

def _get_resource_profile(self, resource_type):
"""Returns fake data for given resource_type.
:param str resource_type: Can be any of *'hydro'*, *'solar'*, or *'wind'*.
:return: (*pandas.DataFrame*) -- fake data for resource
"""
plant_ids = self._get_plant_ids_for_type(resource_type)
fake_resource_profile = self._create_fake_profile(plant_ids)
return fake_resource_profile

def _get_plant_ids_for_type(self, resource_type):
"""Retrieves plant_ids for plants of `resource_type` from the grid.
:param str resource_type: Can be any of *'hydro'*, *'solar'*, or *'wind'*.
:return: (*list*) -- list of plant_ids
"""
plant_ids = list(
self._grid.plant.query("type in @self._RESOURCES[@resource_type]").index
)
return plant_ids

def _create_fake_profile(self, columns):
"""Generates a fake profile.
:param list columns: columns for the DataFrame
:return: (*pandas.DataFrame*) -- a fake profile
"""
times = pd.date_range(
start=self._start_time,
end=self._end_time,
periods=self._periods,
freq=self._freq,
)
index = pd.Index(times, name="UTC Time")
data = self._random.uniform(low=0, high=1, size=(len(times), len(columns)))
fake_profile = pd.DataFrame(data=data, index=index, columns=columns)
return fake_profile
Loading

0 comments on commit 733c5b1

Please sign in to comment.