Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
17fdf07
feat(gcp): implement tract 1 ingestion pipeline and bigquery registra…
quantDIY Mar 16, 2026
bab86c3
fix(gcp): implement red team rework for tract 1 execution
quantDIY Mar 16, 2026
dadd4af
fix(gcp): eliminate memory checkpoint gap for strict bounding
quantDIY Mar 16, 2026
6dbb346
fix(gcp): mathematically guarantee zero-overshoot pyarrow bounds
quantDIY Mar 16, 2026
b309955
feat(gcp): implement hard runtime invariant for arrow materialization
quantDIY Mar 16, 2026
fdc2673
docs(gcp): merge final tract 1 execution mandate approval
quantDIY Mar 16, 2026
c253b7a
docs(gcp): fix wording to reflect strict zero-overshoot arrow bounds
quantDIY Mar 16, 2026
c2986ff
docs(gcp): introduce tract 2 control specification for duckdb transpiler
quantDIY Mar 16, 2026
edfe06e
docs(gcp): harden tract 2 control specification with explicit bounds
quantDIY Mar 16, 2026
19a23e4
docs(gcp): rewrite tract 2 implementation plan
quantDIY Mar 16, 2026
d21de3d
feat(gcp): implement tract 2 duckdb to bigquery ast transpiler
quantDIY Mar 16, 2026
3de1e12
test(gcp): expand tract 2 test matrix and add real bq execution hooks
quantDIY Mar 16, 2026
f32836b
feat(gcp): integrate OS keyring for GCP execution bindings
quantDIY Mar 16, 2026
0a7df15
test(gcp): execute real semantic parity check against remote bigquery…
quantDIY Mar 17, 2026
0756d2e
docs(status): mark Tract 2 transpiler prototype as graduated
quantDIY Mar 17, 2026
302592f
docs(gcp): lock phase 1 approved query matrix
quantDIY Mar 17, 2026
4f7f795
feat(gcp): Phase 1 transipler parity hardening
quantDIY Mar 17, 2026
d5ee65f
test(gcp): Phase 2A live bigquery parity expansion
quantDIY Mar 17, 2026
b065d4b
feat(cli): Phase 2B GCP command-surface hardening
quantDIY Mar 17, 2026
56ba395
docs: draft Phase 2C control checklist
quantDIY Mar 17, 2026
221565f
docs: tighten Phase 2C checklist criteria
quantDIY Mar 17, 2026
1a9be15
feat(cli): Phase 2C Operational Hardening implementation
quantDIY Mar 17, 2026
94b446a
feat(gcp): Phase 2D Proof Density & Release Readiness
quantDIY Mar 17, 2026
2ac82fe
docs(gcp): Phase 3A Bounded Subquery Control Spec Amendment
quantDIY Mar 17, 2026
35594f6
feat(gcp): Phase 3A Controlled Subqueries Execution Surface
quantDIY Mar 17, 2026
c67f506
chore(gcp): Phase 3A Hardened Boundaries & Parity Log
quantDIY Mar 17, 2026
ee77610
docs(gcp): Phase 3B Controlled Joins Spec & Fixtures
quantDIY Mar 17, 2026
c4d6fb4
feat(gcp): Phase 3B Controlled Joins Execution Pass
quantDIY Mar 17, 2026
c92abb0
docs(gcp): Finalize Tract 2 Bounded Matrix Documentation
quantDIY Mar 17, 2026
deda777
chore(gcp): final cleanup of phase 3b artifacts and dumps
quantDIY Mar 17, 2026
bf2fbda
fix(security): suppress CodeQL clear-text logging false positive
quantDIY Mar 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .agent/skills/quanux_annex/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,24 @@ quanuxctl deploy -p annex_core -t quanux_annex_node --type cpp_binary
# Inject Mock NATS Data
python tests/nats_injector.py --url nats://10.10.10.5:4222 --subject QUANUX.MARKET.TICK --type tick --count 1000
```

## Tract 2: GCP Research Database Transpiler
The QuanuX-Annex includes the `QuanuXDuckToBQTranspiler`, critically bridging native DuckDB operator commands to BigQuery Standard SQL text for bounded remote execution.

### Operator Workflows
The CLI `quanuxctl infra gcp-sql` exposes 3 deterministic states:
- `validate`: Formally analyze AST tree restrictions strictly against the allowed matrix without cloud networking.
- `transpile`: Emits exact semantic BigQuery string transformations dynamically without native execution.
- `execute`: Dispatches transpiled AST requests enforcing rigorous payload constraints (`--timeout`, `--max-rows`). Machine-readable JSON structural guarantees flow efficiently to automated CI test grids.

### Proven Bounded Matrix
- **SQL Basics**: Read-only `SELECT`, `FROM`, `WHERE`, `GROUP BY`, `ORDER BY`, `LIMIT`, leveraging exactly matching `COUNT`/`SUM`/`AVG`/`MIN`/`MAX` statistical equivalencies over live dual-mapped BQ remote environments.
- **Subqueries (Phase 3A)**: Shallow 1-level scalar depths isolating static equality loops. Resolves nested `IN (SELECT...)` expressions securely to uncorrelated memory tables.
- **Joins (Phase 3B)**: Unidirectionally permits exactly one static explicit equality constraint per logical tree exclusively mapped via `INNER JOIN`.

### Intentionally Unsupported (Fail-Closed)
To completely eradicate state mutability, internal `DROP`/`UPDATE`/`INSERT` commands invoke an immediate systemic execution halt. All unverified syntax vectors encompassing deep outer, cross, natural joins, chained correlated subqueries, raw recursive combinations, window functions, and proprietary database schemas emit fatal AST rejections forcing analysts natively onto GCP tooling bounds.

### Known Bolts & Next Objectives
- **Known Bolts (Later Tightening)**: Subquery parameters potentially adapt gracefully to layered complexity aggregations. Bounded subset joins tentatively mapped to nested grouping intersections to resolve larger mathematical domains securely over AWS SigV4.
- **Next Architectural Objectives**: Pivot explicitly back to Terraform/Ansible infrastructure orchestration, deep telemetry GraphQL/Hasura/superGraph/Aleph federations, and unified cold-data blob architectures.
37 changes: 37 additions & 0 deletions QuanuX-Annex/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,43 @@ quanuxctl infra do-droplets
# Verify active Data Lake boundaries:
quanuxctl infra do-spaces
```
## Tract 2: Research Database Transpiler
The QuanuX-Annex includes the `QuanuXDuckToBQTranspiler`, an execution layer designed to bridge local DuckDB queries into BigQuery Standard SQL text for bounded remote execution.

