Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clickhouse): distributed #105

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cli/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
import os
import random
import shutil
import string
import sys
import time
from dataclasses import dataclass
Expand Down Expand Up @@ -419,3 +421,9 @@ def get_otel_env() -> dict:
env[url_var] = conf(OTEL_VALIDATORS)[f"L12N_{url_var}"]
env[auth_var] = conf(OTEL_VALIDATORS)[f"L12N_{auth_var}"]
return env


def rand_cluster_id() -> str:
return "".join(
random.choice(string.digits + string.ascii_letters) for _ in range(6)
)
16 changes: 1 addition & 15 deletions cli/plugins/ballista.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,23 @@

import base64
import json
import random
import string
import time
from concurrent.futures import ThreadPoolExecutor

import core
from common import (
OTEL_VALIDATORS,
aws,
conf,
format_lambda_output,
get_otel_env,
rand_cluster_id,
terraform_output,
)
from invoke import Exit, task

VALIDATORS = OTEL_VALIDATORS


def rand_cluster_id() -> str:
return "".join(
random.choice(string.digits + string.ascii_letters) for _ in range(6)
)


@task(autoprint=True)
def lambda_example(c, json_output=False, month="01"):
"""CREATE EXTERNAL TABLE and find out SUM(trip_distance) GROUP_BY payment_type"""
Expand Down Expand Up @@ -55,7 +47,6 @@ def format_lambda_result(name, external_duration, lambda_res):

