Skip to content

Commit

Permalink
Scenario 3 - Created the Energy Scenario (#781)
Browse files Browse the repository at this point in the history
examples/energy - An example of Ingestion and Processing Jobs with a Streamlit Dashboard

Why?
The objective of this example is to ingest a process in VDK two CSV datasets providing the Natural Gas price and the Climate extremes in the US. The example also builds a dashboard that shows results in streamlit.

What?
The change adds a new folder under the examples directory containing the complete example of the scenario.

How has it been tested?
The following manual tests have been run:

vdk run energy
streamlit run dashboard.py
What type of change are you making?
New Use Case

Signed-off-by: Angelica Lo Duca angelica.loduca [at] gmail.com
  • Loading branch information
alod83 authored May 11, 2022
1 parent 21a9163 commit ba1806b
Show file tree
Hide file tree
Showing 14 changed files with 343 additions and 0 deletions.
147 changes: 147 additions & 0 deletions examples/energy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Energy
The objective of this scenario is to build an app that shows the relationship between Natural Gas prices in the U.S. and the temperature.
The app uses VDK to ingest and process the following two CSV datasets:
* [Natural Gas Prices from 1997 to 2020](https://datahub.io/core/natural-gas)
* [U.S. Climate Extremes Index from 1910 to 2020](https://www.ncdc.noaa.gov/extremes/cei/graph/ne/01-12/2)


This example provides the complete code to perform the following tasks:

* Data Ingestion from CSV
* Data Processing
* Data Publication in form of an innteractive App

The directory is organized as follows:

* energy - contains jobs
* dashboard.py - contains the code to generate the app

## Scenario
An American natural gas supplier has hired a team of data scientists to find out if there is a correlation between price trends and climatic conditions (minimum temperatures).
Analysts found two datasets:
* [Natural Gas Prices from 1997 to 2020](https://datahub.io/core/natural-gas)
* [U.S. Climate Extremes Index from 1910 to 2020](https://www.ncdc.noaa.gov/extremes/cei/graph/ne/01-12/2)


## Data Source

The Natural Gas Prices dataset contains 5,953 records, provided by the U.S. Energy Information Administration(EIA). Each record contains the daily price of natural gas.
For each record, the following information is available:
* Date (provided on a daily basis)
* Price (in dollars)

The following table shows an excerpt of the dataset:
| Date | Price |
| ---------- | ----------------------------------------------------- |
| 1997-01-07 | 3.819999999999999840127884453977458178997039794921875 |
| 1997-01-08 | 3.79999999999999982236431605997495353221893310546875 |
| 1997-01-09 | 3.609999999999999875655021241982467472553253173828125 |
| 1997-01-10 | 3.9199999999999999289457264239899814128875732421875 |
| 1997-01-13 | 4 |
| 1997-01-14 | 4.0099999999999997868371792719699442386627197265625 |
| 1997-01-15 | 4.339999999999999857891452847979962825775146484375 |
| 1997-01-16 | 4.70999999999999996447286321199499070644378662109375 |
| 1997-01-17 | 3.910000000000000142108547152020037174224853515625 |
| 1997-01-20 | 3.2599999999999997868371792719699442386627197265625 |
| 1997-01-21 | 2.9900000000000002131628207280300557613372802734375 |

The U.S. Climate Extremes Index dataset contains 112 records. For each record, many parameters can be downloaded. Data Analysts select the following information:
* Date (provided on an annual basis)
* Percentage of minimum temperature much above normal
* Percentage of minimum temperature much below normal.

The following table shows an excerpt of the dataset:

| **Date** | Much Above Normal | Much Below Normal |
| -------- | ----------------- | ----------------- |
| **1910** | 0.00 | 3.90 |
| **1911** | 3.70 | 7.60 |
| **1912** | 0.00 | 52.80 |
| **1913** | 1.50 | 0.00 |
| **1914** | 0.00 | 79.40 |
| **1915** | 0.00 | 0.10 |
| **1916** | 0.00 | 18.40 |
| **1917** | 0.00 | 99.70 |
| **1918** | 0.00 | 45.40 |
| **1919** | 0.00 | 0.00 |

## Requirements

To run this example, you need:

* Versatile Data Kit
* Trino DB
* Versatile Data Kit plugin for Trino
* Streamlit

For more details on how to install Versatile Data Kit, Trino DB, Streamlit and Versatile Data Kit plugin for Trino, please refer to [this link](https://github.com/vmware/versatile-data-kit/tree/main/examples/life-expectancy).

### Other Requirements
This example also requires the following Python libraries, which are included in the requirement.txt file:
```
pandas
```

## Configuration
The following example uses the same configuration as the [Life Expectancy](https://github.com/vmware/versatile-data-kit/tree/main/examples/life-expectancy) scenario, with the only difference on the trino schema used by the Versatile Data Kit configuration file:
```
trino_schema = energy
```

## Data Ingestion
Data Ingestion uploads in the database output of CSV files, defined in the Data Source section. Data Ingestion is performed through the following steps:

* delete the existing table (if any)
* create a new table
* ingest table values directly from the CSV.

To access the CSV file, this example requires an active Internet connection to work properly.

Jobs 01-03 are devoted to Data Ingestion of the Natural Gas Prices dataset. The output of the CSV file is imported in a tables, named `nataural_gas_prices`.

Jobs 04-06 are devoted to Data Ingestion of the U.S. Climate Extremes Index dataset. The output of the CSV file is imported in a tables, named `climate_extremes_index`.

## Data Processing
Data Processing includes two steps:
* building an annual view of the `natural_gas_prices` with the average value of natural gas price
* merging the two tables, the previous one and `climate_extremes_index`. Values are normalized in the interval [0,1] thus making possible a comparison among them.

Jobs 07-08 are devoted to the first step. The produced output is stored in a table, called `average_gas_price_by_year`.

The following table shows an example of the produced table:

| **Year** | **Price** |
| -------- | ------------------ |
| **2002** | 3.3756000000000026 |
| **2010** | 4.369722222222219 |
| **2017** | 2.9880308880308877 |
| **2018** | 3.152661290322578 |
| **2006** | 6.731244979919681 |
| **2011** | 3.9963095238095225 |
| **1998** | 2.0883665338645407 |
| **2008** | 8.862529644268777 |
| **2009** | 3.942658730158732 |
| **2012** | 2.7544841269841265 |

Jobs 09-10 are devoted to the second step. The produced output is stored in a table, called `merged_tables`.

The following table shows an example of the produced table:

| **Year** | **NormPrice** | **NormTemperatureMuchAboveNormal** | **NormTemperatureMuchBelowNormal** |
| -------- | ------------------- | ---------------------------------- | ---------------------------------- |
| **2002** | 0.38088448055944574 | 0.054384017758046625 | 0 |
| **2010** | 0.4930558652684487 | 0.12430632630410655 | 0.14666666666666667 |
| **2017** | 0.3371532742870077 | 0.8213096559378469 | 0 |
| **2018** | 0.355729280111502 | 0.48834628190899004 | 0 |
| **2006** | 0.7595173443817639 | 0.43396226415094347 | 0 |
| **2011** | 0.45092199227721136 | 0.1598224195338513 | 0.013333333333333334 |
| **1998** | 0.2356400054712422 | 0.8135405105438402 | 0 |
| **2008** | 1 | 0.0033296337402885685 | 0.04 |
| **2009** | 0.4448683263596609 | 0.006659267480577137 | 0 |
| **2012** | 0.31080111859094284 | 0.8867924528301888 | 0 |

## Data Publication
The `dashboard.py` script contains an app on the Online Exhibition, showing the preview of all the artworks contained in the database. To run the app, simply run the following command:
```
streamlit run dashboard.py
```
64 changes: 64 additions & 0 deletions examples/energy/dashboard.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import math

import altair as alt
import numpy as np
import pandas as pd
import streamlit as st
import trino

# connect to server
conn = trino.dbapi.connect(
host="localhost",
port=8080,
user="root",
catalog="mysql",
http_scheme="HTTP",
verify=True,
schema="energy",
)


# Design the UI
st.markdown("# Does the Natural Gas Price in the US depend on the temperature?")

st.markdown(
"""We consider two types of temperature:
* much above normal
* much below normal.
The following figure shows that if the temperature is much above normal,
the gas price decreases. This happened in 1998, 2012, 2015-2018.
A temperature much below normal seems to not affect the gas price."""
)

df = pd.read_sql_query(
f"SELECT Year, NormPrice, NormTemperatureMuchAboveNormal, NormTemperatureMuchBelowNormal FROM merged_tables",
conn,
)

palette = ["#ff9494", "#ff5252", "#8B0000"]
indicators = [
"NormPrice",
"NormTemperatureMuchAboveNormal",
"NormTemperatureMuchBelowNormal",
]

price_bar = (
alt.Chart(df)
.mark_bar(size=30)
.encode(x="Year:O", y="NormPrice:Q", color=alt.value("#8B0000"))
)

temp_list = ["MuchAboveNormal", "MuchBelowNormal"]
temp_type = st.selectbox("Select Temperature", temp_list, key="temp_type")

temp_line = (
alt.Chart(df)
.mark_line()
.encode(x="Year:O", y=f"NormTemperature{temp_type}:Q", color=alt.value("#ff9494"))
)

c = (price_bar + temp_line).properties(height=350, width=800)

st.altair_chart(c)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS natural_gas_prices
4 changes: 4 additions & 0 deletions examples/energy/energy/02_create_table_natural_gas_prices.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE natural_gas_prices (
Date varchar,
Price double
)
27 changes: 27 additions & 0 deletions examples/energy/energy/03_ingest_table_natural_gas_prices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import inspect
import logging

import pandas as pd
from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
"""
Download datasets required by the scenario and put them in the data lake.
"""
log.info(f"Starting job step {__name__}")

# url of the Natural gas prices dataset
url = "https://datahub.io/core/natural-gas/r/daily.csv"

df = pd.read_csv(url)

job_input.send_tabular_data_for_ingestion(
df.itertuples(index=False),
destination_table="natural_gas_prices",
column_names=df.columns.tolist(),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS climate_extremes_index
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE climate_extremes_index (
Date varchar,
MuchAboveNormal double,
MuchBelowNormal double
)
36 changes: 36 additions & 0 deletions examples/energy/energy/06_ingest_table_create_extremes_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2021 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import inspect
import logging

import numpy as np
import pandas as pd
from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
"""
Download datasets required by the scenario and put them in the data lake.
"""
log.info(f"Starting job step {__name__}")

# url of the U.S. Climate Extremes Index (CEI) dataset
url = "https://www.ncdc.noaa.gov/extremes/cei/graph/us/01-12/2/data.csv"

dtypes = {
"Date": str,
"MuchAboveNormal": np.float64,
"MuchBelowNormal": np.float64,
}

df = pd.read_csv(url, skiprows=[0], dtype=dtypes)

df.columns = df.columns.str.replace(" ", "")

job_input.send_tabular_data_for_ingestion(
df.itertuples(index=False),
destination_table="climate_extremes_index",
column_names=df.columns.tolist(),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS average_gas_price_by_year
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE average_gas_price_by_year AS
(SELECT extract(YEAR from CAST(Date AS date)) AS Year,
avg(Price) AS Price
FROM natural_gas_prices
GROUP BY extract(YEAR from CAST(Date AS date))
)
1 change: 1 addition & 0 deletions examples/energy/energy/09_delete_table_merged_tables.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS merged_tables
11 changes: 11 additions & 0 deletions examples/energy/energy/10_create_table_merged_tables.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE merged_tables AS
(SELECT Year,
(Price / (SELECT max(Price) FROM average_gas_price_by_year)) AS NormPrice,
(MuchAboveNormal / (SELECT max(MuchAboveNormal) FROM climate_extremes_index
WHERE CAST(climate_extremes_index.Date AS integer) IN (SELECT Year from average_gas_price_by_year)
)) AS NormTemperatureMuchAboveNormal,
(MuchBelowNormal / (SELECT max(MuchBelowNormal) FROM climate_extremes_index WHERE CAST(climate_extremes_index.Date AS integer) IN (SELECT Year from average_gas_price_by_year)
)) AS NormTemperatureMuchBelowNormal
FROM average_gas_price_by_year INNER JOIN climate_extremes_index
ON average_gas_price_by_year.Year = CAST(climate_extremes_index.Date AS integer)
)
34 changes: 34 additions & 0 deletions examples/energy/energy/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
; Supported format: https://docs.python.org/3/library/configparser.html#supported-ini-file-structure

; This is the only file required to deploy a Data Job.
; Read more to understand what each option means:

; Information about the owner of the Data Job
[owner]

; Team is a way to group Data Jobs that belonged to the same team.
team = my-team

; Configuration related to running data jobs
[job]
; For format see https://en.wikipedia.org/wiki/Cron
; The cron expression is evaluated in UTC time.
; If it is time for a new job run and the previous job run hasn’t finished yet,
; the cron job waits until the previous execution has finished.
schedule_cron = 11 23 5 8 1

[vdk]
; Key value pairs of any configuration options that can be passed to vdk.
; For possible options in your vdk installation execute command vdk config-help
db_default_type=TRINO
ingest_method_default = trino

trino_catalog = mysql
trino_use_ssl = false
trino_host = localhost
trino_port = 8080
trino_user = root
trino_schema = energy
trino_ssl_verify = false

ingester_wait_to_finish_after_every_send = true
5 changes: 5 additions & 0 deletions examples/energy/energy/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
altair
pandas
streamlit
vdk-csv
vdk-ingest-file

0 comments on commit ba1806b

Please sign in to comment.