Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ldo scenarios #132

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
273 changes: 273 additions & 0 deletions hhnk_threedi_tools/breaches/breaches.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
# %%
"""
Each class has a file as its attributes and a file as a new class.
self.base is the directory in which it is located
"""

# First-party imports
import os
from pathlib import Path

from hhnk_research_tools import Folder

FOLDER_STRUCTURE = """
Main Breaches object
├── 01_NetCDF
│ ├── aggregate_results_3di.nc
│ ├── gridadmin.h5
│ ├── gridadmin.sqlite
│ └── results_3di.nc
├── 02_JPEG
│ ├── overstroming.png
│ ├── breach.name.png
│ └── agg.png
├── 03_ssm
│ ├── max_flow_velocity.tif
│ └── max_raste_of_rise.tif
│ └── max_waterdepth.tif
│ └── max_waterlevel.tif
└── 04_wss
│ ├── dem_clip.vrt
│ ├── grid_raw.gpkg
│ ├── mask_flood.gpkg
│ └── max_wdepth_orig.tif
│ ├── new_grid.gpkg
│ ├── nodeid.tif
│ └── landuse_2021_clip.vrt

"""


class Breaches(Folder):
__doc__ = f"""

--------------------------------------------------------------------------
An object to ease the accessibility, creation and checks of folders and
files in the polder structure.

Usage as follows:
- Access class with te path to the main folder (e.g., E:\03.resultaten\Overstromingsberekeningen primaire doorbraken 2024\output\ROR PRI - dijktrajecten 13-8 en 13-9 - Stroom_ZUID_T10_T3000\ROR-PRI-UITDAMMERDIJK_8-T100)
- Find your way through by using folder.show
- Check if a file or folder exists using .exists
- Show all (needed) files using .files
- Show all (needed) layers using .layers
- Return a path of a file using either str() or .path

Example code:
folder = Folders('E:\03.resultaten\Overstromingsberekeningen primaire doorbraken 2024\output\ROR PRI - dijktrajecten 13-8 en 13-9 - Stroom_ZUID_T10_T3000\ROR-PRI-UITDAMMERDIJK_8-T100')

folder.show

Output:
Heiloo @ E:\03.resultaten\Overstromingsberekeningen primaire doorbraken 2024\output\ROR PRI - dijktrajecten 13-8 en 13-9 - Stroom_ZUID_T10_T3000\ROR-PRI-UITDAMMERDIJK_8-T100
Folders:
Folders
├── 01_NetCDF
├── 02_JPEG
├── 03_SSM
└── 04_WSS

Files: []
Layers: []


folder.source_data.show

Output:

01_Source_data @ C:/Poldermodellen/Heiloo/01_Source_data
Folders:
source_data
└── modelbuilder

Files: ['aggregate_results_3di', 'gridadmin', 'results_3di', ...]


{FOLDER_STRUCTURE}

"""

def __init__(self, base, create=True):
super().__init__(base, create=create)

# NetCDF
self.netcdf = NetCDF(self.base, create=create)

# model files
self.jpeg = JPEG(self.base, create=create)

# Threedi results
self.ssm = SSM(self.base, create=create)

# Results of tests
self.wss = WSS(self.base, create=create)

@property
def structure(self):
return f"""
{self.space}Folders
{self.space}├── 01_NetCDF (.netcdf)
{self.space}├── 02_JPEG (.jpeg)
{self.space}├── 03_SSM (.ssm)
{self.space}└── 04_WSS (.wss)
"""

@property
def full_structure(self):
return print(FOLDER_STRUCTURE)