def run_executor(
lambda_name: str,
bucket_name: str,
seed_ip: str,
virtual_ip: str,
scheduler_ip: str,
Expand All @@ -78,7 +69,6 @@ def run_executor(
Payload=json.dumps(
{
"role": "executor",
"bucket_name": bucket_name,
"timeout_sec": 40,
"scheduler_ip": scheduler_ip,
"env": env,
Expand All @@ -93,7 +83,6 @@ def run_executor(

def run_scheduler(
lambda_name: str,
bucket_name: str,
seed_ip: str,
virtual_ip: str,
query: str,
Expand All @@ -116,7 +105,6 @@ def run_scheduler(
Payload=json.dumps(
{
"role": "scheduler",
"bucket_name": bucket_name,
"timeout_sec": 38,
"query": base64.b64encode(query.encode()).decode(),
"env": env,
Expand Down Expand Up @@ -149,7 +137,6 @@ def distributed(c, seed, dataset=10):
scheduler_fut = ex.submit(
run_scheduler,
lambda_name,
bucket_name,
seed,
"172.28.0.1",
sql,
Expand All @@ -162,7 +149,6 @@ def distributed(c, seed, dataset=10):
ex.submit(
run_executor,
lambda_name,
bucket_name,
seed,
f"172.28.0.{i+2}",
"172.28.0.1",
Expand Down
10 changes: 1 addition & 9 deletions cli/plugins/chappy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import base64
import json
import random
import string
import time
import uuid
from collections import namedtuple
Expand All @@ -17,23 +15,17 @@
REPOROOT,
FargateService,
aws,
conf,
format_lambda_output,
get_otel_env,
terraform_output,
wait_deployment,
rand_cluster_id,
)
from invoke import Context, Exit, task

VALIDATORS = OTEL_VALIDATORS


def rand_cluster_id() -> str:
return "".join(
random.choice(string.digits + string.ascii_letters) for _ in range(6)
)


def service_outputs(c: Context) -> tuple[str, str, str]:
with ThreadPoolExecutor() as ex:
cluster_fut = ex.submit(terraform_output, c, "chappy", "fargate_cluster_name")
Expand Down
102 changes: 100 additions & 2 deletions cli/plugins/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
"""Clickhouse on AWS Lambda"""

import base64
import json
import time
from concurrent.futures import ThreadPoolExecutor

import core
from common import AWS_REGION
from invoke import task
from common import (
AWS_REGION,
OTEL_VALIDATORS,
aws,
format_lambda_output,
get_otel_env,
rand_cluster_id,
terraform_output,
)
from invoke import Exit, task

VALIDATORS = OTEL_VALIDATORS


@task(autoprint=True)
Expand All @@ -15,3 +30,86 @@ def lambda_example(c, json_output=False, month="01"):
if not json_output:
print(sql)
return core.run_lambda(c, "clickhouse", sql, json_output=json_output)


def format_lambda_result(name, external_duration, lambda_res):
result = []
result.append(f"==============================")
result.append(f"RESULTS FOR {name}")
result.append(f"EXTERNAL_DURATION: {external_duration}")
if "FunctionError" in lambda_res:
raise Exit(message=lambda_res["Payload"], code=1)
result.append("== PAYLOAD ==")
result.append(format_lambda_output(lambda_res["Payload"], False))
result.append(f"==============================")
return "\n".join(result)


def run_node(
lambda_name: str,
seed_ip: str,
virtual_ip: str,
query: str,
nodes: int,
cluster_id: str,
):
start_time = time.time()
env = {
"CHAPPY_CLUSTER_SIZE": nodes,
"CHAPPY_SEED_HOSTNAME": seed_ip,
"CHAPPY_SEED_PORT": 8000,
"CHAPPY_CLUSTER_ID": cluster_id,
"CHAPPY_VIRTUAL_IP": virtual_ip,
"RUST_LOG": "info,chappy_perforator=debug,chappy=trace,rustls=error",
"RUST_BACKTRACE": "1",
**get_otel_env(),
}
lambda_res = aws("lambda").invoke(
FunctionName=lambda_name,
Payload=json.dumps(
{
"timeout_sec": 38,
"query": base64.b64encode(query.encode()).decode(),
"env": env,
}
).encode(),
InvocationType="RequestResponse",
LogType="None",
)
lambda_res["Payload"] = lambda_res["Payload"].read().decode()
return (lambda_res, time.time() - start_time)


@task
def distributed(c, seed, dataset=10, nodes=5):
"""CREATE EXTERNAL TABLE and find out stored page data by url_host_registered_domain"""
bucket_name = core.bucket_name(c)
core.load_commoncrawl_index(c, dataset)
cluster_id = rand_cluster_id()
sql = f"""
SELECT url_host_registered_domain, sum(warc_record_length) AS stored_bytes
FROM s3Cluster('cloudfuse_cluster', 'https://{bucket_name}.s3.{AWS_REGION()}.amazonaws.com/commoncrawl/index/n{dataset}/crawl=CC-MAIN-2023-14/subset=warc/*', 'Parquet')
GROUP BY url_host_registered_domain
ORDER BY sum(warc_record_length) DESC
LIMIT 10;
"""

lambda_name = terraform_output(c, "clickhouse", "distributed_lambda_name")
with ThreadPoolExecutor(max_workers=nodes + 4) as ex:
node_futs = []
for i in range(1, nodes + 1):
node_futs.append(
ex.submit(
run_node,
lambda_name,
seed,
f"172.28.0.{i}",
sql if i == nodes else "",
nodes,
cluster_id,
)
)

for i in range(0, nodes):
executor_res, executor_duration = node_futs[i].result()
print(format_lambda_result(f"NODE-{i+1}", executor_duration, executor_res))
15 changes: 7 additions & 8 deletions docker/ballista/distributed-handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,19 +167,18 @@ def handle_event(event) -> dict[str, Any]:
global IS_COLD_START
is_cold_start = IS_COLD_START
IS_COLD_START = False
if not is_cold_start:
raise Exception(f"Only cold starts supported")

logging.info(f"role :{event['role']}")
timeout_sec = float(event["timeout_sec"])

if is_cold_start:
if event["role"] == "scheduler":
srv_proc = init_scheduler()
elif event["role"] == "executor":
srv_proc = init_executor(event["scheduler_ip"])
else:
raise Exception(f'Unknown role {event["role"]}')
if event["role"] == "scheduler":
srv_proc = init_scheduler()
elif event["role"] == "executor":
srv_proc = init_executor(event["scheduler_ip"])
else:
raise Exception(f"Only cold starts supported")
raise Exception(f'Unknown role {event["role"]}')

init_duration = time.time() - start

Expand Down
15 changes: 1 addition & 14 deletions docker/ballista/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,7 @@ services:
context: ../..
dockerfile: docker/ballista/distributed.Dockerfile
image: cloudfuse-io/l12n:ballista-distributed
cap_drop:
- ALL
read_only: true
volumes:
- ballista-tmp:/tmp
entrypoint:
- python3
- distributed-handler.py
environment:
- AWS_ACCESS_KEY_ID=$LAMBDA_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY=$LAMBDA_SECRET_ACCESS_KEY
- AWS_SESSION_TOKEN=$LAMBDA_SESSION_TOKEN
- AWS_DEFAULT_REGION=$L12N_AWS_REGION
- DATA_BUCKET_NAME
# unfortunately local tests of distributed images is not supported yet

volumes:
ballista-tmp:
Loading