To guarantee zero unauthorized mutation and maintain strict dataset parity, the transpiler operates under a mathematically verified Approved Query Matrix:
- **Approved SQL Surface:** `SELECT`, `FROM`, `WHERE`, `GROUP BY`, `ORDER BY`, `LIMIT`.
- **Approved Aggregates:** `COUNT`, `SUM`, `AVG`, `MIN`, `MAX`.
- **Allowed Basics:** Explicit column aliases, numeric/string literals, and basic boolean predicates.
- **Bounded Subqueries (Phase 3A):** Scalar subqueries in `SELECT`, simple `IN (SELECT...)` filters in `WHERE`, Uncorrelated Derived Tables in `FROM`. Maximum nesting depth 1.
- **Bounded Joins (Phase 3B):** Strictly one `INNER JOIN` (or self-join) via explicit column equality predicates.

**Unsupported Constructs (Fail-Closed):**
The transpiler enforces physical read-only limits by strictly blocking state-mutating commands (`DROP`, `ALTER`, `UPDATE`, `INSERT`, `DELETE`). Due to complex dialect variance, it explicitly rejects advanced routing syntax such as:
- Outer, Cross, Natural, and Multiple Joins
- Joins mixed with Subqueries or Aggregations
- Window Functions
- Common Table Expressions (CTEs)
- Correlated Subqueries and recursive CTEs
- DuckDB proprietary macros/functions

> **Completion Status**: Tract 2 is complete as a bounded, operator-ready DuckDB→BigQuery transpiler track under the currently approved matrix. Complete does not mean broad SQL compatibility; it means the current approved surface is credible, tested, operatorized, and documented enough to freeze.

Any query exceeding this whitelist will natively raise a `TranspilationError` and halt immediately before querying GCP. Operators must execute unauthorized complex logic natively against BigQuery if bypassing this prototype boundary.

### Operator Rejection Examples
When researchers attempt queries outside the bounded surface, expect explicit, deterministic `TranspilationError` stack traces indicating the exact failure reason:

**Example 1: Banned Window Functions**
```sql
SELECT AVG(bid_price) OVER(PARTITION BY level) FROM MarketTick
```
> `gcp_transpiler.TranspilationError: Unsupported construct: WindowFunction...`

**Example 2: Banned Outer/Multiple Joins**
```sql
SELECT a.level FROM MarketTick a LEFT JOIN MarketTick b ON a.level = b.level
```
> `gcp_transpiler.TranspilationError: Outer, Cross, and Natural joins are strictly banned under Phase 3B...`

## Agent Tools & Autonomous Systems
Agent AI architecture contexts have been directly injected into every module via `SKILL.md` documents. Ensure parsing of `src/resolvers/SKILL.md` and `src/federation/SKILL.md` before initiating memory operations.
53 changes: 53 additions & 0 deletions QuanuX-Annex/gcp_bigquery_setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import argparse
import logging
from google.cloud import bigquery

