diff --git a/hisim/json_generator.py b/hisim/json_generator.py index c134bbe0b..e42e08ffd 100644 --- a/hisim/json_generator.py +++ b/hisim/json_generator.py @@ -42,7 +42,7 @@ class ConfigFile(JSONWizard): component_entries: List[ComponentEntry] = field(default_factory=list) my_simulation_parameters: Optional[SimulationParameters] = None my_module_config: Optional[Dict[str, Any]] = None - pyam_data_information: Optional[Dict[str, Any]] = None + scenario_data_information: Optional[Dict[str, Any]] = None class JsonConfigurationGenerator: @@ -65,11 +65,11 @@ def set_module_config(self, my_module_config_path: str) -> None: config_dict = json.load(openfile) self.config_file.my_module_config = config_dict - def set_pyam_data_information_dict( - self, pyam_data_information_dict: Dict[str, Any] + def set_scenario_data_information_dict( + self, scenario_data_information_dict: Dict[str, Any] ) -> None: - """Sets some pyam information concerning the data.""" - self.config_file.pyam_data_information = pyam_data_information_dict + """Sets some scenario information concerning the result data.""" + self.config_file.scenario_data_information = scenario_data_information_dict def add_component(self, config: Type[ConfigBase]) -> ComponentEntry: """Adds a component and returns a component entry.""" diff --git a/hisim/postprocessing/compute_kpis.py b/hisim/postprocessing/compute_kpis.py index c400aa76f..1bea9fce1 100644 --- a/hisim/postprocessing/compute_kpis.py +++ b/hisim/postprocessing/compute_kpis.py @@ -14,14 +14,14 @@ from hisim.simulationparameters import SimulationParameters from hisim.utils import HISIMPATH from hisim.component_wrapper import ComponentWrapper -from hisim.components import generic_hot_water_storage_modular + from hisim import log from hisim.postprocessing.investment_cost_co2 import compute_investment_cost def building_temperature_control_and_heating_load( results: pd.DataFrame, seconds_per_timestep: int, components: List[ComponentWrapper] -) -> Tuple[Any, Any, float, float, float, float, float]: +) -> Tuple[Any, Any, float, float, float, float, float, float]: """Check the building indoor air temperature. Check for all timesteps and count the @@ -45,6 +45,11 @@ def building_temperature_control_and_heating_load( heating_load_in_watt = getattr( wrapped_component.my_component, "my_building_information" ).max_thermal_building_demand_in_watt + # get specific heating load + scaled_conditioned_floor_area_in_m2 = getattr( + wrapped_component.my_component, "my_building_information" + ).scaled_conditioned_floor_area_in_m2 + specific_heating_load_in_watt_per_m2 = heating_load_in_watt / scaled_conditioned_floor_area_in_m2 break for column in results.columns: @@ -99,6 +104,7 @@ def building_temperature_control_and_heating_load( min_temperature_reached_in_celsius, max_temperature_reached_in_celsius, heating_load_in_watt, + specific_heating_load_in_watt_per_m2 ) @@ -243,6 +249,7 @@ def compute_consumption_production( in output.postprocessing_flag ): consumption_ids.append(index) + elif InandOutputType.CHARGE_DISCHARGE in output.postprocessing_flag: if ComponentType.BATTERY in output.postprocessing_flag: battery_charge_discharge_ids.append(index) @@ -271,93 +278,6 @@ def compute_consumption_production( return postprocessing_results -def compute_hot_water_storage_losses_and_cycles( - components: List[ComponentWrapper], - all_outputs: List, - results: pd.DataFrame, - timeresolution: int, -) -> Tuple[float, float, float, float, float, float]: - """Computes hot water storage losses and cycles.""" - - # initialize columns consumption, production, battery_charge, battery_discharge, storage - charge_sum_dhw = 0.0 - charge_sum_buffer = 0.0 - discharge_sum_dhw = 0.0 - discharge_sum_buffer = 0.0 - cycle_buffer = None - cycle_dhw = None - - # get cycle of water storages - for elem in components: - if isinstance( - elem.my_component, generic_hot_water_storage_modular.HotWaterStorage - ): - use = elem.my_component.use - if use == ComponentType.BUFFER: - cycle_buffer = elem.my_component.config.energy_full_cycle - elif use == ComponentType.BOILER: - cycle_dhw = elem.my_component.config.energy_full_cycle - - for index, output in enumerate(all_outputs): - if output.postprocessing_flag is not None: - if InandOutputType.CHARGE in output.postprocessing_flag: - if InandOutputType.WATER_HEATING in output.postprocessing_flag: - charge_sum_dhw = charge_sum_dhw + compute_energy_from_power( - power_timeseries=results.iloc[:, index], - timeresolution=timeresolution, - ) - elif InandOutputType.HEATING in output.postprocessing_flag: - charge_sum_buffer = charge_sum_buffer + compute_energy_from_power( - power_timeseries=results.iloc[:, index], - timeresolution=timeresolution, - ) - elif InandOutputType.DISCHARGE in output.postprocessing_flag: - if ComponentType.BOILER in output.postprocessing_flag: - discharge_sum_dhw = discharge_sum_dhw + compute_energy_from_power( - power_timeseries=results.iloc[:, index], - timeresolution=timeresolution, - ) - elif ComponentType.BUFFER in output.postprocessing_flag: - discharge_sum_buffer = ( - discharge_sum_buffer - + compute_energy_from_power( - power_timeseries=results.iloc[:, index], - timeresolution=timeresolution, - ) - ) - else: - continue - if cycle_dhw is not None: - cycles_dhw = charge_sum_dhw / cycle_dhw - else: - cycles_dhw = 0 - log.error( - "Energy of full cycle must be defined in config of modular hot water storage to compute the number of cycles. " - ) - storage_loss_dhw = charge_sum_dhw - discharge_sum_dhw - if cycle_buffer is not None: - cycles_buffer = charge_sum_buffer / cycle_buffer - else: - cycles_buffer = 0 - log.error( - "Energy of full cycle must be defined in config of modular hot water storage to compute the number of cycles. " - ) - storage_loss_buffer = charge_sum_buffer - discharge_sum_buffer - if cycle_buffer == 0: - building_heating = charge_sum_buffer - else: - building_heating = discharge_sum_buffer - - return ( - cycles_dhw, - storage_loss_dhw, - discharge_sum_dhw, - cycles_buffer, - storage_loss_buffer, - building_heating, - ) - - def compute_self_consumption_and_injection( postprocessing_results: pd.DataFrame, ) -> Tuple[pd.Series, pd.Series]: @@ -605,20 +525,6 @@ def compute_kpis( co2 = co2 + fuel_co2 price = price + fuel_price - # ( - # cycles_dhw, - # loss_dhw, - # use_dhw, - # cycles_buffer, - # loss_buffer, - # use_heating, - # ) = compute_hot_water_storage_losses_and_cycles( - # components=components, - # all_outputs=all_outputs, - # results=results, - # timeresolution=simulation_parameters.seconds_per_timestep, - # ) - # compute cost and co2 for investment/installation: # Todo: function compute_investment_cost does not include all components, use capex and opex-results instead investment_cost, co2_footprint = compute_investment_cost(components=components) @@ -664,6 +570,7 @@ def compute_kpis( min_temperature_reached_in_celsius, max_temperature_reached_in_celsius, heating_load_in_watt, + specific_heating_load_in_watt_per_m2 ) = building_temperature_control_and_heating_load( results=results, seconds_per_timestep=simulation_parameters.seconds_per_timestep, @@ -693,14 +600,6 @@ def compute_kpis( table.append(["Self-consumption:", f"{self_consumption_sum:4.0f}", "kWh"]) table.append(["Injection:", f"{injection_sum:4.0f}", "kWh"]) table.append(["Battery losses:", f"{battery_losses:4.0f}", "kWh"]) - # table.append(["DHW storage heat loss:", f"{loss_dhw:4.0f}", "kWh"]) - # table.append(["DHW storage heat cycles:", f"{cycles_dhw:4.0f}", "Cycles"]) - # table.append(["DHW energy provided:", f"{use_dhw:4.0f}", "kWh"]) - # table.append(["Buffer storage heat loss:", f"{loss_buffer:4.0f}", "kWh"]) - # table.append(["Buffer storage heat cycles:", f"{cycles_buffer:4.0f}", "Cycles"]) - # table.append(["Heating energy provided:", f"{use_heating:4.0f}", "kWh"]) - # table.append(["Hydrogen system losses:", f"{h2_system_losses:4.0f}", "kWh"]) - # table.append(["Hydrogen storage content:", f"{0:4.0f}", "kWh"]) table.append(["Autarky rate:", f"{autarky_rate:3.1f}", "%"]) table.append(["Self-consumption rate:", f"{self_consumption_rate:3.1f}", "%"]) table.append(["Cost for energy use:", f"{price:3.0f}", "EUR"]) @@ -793,6 +692,9 @@ def compute_kpis( table.append( ["Building heating load:", f"{(heating_load_in_watt):3.0f}", "W"] ) + table.append( + ["Specific heating load:", f"{(specific_heating_load_in_watt_per_m2):3.0f}", "W"] + ) table.append(["Number of heat pump cycles:", f"{number_of_cycles:3.0f}", "-"]) table.append(["Seasonal performance factor of heat pump:", f"{spf:3.0f}", "-"]) diff --git a/hisim/postprocessing/postprocessing_main.py b/hisim/postprocessing/postprocessing_main.py index 75165c5b9..15cda5fa0 100644 --- a/hisim/postprocessing/postprocessing_main.py +++ b/hisim/postprocessing/postprocessing_main.py @@ -44,7 +44,7 @@ def __init__(self): self.dirname: str self.chapter_counter: int = 1 self.figure_counter: int = 1 - self.pyam_data_folder: str = "" + self.result_data_folder_for_scenario_evaluation: str = "" self.model: str = "HiSim" self.scenario: str = "" self.region: str = "" @@ -289,13 +289,13 @@ def run(self, ppdt: PostProcessingDataTransfer) -> None: # noqa: MC0001 + f"{duration:1.2f}s." ) - # Write Outputs to pyam.IAMDataframe format for scenario evaluation + # Write Outputs to specific format for scenario evaluation (idea for format from pyam package) if ( - PostProcessingOptions.PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION_WITH_PYAM + PostProcessingOptions.PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION in ppdt.post_processing_options ): - log.information("Prepare results for scenario evaluation with pyam.") - self.prepare_results_for_scenario_evaluation_with_pyam(ppdt) + log.information("Prepare results for scenario evaluation.") + self.prepare_results_for_scenario_evaluation(ppdt) # Open file explorer if ( @@ -755,20 +755,20 @@ def export_sankeys(self): """ pass # noqa: unnecessary-pass - def prepare_results_for_scenario_evaluation_with_pyam( + def prepare_results_for_scenario_evaluation( self, ppdt: PostProcessingDataTransfer ) -> None: - """Prepare the results for the scenario evaluation with pyam.""" + """Prepare the results for the scenario evaluation.""" - # create pyam data foler - self.pyam_data_folder = os.path.join( - ppdt.simulation_parameters.result_directory, "pyam_data" + # create result data folder + self.result_data_folder_for_scenario_evaluation = os.path.join( + ppdt.simulation_parameters.result_directory, "result_data_for_scenario_evaluation" ) - if os.path.exists(self.pyam_data_folder) is False: - os.makedirs(self.pyam_data_folder) + if os.path.exists(self.result_data_folder_for_scenario_evaluation) is False: + os.makedirs(self.result_data_folder_for_scenario_evaluation) else: log.information( - "This pyam_data path exists already: " + self.pyam_data_folder + "This result data path exists already: " + self.result_data_folder_for_scenario_evaluation ) # -------------------------------------------------------------------------------------------------------------------------------------------------------------- @@ -794,7 +794,7 @@ def prepare_results_for_scenario_evaluation_with_pyam( "year": [], "value": [], } - # set pyam model name + # set model name self.model = "".join(["HiSim_", ppdt.module_filename]) # set pyam scenario name @@ -807,7 +807,7 @@ def prepare_results_for_scenario_evaluation_with_pyam( else: self.scenario = "" - # set pyam region + # set region if SingletonSimRepository().exist_entry(key=SingletonDictKeyEnum.LOCATION): self.region = SingletonSimRepository().get_entry( key=SingletonDictKeyEnum.LOCATION @@ -815,7 +815,7 @@ def prepare_results_for_scenario_evaluation_with_pyam( else: self.region = "" - # set pyam year or timeseries + # set year or timeseries self.year = ppdt.simulation_parameters.year timeseries_hourly = ppdt.results_hourly.index timeseries_daily = ppdt.results_daily.index @@ -825,7 +825,15 @@ def prepare_results_for_scenario_evaluation_with_pyam( PostProcessingOptions.COMPUTE_AND_WRITE_KPIS_TO_REPORT in ppdt.post_processing_options ): - self.write_kpis_in_pyam_dict( + self.write_kpis_in_dict( + ppdt=ppdt, simple_dict_cumulative_data=simple_dict_hourly_data) + self.write_kpis_in_dict( + ppdt=ppdt, simple_dict_cumulative_data=simple_dict_daily_data + ) + self.write_kpis_in_dict( + ppdt=ppdt, simple_dict_cumulative_data=simple_dict_monthly_data + ) + self.write_kpis_in_dict( ppdt=ppdt, simple_dict_cumulative_data=simple_dict_cumulative_data ) @@ -838,7 +846,7 @@ def prepare_results_for_scenario_evaluation_with_pyam( ) self.write_filename_and_save_to_csv( dataframe=dataframe_hourly_data, - folder=self.pyam_data_folder, + folder=self.result_data_folder_for_scenario_evaluation, module_filename=ppdt.module_filename, simulation_duration=ppdt.simulation_parameters.duration.days, simulation_year=ppdt.simulation_parameters.year, @@ -853,7 +861,7 @@ def prepare_results_for_scenario_evaluation_with_pyam( ) self.write_filename_and_save_to_csv( dataframe=dataframe_daily_data, - folder=self.pyam_data_folder, + folder=self.result_data_folder_for_scenario_evaluation, module_filename=ppdt.module_filename, simulation_duration=ppdt.simulation_parameters.duration.days, simulation_year=ppdt.simulation_parameters.year, @@ -868,7 +876,7 @@ def prepare_results_for_scenario_evaluation_with_pyam( ) self.write_filename_and_save_to_csv( dataframe=dataframe_monthly_data, - folder=self.pyam_data_folder, + folder=self.result_data_folder_for_scenario_evaluation, module_filename=ppdt.module_filename, simulation_duration=ppdt.simulation_parameters.duration.days, simulation_year=ppdt.simulation_parameters.year, @@ -899,7 +907,7 @@ def prepare_results_for_scenario_evaluation_with_pyam( simple_df_yearly_data = pd.DataFrame(simple_dict_cumulative_data) self.write_filename_and_save_to_csv( dataframe=simple_df_yearly_data, - folder=self.pyam_data_folder, + folder=self.result_data_folder_for_scenario_evaluation, module_filename=ppdt.module_filename, time_resolution_of_data="yearly", simulation_duration=ppdt.simulation_parameters.duration.days, @@ -908,7 +916,7 @@ def prepare_results_for_scenario_evaluation_with_pyam( ) # -------------------------------------------------------------------------------------------------------------------------------------------------------------- - # create dictionary with all import pyam information + # create dictionary with all import data information data_information_dict = { "model": self.model, "scenario": self.scenario, @@ -926,8 +934,8 @@ def prepare_results_for_scenario_evaluation_with_pyam( json_generator_config.set_module_config( my_module_config_path=ppdt.my_module_config_path ) - json_generator_config.set_pyam_data_information_dict( - pyam_data_information_dict=data_information_dict + json_generator_config.set_scenario_data_information_dict( + scenario_data_information_dict=data_information_dict ) for component in ppdt.wrapped_components: json_generator_config.add_component(config=component.my_component.config) @@ -935,7 +943,7 @@ def prepare_results_for_scenario_evaluation_with_pyam( # save the json config json_generator_config.save_to_json( filename=os.path.join( - self.pyam_data_folder, "data_information_for_pyam.json" + self.result_data_folder_for_scenario_evaluation, "data_information_for_scenario_evaluation.json" ) ) @@ -953,12 +961,12 @@ def write_component_configurations_to_json( ) ) - def write_kpis_in_pyam_dict( + def write_kpis_in_dict( self, ppdt: PostProcessingDataTransfer, simple_dict_cumulative_data: Dict[str, Any], ) -> None: - """Write kpis in pyam dictionary.""" + """Write kpis in dictionary.""" kpi_compute_return = compute_kpis( components=ppdt.wrapped_components, @@ -978,7 +986,10 @@ def write_kpis_in_pyam_dict( simple_dict_cumulative_data["region"].append(self.region) simple_dict_cumulative_data["variable"].append(variable_name) simple_dict_cumulative_data["unit"].append(variable_unit) - simple_dict_cumulative_data["year"].append(self.year) + try: + simple_dict_cumulative_data["year"].append(self.year) + except Exception: + simple_dict_cumulative_data["time"].append(self.year) simple_dict_cumulative_data["value"].append(variable_value) def get_variable_name_and_unit_from_ppdt_results_column( diff --git a/hisim/postprocessing/pyam_data_processing.py b/hisim/postprocessing/pyam_data_processing.py deleted file mode 100644 index 1deab028b..000000000 --- a/hisim/postprocessing/pyam_data_processing.py +++ /dev/null @@ -1,1169 +0,0 @@ -"""Data Processing and Plotting for Scenario Comparison with Pyam.""" - - -import glob -import datetime -import os -from typing import Dict, Any, Tuple, Optional, List -import string -import copy -import warnings -import numpy as np -import pyam -import pandas as pd -import matplotlib.pyplot as plt - -# import plotly -# from html2image import Html2Image -from ordered_set import OrderedSet -import seaborn as sns - -from hisim.postprocessing.pyam_data_collection import ( - PyamDataTypeEnum, - PyamDataProcessingModeEnum, -) -from hisim.postprocessing.chartbase import ChartFontsAndSize -from hisim import log - -# TODO: debugging needed - - -class PyAmChartGenerator: - - """PyamChartGenerator class.""" - - def __init__( - self, - simulation_duration_to_check: str, - data_processing_mode: Any, - time_resolution_of_data_set: Any, - variables_to_check: Optional[List[str]] = None, - dict_of_scenarios_to_check: Optional[Dict[str, List[str]]] = None, - ) -> None: - """Initialize the class.""" - - warnings.filterwarnings("ignore") - - self.datetime_string = datetime.datetime.now().strftime("%Y%m%d_%H%M") - self.show_plot_legend: bool = True - if data_processing_mode == PyamDataProcessingModeEnum.PROCESS_ALL_DATA: - - data_path_strip = "data_with_all_parameters" - result_path_strip = "results_for_all_parameters" - self.show_plot_legend = False - - elif ( - data_processing_mode - == PyamDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_BUILDING_CODES - ): - data_path_strip = "data_with_different_building_codes" - result_path_strip = "results_different_building_codes" - - elif ( - data_processing_mode - == PyamDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_BUILDING_SIZES - ): - data_path_strip = "data_with_different_conditioned_floor_area_in_m2s" - result_path_strip = "results_different_conditioned_floor_area_in_m2s" - - elif ( - data_processing_mode - == PyamDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_PV_AZIMUTH_ANGLES - ): - data_path_strip = "data_with_different_pv_azimuths" - result_path_strip = "results_different_pv_azimuths" - - elif ( - data_processing_mode - == PyamDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_PV_TILT_ANGLES - ): - data_path_strip = "data_with_different_pv_tilts" - result_path_strip = "results_different_pv_tilts" - - elif ( - data_processing_mode - == PyamDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_SHARE_OF_MAXIMUM_PV - ): - data_path_strip = "data_with_different_share_of_maximum_pv_powers" - result_path_strip = "results_different_share_of_maximum_pv_powers" - - elif ( - data_processing_mode - == PyamDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_NUMBER_OF_DWELLINGS - ): - data_path_strip = "data_with_different_number_of_dwellings_per_buildings" - result_path_strip = "results_different_number_of_dwellings_per_buildings" - - else: - raise ValueError("PyamDataProcessingMode not known.") - - self.folder_path = os.path.join( - os.pardir, - os.pardir, - "system_setups", - "results_for_scenario_comparison", - "data", - data_path_strip, - ) - self.result_folder = os.path.join( - os.pardir, - os.pardir, - "system_setups", - "results_for_scenario_comparison", - "results", - result_path_strip, - ) - log.information(f"Data folder path: {self.folder_path}") - self.hisim_chartbase = ChartFontsAndSize() - self.hisim_chartbase.figsize = (10, 6) - self.hisim_chartbase.dpi = 100 - - if variables_to_check != [] and variables_to_check is not None: - # read data, sort data according to scenarios if wanted, and create pandas dataframe - ( - pandas_dataframe, - key_for_scenario_one, - key_for_current_scenario, - variables_to_check, - ) = self.get_dataframe_and_create_pandas_dataframe_for_all_data( - folder_path=self.folder_path, - time_resolution_of_data_set=time_resolution_of_data_set, - dict_of_scenarios_to_check=dict_of_scenarios_to_check, - variables_to_check=variables_to_check, - ) - log.information("key for scenario one " + key_for_scenario_one) - log.information("key for current scenario " + key_for_current_scenario) - - self.make_plots_with_specific_kind_of_data( - time_resolution_of_data_set=time_resolution_of_data_set, - pyam_dataframe=pandas_dataframe, - simulation_duration_key=simulation_duration_to_check, - variables_to_check=variables_to_check, - ) - - else: - log.information( - "Variable list for data is not given and will not be plotted or anaylzed." - ) - - def get_dataframe_and_create_pandas_dataframe_for_all_data( - self, - folder_path: str, - time_resolution_of_data_set: Any, - dict_of_scenarios_to_check: Optional[Dict[str, List[str]]], - variables_to_check: List[str], - ) -> Tuple[pd.DataFrame, str, str, List[str]]: - """Get csv data and create pyam dataframes.""" - - if time_resolution_of_data_set == PyamDataTypeEnum.HOURLY: - kind_of_data_set = "hourly" - elif time_resolution_of_data_set == PyamDataTypeEnum.YEARLY: - kind_of_data_set = "yearly" - elif time_resolution_of_data_set == PyamDataTypeEnum.DAILY: - kind_of_data_set = "daily" - elif time_resolution_of_data_set == PyamDataTypeEnum.MONTHLY: - kind_of_data_set = "monthly" - else: - raise ValueError( - "This kind of data was not found in the pyamdaacollectorenum class." - ) - log.information( - f"Read csv files and create one big pyam dataframe for {kind_of_data_set} data." - ) - - for file in glob.glob( - os.path.join(folder_path, "**", f"*{kind_of_data_set}*.csv") - ): - - file_df = pd.read_csv(filepath_or_buffer=file) - - # if scenario values are no strings, transform them - file_df["scenario"] = file_df["scenario"].transform(str) - key_for_scenario_one = "" - key_for_current_scenario = "" - - # make rel electricity calculation before sorting and renaming - - if "ElectricityMeter|Electricity|ElectricityFromGrid" in variables_to_check: - print("relative electricity demand will be calculated.") - - file_df = self.calculate_relative_electricity_demand(dataframe=file_df) - variables_to_check.append("Relative Electricity Demand") - - if dict_of_scenarios_to_check is not None and dict_of_scenarios_to_check != {}: - - ( - file_df, - key_for_scenario_one, - key_for_current_scenario, - ) = self.check_if_scenario_exists_and_filter_dataframe_for_scenarios_dict( - data_frame=file_df, - dict_of_scenarios_to_check=dict_of_scenarios_to_check, - ) - - return ( - file_df, - key_for_scenario_one, - key_for_current_scenario, - variables_to_check, - ) - - def make_plots_with_specific_kind_of_data( - self, - time_resolution_of_data_set: Any, - pyam_dataframe: pd.DataFrame, - simulation_duration_key: str, - variables_to_check: List[str], - ) -> None: - """Make plots for different kind of data.""" - - log.information(f"Simulation duration: {simulation_duration_key} days.") - - if pyam_dataframe.empty: - raise ValueError("Pyam dataframe is empty.") - - sub_results_folder = f"simulation_duration_of_{simulation_duration_key}_days" - sub_sub_results_folder = ( - f"pyam_results_{time_resolution_of_data_set.value}_{self.datetime_string}" - ) - - self.path_for_plots = os.path.join( - self.result_folder, sub_results_folder, sub_sub_results_folder - ) - - for variable_to_check in variables_to_check: - log.information("Check variable " + str(variable_to_check)) - - # prepare path for plots - self.path_addition = "".join( - [ - x - for x in variable_to_check - if x in string.ascii_letters or x.isspace() or x == "2" - ] - ) - - self.plot_path_complete = os.path.join( - self.path_for_plots, self.path_addition - ) - if os.path.exists(self.plot_path_complete) is False: - os.makedirs(self.plot_path_complete) - - # filter the dataframe according to variable - filtered_data = self.filter_pandas_dataframe( - dataframe=pyam_dataframe, variable_to_check=variable_to_check - ) - # get unit of variable - try: - unit = filtered_data.unit.values[0] - except Exception: - if "Temperature deviation" in variable_to_check: - unit = "°C*h" - else: - unit = "-" - - if time_resolution_of_data_set == PyamDataTypeEnum.YEARLY: - kind_of_data_set = "yearly" - log.information( - f"Yearly Data Processing for Simulation Duration of {simulation_duration_key} Days:" - ) - # get statistical data - self.get_statistics_of_data_and_write_to_excel( - filtered_data=filtered_data, - path_to_save=self.plot_path_complete, - kind_of_data_set=kind_of_data_set, - ) - - # try: - # self.make_box_plot_for_pandas_dataframe( - # filtered_data=filtered_data, title=self.path_addition, - # ) - - # except Exception: - # log.information("Boxplot went wrong") - - self.make_bar_plot_for_pandas_dataframe( - filtered_data=filtered_data, title=self.path_addition, unit=unit - ) - self.make_histogram_plot_for_pandas_dataframe( - filtered_data=filtered_data, title=self.path_addition, unit=unit - ) - - elif time_resolution_of_data_set in ( - PyamDataTypeEnum.HOURLY, - PyamDataTypeEnum.DAILY, - PyamDataTypeEnum.MONTHLY, - ): - - if time_resolution_of_data_set == PyamDataTypeEnum.HOURLY: - kind_of_data_set = "hourly" - line_plot_marker_size = 2 - elif time_resolution_of_data_set == PyamDataTypeEnum.DAILY: - kind_of_data_set = "daily" - line_plot_marker_size = 3 - elif time_resolution_of_data_set == PyamDataTypeEnum.MONTHLY: - kind_of_data_set = "monthly" - line_plot_marker_size = 5 - - log.information( - f"{kind_of_data_set} Data Processing for Simulation Duration of {simulation_duration_key} Days:" - ) - # get statistical data - self.get_statistics_of_data_and_write_to_excel( - filtered_data=filtered_data, - path_to_save=self.plot_path_complete, - kind_of_data_set=kind_of_data_set, - ) - - try: - self.make_line_plot_for_pandas_dataframe( - filtered_data=filtered_data, - title=self.path_addition, - line_plot_marker_size=line_plot_marker_size, - ) - - # so far only working when scenarios_to_check are set - # self.make_line_plot_with_filling_for_pyam_dataframe( - # filtered_data=filtered_data, - # comparison_mode=comparion_mode, - # title=self.path_addition, - # ) - - except Exception: - log.information(f"{variable_to_check} could not be plotted.") - - else: - raise ValueError( - "This kind of data was not found in the pyamdatacollectorenum class." - ) - - def make_line_plot_for_pandas_dataframe( - self, filtered_data: pd.DataFrame, title: str, line_plot_marker_size: int - ) -> None: - """Make line plot.""" - log.information("Make line plot with data.") - - fig, a_x = plt.subplots( - figsize=self.hisim_chartbase.figsize, dpi=self.hisim_chartbase.dpi - ) - x_data = list(OrderedSet(list(filtered_data.time))) - if filtered_data.time.values[0] is not None: - year = filtered_data.time.values[0].split("-")[0] - else: - raise ValueError( - "year could not be determined because time value of filtered data was None." - ) - - x_data_transformed = np.asarray(x_data, dtype="datetime64[D]") - - for scenario in list(OrderedSet(list(filtered_data.scenario))): - filtered_data_per_scenario = filtered_data.loc[ - filtered_data["scenario"] == scenario - ] - mean_values_aggregated_according_to_scenarios = [] - for time_value in x_data: - - mean_value_per_scenario_per_timestep = np.mean( - filtered_data_per_scenario.loc[ - filtered_data_per_scenario["time"] == time_value - ]["value"] - ) - - mean_values_aggregated_according_to_scenarios.append( - mean_value_per_scenario_per_timestep - ) - - y_data = mean_values_aggregated_according_to_scenarios - - plt.plot( - x_data_transformed, - y_data, - "-o", - markersize=line_plot_marker_size, - label=scenario, - ) - - y_tick_labels, unit, y_tick_locations = self.set_axis_scale( - a_x, x_or_y="y", unit=filtered_data.unit.values[0] - ) - plt.yticks( - ticks=y_tick_locations, - labels=y_tick_labels, - fontsize=self.hisim_chartbase.fontsize_ticks, - ) - - plt.ylabel( - ylabel=f"{unit}", fontsize=self.hisim_chartbase.fontsize_label, - ) - plt.xlabel( - xlabel=year, fontsize=self.hisim_chartbase.fontsize_label, - ) - plt.title(label=title, fontsize=self.hisim_chartbase.fontsize_title) - plt.tick_params(labelsize=self.hisim_chartbase.fontsize_ticks) - a_x.tick_params(axis="x", labelrotation=45) - plt.legend(bbox_to_anchor=(1, 1), loc="upper left") - - fig.savefig( - os.path.join(self.plot_path_complete, "line_plot.png"), bbox_inches="tight" - ) - plt.close() - - def make_line_plot_with_filling_for_pyam_dataframe( - self, filtered_data: pyam.IamDataFrame, comparison_mode: str, title: str, - ) -> None: - """Make line plot with filling.""" - log.information("Make line plot with filling.") - - fig, a_x = plt.subplots( - figsize=self.hisim_chartbase.figsize, dpi=self.hisim_chartbase.dpi - ) - - filtered_data.plot( - ax=a_x, color=comparison_mode, title=title, fill_between=True, - ) - - y_tick_labels, unit, y_tick_locations = self.set_axis_scale( - a_x, x_or_y="y", unit=filtered_data.unit[0] - ) - plt.yticks( - ticks=y_tick_locations, - labels=y_tick_labels, - fontsize=self.hisim_chartbase.fontsize_ticks, - ) - plt.ylabel( - ylabel=f"{unit}", fontsize=self.hisim_chartbase.fontsize_label, - ) - plt.xlabel( - xlabel="Time", fontsize=self.hisim_chartbase.fontsize_label, - ) - plt.title(label=title, fontsize=self.hisim_chartbase.fontsize_title) - plt.tick_params(labelsize=self.hisim_chartbase.fontsize_ticks) - - fig.savefig(os.path.join(self.plot_path_complete, "line_plot_with_filling.png")) - plt.close() - - def make_bar_plot_for_pandas_dataframe( - self, - filtered_data: pd.DataFrame, - title: str, - unit: str, - alternative_bar_labels: Optional[List[str]] = None, - ) -> None: - """Make bar plot.""" - log.information("Make bar plot.") - - fig, a_x = plt.subplots( - figsize=self.hisim_chartbase.figsize, dpi=self.hisim_chartbase.dpi - ) - - y_data = [] - bar_labels = [] - - for scenario in list(OrderedSet(list(filtered_data.scenario))): - filtered_data_per_scenario = filtered_data.loc[ - filtered_data["scenario"] == scenario - ] - - mean_value_per_scenario = np.mean(filtered_data_per_scenario.value.values) - - y_data.append(mean_value_per_scenario) - bar_labels.append(scenario) - - # choose bar labels - if alternative_bar_labels is not None: - bar_labels = alternative_bar_labels - - x_data = np.arange(0, len(y_data) * 2, step=2) - - cmap = plt.get_cmap("viridis") - colors = [cmap(i) for i in np.linspace(0, 1, len(x_data))] - a_x.bar(x_data, y_data, label=bar_labels, color=colors) - - y_tick_labels, unit, y_tick_locations = self.set_axis_scale( - a_x, x_or_y="y", unit=unit, - ) - plt.yticks( - ticks=y_tick_locations, - labels=y_tick_labels, - fontsize=self.hisim_chartbase.fontsize_ticks, - ) - plt.ylabel( - ylabel=f"{unit}", fontsize=self.hisim_chartbase.fontsize_label, - ) - plt.xlabel( - xlabel=filtered_data.year.values[0], - fontsize=self.hisim_chartbase.fontsize_label, - ) - plt.title(label=title, fontsize=self.hisim_chartbase.fontsize_title) - plt.tick_params(labelsize=self.hisim_chartbase.fontsize_ticks) - - if self.show_plot_legend: - plt.legend(bbox_to_anchor=(1, 1), loc="upper left") - - a_x.xaxis.set_tick_params(labelbottom=False) - a_x.set_xticks([]) - plt.tight_layout() - fig.savefig(os.path.join(self.plot_path_complete, "bar_plot.png")) - plt.close() - - def make_box_plot_for_pandas_dataframe( - self, - filtered_data: pd.DataFrame, - title: str, - scenario_set: Optional[List[str]] = None, - ) -> None: - """Make box plot.""" - log.information("Make box plot.") - - fig, a_x = plt.subplots( - figsize=self.hisim_chartbase.figsize, dpi=self.hisim_chartbase.dpi - ) - if scenario_set is None: - scenario_set = list(OrderedSet(filtered_data.scenario)) - - sns.boxplot(data=filtered_data, x="scenario", y="value") - y_tick_labels, unit, y_tick_locations = self.set_axis_scale( - a_x, x_or_y="y", unit=filtered_data.unit.values[0] - ) - plt.yticks( - ticks=y_tick_locations, - labels=y_tick_labels, - fontsize=self.hisim_chartbase.fontsize_ticks, - ) - plt.ylabel( - ylabel=f"{unit}", fontsize=self.hisim_chartbase.fontsize_label, - ) - plt.xlabel( - xlabel=filtered_data.year.values[0], - fontsize=self.hisim_chartbase.fontsize_label, - ) - plt.title(label=title, fontsize=self.hisim_chartbase.fontsize_title) - plt.tick_params(labelsize=self.hisim_chartbase.fontsize_ticks) - a_x.xaxis.set_tick_params(labelbottom=False) - a_x.set_xticks([]) - print(self.show_plot_legend) - if self.show_plot_legend: - plt.legend(scenario_set, bbox_to_anchor=(1, 1), loc="upper left") - - fig.savefig( - os.path.join(self.plot_path_complete, "box_plot.png"), bbox_inches="tight" - ) - plt.close() - - def make_histogram_plot_for_pandas_dataframe( - self, - filtered_data: pd.DataFrame, - title: str, - unit: str, - scenario_set: Optional[List[str]] = None, - ) -> None: - """Make histogram plot.""" - log.information("Make histogram plot.") - - fig, a_x = plt.subplots( # pylint: disable=unused-variable - figsize=self.hisim_chartbase.figsize, dpi=self.hisim_chartbase.dpi - ) - if scenario_set is None: - scenario_set = list(OrderedSet(filtered_data.scenario)) - - plt.hist(x=np.array(filtered_data.value.values), bins="auto") - - if max(filtered_data.value.values) != 0: - x_tick_locations = range( - 0, - int(max(filtered_data.value.values)), - round(int(max(filtered_data.value.values) / 10), 0), - ) - else: - x_tick_locations = None - plt.xticks( - ticks=x_tick_locations, - rotation=45, - fontsize=self.hisim_chartbase.fontsize_ticks, - rotation_mode="anchor", - ha="right", - ) - - plt.ylabel( - ylabel="Count", fontsize=self.hisim_chartbase.fontsize_label, - ) - plt.xlabel( - xlabel=f"{unit}", fontsize=self.hisim_chartbase.fontsize_label, - ) - plt.title(label=title, fontsize=self.hisim_chartbase.fontsize_title) - plt.tick_params(labelsize=self.hisim_chartbase.fontsize_ticks) - - # plt.legend(scenario_set, bbox_to_anchor=(1, 1), loc="upper left") - fig.savefig( - os.path.join(self.plot_path_complete, "histogram_plot.png"), - bbox_inches="tight", - ) - plt.close() - - # def make_sankey_plot_for_pyam_dataframe( - # self, - # pyam_dataframe: pyam.IamDataFrame, - # filter_model: Optional[str], - # filter_scenario: Optional[str], - # filter_variables: Optional[str], - # filter_region: Optional[str], - # filter_unit: Optional[str], - # filter_year: Optional[str], - # ) -> None: - # """Make sankey plot.""" - # log.information("Make sankey plot.") - - # filtered_data = self.filter_pyam_dataframe( - # pyam_dataframe=pyam_dataframe, - # filter_model=filter_model, - # filter_scenario=filter_scenario, - # filter_region=filter_region, - # filter_variables=filter_variables, - # filter_unit=filter_unit, - # filter_year=filter_year, - # ) - - # sankey_mapping = { - # "ElectrcityGridBaseLoad|Electricity|ElectricityOutput": ( - # "PV", - # "Occupancy", - # ), - # "PVSystemw-|Electricity|ElectricityOutput": ("PV", "Grid"), - # "Occupancy|Electricity|ElectricityOutput": ("Grid", "Occupancy"), - # } - # fig = filtered_data.plot.sankey(mapping=sankey_mapping) - - # # save figure as html first - # plotly.offline.plot( - # fig, - # filename=os.path.join(self.plot_path_complete, "sankey_plot.html"), - # auto_open=False, - # ) - - # # convert html file to png - # hti = Html2Image() - # with open( - # os.path.join(self.plot_path_complete, "sankey_plot.html"), encoding="utf8", - # ) as file: - # hti.screenshot( - # file.read(), save_as="sankey_plot.png", - # ) - - # # change directory of sankey output file - # try: - # os.rename( - # "sankey_plot.png", - # os.path.join(self.plot_path_complete, "sankey_plot.png"), - # ) - # except Exception as exc: - # raise Exception("Cannot save current sankey. Try again.") from exc - - def set_axis_scale( - self, a_x: Any, x_or_y: Any, unit: Any - ) -> Tuple[float, str, Any]: - """Get axis and unit and scale it properly.""" - - if x_or_y == "x": - tick_values = a_x.get_xticks() - elif x_or_y == "y": - tick_values = a_x.get_yticks() - else: - raise ValueError("x_or_y must be either 'x' or 'y'") - - max_ticks = max(tick_values) - min_ticks = min(tick_values) - - max_scale = max(abs(max_ticks), abs(min_ticks)) - - new_tick_values = tick_values - scale = "" - - if unit not in ["-", "%"]: - - if max_scale >= 1e12: - new_tick_values = tick_values * 1e-12 - scale = "T" - elif 1e9 <= max_scale < 1e12: - new_tick_values = tick_values * 1e-9 - scale = "G" - elif 1e6 <= max_scale < 1e9: - new_tick_values = tick_values * 1e-6 - scale = "M" - elif 1e3 <= max_scale < 1e6: - new_tick_values = tick_values * 1e-3 - scale = "k" - elif -1e3 <= max_scale < 1e3: - new_tick_values = tick_values - scale = "" - - tick_locations = tick_values - tick_labels = np.round(new_tick_values, 1) - unit = f"{scale}{unit}" - - # if k already in unit, remove k and replace with "M" - if unit in ["kkWh", "kkW"]: - unit = "M" + unit[2:] - elif unit in ["kkg", "kkg/s"]: - unit = "t" + unit[3:] - - return tick_labels, unit, tick_locations - - def filter_pandas_dataframe( - self, dataframe: pd.DataFrame, variable_to_check: str - ) -> pd.DataFrame: - """Filter pandas dataframe according to variable.""" - filtered_dataframe = dataframe.loc[dataframe["variable"] == variable_to_check] - if filtered_dataframe.empty: - print( - f"The dataframe contains the following variables: {set(list(dataframe.variable))}" - ) - raise ValueError( - f"The filtered dataframe is empty. The dataframe did not contain the variable {variable_to_check}. Check the list above." - ) - return filtered_dataframe - - def decide_for_scenario_or_variable_comparison( - self, filtered_data: pyam.IamDataFrame - ) -> str: - """Decide for each plot what will be compared, different scenarios or different variales.""" - - if len(filtered_data.scenario) == 1 and len(filtered_data.variable) > 1: - comparison_mode = "variable" - elif len(filtered_data.scenario) > 1 and len(filtered_data.variable) == 1: - comparison_mode = "scenario" - else: - raise ValueError( - f"No comparison mode could be determined. There are {len(filtered_data.scenario)} scenarios and {len(filtered_data.variable)} variables filtered." - ) - - return comparison_mode - - def get_statistics_of_data_and_write_to_excel( - self, filtered_data: pd.DataFrame, path_to_save: str, kind_of_data_set: str, - ) -> None: - """Use pandas describe method to get statistical values of certain data.""" - # create a excel writer object - with pd.ExcelWriter( # pylint: disable=abstract-class-instantiated - path=os.path.join(path_to_save, f"{kind_of_data_set}_statistics.xlsx"), - mode="w", - ) as writer: - - filtered_data.to_excel(excel_writer=writer, sheet_name="filtered data") - statistical_data = filtered_data.describe() - - statistical_data.to_excel(excel_writer=writer, sheet_name="statistics") - - def check_if_scenario_exists_and_filter_dataframe_for_scenarios( - self, - data_frame: pd.DataFrame, - dict_of_scenarios_to_check: Dict[str, List[str]], - ) -> pd.DataFrame: - """Check if scenario exists and filter dataframe for scenario.""" - for (list_of_scenarios_to_check,) in dict_of_scenarios_to_check.values(): - aggregated_scenario_dict: Dict = { - key: [] for key in list_of_scenarios_to_check - } - - for given_scenario in data_frame["scenario"]: - # string comparison - - for scenario_to_check in list_of_scenarios_to_check: - if ( - scenario_to_check in given_scenario - and given_scenario - not in aggregated_scenario_dict[scenario_to_check] - ): - aggregated_scenario_dict[scenario_to_check].append( - given_scenario - ) - # raise error if dict is empty - for ( - key_scenario_to_check, - given_scenario, - ) in aggregated_scenario_dict.items(): - if given_scenario == []: - raise ValueError( - f"Scenarios containing {key_scenario_to_check} were not found in the pyam dataframe." - ) - - concat_df = pd.DataFrame() - # only take rows from dataframe which are in selected scenarios - for ( - key_scenario_to_check, - given_scenario, - ) in aggregated_scenario_dict.items(): - - df_filtered_for_specific_scenarios = data_frame.loc[ - data_frame["scenario"].isin(given_scenario) - ] - df_filtered_for_specific_scenarios["scenario"] = [ - key_scenario_to_check - ] * len(df_filtered_for_specific_scenarios["scenario"]) - concat_df = pd.concat([concat_df, df_filtered_for_specific_scenarios]) - concat_df["scenario_0"] = data_frame["scenario"] - - return concat_df - - def check_if_scenario_exists_and_filter_dataframe_for_scenarios_dict( - self, - data_frame: pd.DataFrame, - dict_of_scenarios_to_check: Dict[str, List[str]], - ) -> Tuple[pd.DataFrame, str, str]: - """Check if scenario exists and filter dataframe for scenario.""" - - concat_df = data_frame - filter_level_index = 0 - for ( - scenario_to_check_key, - list_of_scenarios_to_check, - ) in dict_of_scenarios_to_check.items(): - - concat_df = self.aggregate_all_values_for_one_scenario( - dataframe=concat_df, - list_of_scenarios_to_check=list_of_scenarios_to_check, - column_name_to_check=scenario_to_check_key, - # filter_level_index=filter_level_index, - ) - - filter_level_index = filter_level_index + 1 - key_for_scenario_one = "" - key_for_current_scenario = "" - # rename scenario with all scenario filter levels - for index in concat_df.index: - # if even more filter levels need to add condition! - if filter_level_index == 2: - current_scenario_value = concat_df["scenario"][index] - scenario_value_one = concat_df["scenario_1"][index] - # scenario zero is original scenario that will be overwritten - key_for_scenario_one = list(dict_of_scenarios_to_check.keys())[0] - key_for_current_scenario = list(dict_of_scenarios_to_check.keys())[1] - # concat_df.iloc[index, concat_df.columns.get_loc("scenario")] = f"{scenario_value_one}_{current_scenario_value}" - concat_df.loc[ - index, "scenario" - ] = f"{scenario_value_one}_{current_scenario_value}" - elif filter_level_index == 1: - key_for_scenario_one = list(dict_of_scenarios_to_check.keys())[0] - key_for_current_scenario = "" - return concat_df, key_for_scenario_one, key_for_current_scenario - - def aggregate_all_values_for_one_scenario( - self, - dataframe: pd.DataFrame, - list_of_scenarios_to_check: List, - column_name_to_check: str, - # filter_level_index: int, - ) -> pd.DataFrame: - """Check for one scenario.""" - - aggregated_scenario_dict: Dict = {key: [] for key in list_of_scenarios_to_check} - print("aggregated scenario dict", aggregated_scenario_dict) - for scenario_to_check in list_of_scenarios_to_check: - print("scenario to check", scenario_to_check) - for value in dataframe[column_name_to_check].values: - if ( - isinstance(scenario_to_check, str) - and scenario_to_check in value - and value not in aggregated_scenario_dict[scenario_to_check] - ): - aggregated_scenario_dict[scenario_to_check].append(value) - elif ( - isinstance(scenario_to_check, (float, int)) - and scenario_to_check == value - and value not in aggregated_scenario_dict[scenario_to_check] - ): - - aggregated_scenario_dict[scenario_to_check].append(value) - - concat_df = pd.DataFrame() - # only take rows from dataframe which are in selected scenarios - for ( - key_scenario_to_check, - given_list_of_values, - ) in aggregated_scenario_dict.items(): - - df_filtered_for_specific_scenarios = dataframe.loc[ - dataframe[column_name_to_check].isin(given_list_of_values) - ] - - df_filtered_for_specific_scenarios.loc[ - :, "scenario" - ] = key_scenario_to_check - - concat_df = pd.concat( - [concat_df, df_filtered_for_specific_scenarios], ignore_index=True - ) - print(dataframe.loc[:, "scenario"]) - # concat_df[f"scenario_{filter_level_index}"] = dataframe.loc[:, "scenario"] - - del df_filtered_for_specific_scenarios - - return concat_df - - def calculate_relative_electricity_demand( - self, dataframe: pd.DataFrame - ) -> pd.DataFrame: - """Calculate relative electricity demand.""" - - # look for ElectricityMeter|Electricity|ElectrcityFromGrid output - if ( - "ElectricityMeter|Electricity|ElectricityFromGrid" - not in dataframe.variable.values - ): - raise ValueError( - "ElectricityMeter|Electricity|ElectricityFromGrid was not found in variables." - ) - - # filter again just to be shure - filtered_data = dataframe.loc[ - dataframe.variable == "ElectricityMeter|Electricity|ElectricityFromGrid" - ] - - if "share_of_maximum_pv_power" not in filtered_data.columns: - raise ValueError( - "share_of_maximum_pv_power was not found in dataframe columns" - ) - # sort df accrofing to share of pv - filtered_data = filtered_data.sort_values("share_of_maximum_pv_power") - - # iterate over all scenarios - for scenario in list(set(filtered_data.scenario.values)): - - if "share_of_maximum_pv_power" not in scenario: - - df_for_one_scenario = filtered_data.loc[ - filtered_data.scenario == scenario - ] - - df_for_one_scenario_and_for_share_zero = df_for_one_scenario.loc[ - df_for_one_scenario.share_of_maximum_pv_power == 0 - ] - - reference_value_for_electricity_demand = ( - df_for_one_scenario_and_for_share_zero.value.values - ) - relative_electricity_demand = [0] * len( - reference_value_for_electricity_demand - ) - - # get reference value (when share of pv power is zero) - for share_of_maximum_pv_power in list( - set(df_for_one_scenario.share_of_maximum_pv_power.values) - ): - - if share_of_maximum_pv_power != 0: - - df_for_one_scenario_and_for_one_share = df_for_one_scenario.loc[ - df_for_one_scenario.share_of_maximum_pv_power - == share_of_maximum_pv_power - ] - - value_for_electricity_demand = ( - df_for_one_scenario_and_for_one_share.value.values - ) - - # calculate reference electricity demand for each scenario and share of pv power - relative_electricity_demand = ( - value_for_electricity_demand - / reference_value_for_electricity_demand - * 100 - ) - - new_df_only_with_relative_electricity_demand = copy.deepcopy( - df_for_one_scenario_and_for_one_share - ) - new_df_only_with_relative_electricity_demand.loc[ - :, "variable" - ] = "Relative Electricity Demand" - new_df_only_with_relative_electricity_demand.loc[ - :, "unit" - ] = "%" - new_df_only_with_relative_electricity_demand.loc[ - :, "value" - ] = relative_electricity_demand - - del df_for_one_scenario_and_for_one_share - - dataframe = pd.concat( - [dataframe, new_df_only_with_relative_electricity_demand] - ) - del dataframe["Unnamed: 0"] - del new_df_only_with_relative_electricity_demand - - else: - - df_for_one_scenario = filtered_data.loc[ - filtered_data.scenario == scenario - ] - share_of_maximum_pv_power = df_for_one_scenario[ - "share_of_maximum_pv_power" - ].values[0] - - if share_of_maximum_pv_power == 0: - - relative_electricity_demand = [0] * len(df_for_one_scenario) - - else: - - value_for_electricity_demand = df_for_one_scenario.value.values - df_for_one_scenario_and_for_share_zero = filtered_data.loc[ - filtered_data.share_of_maximum_pv_power == 0 - ] - reference_value_for_electricity_demand = ( - df_for_one_scenario_and_for_share_zero.value.values - ) - - # calculate reference electricity demand for each scenario and share of pv power - relative_electricity_demand = ( - 1 - - ( - ( - reference_value_for_electricity_demand - - value_for_electricity_demand - ) - / reference_value_for_electricity_demand - ) - ) * 100 - - new_df_only_with_relative_electricity_demand = copy.deepcopy( - df_for_one_scenario - ) - new_df_only_with_relative_electricity_demand.loc[ - :, "variable" - ] = "Relative Electricity Demand" - new_df_only_with_relative_electricity_demand.loc[:, "unit"] = "%" - new_df_only_with_relative_electricity_demand.loc[ - :, "value" - ] = relative_electricity_demand - - del df_for_one_scenario - - dataframe = pd.concat( - [dataframe, new_df_only_with_relative_electricity_demand] - ) - - del dataframe["Unnamed: 0"] - del new_df_only_with_relative_electricity_demand - - return dataframe - - -class FilterClass: - - """Class for setting filters on the data for processing.""" - - def __init__(self): - """Initialize the class.""" - - ( - self.kpi_data, - self.electricity_data, - self.occuancy_consumption, - self.heating_demand, - ) = self.get_variables_to_check() - ( - self.building_type, - self.building_refurbishment_state, - self.building_age, - self.pv_share, - ) = self.get_scenarios_to_check() - - def get_variables_to_check(self): - """Get specific variables to check for the scenario evaluation.""" - - # system_setups for variables to check (check names of your variables before your evaluation, if they are correct) - # kpi data has no time series, so only choose when you analyze yearly data - kpi_data = [ - "Production", - "Investment costs for equipment per simulated period", - "CO2 footprint for equipment per simulated period", - "System operational costs for simulated period", - "System operational emissions for simulated period", - "Total costs for simulated period", - "Total emissions for simulated period", - "Temperature deviation of building indoor air temperature being below set temperature 19.0 °C", - "Minimum building indoor air temperature reached", - "Temperature deviation of building indoor air temperature being above set temperature 24.0 °C", - "Maximum building indoor air temperature reached", - "Number of heat pump cycles", - "Total energy from electricity grid", - "Total energy to electricity grid", - ] - - electricity_data = [ - # "L2EMSElectricityController|Electricity|ElectricityToOrFromGrid", - # "PVSystem_w0|Electricity|ElectricityOutput", # check if pv was used or not - "ElectricityMeter|Electricity|ElectricityToGrid", - "ElectricityMeter|Electricity|ElectricityFromGrid", - "ElectricityMeter|Electricity|ElectricityAvailable", - "ElectricityMeter|Electricity|ElectricityConsumption", - "ElectricityMeter|Electricity|ElectricityProduction", - ] - - occuancy_consumption = [ - "Occupancy|Electricity|ElectricityOutput", - "Occupancy|WarmWater|WaterConsumption", - ] - - heating_demand = [ - "AdvancedHeatPumpHPLib|Heating|ThermalOutputPower", - # "HeatDistributionSystem|Heating|ThermalOutputPower", - # "Building|Heating|TheoreticalThermalBuildingDemand", - "Building|Temperature|TemperatureIndoorAir", - ] - - return kpi_data, electricity_data, occuancy_consumption, heating_demand - - def get_scenarios_to_check(self): - """Get scenarios to check for scenario evaluation.""" - - ( - building_type, - building_refurbishment_state, - building_age, - ) = self.get_building_properties_to_check() - - pv_share = self.get_pv_properties_to_check() - - return building_type, building_refurbishment_state, building_age, pv_share - - def get_building_properties_to_check(self): - """Get building properties.""" - - # system_setups for scenarios to filter - building_type = [ - "DE.N.SFH", - "DE.N.TH", - "DE.N.MFH", - "DE.N.AB", - ] - - building_refurbishment_state = [ - "001.001", - "001.002", - "001.003", - ] - - building_age = [ - "01.Gen", - "02.Gen", - "03.Gen", - "04.Gen", - "05.Gen", - "06.Gen", - "07.Gen", - "08.Gen", - "09.Gen", - "10.Gen", - "11.Gen", - "12.Gen", - ] - - return building_type, building_refurbishment_state, building_age - - def get_pv_properties_to_check(self): - """Get pv properties.""" - - # system_setups for scenarios to filter - pv_share = [0, 0.25, 0.5, 1] - - return pv_share diff --git a/hisim/postprocessing/scenario_evaluation/__init__.py b/hisim/postprocessing/scenario_evaluation/__init__.py new file mode 100644 index 000000000..00de565ea --- /dev/null +++ b/hisim/postprocessing/scenario_evaluation/__init__.py @@ -0,0 +1 @@ +"""Init module.""" diff --git a/hisim/postprocessing/pyam_data_collection.py b/hisim/postprocessing/scenario_evaluation/result_data_collection.py similarity index 79% rename from hisim/postprocessing/pyam_data_collection.py rename to hisim/postprocessing/scenario_evaluation/result_data_collection.py index a32646bab..01d6500d4 100644 --- a/hisim/postprocessing/pyam_data_collection.py +++ b/hisim/postprocessing/scenario_evaluation/result_data_collection.py @@ -1,4 +1,4 @@ -"""Data Collection for Scenario Comparison with Pyam.""" +"""Data Collection for Scenario Comparison.""" # clean import glob import datetime @@ -15,9 +15,9 @@ from hisim import log -class PyamDataCollector: +class ResultDataCollection: - """PyamDataCollector class which collects and concatenate the pyam data from the system_setups/results.""" + """ResultDataCollection class which collects and concatenate the result data from the system_setups/results.""" def __init__( self, @@ -25,72 +25,68 @@ def __init__( simulation_duration_to_check: str, time_resolution_of_data_set: Any, folder_from_which_data_will_be_collected: str = os.path.join( - os.pardir, os.pardir, "system_setups", "results" + os.pardir, os.pardir, os.pardir, "system_setups", "results" ), path_to_default_config: Optional[str] = None, ) -> None: """Initialize the class.""" result_folder = folder_from_which_data_will_be_collected - self.pyam_data_folder = os.path.join( - os.pardir, - os.pardir, - "system_setups", - "results_for_scenario_comparison", - "data", + self.result_data_folder = os.path.join( + result_folder, os.pardir, "results_for_scenario_comparison", "data", ) # in each system_setups/results folder should be one system setup that was executed with the default config - self.path_of_pyam_results_executed_with_default_config: str = "" + self.path_of_scenario_data_executed_with_default_config: str = "" log.information(f"Checking results from folder: {result_folder}") - list_with_pyam_data_folders = self.get_only_useful_data( + list_with_result_data_folders = self.get_only_useful_data( result_path=result_folder ) - if data_processing_mode == PyamDataProcessingModeEnum.PROCESS_ALL_DATA: + if data_processing_mode == ResultDataProcessingModeEnum.PROCESS_ALL_DATA: parameter_key = None elif ( data_processing_mode - == PyamDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_BUILDING_SIZES + == ResultDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_BUILDING_SIZES ): parameter_key = "conditioned_floor_area_in_m2" elif ( data_processing_mode - == PyamDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_BUILDING_CODES + == ResultDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_BUILDING_CODES ): parameter_key = "building_code" elif ( data_processing_mode - == PyamDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_PV_AZIMUTH_ANGLES + == ResultDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_PV_AZIMUTH_ANGLES ): parameter_key = "pv_azimuth" elif ( data_processing_mode - == PyamDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_PV_TILT_ANGLES + == ResultDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_PV_TILT_ANGLES ): parameter_key = "pv_tilt" elif ( data_processing_mode - == PyamDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_SHARE_OF_MAXIMUM_PV + == ResultDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_SHARE_OF_MAXIMUM_PV ): parameter_key = "share_of_maximum_pv_power" elif ( data_processing_mode - == PyamDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_NUMBER_OF_DWELLINGS + == ResultDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_NUMBER_OF_DWELLINGS ): parameter_key = "number_of_dwellings_per_building" else: raise ValueError( - "Analysis mode is not part of the PyamDataProcessingModeEnum class." + "Analysis mode is not part of the ResultDataProcessingModeEnum class." ) log.information(f"Data Collection Mode is {data_processing_mode}") @@ -100,11 +96,11 @@ def __init__( if path_to_default_config is None: list_with_parameter_key_values = None - list_with_csv_files = list_with_pyam_data_folders + list_with_csv_files = list_with_result_data_folders list_with_module_config_dicts = None else: - # path to default config is given (which means there should be also a module config dict in the json file in the pyam folder which has read the config) + # path to default config is given (which means there should be also a module config dict in the json file in the result folder which has read the config) default_config_dict = self.get_default_config( path_to_default_config=path_to_default_config @@ -114,8 +110,8 @@ def __init__( list_with_csv_files, list_with_parameter_key_values, list_with_module_config_dicts, - ) = self.go_through_all_pyam_data_folders_and_collect_file_paths_according_to_parameters( - list_with_pyam_data_folders=list_with_pyam_data_folders, + ) = self.go_through_all_result_data_folders_and_collect_file_paths_according_to_parameters( + list_with_result_data_folders=list_with_result_data_folders, default_config_dict=default_config_dict, parameter_key=parameter_key, ) @@ -150,37 +146,37 @@ def get_only_useful_data(self, result_path: str) -> List[str]: # go through result path and if the dirs do not contain finished.flag ask for deletion self.clean_result_directory_from_unfinished_results(result_path=result_path) - # get result folders with pyam data folder - list_with_all_paths_to_check = self.get_list_of_all_relevant_pyam_data_folders( + # get result folders with result data folder + list_with_all_paths_to_check = self.get_list_of_all_relevant_scenario_data_folders( result_path=result_path ) print( - "len of list with all paths to containing pyam data ", + "len of list with all paths to containing result data ", len(list_with_all_paths_to_check), ) # filter out results that had buildings that were too hot or too cold list_with_all_paths_to_check_after_filtering = self.filter_results_that_failed_to_heat_or_cool_building_sufficiently( - list_of_result_path_that_contain_pyam_data=list_with_all_paths_to_check + list_of_result_path_that_contain_scenario_data=list_with_all_paths_to_check ) print( "len of list with all paths after filtering ", len(list_with_all_paths_to_check), ) # check if duplicates are existing and ask for deletion - list_with_pyam_data_folders = self.go_through_all_pyam_data_folders_and_check_if_module_configs_are_double_somewhere( - list_of_pyam_folder_paths_to_check=list_with_all_paths_to_check_after_filtering + list_with_result_data_folders = self.go_through_all_scenario_data_folders_and_check_if_module_configs_are_double_somewhere( + list_of_result_folder_paths_to_check=list_with_all_paths_to_check_after_filtering ) print( "len of list with all paths after double checking for duplicates ", - len(list_with_pyam_data_folders), + len(list_with_result_data_folders), ) - return list_with_pyam_data_folders + return list_with_result_data_folders def clean_result_directory_from_unfinished_results(self, result_path: str) -> None: """When a result folder does not contain the finished_flag, it will be removed from the system_setups/result folder.""" list_of_unfinished_folders = [] with open( - os.path.join(self.pyam_data_folder, "failed_simualtions.txt"), + os.path.join(self.result_data_folder, "failed_simualtions.txt"), "a", encoding="utf-8", ) as file: @@ -216,13 +212,13 @@ def clean_result_directory_from_unfinished_results(self, result_path: str) -> No # print("The answer must be yes or no.") def filter_results_that_failed_to_heat_or_cool_building_sufficiently( - self, list_of_result_path_that_contain_pyam_data: List[str] + self, list_of_result_path_that_contain_scenario_data: List[str] ) -> List[str]: """When a result shows too high or too low building temperatures, it will be filtered and removed from further analysis.""" list_of_unsuccessful_folders = [] with open( os.path.join( - self.pyam_data_folder, + self.result_data_folder, "succeeded_simulations_that_showed_too_high_or_too_low_building_temps.txt", ), "a", @@ -241,17 +237,17 @@ def filter_results_that_failed_to_heat_or_cool_building_sufficiently( "temp deviation below set heating [°C*h]," "temp deviation above set cooling [°C*h], folder \n" ) - for folder in list_of_result_path_that_contain_pyam_data: - pyam_data_information = os.path.join( - folder, "data_information_for_pyam.json" + for folder in list_of_result_path_that_contain_scenario_data: + scenario_data_information = os.path.join( + folder, "data_information_for_scenario_evaluation.json" ) main_folder = os.path.normpath(folder + os.sep + os.pardir) webtool_kpis_file = os.path.join(main_folder, "webtool_kpis.json") # get set temperatures used in the simulation - if os.path.exists(pyam_data_information): + if os.path.exists(scenario_data_information): with open( - pyam_data_information, "r", encoding="utf-8" + scenario_data_information, "r", encoding="utf-8" ) as data_info_file: json_file = json.load(data_info_file) component_entries = json_file["componentEntries"] @@ -270,7 +266,7 @@ def filter_results_that_failed_to_heat_or_cool_building_sufficiently( break else: raise FileNotFoundError( - f"The file {pyam_data_information} could not be found. " + f"The file {scenario_data_information} could not be found. " ) # open the webtool kpis and check if building got too hot or too cold @@ -359,7 +355,7 @@ def filter_results_that_failed_to_heat_or_cool_building_sufficiently( ) if answer.upper() in ["N", "NO"]: for folder in list_of_unsuccessful_folders: - list_of_result_path_that_contain_pyam_data.remove(folder) + list_of_result_path_that_contain_scenario_data.remove(folder) print( "The folders with too low or too high building temperatures will be discarded from the further analysis." ) @@ -370,21 +366,21 @@ def filter_results_that_failed_to_heat_or_cool_building_sufficiently( else: print("The answer must be yes or no.") - return list_of_result_path_that_contain_pyam_data + return list_of_result_path_that_contain_scenario_data - def get_list_of_all_relevant_pyam_data_folders(self, result_path: str) -> List[str]: - """Get a list of all pyam data folders which you want to analyze.""" + def get_list_of_all_relevant_scenario_data_folders(self, result_path: str) -> List[str]: + """Get a list of all scenario data folders which you want to analyze.""" # choose which path to check - path_to_check = os.path.join(result_path, "**", "pyam_data") + path_to_check = os.path.join(result_path, "**", "result_data_for_scenario_evaluation") list_of_paths_first_order = list(glob.glob(path_to_check)) - # if in these paths no pyam data folder can be found check in subfolders for it - path_to_check = os.path.join(result_path, "**", "**", "pyam_data") # type: ignore + # if in these paths no result data folder can be found check in subfolders for it + path_to_check = os.path.join(result_path, "**", "**", "result_data_for_scenario_evaluation") # type: ignore list_of_paths_second_order = list(glob.glob(path_to_check)) - path_to_check = os.path.join(result_path, "**", "**", "**", "pyam_data") # type: ignore + path_to_check = os.path.join(result_path, "**", "**", "**", "result_data_for_scenario_evaluation") # type: ignore list_of_paths_third_order = list(glob.glob(path_to_check)) list_with_all_paths_to_check = ( @@ -399,21 +395,21 @@ def import_data_from_file( self, paths_to_check: List[str], analyze_yearly_or_hourly_data: Any ) -> List: """Import data from result files.""" - log.information("Importing pyam_data from csv files.") + log.information("Importing result_data_for_scenario_evaluation from csv files.") all_csv_files = [] - if analyze_yearly_or_hourly_data == PyamDataTypeEnum.HOURLY: + if analyze_yearly_or_hourly_data == ResultDataTypeEnum.HOURLY: kind_of_data_set = "hourly" - elif analyze_yearly_or_hourly_data == PyamDataTypeEnum.YEARLY: + elif analyze_yearly_or_hourly_data == ResultDataTypeEnum.YEARLY: kind_of_data_set = "yearly" - elif analyze_yearly_or_hourly_data == PyamDataTypeEnum.DAILY: + elif analyze_yearly_or_hourly_data == ResultDataTypeEnum.DAILY: kind_of_data_set = "daily" - elif analyze_yearly_or_hourly_data == PyamDataTypeEnum.MONTHLY: + elif analyze_yearly_or_hourly_data == ResultDataTypeEnum.MONTHLY: kind_of_data_set = "monthly" else: raise ValueError( - "analyze_yearly_or_hourly_data was not found in the pyamdatacollectorenum class." + "analyze_yearly_or_hourly_data was not found in the datacollectorenum class." ) for folder in paths_to_check: # type: ignore @@ -436,7 +432,6 @@ def make_dictionaries_with_simulation_duration_keys( # open file config and check if they have wanted simulation duration for file in all_csv_files: - parent_folder = os.path.abspath(os.path.join(file, os.pardir)) # type: ignore for file1 in os.listdir(parent_folder): if ".json" in file1: @@ -444,13 +439,16 @@ def make_dictionaries_with_simulation_duration_keys( os.path.join(parent_folder, file1), "r", encoding="utf-8" ) as openfile: json_file = json.load(openfile) - simulation_duration = json_file["pyamDataInformation"].get( + simulation_duration = json_file["scenarioDataInformation"].get( "duration in days" ) if int(simulation_duration_to_check) == int( simulation_duration ): dict_of_csv_data[f"{simulation_duration}"].append(file) + else: + raise ValueError(f"The simulation_duration_to_check of {simulation_duration_to_check} is different," + f"to the simulation duration of {simulation_duration} found in the scenario data information json in the result folders.") # raise error if dict is empty if bool(dict_of_csv_data) is False: @@ -523,9 +521,9 @@ def read_csv_and_generate_pandas_dataframe( list_with_parameter_key_values: Optional[List[Any]] = None, list_with_module_config_dicts: Optional[List[Any]] = None, ) -> None: - """Read the csv files and generate the pyam dataframe.""" + """Read the csv files and generate the result dataframe.""" log.information( - f"Read csv files and generate pyam dataframes for {time_resolution_of_data_set}." + f"Read csv files and generate result dataframes for {time_resolution_of_data_set}." ) appended_dataframe = pd.DataFrame() @@ -591,55 +589,55 @@ def read_csv_and_generate_pandas_dataframe( dataframe=appended_dataframe ) - filename = self.store_pyam_data_with_the_right_name_and_in_the_right_path( - pyam_data_folder=self.pyam_data_folder, + filename = self.store_scenario_data_with_the_right_name_and_in_the_right_path( + result_data_folder=self.result_data_folder, simulation_duration_key=simulation_duration_key, time_resolution_of_data_set=time_resolution_of_data_set, parameter_key=parameter_key, ) appended_dataframe.to_csv(filename) - def store_pyam_data_with_the_right_name_and_in_the_right_path( + def store_scenario_data_with_the_right_name_and_in_the_right_path( self, - pyam_data_folder: str, + result_data_folder: str, simulation_duration_key: str, time_resolution_of_data_set: Any, parameter_key: Optional[str] = None, ) -> str: - """Store csv files in the pyam data folder with the right filename and path.""" + """Store csv files in the result data folder with the right filename and path.""" - if time_resolution_of_data_set == PyamDataTypeEnum.HOURLY: + if time_resolution_of_data_set == ResultDataTypeEnum.HOURLY: kind_of_data_set = "hourly" - elif time_resolution_of_data_set == PyamDataTypeEnum.YEARLY: + elif time_resolution_of_data_set == ResultDataTypeEnum.YEARLY: kind_of_data_set = "yearly" - elif time_resolution_of_data_set == PyamDataTypeEnum.DAILY: + elif time_resolution_of_data_set == ResultDataTypeEnum.DAILY: kind_of_data_set = "daily" - elif time_resolution_of_data_set == PyamDataTypeEnum.MONTHLY: + elif time_resolution_of_data_set == ResultDataTypeEnum.MONTHLY: kind_of_data_set = "monthly" else: raise ValueError( - "This kind of data was not found in the pyamdatacollectorenum class." + "This kind of data was not found in the datacollectorenum class." ) if parameter_key is not None: path_for_file = os.path.join( - pyam_data_folder, + result_data_folder, f"data_with_different_{parameter_key}s", f"simulation_duration_of_{simulation_duration_key}_days", ) else: path_for_file = os.path.join( - pyam_data_folder, + result_data_folder, "data_with_all_parameters", f"simulation_duration_of_{simulation_duration_key}_days", ) if os.path.exists(path_for_file) is False: os.makedirs(path_for_file) - log.information(f"Saving pyam dataframe in {path_for_file} folder") + log.information(f"Saving result dataframe in {path_for_file} folder") filename = os.path.join( path_for_file, - f"pyam_dataframe_for_{simulation_duration_key}_days_{kind_of_data_set}_data.csv", + f"result_dataframe_for_{simulation_duration_key}_days_{kind_of_data_set}_data.csv", ) return filename @@ -656,21 +654,21 @@ def get_default_config(self, path_to_default_config: Optional[str]) -> Any: return default_config_dict - def read_pyam_data_json_config_and_compare_to_default_config( + def read_scenario_data_json_config_and_compare_to_default_config( self, default_config_dict: Dict[str, Any], - path_to_pyam_data_folder: str, + path_to_scenario_data_folder: str, list_with_csv_files: List[Any], list_with_parameter_key_values: List[Any], list_with_module_configs: List[Any], parameter_key: str, ) -> tuple[List[Any], List[Any], List[Any]]: - """Read json config in pyam_data folder and compare with default config.""" + """Read json config in result_data_for_scenario_evaluation folder and compare with default config.""" - for file in os.listdir(path_to_pyam_data_folder): + for file in os.listdir(path_to_scenario_data_folder): if ".json" in file: - with open(os.path.join(path_to_pyam_data_folder, file), "r", encoding="utf-8") as openfile: # type: ignore + with open(os.path.join(path_to_scenario_data_folder, file), "r", encoding="utf-8") as openfile: # type: ignore config_dict = json.load(openfile) my_module_config_dict = config_dict["myModuleConfig"] scenario_name = config_dict["systemName"] @@ -687,7 +685,8 @@ def read_pyam_data_json_config_and_compare_to_default_config( # check if module config and default config have any keys in common if len(set(default_config_dict).intersection(my_module_config_dict)) == 0: raise KeyError( - f"The module config of the folder {path_to_pyam_data_folder} should contain the keys of the default config, otherwise their values cannot be compared." + f"The module config of the folder {path_to_scenario_data_folder} should contain the keys of the default config,", + "otherwise their values cannot be compared." ) # check if there is a module config which is equal to default config @@ -695,8 +694,8 @@ def read_pyam_data_json_config_and_compare_to_default_config( item in my_module_config_dict.items() for item in default_config_dict.items() ): - self.path_of_pyam_results_executed_with_default_config = ( - path_to_pyam_data_folder + self.path_of_scenario_data_executed_with_default_config = ( + path_to_scenario_data_folder ) # for each parameter different than the default config parameter, get the respective path to the folder @@ -704,16 +703,16 @@ def read_pyam_data_json_config_and_compare_to_default_config( # if my_module_config_dict[parameter_key] != default_config_dict[parameter_key]: - list_with_csv_files.append(path_to_pyam_data_folder) + list_with_csv_files.append(path_to_scenario_data_folder) list_with_parameter_key_values.append(my_module_config_dict[parameter_key]) list_with_module_configs.append(my_module_config_dict) # add to each item in the dict also the default system setup if the default system setup exists - if self.path_of_pyam_results_executed_with_default_config != "": + if self.path_of_scenario_data_executed_with_default_config != "": list_with_csv_files.append( - self.path_of_pyam_results_executed_with_default_config + self.path_of_scenario_data_executed_with_default_config ) list_with_parameter_key_values.append(default_config_dict[parameter_key]) @@ -728,36 +727,37 @@ def read_pyam_data_json_config_and_compare_to_default_config( def read_module_config_if_exist_and_write_in_dataframe( self, default_config_dict: Dict[str, Any], - path_to_pyam_data_folder: str, + path_to_scenario_data_folder: str, list_with_module_configs: List[Any], list_with_csv_files: List[Any], ) -> Tuple[List, List]: """Read module config if possible and write to dataframe.""" - for file in os.listdir(path_to_pyam_data_folder): + for file in os.listdir(path_to_scenario_data_folder): if ".json" in file: - with open(os.path.join(path_to_pyam_data_folder, file), "r", encoding="utf-8") as openfile: # type: ignore + with open(os.path.join(path_to_scenario_data_folder, file), "r", encoding="utf-8") as openfile: # type: ignore config_dict = json.load(openfile) my_module_config_dict = config_dict["myModuleConfig"] # check if module config and default config have any keys in common if len(set(default_config_dict).intersection(my_module_config_dict)) == 0: raise KeyError( - f"The module config of the folder {path_to_pyam_data_folder} should contain the keys of the default config, otherwise their values cannot be compared." + f"The module config of the folder {path_to_scenario_data_folder} should contain the keys of the default config,", + "otherwise their values cannot be compared." ) list_with_module_configs.append(my_module_config_dict) - list_with_csv_files.append(path_to_pyam_data_folder) + list_with_csv_files.append(path_to_scenario_data_folder) return ( list_with_module_configs, list_with_csv_files, ) - def go_through_all_pyam_data_folders_and_collect_file_paths_according_to_parameters( + def go_through_all_result_data_folders_and_collect_file_paths_according_to_parameters( self, - list_with_pyam_data_folders: List[str], + list_with_result_data_folders: List[str], default_config_dict: Dict[str, Any], parameter_key: Optional[str], ) -> tuple[List[Any], List[Any], List[Any]]: @@ -767,7 +767,7 @@ def go_through_all_pyam_data_folders_and_collect_file_paths_according_to_paramet list_with_csv_files: List = [] list_with_parameter_key_values: List = [] - for folder in list_with_pyam_data_folders: # type: ignore + for folder in list_with_result_data_folders: # type: ignore if parameter_key is None: ( @@ -775,7 +775,7 @@ def go_through_all_pyam_data_folders_and_collect_file_paths_according_to_paramet list_with_csv_files, ) = self.read_module_config_if_exist_and_write_in_dataframe( default_config_dict=default_config_dict, - path_to_pyam_data_folder=folder, + path_to_scenario_data_folder=folder, list_with_module_configs=list_with_module_configs, list_with_csv_files=list_with_csv_files, ) @@ -787,9 +787,9 @@ def go_through_all_pyam_data_folders_and_collect_file_paths_according_to_paramet list_with_csv_files, list_with_parameter_key_values, list_with_module_configs, - ) = self.read_pyam_data_json_config_and_compare_to_default_config( + ) = self.read_scenario_data_json_config_and_compare_to_default_config( default_config_dict=default_config_dict, - path_to_pyam_data_folder=folder, + path_to_scenario_data_folder=folder, list_with_csv_files=list_with_csv_files, list_with_parameter_key_values=list_with_parameter_key_values, list_with_module_configs=list_with_module_configs, @@ -815,14 +815,15 @@ def check_for_duplicates_in_dict( return indices_of_duplicates - def go_through_all_pyam_data_folders_and_check_if_module_configs_are_double_somewhere( - self, list_of_pyam_folder_paths_to_check: List[str] + def go_through_all_scenario_data_folders_and_check_if_module_configs_are_double_somewhere( + self, list_of_result_folder_paths_to_check: List[str] ) -> List[Any]: - """Go through all pyam folders and remove the system_setups that are duplicated.""" + """Go through all result folders and remove the system_setups that are duplicated.""" list_of_all_module_configs = [] - list_of_pyam_folders_which_have_only_unique_configs = [] - for folder in list_of_pyam_folder_paths_to_check: + list_of_result_folders_which_have_only_unique_configs = [] + for folder in list_of_result_folder_paths_to_check: + for file in os.listdir(folder): if ".json" in file: with open(os.path.join(folder, file), "r", encoding="utf-8") as openfile: # type: ignore @@ -831,16 +832,16 @@ def go_through_all_pyam_data_folders_and_check_if_module_configs_are_double_some my_module_config_dict.update( { "duration in days": config_dict[ - "pyamDataInformation" + "scenarioDataInformation" ].get("duration in days") } ) my_module_config_dict.update( - {"model": config_dict["pyamDataInformation"].get("model")} + {"model": config_dict["scenarioDataInformation"].get("model")} ) my_module_config_dict.update( { - "model": config_dict["pyamDataInformation"].get( + "model": config_dict["scenarioDataInformation"].get( "scenario" ) } @@ -849,13 +850,13 @@ def go_through_all_pyam_data_folders_and_check_if_module_configs_are_double_some # prevent to add modules with same module config and same simulation duration twice if my_module_config_dict not in list_of_all_module_configs: list_of_all_module_configs.append(my_module_config_dict) - list_of_pyam_folders_which_have_only_unique_configs.append( + list_of_result_folders_which_have_only_unique_configs.append( os.path.join(folder) ) # get folders with duplicates list_with_duplicates = [] - if folder not in list_of_pyam_folders_which_have_only_unique_configs: + if folder not in list_of_result_folders_which_have_only_unique_configs: whole_parent_folder = os.path.abspath(os.path.join(folder, os.pardir)) list_with_duplicates.append(whole_parent_folder) @@ -874,12 +875,12 @@ def go_through_all_pyam_data_folders_and_check_if_module_configs_are_double_some else: print("The answer must be yes or no.") - return list_of_pyam_folders_which_have_only_unique_configs + return list_of_result_folders_which_have_only_unique_configs -class PyamDataTypeEnum(enum.Enum): +class ResultDataTypeEnum(enum.Enum): - """PyamDataTypeEnum class. + """ResultDataTypeEnum class. Here it is defined what kind of data you want to collect. """ @@ -890,9 +891,9 @@ class PyamDataTypeEnum(enum.Enum): YEARLY = "yearly" -class PyamDataProcessingModeEnum(enum.Enum): +class ResultDataProcessingModeEnum(enum.Enum): - """PyamDataProcessingModeEnum class. + """ResultDataProcessingModeEnum class. Here it is defined what kind of data processing you want to make. """ diff --git a/hisim/postprocessing/scenario_evaluation/result_data_plotting.py b/hisim/postprocessing/scenario_evaluation/result_data_plotting.py new file mode 100644 index 000000000..78e97cf63 --- /dev/null +++ b/hisim/postprocessing/scenario_evaluation/result_data_plotting.py @@ -0,0 +1,753 @@ +"""Data Processing and Plotting for Scenario Comparison.""" + + +import datetime +import os +from typing import Dict, Any, Tuple, Optional, List +import string +import warnings +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt + +# import plotly +# from html2image import Html2Image +from ordered_set import OrderedSet +import seaborn as sns + +from hisim.postprocessing.scenario_evaluation.result_data_collection import ( + ResultDataTypeEnum, + ResultDataProcessingModeEnum, +) +from hisim.postprocessing.scenario_evaluation.result_data_processing import ( + ScenarioDataProcessing, +) +from hisim.postprocessing.chartbase import ChartFontsAndSize +from hisim import log + + +class ScenarioChartGeneration: + + """ScenarioChartGeneration class.""" + + def __init__( + self, + simulation_duration_to_check: str, + data_processing_mode: Any, + time_resolution_of_data_set: Any, + variables_to_check: Optional[List[str]] = None, + dict_of_scenarios_to_check: Optional[Dict[str, List[str]]] = None, + folder_from_which_data_will_be_collected: str = os.path.join( + os.pardir, os.pardir, os.pardir, "system_setups", "results" + ), + ) -> None: + """Initialize the class.""" + + warnings.filterwarnings("ignore") + + self.datetime_string = datetime.datetime.now().strftime("%Y%m%d_%H%M") + self.show_plot_legend: bool = True + + if data_processing_mode == ResultDataProcessingModeEnum.PROCESS_ALL_DATA: + + data_path_strip = "data_with_all_parameters" + result_path_strip = "results_for_all_parameters" + self.show_plot_legend = False + + elif ( + data_processing_mode + == ResultDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_BUILDING_CODES + ): + data_path_strip = "data_with_different_building_codes" + result_path_strip = "results_different_building_codes" + + elif ( + data_processing_mode + == ResultDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_BUILDING_SIZES + ): + data_path_strip = "data_with_different_conditioned_floor_area_in_m2s" + result_path_strip = "results_different_conditioned_floor_area_in_m2s" + + elif ( + data_processing_mode + == ResultDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_PV_AZIMUTH_ANGLES + ): + data_path_strip = "data_with_different_pv_azimuths" + result_path_strip = "results_different_pv_azimuths" + + elif ( + data_processing_mode + == ResultDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_PV_TILT_ANGLES + ): + data_path_strip = "data_with_different_pv_tilts" + result_path_strip = "results_different_pv_tilts" + + elif ( + data_processing_mode + == ResultDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_SHARE_OF_MAXIMUM_PV + ): + data_path_strip = "data_with_different_share_of_maximum_pv_powers" + result_path_strip = "results_different_share_of_maximum_pv_powers" + + elif ( + data_processing_mode + == ResultDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_NUMBER_OF_DWELLINGS + ): + data_path_strip = "data_with_different_number_of_dwellings_per_buildings" + result_path_strip = "results_different_number_of_dwellings_per_buildings" + + else: + raise ValueError("DataProcessingMode not known.") + + self.data_folder_path = os.path.join( + folder_from_which_data_will_be_collected, + os.pardir, + "results_for_scenario_comparison", + "data", + data_path_strip, + ) + + self.result_folder = os.path.join( + folder_from_which_data_will_be_collected, + os.pardir, + "results_for_scenario_comparison", + "results", + result_path_strip, + ) + + self.hisim_chartbase = ChartFontsAndSize() + self.hisim_chartbase.figsize = (10, 6) + self.hisim_chartbase.dpi = 100 + + if variables_to_check != [] and variables_to_check is not None: + # read data, sort data according to scenarios if wanted, and create pandas dataframe + ( + pandas_dataframe, + key_for_scenario_one, + key_for_current_scenario, + variables_to_check, + ) = ScenarioDataProcessing.get_dataframe_and_create_pandas_dataframe_for_all_data( + data_folder_path=self.data_folder_path, + time_resolution_of_data_set=time_resolution_of_data_set, + dict_of_scenarios_to_check=dict_of_scenarios_to_check, + variables_to_check=variables_to_check, + ) + + log.information("key for scenario one " + key_for_scenario_one) + log.information("key for current scenario " + key_for_current_scenario) + + self.make_plots_with_specific_kind_of_data( + time_resolution_of_data_set=time_resolution_of_data_set, + pandas_dataframe=pandas_dataframe, + simulation_duration_key=simulation_duration_to_check, + variables_to_check=variables_to_check, + ) + + else: + log.information( + "Variable list for data is not given and will not be plotted or anaylzed." + ) + + def make_plots_with_specific_kind_of_data( + self, + time_resolution_of_data_set: Any, + pandas_dataframe: pd.DataFrame, + simulation_duration_key: str, + variables_to_check: List[str], + ) -> None: + """Make plots for different kind of data.""" + + log.information(f"Simulation duration: {simulation_duration_key} days.") + + if pandas_dataframe.empty: + raise ValueError("Dataframe is empty.") + + sub_results_folder = f"simulation_duration_of_{simulation_duration_key}_days" + sub_sub_results_folder = ( + f"scenario_comparison_{time_resolution_of_data_set.value}_{self.datetime_string}" + ) + + self.path_for_plots = os.path.join( + self.result_folder, sub_results_folder, sub_sub_results_folder + ) + + for variable_to_check in variables_to_check: + log.information("Check variable " + str(variable_to_check)) + + # prepare path for plots + self.path_addition = "".join( + [ + x + for x in variable_to_check + if x in string.ascii_letters or x.isspace() or x == "2" + ] + ) + + self.plot_path_complete = os.path.join( + self.path_for_plots, self.path_addition + ) + if os.path.exists(self.plot_path_complete) is False: + os.makedirs(self.plot_path_complete) + + # filter the dataframe according to variable + filtered_data = ScenarioDataProcessing.filter_pandas_dataframe( + dataframe=pandas_dataframe, variable_to_check=variable_to_check + ) + # get unit of variable + try: + unit = filtered_data.unit.values[0] + except Exception: + if "Temperature deviation" in variable_to_check: + unit = "°C*h" + else: + unit = "-" + + if time_resolution_of_data_set == ResultDataTypeEnum.YEARLY: + kind_of_data_set = "yearly" + + # get statistical data + ScenarioDataProcessing.get_statistics_of_data_and_write_to_excel( + filtered_data=filtered_data, + path_to_save=self.plot_path_complete, + kind_of_data_set=kind_of_data_set, + ) + + try: + self.make_box_plot_for_pandas_dataframe( + filtered_data=filtered_data, title=self.path_addition, + ) + except Exception: + log.information( + f"{variable_to_check} could not be plotted as box plot." + ) + + try: + self.make_bar_plot_for_pandas_dataframe( + filtered_data=filtered_data, title=self.path_addition, unit=unit + ) + except Exception: + log.information( + f"{variable_to_check} could not be plotted as bar plot." + ) + + try: + self.make_scatter_plot_for_pandas_dataframe( + full_pandas_dataframe=pandas_dataframe, + filtered_data=filtered_data, + y_data_variable=self.path_addition, + ) + except Exception: + log.information( + f"{variable_to_check} could not be plotted as scatter plot." + ) + + try: + self.make_histogram_plot_for_pandas_dataframe( + filtered_data=filtered_data, title=self.path_addition, unit=unit + ) + except Exception: + log.information( + f"{variable_to_check} could not be plotted as histogram." + ) + + elif time_resolution_of_data_set in ( + ResultDataTypeEnum.HOURLY, + ResultDataTypeEnum.DAILY, + ResultDataTypeEnum.MONTHLY, + ): + + if time_resolution_of_data_set == ResultDataTypeEnum.HOURLY: + kind_of_data_set = "hourly" + line_plot_marker_size = 2 + elif time_resolution_of_data_set == ResultDataTypeEnum.DAILY: + kind_of_data_set = "daily" + line_plot_marker_size = 3 + elif time_resolution_of_data_set == ResultDataTypeEnum.MONTHLY: + kind_of_data_set = "monthly" + line_plot_marker_size = 5 + + # get statistical data + ScenarioDataProcessing.get_statistics_of_data_and_write_to_excel( + filtered_data=filtered_data, + path_to_save=self.plot_path_complete, + kind_of_data_set=kind_of_data_set, + ) + + try: + self.make_line_plot_for_pandas_dataframe( + filtered_data=filtered_data, + title=self.path_addition, + line_plot_marker_size=line_plot_marker_size, + ) + except Exception: + log.information( + f"{variable_to_check} could not be plotted as line plot." + ) + try: + + self.make_box_plot_for_pandas_dataframe( + filtered_data=filtered_data, title=self.path_addition + ) + + except Exception: + log.information( + f"{variable_to_check} could not be plotted as box plot." + ) + + else: + raise ValueError( + "This kind of data was not found in the datacollectorenum class." + ) + + def make_line_plot_for_pandas_dataframe( + self, filtered_data: pd.DataFrame, title: str, line_plot_marker_size: int + ) -> None: + """Make line plot.""" + log.information("Make line plot with data.") + + fig, a_x = plt.subplots( + figsize=self.hisim_chartbase.figsize, dpi=self.hisim_chartbase.dpi + ) + x_data = list(OrderedSet(list(filtered_data.time))) + if filtered_data.time.values[0] is not None: + year = filtered_data.time.values[0].split("-")[0] + else: + raise ValueError( + "year could not be determined because time value of filtered data was None." + ) + + x_data_transformed = np.asarray(x_data, dtype="datetime64[D]") + + for scenario in list(OrderedSet(list(filtered_data.scenario))): + filtered_data_per_scenario = filtered_data.loc[ + filtered_data["scenario"] == scenario + ] + mean_values_aggregated_according_to_scenarios = [] + for time_value in x_data: + + mean_value_per_scenario_per_timestep = np.mean( + filtered_data_per_scenario.loc[ + filtered_data_per_scenario["time"] == time_value + ]["value"] + ) + + mean_values_aggregated_according_to_scenarios.append( + mean_value_per_scenario_per_timestep + ) + + y_data = mean_values_aggregated_according_to_scenarios + + plt.plot( + x_data_transformed, + y_data, + "-o", + markersize=line_plot_marker_size, + label=scenario, + ) + + y_tick_labels, unit, y_tick_locations = self.set_axis_scale( + a_x, x_or_y="y", unit=filtered_data.unit.values[0] + ) + plt.yticks( + ticks=y_tick_locations, + labels=y_tick_labels, + fontsize=self.hisim_chartbase.fontsize_ticks, + ) + + plt.ylabel( + ylabel=f"{unit}", fontsize=self.hisim_chartbase.fontsize_label, + ) + plt.xlabel( + xlabel=year, fontsize=self.hisim_chartbase.fontsize_label, + ) + plt.title(label=title, fontsize=self.hisim_chartbase.fontsize_title) + plt.tick_params(labelsize=self.hisim_chartbase.fontsize_ticks) + a_x.tick_params(axis="x", labelrotation=45) + if self.show_plot_legend: + plt.legend(bbox_to_anchor=(1, 1), loc="upper left") + + fig.savefig( + os.path.join(self.plot_path_complete, "line_plot.png"), bbox_inches="tight" + ) + plt.close() + + def make_bar_plot_for_pandas_dataframe( + self, + filtered_data: pd.DataFrame, + title: str, + unit: str, + alternative_bar_labels: Optional[List[str]] = None, + ) -> None: + """Make bar plot.""" + log.information("Make bar plot.") + + fig, a_x = plt.subplots( + figsize=self.hisim_chartbase.figsize, dpi=self.hisim_chartbase.dpi + ) + + y_data = [] + bar_labels = [] + + for scenario in list(OrderedSet(list(filtered_data.scenario))): + filtered_data_per_scenario = filtered_data.loc[ + filtered_data["scenario"] == scenario + ] + + mean_value_per_scenario = np.mean(filtered_data_per_scenario.value.values) + + y_data.append(mean_value_per_scenario) + bar_labels.append(scenario) + + # choose bar labels + if alternative_bar_labels is not None: + bar_labels = alternative_bar_labels + + x_data = np.arange(0, len(y_data) * 2, step=2) + + cmap = plt.get_cmap("viridis") + colors = [cmap(i) for i in np.linspace(0, 1, len(x_data))] + a_x.bar(x_data, y_data, label=bar_labels, color=colors) + + y_tick_labels, unit, y_tick_locations = self.set_axis_scale( + a_x, x_or_y="y", unit=unit, + ) + plt.yticks( + ticks=y_tick_locations, + labels=y_tick_labels, + fontsize=self.hisim_chartbase.fontsize_ticks, + ) + plt.ylabel( + ylabel=f"{unit}", fontsize=self.hisim_chartbase.fontsize_label, + ) + plt.xlabel( + xlabel=filtered_data.year.values[0], + fontsize=self.hisim_chartbase.fontsize_label, + ) + plt.title(label=title, fontsize=self.hisim_chartbase.fontsize_title) + plt.tick_params(labelsize=self.hisim_chartbase.fontsize_ticks) + + if self.show_plot_legend: + plt.legend(bbox_to_anchor=(1, 1), loc="upper left") + + a_x.xaxis.set_tick_params(labelbottom=False) + a_x.set_xticks([]) + plt.tight_layout() + fig.savefig(os.path.join(self.plot_path_complete, "bar_plot.png")) + plt.close() + + def make_box_plot_for_pandas_dataframe( + self, + filtered_data: pd.DataFrame, + title: str, + scenario_set: Optional[List[str]] = None, + ) -> None: + """Make box plot.""" + log.information("Make box plot.") + + fig, a_x = plt.subplots( + figsize=self.hisim_chartbase.figsize, dpi=self.hisim_chartbase.dpi + ) + if scenario_set is None: + scenario_set = list(OrderedSet(filtered_data.scenario)) + + sns.boxplot(data=filtered_data, x="scenario", y="value") + + y_tick_labels, unit, y_tick_locations = self.set_axis_scale( + a_x, x_or_y="y", unit=filtered_data.unit.values[0] + ) + plt.yticks( + ticks=y_tick_locations, + labels=y_tick_labels, + fontsize=self.hisim_chartbase.fontsize_ticks, + ) + plt.ylabel( + ylabel=f"{unit}", fontsize=self.hisim_chartbase.fontsize_label, + ) + try: + # this works for yearly data + plt.xlabel( + xlabel=filtered_data.year.values[0], + fontsize=self.hisim_chartbase.fontsize_label, + ) + except Exception: + # take year from time colum + year = filtered_data.time.values[0].split("-")[0] + plt.xlabel( + xlabel=year, fontsize=self.hisim_chartbase.fontsize_label, + ) + plt.title(label=title, fontsize=self.hisim_chartbase.fontsize_title) + plt.tick_params(labelsize=self.hisim_chartbase.fontsize_ticks) + a_x.xaxis.set_tick_params(labelbottom=False) + a_x.set_xticks([]) + if self.show_plot_legend: + plt.legend(scenario_set, bbox_to_anchor=(1, 1), loc="upper left") + + fig.savefig( + os.path.join(self.plot_path_complete, "box_plot.png"), bbox_inches="tight" + ) + plt.close() + + def make_histogram_plot_for_pandas_dataframe( + self, + filtered_data: pd.DataFrame, + title: str, + unit: str, + scenario_set: Optional[List[str]] = None, + ) -> None: + """Make histogram plot.""" + log.information("Make histogram plot.") + + fig, a_x = plt.subplots( # pylint: disable=unused-variable + figsize=self.hisim_chartbase.figsize, dpi=self.hisim_chartbase.dpi + ) + if scenario_set is None: + scenario_set = list(OrderedSet(filtered_data.scenario)) + + plt.hist(x=np.array(filtered_data.value.values), bins="auto") + + plt.ylabel( + ylabel="Count", fontsize=self.hisim_chartbase.fontsize_label, + ) + plt.xlabel( + xlabel=f"{unit}", fontsize=self.hisim_chartbase.fontsize_label, + ) + plt.title(label=title, fontsize=self.hisim_chartbase.fontsize_title) + plt.tick_params(labelsize=self.hisim_chartbase.fontsize_ticks) + + fig.savefig( + os.path.join(self.plot_path_complete, "histogram_plot.png"), + bbox_inches="tight", + ) + plt.close() + + def make_scatter_plot_for_pandas_dataframe( + self, + full_pandas_dataframe: pd.DataFrame, + filtered_data: pd.DataFrame, + y_data_variable: str, + x_data_variable: str = "Specific heating load", + ) -> None: + """Make scatter plot.""" + log.information("Make scatter plot with data.") + + fig, a_x = plt.subplots( + figsize=self.hisim_chartbase.figsize, dpi=self.hisim_chartbase.dpi + ) + + # iterate over all scenarios + x_data_mean_value_list_for_all_scenarios = [] + y_data_mean_value_list_for_all_scenarios = [] + for scenario in list(OrderedSet(list(full_pandas_dataframe.scenario))): + + full_data_per_scenario = full_pandas_dataframe.loc[ + full_pandas_dataframe["scenario"] == scenario + ] + filtered_data_per_scenario = filtered_data.loc[ + filtered_data["scenario"] == scenario + ] + + # get x_data_list by filtering the df according to x_data_variable and then by taking values from "value" column + x_data_list = list( + full_data_per_scenario.loc[ + full_data_per_scenario["variable"] == x_data_variable + ]["value"].values + ) + x_data_unit = full_data_per_scenario.loc[ + full_data_per_scenario["variable"] == x_data_variable + ]["unit"].values[0] + + # if x_data_list has more than 1 value (because more values for this scenario exist), then take mean value + if len(x_data_list) > 1: + # for each scenario take the mean value + x_data_mean_value_per_scenario = np.mean(x_data_list) + elif len(x_data_list) == 1: + x_data_mean_value_per_scenario = x_data_list[0] + else: + raise ValueError( + "The x_data_list is empty. Probably the full dataframe did not contain the x_data_variable in the column variable." + ) + + # append to x_data_mean_value_list + x_data_mean_value_list_for_all_scenarios.append( + x_data_mean_value_per_scenario + ) + + # get y values from filtered data per scenario (already filtered according to variable to check and scenario) + y_data_list = list(filtered_data_per_scenario["value"].values) + y_data_unit = filtered_data_per_scenario["unit"].values[0] + # if y_data_list has more than 1 value (because more values for this scenario exist), then take mean value + if len(y_data_list) > 1: + # for each scenario take the mean value + y_data_mean_value_per_scenario = np.mean(y_data_list) + elif len(y_data_list) == 1: + y_data_mean_value_per_scenario = y_data_list[0] + else: + raise ValueError( + "The y_data_list is empty. Something went wrong with the filtering in the functions before." + ) + + # append to y_data_mean_value_list + y_data_mean_value_list_for_all_scenarios.append( + y_data_mean_value_per_scenario + ) + + # identify marker size accroding to data length + data_length = len(x_data_mean_value_list_for_all_scenarios) + if data_length < 10: + scatter_plot_marker_size = 20 + elif 10 < data_length < 50: + scatter_plot_marker_size = 16 + elif 50 < data_length < 100: + scatter_plot_marker_size = 8 + elif 100 < data_length < 300: + scatter_plot_marker_size = 6 + elif 300 < data_length < 500: + scatter_plot_marker_size = 4 + elif 500 < data_length < 1000: + scatter_plot_marker_size = 2 + else: + scatter_plot_marker_size = 1 + + # make scatter plot + plt.scatter( + x_data_mean_value_list_for_all_scenarios, + y_data_mean_value_list_for_all_scenarios, + s=scatter_plot_marker_size, + ) + + y_tick_labels, y_unit, y_tick_locations = self.set_axis_scale( + a_x, x_or_y="y", unit=y_data_unit + ) + plt.yticks( + ticks=y_tick_locations, + labels=y_tick_labels, + fontsize=self.hisim_chartbase.fontsize_ticks, + ) + + plt.ylabel( + ylabel=f"{y_data_variable} [{y_unit}]", + fontsize=self.hisim_chartbase.fontsize_label, + ) + plt.xlabel( + xlabel=f"{x_data_variable} [{x_data_unit}]", + fontsize=self.hisim_chartbase.fontsize_label, + ) + + plt.tick_params(labelsize=self.hisim_chartbase.fontsize_ticks) + a_x.tick_params(axis="x", labelrotation=45) + if self.show_plot_legend: + plt.legend(bbox_to_anchor=(1, 1), loc="upper left") + + fig.savefig( + os.path.join(self.plot_path_complete, "scatter_plot.png"), + bbox_inches="tight", + ) + plt.close() + + def set_axis_scale( + self, a_x: Any, x_or_y: Any, unit: Any + ) -> Tuple[float, str, Any]: + """Get axis and unit and scale it properly.""" + + if x_or_y == "x": + tick_values = a_x.get_xticks() + elif x_or_y == "y": + tick_values = a_x.get_yticks() + else: + raise ValueError("x_or_y must be either 'x' or 'y'") + + max_ticks = max(tick_values) + min_ticks = min(tick_values) + + max_scale = max(abs(max_ticks), abs(min_ticks)) + + new_tick_values = tick_values + scale = "" + + if unit not in ["-", "%"]: + + if max_scale >= 1e12: + new_tick_values = tick_values * 1e-12 + scale = "T" + elif 1e9 <= max_scale < 1e12: + new_tick_values = tick_values * 1e-9 + scale = "G" + elif 1e6 <= max_scale < 1e9: + new_tick_values = tick_values * 1e-6 + scale = "M" + elif 1e3 <= max_scale < 1e6: + new_tick_values = tick_values * 1e-3 + scale = "k" + elif -1e3 <= max_scale < 1e3: + new_tick_values = tick_values + scale = "" + + tick_locations = tick_values + tick_labels = np.round(new_tick_values, 1) + unit = f"{scale}{unit}" + + # if k already in unit, remove k and replace with "M" + if unit in ["kkWh", "kkW"]: + unit = "M" + unit[2:] + elif unit in ["kkg", "kkg/s"]: + unit = "t" + unit[3:] + + return tick_labels, unit, tick_locations + + # def make_sankey_plot_for_pyam_dataframe( + # self, + # pyam_dataframe: pyam.IamDataFrame, + # filter_model: Optional[str], + # filter_scenario: Optional[str], + # filter_variables: Optional[str], + # filter_region: Optional[str], + # filter_unit: Optional[str], + # filter_year: Optional[str], + # ) -> None: + # """Make sankey plot.""" + # log.information("Make sankey plot.") + + # filtered_data = self.filter_pyam_dataframe( + # pyam_dataframe=pyam_dataframe, + # filter_model=filter_model, + # filter_scenario=filter_scenario, + # filter_region=filter_region, + # filter_variables=filter_variables, + # filter_unit=filter_unit, + # filter_year=filter_year, + # ) + + # sankey_mapping = { + # "ElectrcityGridBaseLoad|Electricity|ElectricityOutput": ( + # "PV", + # "Occupancy", + # ), + # "PVSystemw-|Electricity|ElectricityOutput": ("PV", "Grid"), + # "Occupancy|Electricity|ElectricityOutput": ("Grid", "Occupancy"), + # } + # fig = filtered_data.plot.sankey(mapping=sankey_mapping) + + # # save figure as html first + # plotly.offline.plot( + # fig, + # filename=os.path.join(self.plot_path_complete, "sankey_plot.html"), + # auto_open=False, + # ) + + # # convert html file to png + # hti = Html2Image() + # with open( + # os.path.join(self.plot_path_complete, "sankey_plot.html"), encoding="utf8", + # ) as file: + # hti.screenshot( + # file.read(), save_as="sankey_plot.png", + # ) + + # # change directory of sankey output file + # try: + # os.rename( + # "sankey_plot.png", + # os.path.join(self.plot_path_complete, "sankey_plot.png"), + # ) + # except Exception as exc: + # raise Exception("Cannot save current sankey. Try again.") from exc diff --git a/hisim/postprocessing/scenario_evaluation/result_data_processing.py b/hisim/postprocessing/scenario_evaluation/result_data_processing.py new file mode 100644 index 000000000..c0c426f11 --- /dev/null +++ b/hisim/postprocessing/scenario_evaluation/result_data_processing.py @@ -0,0 +1,542 @@ +"""Result Data Processing and Plotting for Scenario Comparison.""" + + +import glob +import os +from typing import Dict, Any, Tuple, Optional, List +import copy +import pandas as pd + +from hisim.postprocessing.scenario_evaluation.result_data_collection import ( + ResultDataTypeEnum, +) +from hisim import log + + +class ScenarioDataProcessing: + + """ScenarioDataProcessing class.""" + + @staticmethod + def get_dataframe_and_create_pandas_dataframe_for_all_data( + data_folder_path: str, + time_resolution_of_data_set: Any, + dict_of_scenarios_to_check: Optional[Dict[str, List[str]]], + variables_to_check: List[str], + ) -> Tuple[pd.DataFrame, str, str, List[str]]: + """Get csv data and create dataframes with the filtered and procesed scenario data.""" + + if time_resolution_of_data_set == ResultDataTypeEnum.HOURLY: + kind_of_data_set = "hourly" + elif time_resolution_of_data_set == ResultDataTypeEnum.YEARLY: + kind_of_data_set = "yearly" + elif time_resolution_of_data_set == ResultDataTypeEnum.DAILY: + kind_of_data_set = "daily" + elif time_resolution_of_data_set == ResultDataTypeEnum.MONTHLY: + kind_of_data_set = "monthly" + else: + raise ValueError( + "This kind of data was not found in the datacollectorenum class." + ) + log.information( + f"Read csv files and create one big dataframe for {kind_of_data_set} data." + ) + + for file in glob.glob( + os.path.join(data_folder_path, "**", f"*{kind_of_data_set}*.csv") + ): + + file_df = pd.read_csv(filepath_or_buffer=file) + + # if scenario values are no strings, transform them + file_df["scenario"] = file_df["scenario"].transform(str) + key_for_scenario_one = "" + key_for_current_scenario = "" + + # # make rel electricity calculation before sorting and renaming + + # if "ElectricityMeter|Electricity|ElectricityFromGrid" in variables_to_check: + # print("relative electricity demand will be calculated.") + + # file_df = self.calculate_relative_electricity_demand(dataframe=file_df) + # variables_to_check.append("Relative Electricity Demand") + + if dict_of_scenarios_to_check is not None and dict_of_scenarios_to_check != {}: + + ( + file_df, + key_for_scenario_one, + key_for_current_scenario, + ) = ScenarioDataProcessing.check_if_scenario_exists_and_filter_dataframe_for_scenarios_dict( + data_frame=file_df, + dict_of_scenarios_to_check=dict_of_scenarios_to_check, + ) + + return ( + file_df, + key_for_scenario_one, + key_for_current_scenario, + variables_to_check, + ) + + @staticmethod + def filter_pandas_dataframe( + dataframe: pd.DataFrame, variable_to_check: str + ) -> pd.DataFrame: + """Filter pandas dataframe according to variable.""" + filtered_dataframe = dataframe.loc[dataframe["variable"] == variable_to_check] + if filtered_dataframe.empty: + print( + f"The dataframe contains the following variables: {set(list(dataframe.variable))}" + ) + # raise ValueError( + print( + f"The filtered dataframe is empty. The dataframe did not contain the variable {variable_to_check}. Check the list above." + ) + return filtered_dataframe + + @staticmethod + def get_statistics_of_data_and_write_to_excel( + filtered_data: pd.DataFrame, path_to_save: str, kind_of_data_set: str, + ) -> None: + """Use pandas describe method to get statistical values of certain data.""" + # create a excel writer object + with pd.ExcelWriter( # pylint: disable=abstract-class-instantiated + path=os.path.join(path_to_save, f"{kind_of_data_set}_statistics.xlsx"), + mode="w", + ) as writer: + + filtered_data.to_excel(excel_writer=writer, sheet_name="filtered data") + statistical_data = filtered_data.describe() + + statistical_data.to_excel(excel_writer=writer, sheet_name="statistics") + + @staticmethod + def check_if_scenario_exists_and_filter_dataframe_for_scenarios( + data_frame: pd.DataFrame, dict_of_scenarios_to_check: Dict[str, List[str]], + ) -> pd.DataFrame: + """Check if scenario exists and filter dataframe for scenario.""" + for (list_of_scenarios_to_check,) in dict_of_scenarios_to_check.values(): + aggregated_scenario_dict: Dict = { + key: [] for key in list_of_scenarios_to_check + } + + for given_scenario in data_frame["scenario"]: + # string comparison + + for scenario_to_check in list_of_scenarios_to_check: + if ( + scenario_to_check in given_scenario + and given_scenario + not in aggregated_scenario_dict[scenario_to_check] + ): + aggregated_scenario_dict[scenario_to_check].append( + given_scenario + ) + # raise error if dict is empty + for ( + key_scenario_to_check, + given_scenario, + ) in aggregated_scenario_dict.items(): + if given_scenario == []: + raise ValueError( + f"Scenarios containing {key_scenario_to_check} were not found in the dataframe." + ) + + concat_df = pd.DataFrame() + # only take rows from dataframe which are in selected scenarios + for ( + key_scenario_to_check, + given_scenario, + ) in aggregated_scenario_dict.items(): + + df_filtered_for_specific_scenarios = data_frame.loc[ + data_frame["scenario"].isin(given_scenario) + ] + df_filtered_for_specific_scenarios["scenario"] = [ + key_scenario_to_check + ] * len(df_filtered_for_specific_scenarios["scenario"]) + concat_df = pd.concat([concat_df, df_filtered_for_specific_scenarios]) + concat_df["scenario_0"] = data_frame["scenario"] + + return concat_df + + @staticmethod + def aggregate_all_values_for_one_scenario( + dataframe: pd.DataFrame, + list_of_scenarios_to_check: List, + column_name_to_check: str, + # filter_level_index: int, + ) -> pd.DataFrame: + """Check for one scenario.""" + + aggregated_scenario_dict: Dict = {key: [] for key in list_of_scenarios_to_check} + print("aggregated scenario dict", aggregated_scenario_dict) + for scenario_to_check in list_of_scenarios_to_check: + print("scenario to check", scenario_to_check) + for value in dataframe[column_name_to_check].values: + if ( + isinstance(scenario_to_check, str) + and scenario_to_check in value + and value not in aggregated_scenario_dict[scenario_to_check] + ): + aggregated_scenario_dict[scenario_to_check].append(value) + elif ( + isinstance(scenario_to_check, (float, int)) + and scenario_to_check == value + and value not in aggregated_scenario_dict[scenario_to_check] + ): + + aggregated_scenario_dict[scenario_to_check].append(value) + + concat_df = pd.DataFrame() + # only take rows from dataframe which are in selected scenarios + for ( + key_scenario_to_check, + given_list_of_values, + ) in aggregated_scenario_dict.items(): + + df_filtered_for_specific_scenarios = dataframe.loc[ + dataframe[column_name_to_check].isin(given_list_of_values) + ] + + df_filtered_for_specific_scenarios.loc[ + :, "scenario" + ] = key_scenario_to_check + + concat_df = pd.concat( + [concat_df, df_filtered_for_specific_scenarios], ignore_index=True + ) + + # concat_df[f"scenario_{filter_level_index}"] = dataframe.loc[:, "scenario"] + + del df_filtered_for_specific_scenarios + + return concat_df + + @staticmethod + def check_if_scenario_exists_and_filter_dataframe_for_scenarios_dict( + data_frame: pd.DataFrame, dict_of_scenarios_to_check: Dict[str, List[str]], + ) -> Tuple[pd.DataFrame, str, str]: + """Check if scenario exists and filter dataframe for scenario.""" + + concat_df = data_frame + filter_level_index = 0 + for ( + scenario_to_check_key, + list_of_scenarios_to_check, + ) in dict_of_scenarios_to_check.items(): + + concat_df = ScenarioDataProcessing.aggregate_all_values_for_one_scenario( + dataframe=concat_df, + list_of_scenarios_to_check=list_of_scenarios_to_check, + column_name_to_check=scenario_to_check_key, + # filter_level_index=filter_level_index, + ) + + filter_level_index = filter_level_index + 1 + key_for_scenario_one = "" + key_for_current_scenario = "" + # rename scenario with all scenario filter levels + for index in concat_df.index: + # if even more filter levels need to add condition! + if filter_level_index == 2: + current_scenario_value = concat_df["scenario"][index] + scenario_value_one = concat_df["scenario_1"][index] + # scenario zero is original scenario that will be overwritten + key_for_scenario_one = list(dict_of_scenarios_to_check.keys())[0] + key_for_current_scenario = list(dict_of_scenarios_to_check.keys())[1] + # concat_df.iloc[index, concat_df.columns.get_loc("scenario")] = f"{scenario_value_one}_{current_scenario_value}" + concat_df.loc[ + index, "scenario" + ] = f"{scenario_value_one}_{current_scenario_value}" + elif filter_level_index == 1: + key_for_scenario_one = list(dict_of_scenarios_to_check.keys())[0] + key_for_current_scenario = "" + return concat_df, key_for_scenario_one, key_for_current_scenario + + @staticmethod + def calculate_relative_electricity_demand(dataframe: pd.DataFrame) -> pd.DataFrame: + """Calculate relative electricity demand.""" + + # look for ElectricityMeter|Electricity|ElectrcityFromGrid output + if ( + "ElectricityMeter|Electricity|ElectricityFromGrid" + not in dataframe.variable.values + ): + raise ValueError( + "ElectricityMeter|Electricity|ElectricityFromGrid was not found in variables." + ) + + # filter again just to be shure + filtered_data = dataframe.loc[ + dataframe.variable == "ElectricityMeter|Electricity|ElectricityFromGrid" + ] + + if "share_of_maximum_pv_power" not in filtered_data.columns: + raise ValueError( + "share_of_maximum_pv_power was not found in dataframe columns" + ) + # sort df accrofing to share of pv + filtered_data = filtered_data.sort_values("share_of_maximum_pv_power") + + # iterate over all scenarios + for scenario in list(set(filtered_data.scenario.values)): + + if "share_of_maximum_pv_power" not in scenario: + + df_for_one_scenario = filtered_data.loc[ + filtered_data.scenario == scenario + ] + + df_for_one_scenario_and_for_share_zero = df_for_one_scenario.loc[ + df_for_one_scenario.share_of_maximum_pv_power == 0 + ] + + reference_value_for_electricity_demand = ( + df_for_one_scenario_and_for_share_zero.value.values + ) + relative_electricity_demand = [0] * len( + reference_value_for_electricity_demand + ) + + # get reference value (when share of pv power is zero) + for share_of_maximum_pv_power in list( + set(df_for_one_scenario.share_of_maximum_pv_power.values) + ): + + if share_of_maximum_pv_power != 0: + + df_for_one_scenario_and_for_one_share = df_for_one_scenario.loc[ + df_for_one_scenario.share_of_maximum_pv_power + == share_of_maximum_pv_power + ] + + value_for_electricity_demand = ( + df_for_one_scenario_and_for_one_share.value.values + ) + + # calculate reference electricity demand for each scenario and share of pv power + relative_electricity_demand = ( + value_for_electricity_demand + / reference_value_for_electricity_demand + * 100 + ) + + new_df_only_with_relative_electricity_demand = copy.deepcopy( + df_for_one_scenario_and_for_one_share + ) + new_df_only_with_relative_electricity_demand.loc[ + :, "variable" + ] = "Relative Electricity Demand" + new_df_only_with_relative_electricity_demand.loc[ + :, "unit" + ] = "%" + new_df_only_with_relative_electricity_demand.loc[ + :, "value" + ] = relative_electricity_demand + + del df_for_one_scenario_and_for_one_share + + dataframe = pd.concat( + [dataframe, new_df_only_with_relative_electricity_demand] + ) + del dataframe["Unnamed: 0"] + del new_df_only_with_relative_electricity_demand + + else: + + df_for_one_scenario = filtered_data.loc[ + filtered_data.scenario == scenario + ] + share_of_maximum_pv_power = df_for_one_scenario[ + "share_of_maximum_pv_power" + ].values[0] + + if share_of_maximum_pv_power == 0: + + relative_electricity_demand = [0] * len(df_for_one_scenario) + + else: + + value_for_electricity_demand = df_for_one_scenario.value.values + df_for_one_scenario_and_for_share_zero = filtered_data.loc[ + filtered_data.share_of_maximum_pv_power == 0 + ] + reference_value_for_electricity_demand = ( + df_for_one_scenario_and_for_share_zero.value.values + ) + + # calculate reference electricity demand for each scenario and share of pv power + relative_electricity_demand = ( + 1 + - ( + ( + reference_value_for_electricity_demand + - value_for_electricity_demand + ) + / reference_value_for_electricity_demand + ) + ) * 100 + + new_df_only_with_relative_electricity_demand = copy.deepcopy( + df_for_one_scenario + ) + new_df_only_with_relative_electricity_demand.loc[ + :, "variable" + ] = "Relative Electricity Demand" + new_df_only_with_relative_electricity_demand.loc[:, "unit"] = "%" + new_df_only_with_relative_electricity_demand.loc[ + :, "value" + ] = relative_electricity_demand + + del df_for_one_scenario + + dataframe = pd.concat( + [dataframe, new_df_only_with_relative_electricity_demand] + ) + + del dataframe["Unnamed: 0"] + del new_df_only_with_relative_electricity_demand + + return dataframe + + +class FilterClass: + + """Class for setting filters on the data for processing.""" + + def __init__(self): + """Initialize the class.""" + + ( + self.kpi_data, + self.electricity_data, + self.occuancy_consumption, + self.heating_demand, + self.variables_for_debugging_purposes, + ) = self.get_variables_to_check() + ( + self.building_type, + self.building_refurbishment_state, + self.building_age, + self.pv_share, + ) = self.get_scenarios_to_check() + + def get_variables_to_check(self): + """Get specific variables to check for the scenario evaluation.""" + + # system_setups for variables to check (check names of your variables before your evaluation, if they are correct) + # kpi data has no time series, so only choose when you analyze yearly data + kpi_data = [ + "Production", + "Consumption", + "Self-consumption", + "Self-consumption rate", + "Autarky rate", + "Investment costs for equipment per simulated period", + "CO2 footprint for equipment per simulated period", + "System operational costs for simulated period", + "System operational emissions for simulated period", + "Total costs for simulated period", + "Total emissions for simulated period", + "Temperature deviation of building indoor air temperature being below set temperature 19.0 °C", + "Minimum building indoor air temperature reached", + "Temperature deviation of building indoor air temperature being above set temperature 24.0 °C", + "Maximum building indoor air temperature reached", + "Building heating load", + "Specific heating load", + "Number of heat pump cycles", + "Seasonal performance factor of heat pump", + "Total energy from electricity grid", + "Total energy to electricity grid", + ] + + electricity_data = [ + "ElectricityMeter|Electricity|ElectricityToGrid", + "ElectricityMeter|Electricity|ElectricityFromGrid", + "ElectricityMeter|Electricity|ElectricityAvailable", + # if you analyze a house with ems the production and consumption values of the electricity meter are not representative + # use the ems production and consumption or the kpi values instead if needed + # "ElectricityMeter|Electricity|ElectricityConsumption", + # "ElectricityMeter|Electricity|ElectricityProduction", + ] + + occuancy_consumption = [ + "Occupancy|Electricity|ElectricityOutput", + "Occupancy|WarmWater|WaterConsumption", + ] + + heating_demand = [ + "AdvancedHeatPumpHPLib|Heating|ThermalOutputPower", + "Building|Temperature|TemperatureIndoorAir", + ] + variables_for_debugging_purposes = [ + "AdvancedHeatPumpHPLib|Heating|ThermalOutputPower", + "Building|Temperature|TemperatureIndoorAir", + "AdvancedHeatPumpHPLib|Any|COP", + "Battery_w1|Any|StateOfCharge", + ] + + return ( + kpi_data, + electricity_data, + occuancy_consumption, + heating_demand, + variables_for_debugging_purposes, + ) + + def get_scenarios_to_check(self): + """Get scenarios to check for scenario evaluation.""" + + ( + building_type, + building_refurbishment_state, + building_age, + ) = self.get_building_properties_to_check() + + pv_share = self.get_pv_properties_to_check() + + return building_type, building_refurbishment_state, building_age, pv_share + + def get_building_properties_to_check(self): + """Get building properties.""" + + # system_setups for scenarios to filter + building_type = [ + "DE.N.SFH", + "DE.N.TH", + "DE.N.MFH", + "DE.N.AB", + ] + + building_refurbishment_state = [ + "001.001", + "001.002", + "001.003", + ] + + building_age = [ + "01.Gen", + "02.Gen", + "03.Gen", + "04.Gen", + "05.Gen", + "06.Gen", + "07.Gen", + "08.Gen", + "09.Gen", + "10.Gen", + "11.Gen", + "12.Gen", + ] + + return building_type, building_refurbishment_state, building_age + + def get_pv_properties_to_check(self): + """Get pv properties.""" + + # system_setups for scenarios to filter + pv_share = [0, 0.25, 0.5, 1] + + return pv_share diff --git a/hisim/postprocessing/pyam_analysis_complete.py b/hisim/postprocessing/scenario_evaluation/scenario_analysis_complete.py similarity index 74% rename from hisim/postprocessing/pyam_analysis_complete.py rename to hisim/postprocessing/scenario_evaluation/scenario_analysis_complete.py index fb42a62e8..86720602c 100644 --- a/hisim/postprocessing/pyam_analysis_complete.py +++ b/hisim/postprocessing/scenario_evaluation/scenario_analysis_complete.py @@ -1,14 +1,18 @@ -"""Data Collection for Scenario Comparison with Pyam.""" +"""Data Collection for Scenario Comparison.""" # clean import time import os from typing import Any, List, Optional, Dict -from hisim.postprocessing import pyam_data_collection, pyam_data_processing +from hisim.postprocessing.scenario_evaluation import ( + result_data_collection, + result_data_processing, + result_data_plotting, +) -class PyamDataAnalysis: +class ScenarioAnalysis: - """PyamDataAnalysis class which executes pyam data collection and processing.""" + """ScenarioAnalysis class which executes result data collection, processing and plotting.""" def __init__( self, @@ -22,28 +26,29 @@ def __init__( ) -> None: """Initialize the class.""" - pyam_data_collection.PyamDataCollector( + result_data_collection.ResultDataCollection( data_processing_mode=data_processing_mode, folder_from_which_data_will_be_collected=folder_from_which_data_will_be_collected, path_to_default_config=path_to_default_config, time_resolution_of_data_set=time_resolution_of_data_set, simulation_duration_to_check=simulation_duration_to_check, ) - pyam_data_processing.PyAmChartGenerator( + result_data_plotting.ScenarioChartGeneration( simulation_duration_to_check=simulation_duration_to_check, time_resolution_of_data_set=time_resolution_of_data_set, data_processing_mode=data_processing_mode, variables_to_check=variables_to_check, dict_of_scenarios_to_check=dict_with_scenarios_to_check, + folder_from_which_data_will_be_collected=folder_from_which_data_will_be_collected, ) def main(): - """Main function to execute the pyam data analysis.""" + """Main function to execute the scenario analysis.""" - # Inputs for pyam analysis + # Inputs for scenario analysis # ------------------------------------------------------------------------------------------------------------------------------------- - time_resolution_of_data_set = pyam_data_collection.PyamDataTypeEnum.YEARLY + time_resolution_of_data_set = result_data_collection.ResultDataTypeEnum.YEARLY cluster_storage_path = "/fast/home/k-rieck/" # cluster_storage_path = "/storage_cluster/projects/2024-k-rieck-hisim-mass-simulations/hisim_results/results/" @@ -64,11 +69,14 @@ def main(): simulation_duration_to_check = str(365) data_processing_mode = ( - pyam_data_collection.PyamDataProcessingModeEnum.PROCESS_FOR_DIFFERENT_BUILDING_CODES + result_data_collection.ResultDataProcessingModeEnum.PROCESS_ALL_DATA ) - filterclass = pyam_data_processing.FilterClass() - list_with_variables_to_check = filterclass.electricity_data + filterclass.kpi_data + filterclass = result_data_processing.FilterClass() + # list_with_variables_to_check = ( + # filterclass.variables_for_debugging_purposes + filterclass.kpi_data + # ) + list_with_variables_to_check = filterclass.electricity_data # TODO: filter several scenario parameters (eg pv and building code together) not working yet, need to be fixed # dict_with_scenarios_to_check = {"share_of_maximum_pv_power": filterclass.pv_share,"building_code": ["DE.N.SFH.05.Gen.ReEx.001.002"]} @@ -80,15 +88,15 @@ def main(): # "DE.N.AB", # ] # } - dict_with_scenarios_to_check = { - "building_code": filterclass.building_refurbishment_state - } + # dict_with_scenarios_to_check = { + # "building_code": filterclass.building_age + # } # dict_with_scenarios_to_check = {"share_of_maximum_pv_power": filterclass.pv_share} - # dict_with_scenarios_to_check = None + dict_with_scenarios_to_check = None # ------------------------------------------------------------------------------------------------------------------------------------- - PyamDataAnalysis( + ScenarioAnalysis( folder_from_which_data_will_be_collected=folder_from_which_data_will_be_collected, time_resolution_of_data_set=time_resolution_of_data_set, path_to_default_config=path_to_default_config, diff --git a/hisim/postprocessingoptions.py b/hisim/postprocessingoptions.py index e99d3c428..0967256b7 100644 --- a/hisim/postprocessingoptions.py +++ b/hisim/postprocessingoptions.py @@ -27,6 +27,6 @@ class PostProcessingOptions(IntEnum): COMPUTE_OPEX = 18 COMPUTE_CAPEX = 19 COMPUTE_AND_WRITE_KPIS_TO_REPORT = 20 - PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION_WITH_PYAM = 21 + PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION = 21 MAKE_RESULT_JSON_WITH_KPI_FOR_WEBTOOL = 22 WRITE_COMPONENT_CONFIGS_TO_JSON = 23 diff --git a/obsolete/obsolete_compute_kpis.py b/obsolete/obsolete_compute_kpis.py new file mode 100644 index 000000000..aebb2190a --- /dev/null +++ b/obsolete/obsolete_compute_kpis.py @@ -0,0 +1,116 @@ +# clean + +"""Postprocessing option computes overall consumption, production,self-consumption and injection as well as selfconsumption rate and autarky rate.""" + +import os +from typing import List, Tuple, Union, Any +from pathlib import Path + +import pandas as pd + +from hisim.component import ComponentOutput +from hisim.loadtypes import ComponentType, InandOutputType, LoadTypes +from hisim.modular_household.interface_configs.kpi_config import KPIConfig +from hisim.simulationparameters import SimulationParameters +from hisim.utils import HISIMPATH +from hisim.component_wrapper import ComponentWrapper + +from hisim import log +from hisim.postprocessing.investment_cost_co2 import compute_investment_cost + +from hisim.components import generic_hot_water_storage_modular + + +def compute_energy_from_power( + power_timeseries: pd.Series, timeresolution: int +) -> float: + """Computes the energy from a power value.""" + if power_timeseries.empty: + return 0.0 + return float(power_timeseries.sum() * timeresolution / 3.6e6) + +def compute_hot_water_storage_losses_and_cycles( + components: List[ComponentWrapper], + all_outputs: List, + results: pd.DataFrame, + timeresolution: int, +) -> Tuple[float, float, float, float, float, float]: + """Computes hot water storage losses and cycles.""" + + # initialize columns consumption, production, battery_charge, battery_discharge, storage + charge_sum_dhw = 0.0 + charge_sum_buffer = 0.0 + discharge_sum_dhw = 0.0 + discharge_sum_buffer = 0.0 + cycle_buffer = None + cycle_dhw = None + + # get cycle of water storages + for elem in components: + if isinstance( + elem.my_component, generic_hot_water_storage_modular.HotWaterStorage + ): + use = elem.my_component.use + if use == ComponentType.BUFFER: + cycle_buffer = elem.my_component.config.energy_full_cycle + elif use == ComponentType.BOILER: + cycle_dhw = elem.my_component.config.energy_full_cycle + + for index, output in enumerate(all_outputs): + if output.postprocessing_flag is not None: + if InandOutputType.CHARGE in output.postprocessing_flag: + if InandOutputType.WATER_HEATING in output.postprocessing_flag: + charge_sum_dhw = charge_sum_dhw + compute_energy_from_power( + power_timeseries=results.iloc[:, index], + timeresolution=timeresolution, + ) + elif InandOutputType.HEATING in output.postprocessing_flag: + charge_sum_buffer = charge_sum_buffer + compute_energy_from_power( + power_timeseries=results.iloc[:, index], + timeresolution=timeresolution, + ) + elif InandOutputType.DISCHARGE in output.postprocessing_flag: + if ComponentType.BOILER in output.postprocessing_flag: + discharge_sum_dhw = discharge_sum_dhw + compute_energy_from_power( + power_timeseries=results.iloc[:, index], + timeresolution=timeresolution, + ) + elif ComponentType.BUFFER in output.postprocessing_flag: + discharge_sum_buffer = ( + discharge_sum_buffer + + compute_energy_from_power( + power_timeseries=results.iloc[:, index], + timeresolution=timeresolution, + ) + ) + else: + continue + if cycle_dhw is not None: + cycles_dhw = charge_sum_dhw / cycle_dhw + else: + cycles_dhw = 0 + log.error( + "Energy of full cycle must be defined in config of modular hot water storage to compute the number of cycles. " + ) + storage_loss_dhw = charge_sum_dhw - discharge_sum_dhw + if cycle_buffer is not None: + cycles_buffer = charge_sum_buffer / cycle_buffer + else: + cycles_buffer = 0 + log.error( + "Energy of full cycle must be defined in config of modular hot water storage to compute the number of cycles. " + ) + storage_loss_buffer = charge_sum_buffer - discharge_sum_buffer + if cycle_buffer == 0: + building_heating = charge_sum_buffer + else: + building_heating = discharge_sum_buffer + + return ( + cycles_dhw, + storage_loss_dhw, + discharge_sum_dhw, + cycles_buffer, + storage_loss_buffer, + building_heating, + ) diff --git a/system_setups/decentralized_energy_netw_pv_bat_hydro_system_hp_pem.py b/system_setups/decentralized_energy_netw_pv_bat_hydro_system_hp_pem.py index a45067232..21d75b49a 100644 --- a/system_setups/decentralized_energy_netw_pv_bat_hydro_system_hp_pem.py +++ b/system_setups/decentralized_energy_netw_pv_bat_hydro_system_hp_pem.py @@ -101,7 +101,7 @@ def setup_function( PostProcessingOptions.COMPUTE_OPEX ) my_simulation_parameters.post_processing_options.append( - PostProcessingOptions.PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION_WITH_PYAM + PostProcessingOptions.PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION ) """ diff --git a/system_setups/decentralized_energy_netw_pv_bat_hydro_system_hp_rsoc.py b/system_setups/decentralized_energy_netw_pv_bat_hydro_system_hp_rsoc.py index 5f251e892..60a91ec71 100644 --- a/system_setups/decentralized_energy_netw_pv_bat_hydro_system_hp_rsoc.py +++ b/system_setups/decentralized_energy_netw_pv_bat_hydro_system_hp_rsoc.py @@ -96,7 +96,7 @@ def setup_function( PostProcessingOptions.COMPUTE_OPEX ) my_simulation_parameters.post_processing_options.append( - PostProcessingOptions.PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION_WITH_PYAM + PostProcessingOptions.PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION ) my_advanced_battery_config_1 = ( diff --git a/system_setups/household_cluster_advanced_hp_pv_battery_ems.py b/system_setups/household_cluster_advanced_hp_pv_battery_ems.py index 4cb8f8651..721eb3399 100644 --- a/system_setups/household_cluster_advanced_hp_pv_battery_ems.py +++ b/system_setups/household_cluster_advanced_hp_pv_battery_ems.py @@ -97,7 +97,7 @@ def setup_function( year=year, seconds_per_timestep=seconds_per_timestep ) my_simulation_parameters.post_processing_options.append( - PostProcessingOptions.PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION_WITH_PYAM + PostProcessingOptions.PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION ) my_simulation_parameters.post_processing_options.append( PostProcessingOptions.COMPUTE_OPEX diff --git a/system_setups/household_cluster_reference_advanced_hp.py b/system_setups/household_cluster_reference_advanced_hp.py index 64923e3f0..d6e5bac18 100644 --- a/system_setups/household_cluster_reference_advanced_hp.py +++ b/system_setups/household_cluster_reference_advanced_hp.py @@ -123,7 +123,7 @@ def setup_function( year=year, seconds_per_timestep=seconds_per_timestep ) my_simulation_parameters.post_processing_options.append( - PostProcessingOptions.PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION_WITH_PYAM + PostProcessingOptions.PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION ) my_simulation_parameters.post_processing_options.append( PostProcessingOptions.COMPUTE_OPEX diff --git a/system_setups/power_to_x_transformation_battery_electrolyzer_grid.py b/system_setups/power_to_x_transformation_battery_electrolyzer_grid.py index b3823c855..b44757e75 100644 --- a/system_setups/power_to_x_transformation_battery_electrolyzer_grid.py +++ b/system_setups/power_to_x_transformation_battery_electrolyzer_grid.py @@ -102,7 +102,7 @@ def setup_function( PostProcessingOptions.MAKE_NETWORK_CHARTS ) my_simulation_parameters.post_processing_options.append( - PostProcessingOptions.PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION_WITH_PYAM + PostProcessingOptions.PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION ) my_simulation_parameters.post_processing_options.append( PostProcessingOptions.GENERATE_PDF_REPORT diff --git a/system_setups/power_to_x_transformation_battery_electrolyzer_no_grid.py b/system_setups/power_to_x_transformation_battery_electrolyzer_no_grid.py index f35d24d70..ef4edd6a9 100644 --- a/system_setups/power_to_x_transformation_battery_electrolyzer_no_grid.py +++ b/system_setups/power_to_x_transformation_battery_electrolyzer_no_grid.py @@ -101,7 +101,7 @@ def setup_function( PostProcessingOptions.MAKE_NETWORK_CHARTS ) my_simulation_parameters.post_processing_options.append( - PostProcessingOptions.PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION_WITH_PYAM + PostProcessingOptions.PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION ) my_simulation_parameters.post_processing_options.append( PostProcessingOptions.GENERATE_PDF_REPORT diff --git a/tests/test_house_with_pyam_postprocessingoption.py b/tests/test_house_with_pyam_postprocessingoption.py index de7a0d1a9..b024de87a 100644 --- a/tests/test_house_with_pyam_postprocessingoption.py +++ b/tests/test_house_with_pyam_postprocessingoption.py @@ -66,7 +66,7 @@ def test_house_with_pyam( year=year, seconds_per_timestep=seconds_per_timestep ) my_simulation_parameters.post_processing_options.append( - postprocessingoptions.PostProcessingOptions.PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION_WITH_PYAM + postprocessingoptions.PostProcessingOptions.PREPARE_OUTPUTS_FOR_SCENARIO_EVALUATION ) # this part is copied from hisim_main