Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add coinbase sample #451

Merged
merged 10 commits into from
Jan 3, 2024
10 changes: 10 additions & 0 deletions examples/coinbase/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@


PWD = $(shell pwd)
NAME = test

create:
yq -o=json $(PWD)/pipeline.yaml | curl http://localhost:4195/streams/$(NAME) -X POST -d @-

delete:
curl http://localhost:4195/streams/$(NAME) -X DELETE
89 changes: 89 additions & 0 deletions examples/coinbase/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Demo for Benthos data pipeline and Coinbase websocket data



This docker compose file demonstrates how to ingest WebSocket data into Proton by using Benthos pipeline.



## Start the stack

Simply run `docker compose up` in this folder. Three docker containers in the stack:

1. ghcr.io/timeplus-io/proton:latest, as the streaming database
2. jeffail/benthos:latest, a [Benthos](https://www.benthos.dev/) service as the data pipeline
3. init container, create the tickers stream when Proton database server is ready

the ddl to create the stream is:

```sql
CREATE STREAM IF NOT EXISTS tickers (
best_ask decimal(10,2),
best_ask_size decimal(10,8),
best_bid decimal(10,2),
best_bid_size decimal(10,8),
high_24h decimal(10,2),
last_size decimal(10,8),
low_24h decimal(10,2),
open_24h decimal(10,2),
price decimal(10,2),
product_id string,
sequence int,
side string,
time datetime,
trade_id int,
type string,
volume_24h decimal(20,8),
volume_30d decimal(20,8)
)
```

## Create a ingest data pipeline

run command `make create` to create following Benthos data pipeline, note you need install `jq` and `curl` to run this command

```
input:
label: coinbase
websocket:
url: wss://ws-feed.exchange.coinbase.com
open_message: '{"type": "subscribe","product_ids": ["ETH-USD","ETH-EUR"],"channels": ["ticker"]}'
open_message_type: text

output:
http_client:
url: http://proton:8123/proton/v1/ingest/streams/tickers
verb: POST
headers:
Content-Type: application/json
batching:
count: 10
period: 1000ms
processors:
- archive:
format: json_array
- mapping: |
root.columns = this.index(0).keys()
root.data = this.map_each( row -> root.columns.map_each( key -> row.get(key)) )

```

this pipeline will read data from coinbase websocket and then send the result to proton ingest api in a batch


## Query you crypto price data with SQL

now you can run following query to get the OHLC of the crypto data:

```sql
SELECT
window_start, product_id, earliest(price) AS open, max(price) AS high, min(price) AS low, latest(price) AS close
FROM
tumble(tickers, 60s)
WHERE
product_id != '' and _tp_time > earliest_ts()
GROUP BY
window_start, product_id
```


21 changes: 21 additions & 0 deletions examples/coinbase/ddl.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- ddl creare stream
CREATE STREAM IF NOT EXISTS tickers (
best_ask decimal(10,2),
best_ask_size decimal(10,8),
best_bid decimal(10,2),
best_bid_size decimal(10,8),
high_24h decimal(10,2),
last_size decimal(10,8),
low_24h decimal(10,2),
open_24h decimal(10,2),
price decimal(10,2),
product_id string,
sequence int,
side string,
time datetime,
trade_id int,
type string,
volume_24h decimal(20,8),
volume_30d decimal(20,8)
)

34 changes: 34 additions & 0 deletions examples/coinbase/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
version: '3.7'
name: coinbase
services:
proton:
image: ghcr.io/timeplus-io/proton:latest
pull_policy: always
ports:
- "3218:3218" # HTTP Streaming
healthcheck:
test: ["CMD", "curl", "http://localhost:3218/proton/ping"]
interval: 2s
timeout: 10s
retries: 3
start_period: 10s

benthos:
image: jeffail/benthos:latest
pull_policy: always
command: streams
ports:
- "4195:4195"

init-stream:
image: ghcr.io/timeplus-io/proton:latest
command:
- sh
- -c
- |
proton-client -h proton --query "CREATE STREAM IF NOT EXISTS tickers (best_ask decimal(10,2), best_ask_size decimal(10,8), best_bid decimal(10,2), best_bid_size decimal(10,8), high_24h decimal(10,2), last_size decimal(10,8), low_24h decimal(10,2), open_24h decimal(10,2), price decimal(10,2), product_id string, sequence int, side string, time datetime, trade_id int, type string, volume_24h decimal(20,8), volume_30d decimal(20,8))"
depends_on:
proton:
condition: service_healthy


22 changes: 22 additions & 0 deletions examples/coinbase/pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
input:
label: coinbase
websocket:
url: wss://ws-feed.exchange.coinbase.com
open_message: '{"type": "subscribe","product_ids": ["ETH-USD","ETH-EUR"],"channels": ["ticker"]}'
open_message_type: text

output:
http_client:
url: http://proton:8123/proton/v1/ingest/streams/tickers
verb: POST
headers:
Content-Type: application/json
batching:
count: 10
period: 1000ms
processors:
- archive:
format: json_array
- mapping: |
root.columns = this.index(0).keys()
root.data = this.map_each( row -> root.columns.map_each( key -> row.get(key)) )