-
Notifications
You must be signed in to change notification settings - Fork 0
feat: Merge QuanuX-Annex-GCP-Tract2 Phase 3B Complete #83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
17fdf07
bab86c3
dadd4af
6dbb346
b309955
fdc2673
c253b7a
c2986ff
edfe06e
19a23e4
d21de3d
3de1e12
f32836b
0a7df15
0756d2e
302592f
4f7f795
d5ee65f
b065d4b
56ba395
221565f
1a9be15
94b446a
2ac82fe
35594f6
c67f506
ee77610
c4d6fb4
c92abb0
deda777
bf2fbda
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| import argparse | ||
| import logging | ||
| from google.cloud import bigquery | ||
|
|
||
| # Set up logging matching QuanuX-Annex patterns | ||
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | ||
| logger = logging.getLogger("quanux.gcp.bigquery") | ||
|
|
||
| def register_external_table(project_id: str, dataset_id: str, table_id: str, gcs_uri: str): | ||
| """ | ||
| Registers a BigQuery External Table against a GCS bucket containing Parquet files. | ||
| This exposes the historical query surface to the Python modeling tier without moving data. | ||
| """ | ||
| try: | ||
| client = bigquery.Client(project=project_id) | ||
|
|
||
| # Ensure dataset exists | ||
| dataset_ref = client.dataset(dataset_id) | ||
| try: | ||
| client.get_dataset(dataset_ref) | ||
| except Exception: | ||
| logger.info(f"Dataset {dataset_id} not found. Creating it.") | ||
| dataset = bigquery.Dataset(dataset_ref) | ||
| dataset.location = "US" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| client.create_dataset(dataset) | ||
|
|
||
| table_ref = dataset_ref.table(table_id) | ||
|
|
||
| # Configure the external data source | ||
| external_config = bigquery.ExternalConfig("PARQUET") | ||
| external_config.source_uris = [gcs_uri] | ||
| external_config.autodetect = True # Enable Parquet schema auto-detection | ||
|
|
||
| table = bigquery.Table(table_ref) | ||
| table.external_data_configuration = external_config | ||
|
|
||
| table = client.create_table(table, exists_ok=True) | ||
|
|
||
| logger.info(f"Successfully registered external table {project_id}.{dataset_id}.{table_id} pointing to {gcs_uri}") | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Failed to register BigQuery external table: {e}") | ||
| raise | ||
|
|
||
| if __name__ == "__main__": | ||
| parser = argparse.ArgumentParser(description="QuanuX GCP BigQuery External Table Setup") | ||
| parser.add_argument("--project", required=True, help="GCP Project ID") | ||
| parser.add_argument("--dataset", default="quanux_historical", help="BigQuery Dataset Name") | ||
| parser.add_argument("--table", default="market_ticks", help="External Table Name") | ||
| parser.add_argument("--uri", required=True, help="GCS URI (e.g. gs://quanux-historical-lake/ingestion/*.parquet)") | ||
|
|
||
| args = parser.parse_args() | ||
| register_external_table(args.project, args.dataset, args.table, args.uri) | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,146 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import asyncio | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import os | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import time | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import logging | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from typing import Optional | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from google.cloud import storage | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import pyarrow as pa | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import pyarrow.parquet as pq | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import struct | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from nats.aio.client import Client as NATS | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from nats.js.errors import NotFoundError | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Set up logging matching QuanuX-Annex patterns | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger = logging.getLogger("quanux.gcp.ingestion") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class GCPIngestionPipeline: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def __init__(self, memory_limit_mb: int = 500, bucket_name: str = "quanux-historical-lake"): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.memory_limit_bytes = memory_limit_mb * 1024 * 1024 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.bucket_name = bucket_name | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.current_batch = [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.current_batch_size = 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # We define a PyArrow schema that matches the quanux.schema.MarketTick FlatBuffer | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.schema = pa.schema([ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ('timestamp_ns', pa.int64()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ('instrument_id', pa.uint32()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ('bid_price', pa.float64()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ('ask_price', pa.float64()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ('bid_size', pa.uint32()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ('ask_size', pa.uint32()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ('level', pa.uint8()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.gcs_client = storage.Client() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.bucket = self.gcs_client.bucket(self.bucket_name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.warning(f"Failed to initialize GCS client: {e}. Will run in dry-run mode.") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.gcs_client = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.bucket = None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async def start(self): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Starts the NATS JetStream listener and begins batching.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info(f"Starting GCP Ingestion Pipeline. Memory limit: {self.memory_limit_bytes / (1024*1024)} MB") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.nc = NATS() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| nats_url = os.environ.get("QUANUX_NATS_URL", "nats://127.0.0.1:4222") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await self.nc.connect(nats_url) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.js = self.nc.jetstream() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.sub = await self.js.subscribe("QUANUX.MARKET.TICK", cb=self._on_message) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info("Listening on JetStream subject QUANUX.MARKET.TICK") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.error(f"Failed to subscribe to JetStream: {e}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Keep alive | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| while True: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await asyncio.sleep(5) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async def _on_message(self, msg): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Callback for incoming JetStream messages.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Struct format: < Q I d d I I B (37 bytes) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # uint64_t timestamp_ns, uint32_t instrument_id, double bid_price, double ask_price, uint32_t bid_size, uint32_t ask_size, uint8_t level | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| unpacked = struct.unpack("<QIddIIB", msg.data) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| data_row = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'timestamp_ns': unpacked[0], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'instrument_id': unpacked[1], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'bid_price': unpacked[2], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'ask_price': unpacked[3], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'bid_size': unpacked[4], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'ask_size': unpacked[5], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| 'level': unpacked[6] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Strict incremental byte model: each canonical struct adds exactly 37 primitive bytes. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Because our columns are pure primitives, PyArrow Table.nbytes scales exactly at 37 bytes per row. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| next_batch_size = self.current_batch_size + 37 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Mathematical boundary check: if the NEXT row would breach the exact limit, flush the current batch first. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # This mathematically guarantees the materialized table will be <= self.memory_limit_bytes, zero overshoot. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if next_batch_size > self.memory_limit_bytes: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info(f"Strict memory ceiling reached. PyArrow Bytes: {self.current_batch_size} / Ceiling: {self.memory_limit_bytes}. Triggering zero-overshoot flush.") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await self._flush_and_upload() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.current_batch.append(data_row) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.current_batch_size += 37 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except struct.error: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.error("Failed to unpack MarketTick struct - invalid payload size.") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except Exception as e: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.error(f"Error processing message: {e}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async def _flush_and_upload(self, table=None): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Flushes the current batch to Arrow/Parquet and uploads to GCS.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if not self.current_batch: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if table is None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info(f"Building Arrow Table with {len(self.current_batch)} rows...") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| arrays = [pa.array([row[col_name] for row in self.current_batch]) for col_name in self.schema.names] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| table = pa.Table.from_arrays(arrays, schema=self.schema) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Runtime Arrow Footprint Validation | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # We mathematically fail closed if the materialized schema overhead ever deviates from the 37-byte invariant per row. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| expected_bytes = table.num_rows * 37 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if table.nbytes != expected_bytes: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.critical(f"Arrow structural invariant violated! Expected exact {expected_bytes} bytes, observed {table.nbytes} bytes.") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise RuntimeError(f"Arrow memory footprint invariant breached. Halting pipeline to protect boundaries.") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Write to temporary parquet file | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| timestamp = int(time.time()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| filename = f"market_ticks_{timestamp}.parquet" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| local_path = f"/tmp/{filename}" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pq.write_table(table, local_path) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info(f"Wrote Parquet file: {local_path} (Size: {os.path.getsize(local_path)} bytes)") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if self.bucket: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Asynchronous GCS Upload | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| blob = self.bucket.blob(f"ingestion/{filename}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Run blocking upload in an executor | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| loop = asyncio.get_running_loop() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await loop.run_in_executor(None, blob.upload_from_filename, local_path) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info(f"Uploaded {filename} to GCS bucket {self.bucket_name}") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info(f"Dry-run: Would have uploaded {filename} to GCS.") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Clean up | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| os.remove(local_path) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+116
to
+134
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Writing to a temporary file on disk and then reading it back for upload is inefficient and can have security implications with
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Reset batch | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.current_batch = [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.current_batch_size = 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info("Batch reset. Resuming JetStream consumption.") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if __name__ == "__main__": | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pipeline = GCPIngestionPipeline(memory_limit_mb=500) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| asyncio.run(pipeline.start()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| except KeyboardInterrupt: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.info("Pipeline stopped by Operator.") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catching a broad
Exceptioncan hide unrelated errors. It's better to catch the specific exception thatget_datasetraises when a dataset is not found, which isgoogle.api_core.exceptions.NotFound. Please change this line toexcept NotFound:and addfrom google.api_core.exceptions import NotFoundat the top of the file.