# Set up logging matching QuanuX-Annex patterns
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("quanux.gcp.bigquery")

def register_external_table(project_id: str, dataset_id: str, table_id: str, gcs_uri: str):
"""
Registers a BigQuery External Table against a GCS bucket containing Parquet files.
This exposes the historical query surface to the Python modeling tier without moving data.
"""
try:
client = bigquery.Client(project=project_id)

# Ensure dataset exists
dataset_ref = client.dataset(dataset_id)
try:
client.get_dataset(dataset_ref)
except Exception:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Catching a broad Exception can hide unrelated errors. It's better to catch the specific exception that get_dataset raises when a dataset is not found, which is google.api_core.exceptions.NotFound. Please change this line to except NotFound: and add from google.api_core.exceptions import NotFound at the top of the file.

logger.info(f"Dataset {dataset_id} not found. Creating it.")
dataset = bigquery.Dataset(dataset_ref)
dataset.location = "US"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The dataset location is hardcoded to "US". To improve reusability, consider making this a parameter to the register_external_table function with a default value, or reading it from a configuration.

client.create_dataset(dataset)

table_ref = dataset_ref.table(table_id)

# Configure the external data source
external_config = bigquery.ExternalConfig("PARQUET")
external_config.source_uris = [gcs_uri]
external_config.autodetect = True # Enable Parquet schema auto-detection

table = bigquery.Table(table_ref)
table.external_data_configuration = external_config

table = client.create_table(table, exists_ok=True)

logger.info(f"Successfully registered external table {project_id}.{dataset_id}.{table_id} pointing to {gcs_uri}")

except Exception as e:
logger.error(f"Failed to register BigQuery external table: {e}")
raise

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="QuanuX GCP BigQuery External Table Setup")
parser.add_argument("--project", required=True, help="GCP Project ID")
parser.add_argument("--dataset", default="quanux_historical", help="BigQuery Dataset Name")
parser.add_argument("--table", default="market_ticks", help="External Table Name")
parser.add_argument("--uri", required=True, help="GCS URI (e.g. gs://quanux-historical-lake/ingestion/*.parquet)")

args = parser.parse_args()
register_external_table(args.project, args.dataset, args.table, args.uri)
146 changes: 146 additions & 0 deletions QuanuX-Annex/gcp_ingestion_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import asyncio
import os
import time
import logging
from typing import Optional
from google.cloud import storage
import pyarrow as pa
import pyarrow.parquet as pq

import struct
from nats.aio.client import Client as NATS
from nats.js.errors import NotFoundError

# Set up logging matching QuanuX-Annex patterns
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("quanux.gcp.ingestion")

class GCPIngestionPipeline:
def __init__(self, memory_limit_mb: int = 500, bucket_name: str = "quanux-historical-lake"):
self.memory_limit_bytes = memory_limit_mb * 1024 * 1024
self.bucket_name = bucket_name
self.current_batch = []
self.current_batch_size = 0

# We define a PyArrow schema that matches the quanux.schema.MarketTick FlatBuffer
self.schema = pa.schema([
('timestamp_ns', pa.int64()),
('instrument_id', pa.uint32()),
('bid_price', pa.float64()),
('ask_price', pa.float64()),
('bid_size', pa.uint32()),
('ask_size', pa.uint32()),
('level', pa.uint8())
])

try:
self.gcs_client = storage.Client()
self.bucket = self.gcs_client.bucket(self.bucket_name)
except Exception as e:
logger.warning(f"Failed to initialize GCS client: {e}. Will run in dry-run mode.")
self.gcs_client = None
self.bucket = None

async def start(self):
"""Starts the NATS JetStream listener and begins batching."""
logger.info(f"Starting GCP Ingestion Pipeline. Memory limit: {self.memory_limit_bytes / (1024*1024)} MB")

self.nc = NATS()
nats_url = os.environ.get("QUANUX_NATS_URL", "nats://127.0.0.1:4222")
await self.nc.connect(nats_url)
self.js = self.nc.jetstream()

try:
self.sub = await self.js.subscribe("QUANUX.MARKET.TICK", cb=self._on_message)
logger.info("Listening on JetStream subject QUANUX.MARKET.TICK")
except Exception as e:
logger.error(f"Failed to subscribe to JetStream: {e}")
raise

# Keep alive
while True:
await asyncio.sleep(5)

async def _on_message(self, msg):
"""Callback for incoming JetStream messages."""
try:
# Struct format: < Q I d d I I B (37 bytes)
# uint64_t timestamp_ns, uint32_t instrument_id, double bid_price, double ask_price, uint32_t bid_size, uint32_t ask_size, uint8_t level
unpacked = struct.unpack("<QIddIIB", msg.data)

