Monte Carlo Benchmarking Engine
High-performance SIMD Monte Carlo engine (AVX2/NEON) with custom memory allocators and perf logging.
 
Loading...
Searching...
No Matches
setup.py
Go to the documentation of this file.
1#!/usr/bin/env python3
2# ===========================================
3# setup.py
4# ===========================================
5
6
33
34
35import argparse
36import subprocess
37import os
38import shutil
39import time
40from pathlib import Path
41from clickhouse_driver import Client
42import polars as pl
43
44from pipeline.schema_to_clickhouse import generate_clickhouse_table
45from pipeline.schema import SCHEMA
46from pipeline.utils import safe_vector_cast
47from scripts.config import *
48
49
50def log(msg: str):
51 """!Prints an info message to stdout.
52
53 \param msg The message string.
54 \return None
55 """
56 print(f"[INFO] {msg}")
57
58def err(msg: str):
59 """!Prints an error message to stdout.
60 @param msg The message string.
61 """
62 print(f"[ERROR] {msg}")
63
64def run_command(cmd: str):
65 """!Executes a shell command with logging.
66 @param cmd The command string to run.
67 @throws subprocess.CalledProcessError if the command fails.
68 """
69 log(f"Running command: {cmd}")
70 subprocess.run(cmd, shell=True, check=True)
71
72def wait_for_clickhouse() -> Client:
73 """!Waits for ClickHouse server to become ready, retries for up to 30 attempts.
74 @return A connected ClickHouse Client instance.
75 @throws RuntimeError if ClickHouse doesn't respond after all attempts.
76 """
77 log("Waiting for ClickHouse...")
78
79 for attempt in range(30):
80 try:
81 log(f"Connecting to ClickHouse at {CLICKHOUSE_HOST}:{CLICKHOUSE_TCP_PORT} as user '{CLICKHOUSE_USER}'")
82 client = Client(host=CLICKHOUSE_HOST, port=CLICKHOUSE_TCP_PORT, user=CLICKHOUSE_USER, password=CLICKHOUSE_PASSWORD)
83 client.execute("SELECT 1")
84 log("ClickHouse is ready.")
85 return client
86
87 except Exception as e:
88 log(f"Attempt {attempt+1}/30 failed: {e}")
89 time.sleep(2)
90
91 raise RuntimeError("ClickHouse did not start after 30 attempts.")
92
93def setup_clickhouse(client: Client):
94 """!Creates the ClickHouse database and performance table if they don't exist.
95 @param client The connected ClickHouse client.
96 """
97 log("Setting up ClickHouse database and table.")
98 client.execute("CREATE DATABASE IF NOT EXISTS benchmark")
99 client.execute(generate_clickhouse_table())
100 log("Schema loaded into ClickHouse.")
101
102def load_db_to_clickhouse(client: Client, db_path: Path):
103 """!Wipes previous data and loads data from a Parquet file into ClickHouse.
104
105 Casts data using the shared schema and inserts it into the benchmark.performance table.
106 Existing table data will be truncated.
107
108 @param client The connected ClickHouse client.
109 @param db_path Path to the Parquet file to load.
110 @throws FileNotFoundError if the file does not exist.
111 @throws Exception if insertion fails.
112 """
113 if not db_path.exists():
114 raise FileNotFoundError(f"{db_path} not found")
115
116 log(f"Loading data from: {db_path}")
117 df = pl.read_parquet(db_path)
118 df = safe_vector_cast(df, SCHEMA)
119
120 records = df.to_dicts()
121
122 if not records:
123 log("No records to insert.")
124 return
125
126 try:
127 client.execute("TRUNCATE TABLE benchmark.performance")
128 client.execute("INSERT INTO benchmark.performance VALUES", records)
129 except Exception as e:
130 err(f"Error inserting records into ClickHouse: {e}")
131 raise
132
133
134def main():
135 """!CLI entrypoint. Parses arguments and coordinates Docker, schema setup, and data loading.
136 """
137 parser = argparse.ArgumentParser(description="Setup and load benchmark data into ClickHouse")
138 parser.add_argument("--load-from-sample", action="store_true", help="Setup and restore from db_sample.parquet")
139 parser.add_argument("--load-from-db", action="store_true", help="Setup and use existing db.parquet")
140 parser.add_argument("--docker-compose", action="store_true", help="Use Docker Compose to start ClickHouse & Grafana")
141 parser.add_argument("--setup-clickhouse", action="store_true", help="Setup ClickHouse database and table")
142
143 args = parser.parse_args()
144
145 os.makedirs("db", exist_ok=True)
146 os.makedirs("samples", exist_ok=True)
147 os.makedirs("db/logs", exist_ok=True)
148
149 if args.docker_compose:
150 log("Setting up docker instance to be ready for ClickHouse database and Grafana.")
151 run_command("docker-compose up -d")
152 time.sleep(5) # Wait for ClickHouse to start
153
154 client = wait_for_clickhouse()
155
156 if args.setup_clickhouse or args.docker_compose:
157 setup_clickhouse(client)
158
159 if args.load_from_sample:
160 log(f"Loading from sample data: {SAMPLE_PATH}")
161 shutil.copy(SAMPLE_PATH, DB_PATH)
162 load_db_to_clickhouse(client, DB_PATH)
163
164 elif args.load_from_db:
165 log(f"Loading from existing data: {DB_PATH}")
166 load_db_to_clickhouse(client, DB_PATH)
167
168 if args.load_from_sample or args.load_from_db:
169 run_command("docker restart grafana")
170
171
172if __name__ == "__main__":
173 main()
setup_clickhouse(Client client)
Creates the ClickHouse database and performance table if they don't exist.
Definition setup.py:93
main()
CLI entrypoint.
Definition setup.py:134
Client wait_for_clickhouse()
Waits for ClickHouse server to become ready, retries for up to 30 attempts.
Definition setup.py:72
err(str msg)
Prints an error message to stdout.
Definition setup.py:58
run_command(str cmd)
Executes a shell command with logging.
Definition setup.py:64
load_db_to_clickhouse(Client client, Path db_path)
Wipes previous data and loads data from a Parquet file into ClickHouse.
Definition setup.py:102
log(str msg)
Prints an info message to stdout.
Definition setup.py:50