def to_file_dict(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I delete lines 116 and 117. With the previous function (lines 98 to 105) is more than enough

"""
Return dictionary containing paths to source files according to set project structure.

build_base_paths_dict(
polder_path (string: path to project folder (highest level))
)
"""
return {
"aggregate_results_3di": self.netcdf.aggregate_results_3di.path_if_exists,
"gridadmin": self.netcdf.gridadmin.path_if_exists,
"gridadmin": self.netcdf.gridadmin.path_if_exists,
"results_3di": self.netcdf.results_3di.path_if_exists,
# "channels_shapefile": self.source_data.modelbuilder.channel_from_profiles.path_if_exists,
# jpeg
"graph": self.jpeg.graph.path_if_exists,
"agg_graph": self.jpeg.agg_graph.path_if_exists,
"overstroming": self.jpeg.overstroming.path_if_exists,
# wss
"dem_clip": self.wss.dem_clip.path_if_exists,
"grid_raw": self.wss.grid_raw.path_if_exists,
"mask_flood": self.wss.mask_flood.path_if_exists,
"max_wdepth_orig": self.wss.max_wdepth_orig.path_if_exists,
"new_grid": self.wss.new_grid.path_if_exists,
"nodeid": self.wss.nodeid.path_if_exists,
"landuse_2021_clip": self.wss.landuse_2021_clip.path_if_exists,
# ssm
"max_flow_velocity_5m": self.ssm.max_flow_velocity_5m.path_if_exists,
"max_rate_of_rise_5m": self.ssm.max_rate_of_rise_5m.sqlite_tests.path_if_exists,
"max_waterdepth_5m": self.ssm.max_waterdepth_5m.path_if_exists,
"max_waterlevel_5m": self.ssm.max_waterlevel_5m.path_if_exists,
}

@classmethod
def is_valid(self, folderpath):
"""Check if folder stucture is available in input folder."""
SUB_FOLDERS = ["01_NetCDF", "02_JPEG", "03_SSM", "04_WSS"]
return all([Path(folderpath).joinpath(i).exists() for i in SUB_FOLDERS])


class NetCDF(Folder):
"""Path to netcdf data (aggregate_results_3di, gridadmin, results_3di)"""

def __init__(self, base, create):
super().__init__(os.path.join(base, "01_NetCDF"), create)

# Folders

if create:
self.create_readme()

# Files

def create_readme(self):
readme_txt = (
"Expected files are:\n\n"
"aggregate_results_3di (*.nc) named 'aggregate_results_3di.nc'\n"
"gridadmin (*.h5) named 'gridadmin.h5'\n"
"gridadmin (*.sqlite) named 'gridadmin.sqlite'\n"
"log_files (*..zip) named 'log_files.zip'\n"
)
with open(os.path.join(self.base, "read_me.txt"), mode="w") as f:
f.write(readme_txt)

@property
def structure(self):
return f"""
{self.space}01_NetCDF
{self.space}└── aggregate_results_3di
{self.space}└── gridadmin
{self.space}└── results_3di

"""


class JPEG(Folder):
"""Parent folder with all the images created for the final product. They included:
breach graph and it aggregation, and also all the maps that can be generated.
"""

def __init__(self, base, create):
super().__init__(os.path.join(base, "02_JPEG"), create)

if create:
self.create_readme()

def create_readme(self):
readme_txt = (
"This folder is the default folder where the images and maps "
"are stored. The inner structure of these result folders "
"is automatically generated"
)
with open(os.path.join(self.base, "read_me.txt"), mode="w") as f:
f.write(readme_txt)

def __repr__(self):
return f"""{self.name} @ {self.base}
Folders:\t{self.structure}
Files:\t{list(self.files.keys())}
"""


class SSM(Folder):
"""
Folder in which rasters from lizard can be saved

to use with list indexing use the following options:

"""

def __init__(self, base, create):
super().__init__(os.path.join(base, "03_SSM"), create)

# Folders

if create:
self.create_readme()

def create_readme(self):
readme_txt = (
"Expected files are:tif files"
"In this folder we are going to store the raster downloaded from lizard "
"which include: max_flow_velocity, max_rate_rise, max_waterdepth, waterlelvel"
)
with open(os.path.join(self.base, "read_me.txt"), mode="w") as f:
f.write(readme_txt)


# 1d2d output
class WSS(Folder):
"""
Output paths are only defined up to the foldername
of the test, because we can internally decide on the
filenames of logfiles and generated layers (these
paths are not up to the user)
"""

def __init__(self, base, create):
super().__init__(os.path.join(base, "04_WSS"), create)

if create:
self.create_readme()

def create_readme(self):
readme_txt = (
"This folder is the default folder where all the files to calculate the waterdepth raster including the waterdepth raster"
"are stored. The inner structure of these result folders "
"is automatically generated"
)
with open(os.path.join(self.base, "read_me.txt"), mode="w") as f:
f.write(readme_txt)


# %%
103 changes: 103 additions & 0 deletions hhnk_threedi_tools/breaches/delete_scenarios.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# %%
import pandas as pd
import requests

# place the path of the metadata to be fill
csv_location = pd.read_excel()


# Json function to be use in case there is a json file to read
def extract_ids(json_data):
ids = []
for item in json_data["items"]:
if "id" in item:
ids.append(item["id"])
return ids


# %%
# Test API connection
# Copy the hearders from the swagger website.
health = "https://www.overstromingsinformatie.nl/auth/health/"
headers = {
"accept": "application/json",
"content-type": "application/json",
"X-CSRFToken": "29TVZT8M1YqAYO8xtU4jqeGYSA6zijTNarMpXLanXDacq0lGcrUsEESrEL17E79r",
}
response_health = requests.get(url=health, headers=headers)
print(response_health.json())

# %%
well_know = "https://www.overstromingsinformatie.nl/auth/.well-known/jwks.json"
well_know_response = requests.get(url=well_know, headers=headers)
print(well_know_response.json())
# %%
# FOR ADMINISTRATION PERMISION USE THE FOLLOWING. Otherwise you will get a permission feedback
# at the moment you will try to upload the excel file.

parameters = {
"scope": "admin",
"name": "Juan_Test_12",
"expiry_date": "2024-12-13T06:54:04.597Z",
"revoked": False,
}


# Copy here the API key generated on the website

api_key_10_07_24 = "lask2hq6.JhTTsbYLI0j5FNF20JQNpubBaYpByIx0"
# %%
# Check Tenants
tenants = "Place here the API key"
response_tenants = requests.get(url=tenants, headers=headers, auth=("__key__", api_key_10_07_24))
print(response_tenants.json())

# %%
# Get Token
token_url = "https://www.overstromingsinformatie.nl/auth/v1/token/"
response_5 = requests.post(url=token_url, json={"tenant": 4}, auth=("__key__", api_key_10_07_24))
print(response_5.json())
refresh = response_5.json()["refresh"]

# Get the TokenRefresh
access = response_5.json()
refresh_url = "https://www.overstromingsinformatie.nl/auth/v1/token/refresh/"
data_refresh = {"refresh": response_5.json()["refresh"]}
response_refresh = requests.post(url=refresh_url, json=data_refresh, auth=("__key__", api_key_10_07_24))
response_refresh = response_refresh.json()
refresh_token = response_refresh["access"]
print(response_refresh)


# Get ids to delete
file_import_url = "https://www.overstromingsinformatie.nl/api/v1/scenarios?mode=private&limit=100&offset=0&order_by=id&status=incomplete"
headers_excel = {
"accept": "application/json",
"authorization": f"Bearer {refresh_token}",
# 'Content-type':'application/zip',
}
response_incomplete = requests.get(url=file_import_url, headers=headers_excel)

# Get json file from the link to be extracted
# %%

# id_scenarios = extract_ids(response_incomplete.json())
id_scenarios = csv_location["id_delete"].values

# %%
# id_scenarios = []
for id_scenario in id_scenarios:
file_import_url = f"https://www.overstromingsinformatie.nl/api/v1/scenarios/{id_scenario}"
# file_import_url = f'https://ldo.staging.lizard.net/api/v1/excel-imports/{id_excel}/files/{zip_name}/upload'
headers_excel = {
"accept": "application/json",
"authorization": f"Bearer {refresh_token}",
# 'Content-type':'application/zip',
}

response = requests.delete(url=file_import_url, headers=headers_excel)
print(file_import_url)
# %%


# %%
Loading
Loading