From 3be35e0bf8670ed8edc44e9af8e90a7d40ca27c8 Mon Sep 17 00:00:00 2001 From: Gang Tao Date: Wed, 27 Dec 2023 14:48:48 -0800 Subject: [PATCH 1/5] add coinbase sample --- examples/coinbase/Makefile | 10 +++ examples/coinbase/README.md | 91 ++++++++++++++++++++++++++++ examples/coinbase/ddl.sql | 53 ++++++++++++++++ examples/coinbase/docker-compose.yml | 34 +++++++++++ examples/coinbase/pipeline.yaml | 22 +++++++ 5 files changed, 210 insertions(+) create mode 100644 examples/coinbase/Makefile create mode 100644 examples/coinbase/README.md create mode 100644 examples/coinbase/ddl.sql create mode 100644 examples/coinbase/docker-compose.yml create mode 100644 examples/coinbase/pipeline.yaml diff --git a/examples/coinbase/Makefile b/examples/coinbase/Makefile new file mode 100644 index 00000000000..1943410d72e --- /dev/null +++ b/examples/coinbase/Makefile @@ -0,0 +1,10 @@ + + +PWD = $(shell pwd) +NAME = test + +create: + yq -o=json $(PWD)/pipeline.yaml | curl http://localhost:4195/streams/$(NAME) -X POST -d @- + +delete: + curl http://localhost:4195/streams/$(NAME) -X DELETE diff --git a/examples/coinbase/README.md b/examples/coinbase/README.md new file mode 100644 index 00000000000..1eae140ed80 --- /dev/null +++ b/examples/coinbase/README.md @@ -0,0 +1,91 @@ +# Demo for Benthos data pipeline and Coinbase websocket data + + + +This docker compose file demonstrates how to ingest websocket data into proton by using benthos pipeline. + + + +## Start the stack + +Simply run `docker compose up` in this folder. Three docker containers in the stack: + +1. ghcr.io/timeplus-io/proton:latest, as the streaming database +2. jeffail/benthos:latest, a [benthos](https://www.benthos.dev/) service as the data pipeline +3. init container, create the tickers stream when proton database server is ready + +the ddl to create the stream is: + +```sql +CREATE STREAM IF NOT EXISTS tickers ( + best_ask float64, + best_ask_size float64, + best_bid float64, + best_bid_size float64, + high_24h float64, + last_size float64, + low_24h float64, + open_24h float64, + price float64, + product_id string, + sequence int, + side string, + time datetime, + trade_id int, + type string, + volume_24h float64, + volume_30d float64 +) +``` + +## Create a ingest data pipeline + +run command `make create` to create following benthos data pipeline, note you need install `jq` and `curl` to run this command + +``` +input: + label: coinbase + websocket: + url: wss://ws-feed.exchange.coinbase.com + open_message: '{"type": "subscribe","product_ids": ["ETH-USD","ETH-EUR"],"channels": ["ticker"]}' + open_message_type: text + +output: + http_client: + url: http://proton:8123/proton/v1/ingest/streams/tickers + verb: POST + headers: + Content-Type: application/json + batching: + count: 10 + period: 1000ms + processors: + - archive: + format: json_array + - mapping: | + root.columns = this.index(0).keys() + root.data = this.map_each( row -> root.columns.map_each( key -> row.get(key)) ) + +``` + +this pipeline will read data from coinbase websocket and then send the result to proton ingest api in a batch + + +## Query you crypto price data with SQL + +now you can run following query to get the OHLC of the crypto data + +```sql +SELECT + window_start, product_id, earliest(price) AS open, max(price) AS high, min(price) AS low, latest(price) AS close, latest(volume_24h) as acc_vol +FROM + tumble(tickers, 60s) +WHERE + product_id != '' +GROUP BY + window_start, product_id +SETTINGS + seek_to = 'earliest' +``` + + diff --git a/examples/coinbase/ddl.sql b/examples/coinbase/ddl.sql new file mode 100644 index 00000000000..2ab60e0d3f4 --- /dev/null +++ b/examples/coinbase/ddl.sql @@ -0,0 +1,53 @@ +-- ddl creare stream +CREATE STREAM IF NOT EXISTS tickers ( + best_ask float64, + best_ask_size float64, + best_bid float64, + best_bid_size float64, + high_24h float64, + last_size float64, + low_24h float64, + open_24h float64, + price float64, + product_id string, + sequence int, + side string, + time datetime, + trade_id int, + type string, + volume_24h float64, + volume_30d float64 +) + +-- OHLC query +SELECT + window_start, product_id, earliest(price) AS open, max(price) AS high, min(price) AS low, latest(price) AS close, latest(volume_24h) as acc_vol +FROM + tumble(tickers, 60s) +WHERE + product_id != '' +GROUP BY + window_start, product_id +SETTINGS + seek_to = 'earliest' + +-- OHLCV query +WITH ohlc AS + ( + SELECT + window_start, product_id, earliest(price) AS open, max(price) AS high, min(price) AS low, latest(price) AS close, latest(volume_24h) AS acc_vol + FROM + tumble(tickers, 300s) + WHERE + product_id != '' + GROUP BY + window_start, product_id + SETTINGS + seek_to = 'earliest' + ) +SELECT + window_start, product_id, open, high, low, close, acc_vol, acc_vol - lag(acc_vol) AS vol +FROM + ohlc +PARTITION BY + product_id diff --git a/examples/coinbase/docker-compose.yml b/examples/coinbase/docker-compose.yml new file mode 100644 index 00000000000..548115067a7 --- /dev/null +++ b/examples/coinbase/docker-compose.yml @@ -0,0 +1,34 @@ +version: '3.7' +name: coinbase +services: + proton: + image: ghcr.io/timeplus-io/proton:latest + pull_policy: always + ports: + - "3218:3218" # HTTP Streaming + healthcheck: + test: ["CMD", "curl", "http://localhost:3218/proton/ping"] + interval: 2s + timeout: 10s + retries: 3 + start_period: 10s + + benthos: + image: jeffail/benthos:latest + pull_policy: always + command: streams + ports: + - "4195:4195" + + init-stream: + image: ghcr.io/timeplus-io/proton:latest + command: + - sh + - -c + - | + proton-client -h proton --query "CREATE STREAM IF NOT EXISTS tickers (best_ask float64, best_ask_size float64, best_bid float64, best_bid_size float64, high_24h float64, last_size float64, low_24h float64, open_24h float64, price float64, product_id string, sequence int, side string, time datetime, trade_id int, type string, volume_24h float64, volume_30d float64)" + depends_on: + proton: + condition: service_healthy + + diff --git a/examples/coinbase/pipeline.yaml b/examples/coinbase/pipeline.yaml new file mode 100644 index 00000000000..2d95b7044da --- /dev/null +++ b/examples/coinbase/pipeline.yaml @@ -0,0 +1,22 @@ +input: + label: coinbase + websocket: + url: wss://ws-feed.exchange.coinbase.com + open_message: '{"type": "subscribe","product_ids": ["ETH-USD","ETH-EUR"],"channels": ["ticker"]}' + open_message_type: text + +output: + http_client: + url: http://proton:8123/proton/v1/ingest/streams/tickers + verb: POST + headers: + Content-Type: application/json + batching: + count: 10 + period: 1000ms + processors: + - archive: + format: json_array + - mapping: | + root.columns = this.index(0).keys() + root.data = this.map_each( row -> root.columns.map_each( key -> row.get(key)) ) From c6642aae040ebba7ce4e04aa8653992a59282174 Mon Sep 17 00:00:00 2001 From: Gang Tao Date: Wed, 27 Dec 2023 19:10:22 -0800 Subject: [PATCH 2/5] update sql --- examples/coinbase/README.md | 4 +--- examples/coinbase/ddl.sql | 32 -------------------------------- 2 files changed, 1 insertion(+), 35 deletions(-) diff --git a/examples/coinbase/README.md b/examples/coinbase/README.md index 1eae140ed80..93e123f6027 100644 --- a/examples/coinbase/README.md +++ b/examples/coinbase/README.md @@ -81,11 +81,9 @@ SELECT FROM tumble(tickers, 60s) WHERE - product_id != '' + product_id != '' and _tp_time > earliest_ts() GROUP BY window_start, product_id -SETTINGS - seek_to = 'earliest' ``` diff --git a/examples/coinbase/ddl.sql b/examples/coinbase/ddl.sql index 2ab60e0d3f4..82633c22509 100644 --- a/examples/coinbase/ddl.sql +++ b/examples/coinbase/ddl.sql @@ -19,35 +19,3 @@ CREATE STREAM IF NOT EXISTS tickers ( volume_30d float64 ) --- OHLC query -SELECT - window_start, product_id, earliest(price) AS open, max(price) AS high, min(price) AS low, latest(price) AS close, latest(volume_24h) as acc_vol -FROM - tumble(tickers, 60s) -WHERE - product_id != '' -GROUP BY - window_start, product_id -SETTINGS - seek_to = 'earliest' - --- OHLCV query -WITH ohlc AS - ( - SELECT - window_start, product_id, earliest(price) AS open, max(price) AS high, min(price) AS low, latest(price) AS close, latest(volume_24h) AS acc_vol - FROM - tumble(tickers, 300s) - WHERE - product_id != '' - GROUP BY - window_start, product_id - SETTINGS - seek_to = 'earliest' - ) -SELECT - window_start, product_id, open, high, low, close, acc_vol, acc_vol - lag(acc_vol) AS vol -FROM - ohlc -PARTITION BY - product_id From 6baf2b0c102074e910218cea8536092217806a85 Mon Sep 17 00:00:00 2001 From: Gang Tao Date: Wed, 27 Dec 2023 19:50:50 -0800 Subject: [PATCH 3/5] update readme --- examples/coinbase/README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/coinbase/README.md b/examples/coinbase/README.md index 93e123f6027..943dc5545e7 100644 --- a/examples/coinbase/README.md +++ b/examples/coinbase/README.md @@ -2,7 +2,7 @@ -This docker compose file demonstrates how to ingest websocket data into proton by using benthos pipeline. +This docker compose file demonstrates how to ingest WebSocket data into Proton by using Benthos pipeline. @@ -11,8 +11,8 @@ This docker compose file demonstrates how to ingest websocket data into proton b Simply run `docker compose up` in this folder. Three docker containers in the stack: 1. ghcr.io/timeplus-io/proton:latest, as the streaming database -2. jeffail/benthos:latest, a [benthos](https://www.benthos.dev/) service as the data pipeline -3. init container, create the tickers stream when proton database server is ready +2. jeffail/benthos:latest, a [Benthos](https://www.benthos.dev/) service as the data pipeline +3. init container, create the tickers stream when Proton database server is ready the ddl to create the stream is: @@ -40,7 +40,7 @@ CREATE STREAM IF NOT EXISTS tickers ( ## Create a ingest data pipeline -run command `make create` to create following benthos data pipeline, note you need install `jq` and `curl` to run this command +run command `make create` to create following Benthos data pipeline, note you need install `jq` and `curl` to run this command ``` input: @@ -77,7 +77,7 @@ now you can run following query to get the OHLC of the crypto data ```sql SELECT - window_start, product_id, earliest(price) AS open, max(price) AS high, min(price) AS low, latest(price) AS close, latest(volume_24h) as acc_vol + window_start, product_id, earliest(price) AS open, max(price) AS high, min(price) AS low, latest(price) AS close FROM tumble(tickers, 60s) WHERE From 952fd69c3d11824e6219e32592e27c6d96c4431d Mon Sep 17 00:00:00 2001 From: Gang Tao Date: Wed, 27 Dec 2023 19:53:08 -0800 Subject: [PATCH 4/5] update readme --- examples/coinbase/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/coinbase/README.md b/examples/coinbase/README.md index 943dc5545e7..f54949399a9 100644 --- a/examples/coinbase/README.md +++ b/examples/coinbase/README.md @@ -73,7 +73,7 @@ this pipeline will read data from coinbase websocket and then send the result to ## Query you crypto price data with SQL -now you can run following query to get the OHLC of the crypto data +now you can run following query to get the OHLC of the crypto data: ```sql SELECT From e618e9247381548722aef8fcf9e0c09e3d8d08fe Mon Sep 17 00:00:00 2001 From: Gang Tao Date: Tue, 2 Jan 2024 09:36:33 -0800 Subject: [PATCH 5/5] using decimal instead of float --- examples/coinbase/README.md | 22 +++++++++++----------- examples/coinbase/ddl.sql | 22 +++++++++++----------- examples/coinbase/docker-compose.yml | 2 +- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/examples/coinbase/README.md b/examples/coinbase/README.md index f54949399a9..04eedf103aa 100644 --- a/examples/coinbase/README.md +++ b/examples/coinbase/README.md @@ -18,23 +18,23 @@ the ddl to create the stream is: ```sql CREATE STREAM IF NOT EXISTS tickers ( - best_ask float64, - best_ask_size float64, - best_bid float64, - best_bid_size float64, - high_24h float64, - last_size float64, - low_24h float64, - open_24h float64, - price float64, + best_ask decimal(10,2), + best_ask_size decimal(10,8), + best_bid decimal(10,2), + best_bid_size decimal(10,8), + high_24h decimal(10,2), + last_size decimal(10,8), + low_24h decimal(10,2), + open_24h decimal(10,2), + price decimal(10,2), product_id string, sequence int, side string, time datetime, trade_id int, type string, - volume_24h float64, - volume_30d float64 + volume_24h decimal(20,8), + volume_30d decimal(20,8) ) ``` diff --git a/examples/coinbase/ddl.sql b/examples/coinbase/ddl.sql index 82633c22509..51f8a929fa5 100644 --- a/examples/coinbase/ddl.sql +++ b/examples/coinbase/ddl.sql @@ -1,21 +1,21 @@ -- ddl creare stream CREATE STREAM IF NOT EXISTS tickers ( - best_ask float64, - best_ask_size float64, - best_bid float64, - best_bid_size float64, - high_24h float64, - last_size float64, - low_24h float64, - open_24h float64, - price float64, + best_ask decimal(10,2), + best_ask_size decimal(10,8), + best_bid decimal(10,2), + best_bid_size decimal(10,8), + high_24h decimal(10,2), + last_size decimal(10,8), + low_24h decimal(10,2), + open_24h decimal(10,2), + price decimal(10,2), product_id string, sequence int, side string, time datetime, trade_id int, type string, - volume_24h float64, - volume_30d float64 + volume_24h decimal(20,8), + volume_30d decimal(20,8) ) diff --git a/examples/coinbase/docker-compose.yml b/examples/coinbase/docker-compose.yml index 548115067a7..3b05240fe74 100644 --- a/examples/coinbase/docker-compose.yml +++ b/examples/coinbase/docker-compose.yml @@ -26,7 +26,7 @@ services: - sh - -c - | - proton-client -h proton --query "CREATE STREAM IF NOT EXISTS tickers (best_ask float64, best_ask_size float64, best_bid float64, best_bid_size float64, high_24h float64, last_size float64, low_24h float64, open_24h float64, price float64, product_id string, sequence int, side string, time datetime, trade_id int, type string, volume_24h float64, volume_30d float64)" + proton-client -h proton --query "CREATE STREAM IF NOT EXISTS tickers (best_ask decimal(10,2), best_ask_size decimal(10,8), best_bid decimal(10,2), best_bid_size decimal(10,8), high_24h decimal(10,2), last_size decimal(10,8), low_24h decimal(10,2), open_24h decimal(10,2), price decimal(10,2), product_id string, sequence int, side string, time datetime, trade_id int, type string, volume_24h decimal(20,8), volume_30d decimal(20,8))" depends_on: proton: condition: service_healthy