data_row = {
'timestamp_ns': unpacked[0],
'instrument_id': unpacked[1],
'bid_price': unpacked[2],
'ask_price': unpacked[3],
'bid_size': unpacked[4],
'ask_size': unpacked[5],
'level': unpacked[6]
}
# Strict incremental byte model: each canonical struct adds exactly 37 primitive bytes.
# Because our columns are pure primitives, PyArrow Table.nbytes scales exactly at 37 bytes per row.
next_batch_size = self.current_batch_size + 37

# Mathematical boundary check: if the NEXT row would breach the exact limit, flush the current batch first.
# This mathematically guarantees the materialized table will be <= self.memory_limit_bytes, zero overshoot.
if next_batch_size > self.memory_limit_bytes:
logger.info(f"Strict memory ceiling reached. PyArrow Bytes: {self.current_batch_size} / Ceiling: {self.memory_limit_bytes}. Triggering zero-overshoot flush.")
await self._flush_and_upload()

self.current_batch.append(data_row)
self.current_batch_size += 37

except struct.error:
logger.error("Failed to unpack MarketTick struct - invalid payload size.")
except Exception as e:
logger.error(f"Error processing message: {e}")

async def _flush_and_upload(self, table=None):
"""Flushes the current batch to Arrow/Parquet and uploads to GCS."""
if not self.current_batch:
return

if table is None:
logger.info(f"Building Arrow Table with {len(self.current_batch)} rows...")
arrays = [pa.array([row[col_name] for row in self.current_batch]) for col_name in self.schema.names]
table = pa.Table.from_arrays(arrays, schema=self.schema)

# Runtime Arrow Footprint Validation
# We mathematically fail closed if the materialized schema overhead ever deviates from the 37-byte invariant per row.
expected_bytes = table.num_rows * 37
if table.nbytes != expected_bytes:
logger.critical(f"Arrow structural invariant violated! Expected exact {expected_bytes} bytes, observed {table.nbytes} bytes.")
raise RuntimeError(f"Arrow memory footprint invariant breached. Halting pipeline to protect boundaries.")

# Write to temporary parquet file
timestamp = int(time.time())
filename = f"market_ticks_{timestamp}.parquet"
local_path = f"/tmp/{filename}"

pq.write_table(table, local_path)
logger.info(f"Wrote Parquet file: {local_path} (Size: {os.path.getsize(local_path)} bytes)")

if self.bucket:
# Asynchronous GCS Upload
blob = self.bucket.blob(f"ingestion/{filename}")
# Run blocking upload in an executor
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, blob.upload_from_filename, local_path)
logger.info(f"Uploaded {filename} to GCS bucket {self.bucket_name}")
else:
logger.info(f"Dry-run: Would have uploaded {filename} to GCS.")

# Clean up
os.remove(local_path)
Comment on lines +116 to +134

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Writing to a temporary file on disk and then reading it back for upload is inefficient and can have security implications with /tmp. A better approach is to write the Parquet data to an in-memory buffer (io.BytesIO) and upload it directly to GCS. This avoids disk I/O and is more secure. You will need to import io at the top of the file.

Suggested change
timestamp = int(time.time())
filename = f"market_ticks_{timestamp}.parquet"
local_path = f"/tmp/{filename}"
pq.write_table(table, local_path)
logger.info(f"Wrote Parquet file: {local_path} (Size: {os.path.getsize(local_path)} bytes)")
if self.bucket:
# Asynchronous GCS Upload
blob = self.bucket.blob(f"ingestion/{filename}")
# Run blocking upload in an executor
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, blob.upload_from_filename, local_path)
logger.info(f"Uploaded {filename} to GCS bucket {self.bucket_name}")
else:
logger.info(f"Dry-run: Would have uploaded {filename} to GCS.")
# Clean up
os.remove(local_path)
timestamp = int(time.time())
filename = f"market_ticks_{timestamp}.parquet"
buffer = io.BytesIO()
pq.write_table(table, buffer)
logger.info(f"Wrote Parquet data to in-memory buffer (Size: {buffer.tell()} bytes)")
if self.bucket:
blob = self.bucket.blob(f"ingestion/{filename}")
loop = asyncio.get_running_loop()
buffer.seek(0)
await loop.run_in_executor(None, blob.upload_from_file, buffer)
logger.info(f"Uploaded {filename} to GCS bucket {self.bucket_name}")
else:
logger.info(f"Dry-run: Would have uploaded {filename} to GCS.")


# Reset batch
self.current_batch = []
self.current_batch_size = 0
logger.info("Batch reset. Resuming JetStream consumption.")

if __name__ == "__main__":
pipeline = GCPIngestionPipeline(memory_limit_mb=500)
try:
asyncio.run(pipeline.start())
except KeyboardInterrupt:
logger.info("Pipeline stopped by Operator.")
Loading
Loading