diff --git a/Data-Gen/create_join_tables.py b/Data-Gen/create_join_tables.py new file mode 100644 index 0000000..b4bb0ce --- /dev/null +++ b/Data-Gen/create_join_tables.py @@ -0,0 +1,476 @@ +#!/usr/bin/env python3 +import pandas as pd +from pyspark.sql import SparkSession, Row +import pyspark.sql.functions as F +from pyspark.sql.types import ( + StructType, StructField, StringType, IntegerType, LongType, + FloatType, DoubleType, BooleanType, DateType, TimestampType, ArrayType +) + +spark = ( + SparkSession.builder + .master("local[*]") + .appName("CreateJoinTables") + .config("spark.driver.host", "127.0.0.1") + .config("spark.driver.bindAddress", "127.0.0.1") + .config("spark.driver.memory", "128g") + .config("spark.memory.fraction", "0.8") + .config("spark.sql.shuffle.partitions", "800") + .config("spark.local.dir", "/scratch/prestouser/spark-tmp") + .config("spark.executor.heartbeatInterval", "60s") + .config("spark.network.timeout", "300s") + .getOrCreate() +) + +target_size = 1 * 1024**4 # 1 TB +num_nodes_per_graph = 500_000 +PATH_PREFIX = f"/scratch/prestouser/test-data/{num_nodes_per_graph}-1TB" + +# ------------------------------------- +# Step 1. Read all sheets from the Excel file. +# ------------------------------------- +excel_path = "../datagen_schema.xlsx" # update this path as necessary + +# Read every sheet into a dictionary: keys are sheet names, values are DataFrames. +sheets = pd.read_excel(excel_path, sheet_name=None) +# sheets = spark.read. +sheet_names = list(sheets.keys()) +print("Found sheets:", sheet_names) + +table_a = spark.read.parquet(PATH_PREFIX) +# ------------------------------------- +# Step 2. Process the tables overview (first sheet) +# ------------------------------------- +# Assumption: The first sheet (e.g. "Tables") lists the table names and approximate row counts. +tables_overview_df = sheets[sheet_names[0]] +# Adjust these column names if your Excel file uses different names. +table_names = tables_overview_df["masked_table_id"].tolist() +approx_row_counts = tables_overview_df["num_rows_approx"].tolist() + +print("Tables and approximate row counts:") +for tbl, cnt in zip(table_names, approx_row_counts): + print(f" {tbl}: ~{cnt} rows") + +# ------------------------------------- +# Step 3. Read each table's metadata (columns, types, etc.) +# ------------------------------------- +# Here we assume that the sheet name for each table is the same as the table name. +table_metadata = {} +for tbl in table_names: + if tbl in sheets: + meta_df = sheets[tbl] + table_metadata[tbl] = meta_df + print(f"Loaded metadata for table '{tbl}'.") + else: + print(f"Warning: No metadata sheet found for table '{tbl}'.") + +# ------------------------------------- +# Step 4. Define a mapping from your Excel type names to Spark types. +# ------------------------------------- +spark_type_mapping = { + "StringType()": StringType(), + "StringType": StringType(), + "IntegerType()": IntegerType(), + "IntegerType()": IntegerType(), + "LongType()": LongType(), + "FloatType()": FloatType(), + "DoubleType()": DoubleType(), + "BooleanType()": BooleanType(), + "BooleanType()": BooleanType(), + "DateType()": DateType(), + "TimestampType()": TimestampType(), + "ArrayType(IntegerType(), True)": ArrayType(IntegerType(), True), + "ArrayType(StringType(), True)": ArrayType(StringType(), True) +} + +def create_schema(meta_df): + """ + Create a Spark schema (StructType) from the metadata DataFrame. + For numerical types, if "min" and "max" are provided, they are stored in the field metadata. + This version ensures that the type from the spreadsheet is used (if it matches). + """ + fields = [] + # Ensure that the range columns exist in the DataFrame. + has_range = ("min" in meta_df.columns) and ("max" in meta_df.columns) + + for idx, row in meta_df.iterrows(): + col_name = row["masked_column_name"] + # Convert the Type from the spreadsheet to a lower-case string. + type_str = str(row["spark_data_type"]).strip() if pd.notna(row["spark_data_type"]) else "string" + spark_type = spark_type_mapping.get(type_str) + + if spark_type is None: + # If the type is not recognized, warn and default to StringType. + print(f"Warning: Unrecognized type '{row['spark_data_type']}' for column '{col_name}'. Using StringType.") + spark_type = StringType() + + md = {} + # For numerical types, if min and max values are provided, store them in metadata. + if isinstance(spark_type, (IntegerType, LongType, FloatType, DoubleType)) and has_range: + if pd.notna(row["min"]) and pd.notna(row["max"]): + md["min"] = row["min"] + md["max"] = row["max"] + + fields.append(StructField(col_name, spark_type, True, metadata=md)) + + return StructType(fields) + +# Create a dictionary of schemas for each table. +schemas = {} +for tbl, meta_df in table_metadata.items(): + schema = create_schema(meta_df) + schemas[tbl] = schema + print(f"Schema for table '{tbl}': {schema}") + + +# ------------------------------------- +# Step 5. Process join information. +# ------------------------------------- +# Assumption: The final sheet (last sheet) is named "Joins" and holds the join definitions. +join_info_df = sheets[sheet_names[1]] +joins = [] +# Here we assume join_info_df has columns: "LeftTable", "LeftColumn", "RightTable", "RightColumn", and optionally "JoinType" +for idx, row in join_info_df.iterrows(): + join_detail = { + "left_table": row["table1"], + "right_table": row["table2"], + "join_method": row["join_method"], + "left_column": row["column1"], + "right_column": row["column2"] + } + joins.append(join_detail) + +# ======================================== +# PART 2: Generate random data for each table and register as temp views +# ======================================== + +def generate_random_dataframe(schema, num_rows): + """ + Simpler version with basic array generation + """ + df = spark.range(num_rows) + + for field in schema.fields: + col_name = field.name + dt = field.dataType + md = field.metadata or {} + + if isinstance(dt, (IntegerType, LongType)): + min_val = md.get("min", 1) + max_val = md.get("max", 1000) + expr = (F.rand() * (float(max_val) - float(min_val)) + float(min_val)) + if isinstance(dt, IntegerType): + df = df.withColumn(col_name, expr.cast("int")) + else: + df = df.withColumn(col_name, expr.cast("long")) + + elif isinstance(dt, (FloatType, DoubleType)): + min_val = md.get("min", 0.0) + max_val = md.get("max", 1000.0) + expr = (F.rand() * (float(max_val) - float(min_val)) + float(min_val)) + if isinstance(dt, FloatType): + df = df.withColumn(col_name, expr.cast("float")) + else: + df = df.withColumn(col_name, expr.cast("double")) + + elif isinstance(dt, BooleanType): + df = df.withColumn(col_name, F.rand() > 0.5) + + elif isinstance(dt, DateType): + df = df.withColumn(col_name, F.expr("date_add('2000-01-01', cast(rand() * 9000 as int))")) + + elif isinstance(dt, TimestampType): + df = df.withColumn(col_name, F.expr("to_timestamp(date_add('2000-01-01', cast(rand() * 9000 as int)))")) + + elif isinstance(dt, StringType): + df = df.withColumn(col_name, + F.concat(F.lit("str_"), + F.abs(F.hash(F.col("id"), F.rand())).cast("string"))) + + elif isinstance(dt, ArrayType): + # Simpler array generation - fixed size arrays + element_type = dt.elementType + + if isinstance(element_type, IntegerType): + # Create array of 3 random integers + df = df.withColumn(col_name, + F.array( + (F.rand() * 100).cast("int"), + (F.rand() * 100).cast("int"), + (F.rand() * 100).cast("int") + )) + + elif isinstance(element_type, LongType): + # Create array of 3 random longs + df = df.withColumn(col_name, + F.array( + (F.rand() * 1000).cast("long"), + (F.rand() * 1000).cast("long"), + (F.rand() * 1000).cast("long") + )) + + elif isinstance(element_type, StringType): + # Create array of 3 random strings + df = df.withColumn(col_name, + F.array( + F.concat(F.lit("item_"), (F.rand() * 100).cast("int").cast("string")), + F.concat(F.lit("item_"), (F.rand() * 100).cast("int").cast("string")), + F.concat(F.lit("item_"), (F.rand() * 100).cast("int").cast("string")) + )) + + else: + # Default to empty array for unsupported types + df = df.withColumn(col_name, F.array()) + + else: + df = df.withColumn(col_name, F.lit(None)) + + return df.drop("id") + +# Create and register a DataFrame for each table using the distributed random data generation. +# NOTE: THIS WAS SCALED DOWN FOR TESTING PURPOSES. UNCOMMENT LINE 74 AND COMMENT OUT LINES 68-73 FOR REAL TESTING +dfs = {} +for tbl, count in zip(table_names, approx_row_counts): + if tbl != 'table_a': + schema = schemas[tbl] + if tbl == 'table_c': + num_rows = 21000000 + else: + num_rows = int(count) + # num_rows = int(count) + df = generate_random_dataframe(schema, num_rows) + dfs[tbl] = df + print(f"Created DataFrame for table '{tbl}' with {num_rows} random rows.") + +table_b = dfs['table_b'] +table_c = dfs['table_c'] +table_d = dfs['table_d'] +table_e = dfs['table_e'] + + +# ========================= +# CONFIGURATION - Set your desired match percentages here +# ========================= +MATCH_PERCENTAGE_A = 0.001 # 0.1% of table_a rows will match +MATCH_PERCENTAGE_C = 0.01 # 1% of table_c rows will match +MATCH_PERCENTAGE_D = 0.01 # 1% of table_d rows will match +MATCH_PERCENTAGE_E = 0.01 # 1% of table_e rows will match + +print("=" * 60) +print("FORCING TABLES TO MATCH TABLE_B VALUES") +print(f"Match percentages: A={MATCH_PERCENTAGE_A*100}%, C={MATCH_PERCENTAGE_C*100}%, D={MATCH_PERCENTAGE_D*100}%, E={MATCH_PERCENTAGE_E*100}%") +print("=" * 60) + +# ========================= +# Force table_a (4 columns: col_a, col_c, col_b, col_d) +# ========================= +print("\n1. Processing table_a...") +table_a_combos_list = ( + table_b + .select("col_b_8", "col_b_3", "col_b_9", "col_b_1") + .distinct() + .filter( + F.col("col_b_8").isNotNull() & + F.col("col_b_3").isNotNull() & + F.col("col_b_9").isNotNull() & + F.col("col_b_1").isNotNull() + ) + .collect() +) + +combo_count_a = len(table_a_combos_list) +print(f" Distinct combinations for table_a: {combo_count_a}") + +combos_a_with_id = [ + Row( + new_col_a=combo['col_b_8'], + new_col_c=combo['col_b_3'], + new_col_b=combo['col_b_9'], + new_col_d=combo['col_b_1'], + combo_id=idx + 1 + ) + for idx, combo in enumerate(table_a_combos_list) +] + +table_a_combos_df = spark.createDataFrame(combos_a_with_id) + +# Add a random number to each row to decide if it should be forced +table_a_forced = ( + table_a + .withColumn("should_force", F.rand() < MATCH_PERCENTAGE_A) + .withColumn("combo_id", + F.when(F.col("should_force"), F.floor(F.rand() * combo_count_a) + 1) + .otherwise(F.lit(None)) + ) + # Left join to preserve all rows + .join( + F.broadcast(table_a_combos_df), + "combo_id", + "left" + ) + # For forced rows, use new values; for others, keep original + .withColumn("col_a", F.coalesce("new_col_a", "col_a")) + .withColumn("col_c", F.coalesce("new_col_c", "col_c")) + .withColumn("col_b", F.coalesce("new_col_b", "col_b")) + .withColumn("col_d", F.coalesce("new_col_d", "col_d")) + .drop("should_force", "combo_id", "new_col_a", "new_col_c", "new_col_b", "new_col_d") + .select(*table_a.columns) +) + +table_a_forced.write.mode("overwrite").parquet(f"{PATH_PREFIX}/temp_forced_table_a") +table_a_forced = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_a") +print(f" ✓ table_a_forced created ({MATCH_PERCENTAGE_A*100}% forced)") + +# ========================= +# Force table_c (3 columns: col_c_10, col_c_9, col_c_11) +# ========================= +print("\n2. Processing table_c...") +table_c_combos_list = ( + table_b + .select("col_b_8", "col_b_9", "col_b_3") + .distinct() + .filter( + F.col("col_b_8").isNotNull() & + F.col("col_b_9").isNotNull() & + F.col("col_b_3").isNotNull() + ) + .collect() +) + +combo_count_c = len(table_c_combos_list) +print(f" Distinct combinations for table_c: {combo_count_c}") + +combos_c_with_id = [ + Row( + new_col_c_10=combo['col_b_8'], + new_col_c_9=combo['col_b_9'], + new_col_c_11=combo['col_b_3'], + combo_id=idx + 1 + ) + for idx, combo in enumerate(table_c_combos_list) +] + +table_c_combos_df = spark.createDataFrame(combos_c_with_id) + +table_c_forced = ( + table_c + .withColumn("should_force", F.rand() < MATCH_PERCENTAGE_C) + .withColumn("combo_id", + F.when(F.col("should_force"), F.floor(F.rand() * combo_count_c) + 1) + .otherwise(F.lit(None)) + ) + .join( + F.broadcast(table_c_combos_df), + "combo_id", + "left" + ) + .withColumn("col_c_10", F.coalesce("new_col_c_10", "col_c_10")) + .withColumn("col_c_9", F.coalesce("new_col_c_9", "col_c_9")) + .withColumn("col_c_11", F.coalesce("new_col_c_11", "col_c_11")) + .drop("should_force", "combo_id", "new_col_c_10", "new_col_c_9", "new_col_c_11") + .select(*table_c.columns) +) + +table_c_forced.write.mode("overwrite").parquet(f"{PATH_PREFIX}/temp_forced_table_c") +table_c_forced = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_c") +print(f" ✓ table_c_forced created ({MATCH_PERCENTAGE_C*100}% forced)") + +# ========================= +# Force table_d (2 columns: col_d_0, col_d_1) +# ========================= +print("\n3. Processing table_d...") +table_d_combos_list = ( + table_b + .select("col_b_8", "col_b_9") + .distinct() + .filter( + F.col("col_b_8").isNotNull() & + F.col("col_b_9").isNotNull() + ) + .collect() +) + +combo_count_d = len(table_d_combos_list) +print(f" Distinct combinations for table_d: {combo_count_d}") + +combos_d_with_id = [ + Row( + new_col_d_0=combo['col_b_8'], + new_col_d_1=combo['col_b_9'], + combo_id=idx + 1 + ) + for idx, combo in enumerate(table_d_combos_list) +] + +table_d_combos_df = spark.createDataFrame(combos_d_with_id) + +table_d_forced = ( + table_d + .withColumn("should_force", F.rand() < MATCH_PERCENTAGE_D) + .withColumn("combo_id", + F.when(F.col("should_force"), F.floor(F.rand() * combo_count_d) + 1) + .otherwise(F.lit(None)) + ) + .join( + F.broadcast(table_d_combos_df), + "combo_id", + "left" + ) + .withColumn("col_d_0", F.coalesce("new_col_d_0", "col_d_0")) + .withColumn("col_d_1", F.coalesce("new_col_d_1", "col_d_1")) + .drop("should_force", "combo_id", "new_col_d_0", "new_col_d_1") + .select(*table_d.columns) +) + +table_d_forced.write.mode("overwrite").parquet(f"{PATH_PREFIX}/temp_forced_table_d") +table_d_forced = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_d") +print(f" ✓ table_d_forced created ({MATCH_PERCENTAGE_D*100}% forced)") + +# ========================= +# Force table_e (1 column: col_e_0) +# ========================= +print("\n4. Processing table_e...") +table_e_values_list = ( + table_b + .select("col_b_8") + .distinct() + .filter(F.col("col_b_8").isNotNull()) + .collect() +) + +value_count_e = len(table_e_values_list) +print(f" Distinct values for table_e: {value_count_e}") + +values_e_with_id = [ + Row( + new_col_e_0=value['col_b_8'], + value_id=idx + 1 + ) + for idx, value in enumerate(table_e_values_list) +] + +table_e_values_df = spark.createDataFrame(values_e_with_id) + +table_e_forced = ( + table_e + .withColumn("should_force", F.rand() < MATCH_PERCENTAGE_E) + .withColumn("value_id", + F.when(F.col("should_force"), F.floor(F.rand() * value_count_e) + 1) + .otherwise(F.lit(None)) + ) + .join( + F.broadcast(table_e_values_df), + "value_id", + "left" + ) + .withColumn("col_e_0", F.coalesce("new_col_e_0", "col_e_0")) + .drop("should_force", "value_id", "new_col_e_0") + .select(*table_e.columns) +) + +table_e_forced.write.mode("overwrite").parquet(f"{PATH_PREFIX}/temp_forced_table_e") +table_e_forced = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_e") +print(f" ✓ table_e_forced created ({MATCH_PERCENTAGE_E*100}% forced)") + +table_b.write.mode("overwrite").parquet(f"{PATH_PREFIX}/temp_table_b") +table_b = spark.read.parquet(f"{PATH_PREFIX}/temp_table_b") diff --git a/Data-Gen/new_datagen_faster.py b/Data-Gen/new_datagen_faster.py new file mode 100644 index 0000000..c00508d --- /dev/null +++ b/Data-Gen/new_datagen_faster.py @@ -0,0 +1,265 @@ +import random +import subprocess + +import networkx as nx +import numpy as np +import pandas as pd +from pyspark.sql import SparkSession, Row + +target_size = 1 * 1024**4 # 1 TB +num_nodes_per_graph = 500_000 +PATH_PREFIX = f"/scratch/prestouser/test-data/{num_nodes_per_graph}-1TB" +directory_path = PATH_PREFIX + +spark = ( + SparkSession.builder + .master("local[*]") + .appName("LargeGraph") + .config("spark.driver.host", "127.0.0.1") + .config("spark.driver.bindAddress", "127.0.0.1") + .config("spark.driver.memory", "128g") + .config("spark.executor.heartbeatInterval", "60s") + .config("spark.network.timeout", "300s") + .config("spark.memory.fraction", "0.8") + .config("spark.sql.shuffle.partitions", "800") + .config("spark.local.dir", "/scratch/prestouser/spark-tmp") + .getOrCreate() +) +def create_synthetic_distribution(params, plot=True): + + slope = params.get('slope', -2) + min_degree = params.get('min_degree', 1) + max_degree = params.get('max_degree', 200_000) + max_prob = params.get('max_prob', 0.5) + + degrees = np.arange(min_degree, max_degree + 1, dtype=float) + + A = max_prob / (min_degree ** slope) + + y_values = A * degrees ** slope + + degrees_int = degrees.astype(int) + + decay_dict = dict(zip(degrees_int, y_values)) + + return decay_dict + +params = { + 'slope': -2, + 'intercpet': 0.8, + 'r_squared': 0.98, + 'max_degree': 200_000, + 'min_degree': 1, + 'max_prob': 0.5, + 'degree_range': list(np.arange(1, 200_000)) +} + +target_distribution = create_synthetic_distribution(params, 200_000) + +def get_disk_usage(path): + import platform + try: + if platform.system() == "Darwin": + result = subprocess.run( + ['du', '-sk', path], + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + size_in_bytes = int(result.stdout.split()[0]) * 1024 + else: + result = subprocess.run( + ['du', '-sb', path], + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + size_in_bytes = int(result.stdout.split()[0]) + return size_in_bytes + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to get disk usage: {e.stderr.strip()}") + +def random_node(): + return int(np.random.randint(1_000_000, 10_000_000_000)) + +def random_feature(): + return int(np.random.randint(1, 70000)) # cast to native int + +def random_col_e(): + return str(np.random.choice(['col_e_A', 'col_e_B'])) # cast to native str + +num_graphs = spark.sparkContext.defaultParallelism # number of cores available + +def configuration_model_with_distribution(n, degree_distribution,seed): + """ + Generate a graph with a specific degree distribution + """ + degrees = [] + remaining_nodes = n + + for degree, prob in sorted(degree_distribution.items()): + if remaining_nodes <= 0: + break + count = min(int(n * prob + 0.5), remaining_nodes) + if count > 0: + degrees.extend([int(degree)] * count) + remaining_nodes -= count + + if remaining_nodes > 0: + min_degree = min(degree_distribution.keys()) + degrees.extend([min_degree] * remaining_nodes) + + if len(degrees) < 2: + degrees = [1, 1] + + if sum(degrees) % 2 != 0: + degrees[0] += 1 + + try: + g = nx.configuration_model(degrees, seed=seed) + g = nx.Graph(g) + + if g.number_of_edges() == 0: + raise nx.NetworkXError("Generated graph has no edges") + + return g + except Exception as e: + print(f"Error generating graph: {e}") + return nx.barabasi_albert_graph(n, 2) + +def generate_graph_partition(pdf_iterator): + """ + Generator function that yeilds edges for each partition. + pdf_iterator yields pandas DataFrames (one per partition) + """ + # Get broadcasted values + degree_dist = target_distribution_bc.value + seed_val = seed_bc.value + + for pdf in pdf_iterator: + all_edges = [] + + for partition_id in pdf['id'].values: + partition_id = int(partition_id) + + g = configuration_model_with_distribution( + num_nodes_per_graph, + degree_dist, + seed_val + partition_id + ) + + node_map = {node: random_node() for node in g.nodes()} + + for edge in g.edges(): + all_edges.append({ + 'col_a': int(node_map[edge[0]]), + 'col_b': int(node_map[edge[1]]), + 'col_c': int(random_feature()), + 'col_d': int(random_feature()), + 'col_e': random_col_e() + }) + + if all_edges: + yield pd.DataFrame(all_edges) + +target_distribution_bc = spark.sparkContext.broadcast(target_distribution) + +seed = 1000 +seed_bc = spark.sparkContext.broadcast(seed) + +edge_df = ( + spark.range(num_graphs) + .repartition(num_graphs) + .mapInPandas( + generate_graph_partition, + schema="col_a long, col_b long, col_c int, col_d int, col_e string" + ) + .distinct() +) + +edge_df.write.mode("overwrite").parquet(directory_path) + +print(f"Initial write complete. Size: {get_disk_usage(directory_path) / 1024**3:.2f} GB") + + +# ======================================================================================= +# ================================ REVAMPED DATA SCALER ================================= +# ======================================================================================= + +from pyspark.sql.functions import col, floor, rand, lit, when, hash as spark_hash +import math + +initial_dir_size = get_disk_usage(directory_path) + +print(f"Initial dataset size: {round(initial_dir_size / 1024**3, 2)} GB") + +copies_needed = math.ceil(target_size / initial_dir_size) + +print(f"Target size: {round(target_size / 1024**3, 2)} GB") +print(f"Copies needed (including original): {copies_needed}") + +long_cols = ["col_a", "col_b"] +integer_cols = ["col_c", "col_d"] +string_cols = ["col_e"] + +print(f"Reading data from: {directory_path}") +df_original = spark.read.parquet(directory_path) + +if 'source' in df_original.columns: + df_original = df_original.drop('source') + +# Create a "copy_id" Dataframe +df_copies = spark.range(copies_needed).toDF("copy_id") + +# Cross join to create all copies at once +df_expanded = df_original.crossJoin(df_copies) + +print(f"Creating {copies_needed} versions in parallel...") + +# Define noise range +NOISE_MIN = -1 +NOISE_MAX = 1 + +# Create a seed column based on copy_id and row hash +# This gives us different randomness for each day +df_with_seed = df_expanded.withColumn( + "rand_seed", + (spark_hash(col("col_a"), col("col_b"), col("copy_id")) % 1000000).cast("integer") +) + +# Apply noise based on copy_id (copy_id=0 is orignal, no noise) +# Use rand() with a base seed, then add variation based on the row's values +df_augmented = df_with_seed.select( + *[ + when(col("copy_id") == 0, col(c)).otherwise( + (col(c) + floor(rand(42) * (NOISE_MAX - NOISE_MIN + 1)) + NOISE_MIN) + ).cast("long").alias(c) + for c in long_cols + ], + *[ + when(col("copy_id") == 0, col(c)).otherwise( + (col(c) + floor(rand(1042) * (NOISE_MAX - NOISE_MIN + 1)) + NOISE_MIN) + ).cast("integer").alias(c) + for c in integer_cols + ], + *[col(c) for c in string_cols] +) + +# Calculate partitions based on target file size +target_file_size_mb = 250 +estimated_size = initial_dir_size * copies_needed +repartitions = max(1, int(estimated_size / (target_file_size_mb * 1024**2))) + +print(f"Writing combined dataset with {repartitions} partitions...") +print(f"Estimated final size: {round(estimated_size / 1024**3, 2)} GB") + +# Write everytihing in one shot +df_augmented.repartition(repartitions).write.mode("overwrite").parquet(directory_path) + +# Verify final size +final_size = get_disk_usage(directory_path) +print(f"\nFinal size: {round(final_size / 1024**3, 2)} GB") +print(f"Target was: {round(target_size / 1024**3, 2)} GB") +print(f"Achieved: {round(100 * final_size / target_size, 1)}% of target") diff --git a/Data-Gen/notes.md b/Data-Gen/notes.md new file mode 100644 index 0000000..5993da7 --- /dev/null +++ b/Data-Gen/notes.md @@ -0,0 +1,122 @@ +1. new_datagen_faster.py +2. create_join_tables.py + +new_datagen_faster.py runs first. It generates the base table_a data by: + +Creating synthetic graph partitions using a power-law degree distribution (via networkx configuration models) +Writing the initial edge data as parquet to PATH_PREFIX +Scaling it up to the target size (1 GB) by cross-joining copies with noise +Then create_join_tables.py runs second. It: + +Reads the table_a parquet that new_datagen_faster.py produced (line 37: table_a = spark.read.parquet(PATH_PREFIX)) +Generates the other tables (table_b, table_c, table_d, table_e) from schema metadata in an Excel file +Forces join-key alignment by overwriting a percentage of rows in each table with values from table_b + + +## 50 GB +target_sze = 50 * 1024**3 # 1 GB +num_nodes_per_graph is still 500_000 + +### Spark CPU + +workflow_join-50gb.py changes the config. But I doubt this is optimal. Running on `2xGrace` (144 cores) +.config("spark.driver.memory", "128g") +.config("spark.memory.fraction", "0.8") +.config("spark.sql.shuffle.partitions", "800") + +``` +Workflow join completed in 1235.04 seconds + Output: /scratch/prestouser/test-data/500000-50GB/workflow_join_spark_output + Result rows: 2662631628 +``` + +### cuDF-Polars +workflow_join_polars-50gb.py Running on full NVL4: 2xGrace 4xB200 + +workflow_join_polars-50gb.py + +``` +Workflow join completed in 43.47 seconds + Output: /scratch/prestouser/test-data/500000-50GB/workflow_join_polars_output + Result rows: 2662631628 + +RapidsMPF statistics: +Statistics: + - alloc-device: 353.05 GiB | 1.63 s | 216.76 GiB/s | avg-stream-delay 2.02 ms + - alloc-host: 1.98 KiB | 270.61 us | 7.14 MiB/s | avg-stream-delay 97.29 us + - copy-device-to-pinned_host: 322.07 GiB | 2.94 s | 109.71 GiB/s | avg-stream-delay 996.55 us + - copy-pinned_host-to-device: 322.07 GiB | 1.71 s | 188.09 GiB/s | avg-stream-delay 26.02 us +``` + + +## 1GB + +target_size = 1 * 1024**3 # 1 GB +num_nodes_per_graph = 500_000 +PATH_PREFIX = f"/scratch/prestouser/test-data/{num_nodes_per_graph}-1GB" + +Reading data from: /scratch/prestouser/test-data/500000-1GB +Creating 1 versions in parallel... +Writing combined dataset with 10 partitions... +Estimated final size: 2.68 GB + Final size: 2.76 GB +Target was: 1.0 GB +Achieved: 276.0% of target + + +1. Processing table_a... + Distinct combinations for table_a: 14000 + ✓ table_a_forced created (0.1% forced) + +2. Processing table_c... + Distinct combinations for table_c: 14000 + ✓ table_c_forced created (1.0% forced) + +3. Processing table_d... + Distinct combinations for table_d: 14000 + ✓ table_d_forced created (1.0% forced) + +4. Processing table_e... + Distinct values for table_e: 14000 + ✓ table_e_forced created (1.0% forced) + + + +## 10GB +target_size = 10 * 1024**3 # 1 GB +num_nodes_per_graph = 500_000 +PATH_PREFIX = f"/scratch/prestouser/test-data/{num_nodes_per_graph}-1GB" + +Reading data from: /scratch/prestouser/test-data/500000-1GB +Creating 4 versions in parallel... +Writing combined dataset with 43 partitions... +Estimated final size: 10.7 GB + +Final size: 11.04 GB +Target was: 10.0 GB +Achieved: 110.4% of target + +Created DataFrame for table 'table_b' with 14000 random rows. +Created DataFrame for table 'table_c' with 21000000 random rows. +Created DataFrame for table 'table_d' with 12000000 random rows. +Created DataFrame for table 'table_e' with 33000000 random rows. +============================================================ +FORCING TABLES TO MATCH TABLE_B VALUES +Match percentages: A=0.1%, C=1.0%, D=1.0%, E=1.0% +============================================================ + +1. Processing table_a... + Distinct combinations for table_a: 14000 + ✓ table_a_forced created (0.1% forced) + +2. Processing table_c... + Distinct combinations for table_c: 14000 + ✓ table_c_forced created (1.0% forced) + +3. Processing table_d... + Distinct combinations for table_d: 14000 + ✓ table_d_forced created (1.0% forced) + +4. Processing table_e... + Distinct values for table_e: 14000 + ✓ table_e_forced created (1.0% forced) diff --git a/Data-Gen/workflow_join-50gb.py b/Data-Gen/workflow_join-50gb.py new file mode 100644 index 0000000..cccf238 --- /dev/null +++ b/Data-Gen/workflow_join-50gb.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +""" +Standalone workflow join test. + +Reads all tables from parquet (produced by new_datagen_faster.py + create_join_tables.py) +and performs the full chained 4-table left join workflow. +""" +import time +from pathlib import Path + +from pyspark.sql import SparkSession + +spark = ( + SparkSession.builder + .master("local[*]") + .appName("WorkflowJoin") + .config("spark.driver.host", "127.0.0.1") + .config("spark.driver.bindAddress", "127.0.0.1") + .config("spark.driver.memory", "128g") + .config("spark.executor.heartbeatInterval", "60s") + .config("spark.network.timeout", "300s") + .config("spark.memory.fraction", "0.8") + .config("spark.sql.shuffle.partitions", "800") + .getOrCreate() +) + +PATH_PREFIX = Path(__file__).resolve().parent / "test-data" / "500000-1GB" +PATH_PREFIX = "/scratch/prestouser/test-data/500000-50GB" + +# ─── Read all tables from parquet ──────────────────────────────────────────── +print("Reading tables from parquet...") +table_a = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_a") +table_b = spark.read.parquet(f"{PATH_PREFIX}/temp_table_b") +table_c = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_c") +table_d = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_d") +table_e = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_e") + +print(f" table_a: {table_a.count()} rows") +print(f" table_b: {table_b.count()} rows") +print(f" table_c: {table_c.count()} rows") +print(f" table_d: {table_d.count()} rows") +print(f" table_e: {table_e.count()} rows") + +# ─── Workflow Join ─────────────────────────────────────────────────────────── +print("\nRunning workflow join: table_a → table_b → table_c → table_d → table_e ...") +start = time.time() + +workflow_result = ( + table_a + .join( + table_b, + [ + table_a["col_a"] == table_b["col_b_8"], + table_a["col_b"] == table_b["col_b_3"], + table_a["col_c"] == table_b["col_b_9"], + table_a["col_d"] == table_b["col_b_1"], + ], + how="left", + ) + .join( + table_c, + [ + table_a["col_a"] == table_c["col_c_10"], + table_a["col_b"] == table_c["col_c_9"], + table_a["col_e"] == table_c["col_c_11"].cast("string"), + ], + how="left", + ) + .join( + table_d, + [ + table_a["col_a"] == table_d["col_d_0"], + table_a["col_c"] == table_d["col_d_1"], + ], + how="left", + ) + .join( + table_e, + table_a["col_a"] == table_e["col_e_0"], + how="left", + ) +) + +OUTPUT = f"{PATH_PREFIX}/workflow_join_spark_output" +workflow_result.write.mode("overwrite").parquet(OUTPUT) + +elapsed = time.time() - start +print(f"\nWorkflow join completed in {elapsed:.2f} seconds") +print(f" Output: {OUTPUT}") +print(f" Result rows: {spark.read.parquet(OUTPUT).count()}") + +spark.stop() diff --git a/Data-Gen/workflow_join.py b/Data-Gen/workflow_join.py new file mode 100644 index 0000000..903b8c4 --- /dev/null +++ b/Data-Gen/workflow_join.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 +""" +Standalone workflow join test. + +Reads all tables from parquet (produced by new_datagen_faster.py + create_join_tables.py) +and performs the full chained 4-table left join workflow. +""" +import time +from pathlib import Path + +from pyspark.sql import SparkSession + +spark = ( + SparkSession.builder + .master("local[*]") + .appName("WorkflowJoin") + .config("spark.driver.host", "127.0.0.1") + .config("spark.driver.bindAddress", "127.0.0.1") + .config("spark.driver.memory", "32g") + .config("spark.executor.heartbeatInterval", "60s") + .config("spark.network.timeout", "300s") + .getOrCreate() +) + +PATH_PREFIX = Path(__file__).resolve().parent / "test-data" / "500000-1GB" + +# ─── Read all tables from parquet ──────────────────────────────────────────── +print("Reading tables from parquet...") +table_a = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_a") +table_b = spark.read.parquet(f"{PATH_PREFIX}/temp_table_b") +table_c = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_c") +table_d = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_d") +table_e = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_e") + +print(f" table_a: {table_a.count()} rows") +print(f" table_b: {table_b.count()} rows") +print(f" table_c: {table_c.count()} rows") +print(f" table_d: {table_d.count()} rows") +print(f" table_e: {table_e.count()} rows") + +# ─── Workflow Join ─────────────────────────────────────────────────────────── +print("\nRunning workflow join: table_a → table_b → table_c → table_d → table_e ...") +start = time.time() + +workflow_result = ( + table_a + .join( + table_b, + [ + table_a["col_a"] == table_b["col_b_8"], + table_a["col_b"] == table_b["col_b_3"], + table_a["col_c"] == table_b["col_b_9"], + table_a["col_d"] == table_b["col_b_1"], + ], + how="left", + ) + .join( + table_c, + [ + table_a["col_a"] == table_c["col_c_10"], + table_a["col_b"] == table_c["col_c_9"], + table_a["col_e"] == table_c["col_c_11"].cast("string"), + ], + how="left", + ) + .join( + table_d, + [ + table_a["col_a"] == table_d["col_d_0"], + table_a["col_c"] == table_d["col_d_1"], + ], + how="left", + ) + .join( + table_e, + table_a["col_a"] == table_e["col_e_0"], + how="left", + ) +) + +workflow_result.write.format("noop").mode("overwrite").save() + +elapsed = time.time() - start +print(f"\nWorkflow join completed in {elapsed:.2f} seconds") + +spark.stop() diff --git a/Data-Gen/workflow_join_polars-1tb-ray.py b/Data-Gen/workflow_join_polars-1tb-ray.py new file mode 100755 index 0000000..d3e7800 --- /dev/null +++ b/Data-Gen/workflow_join_polars-1tb-ray.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +import argparse +import os +import shutil +import time +from pathlib import Path + +import polars as pl +from cudf_polars.engine.options import StreamingOptions +from cudf_polars.engine.ray import RayEngine +from cudf_polars.utils.config import MemoryResourceConfig + +PATH_PREFIX = "/scratch/prestouser/test-data/500000-1TB" +OUTPUT = Path(PATH_PREFIX) / "workflow_join_polars_ray_output" + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Run workflow join with cudf-polars RayEngine." + ) + parser.add_argument("--path-prefix", default=os.environ.get("DATASET_PATH", PATH_PREFIX)) + parser.add_argument("--output", default=os.environ.get("OUTPUT_PATH", str(OUTPUT))) + parser.add_argument("--ray-address", default=os.environ.get("RAY_ADDRESS")) + parser.add_argument("--num-py-executors", type=int, default=8) + parser.add_argument("--target-partition-size", type=int, default=3_221_225_472) + parser.add_argument("--spill-device-limit", default="70%") + parser.add_argument("--pinned-initial-pool-size", type=int, default=51_539_607_552) + parser.add_argument("--rmm-release-threshold", type=int, default=160_000_000_000) + return parser.parse_args() + + +def prepare_output_path(path: Path) -> None: + if path.is_dir(): + shutil.rmtree(path) + elif path.exists(): + path.unlink() + + +def scan_tables(path_prefix: str) -> dict[str, pl.LazyFrame]: + return { + "table_a": pl.scan_parquet(f"{path_prefix}/temp_forced_table_a/*.parquet"), + "table_b": pl.scan_parquet(f"{path_prefix}/temp_table_b/*.parquet"), + "table_c": pl.scan_parquet(f"{path_prefix}/temp_forced_table_c/*.parquet"), + "table_d": pl.scan_parquet(f"{path_prefix}/temp_forced_table_d/*.parquet"), + "table_e": pl.scan_parquet(f"{path_prefix}/temp_forced_table_e/*.parquet"), + } + + +def print_row_counts(tables: dict[str, pl.LazyFrame]) -> None: + for name, table in tables.items(): + row_count = table.select(pl.len()).collect().item() + print(f" {name}: {row_count} rows") + + +def build_workflow_join_query(tables: dict[str, pl.LazyFrame]) -> pl.LazyFrame: + table_a = tables["table_a"] + table_b = tables["table_b"] + table_c = tables["table_c"].with_columns(pl.col("col_c_11").cast(pl.Utf8)) + table_d = tables["table_d"] + table_e = tables["table_e"] + + return ( + table_a + .join( + table_b, + left_on=["col_a", "col_b", "col_c", "col_d"], + right_on=["col_b_8", "col_b_3", "col_b_9", "col_b_1"], + how="left", + ) + .join( + table_c, + left_on=["col_a", "col_b", "col_e"], + right_on=["col_c_10", "col_c_9", "col_c_11"], + how="left", + ) + .join( + table_d, + left_on=["col_a", "col_c"], + right_on=["col_d_0", "col_d_1"], + how="left", + ) + .join( + table_e, + left_on=["col_a"], + right_on=["col_e_0"], + how="left", + ) + ) + + +def main() -> None: + args = parse_args() + path_prefix = str(args.path_prefix) + output = Path(args.output) + + print("Reading tables from parquet...") + print(f" path_prefix: {path_prefix}") + tables = scan_tables(path_prefix) + print_row_counts(tables) + + print("\nRunning workflow join: table_a -> table_b -> table_c -> table_d -> table_e ...") + workflow_query = build_workflow_join_query(tables) + + memory_resource_config = MemoryResourceConfig( + qualname="rmm.mr.CudaAsyncMemoryResource", + options={"release_threshold": args.rmm_release_threshold}, + ) + + streaming_options = StreamingOptions( + spill_device_limit=args.spill_device_limit, + pinned_memory=True, + pinned_initial_pool_size=args.pinned_initial_pool_size, + statistics=True, + num_py_executors=args.num_py_executors, + fallback_mode="silent", + target_partition_size=args.target_partition_size, + memory_resource_config=memory_resource_config, + ) + + ray_init_options = {} + if args.ray_address: + ray_init_options["address"] = args.ray_address + print(f"Connecting to Ray cluster at {args.ray_address}") + + with RayEngine.from_options( + streaming_options, ray_init_options=ray_init_options + ) as engine: + print(f"RayEngine ranks: {engine.nranks}") + prepare_output_path(output) + start = time.time() + workflow_query.sink_parquet(output, engine=engine, mkdir=True) + elapsed = time.time() - start + statistics = engine.global_statistics() + + print(f"\nWorkflow join completed in {elapsed:.2f} seconds") + print(f" Output: {output}") + row_count = pl.scan_parquet(f"{output}/*.parquet").select(pl.len()).collect().item() + print(f" Result rows: {row_count}") + print("\nRapidsMPF statistics:") + print(statistics.report()) + + +if __name__ == "__main__": + main() diff --git a/Data-Gen/workflow_join_polars-1tb-ray.sh b/Data-Gen/workflow_join_polars-1tb-ray.sh new file mode 100755 index 0000000..e61275a --- /dev/null +++ b/Data-Gen/workflow_join_polars-1tb-ray.sh @@ -0,0 +1,130 @@ +#!/bin/bash +# Multi-node Ray launcher for workflow_join_polars-1tb-ray.py. +set -x +set -e + +export UCX_TLS=^ib,ud:aux +export UCX_NET_DEVICES=bond0 +export UCX_MAX_RNDV_RAILS=1 +export UCX_RNDV_PIPELINE_ERROR_HANDLING=y +export UCX_TCP_CM_REUSEADDR=y +export UCX_RNDV_MTYPE_WORKER_MAX_MEM=1G +export UCX_RNDV_MTYPE_WORKER_FC_ENABLE=y +export UCX_RNDV_FRAG_MEM_TYPES=cuda + +export PYTHONUNBUFFERED=1 +export POLARS_MAX_THREADS=1 +export RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0 +export CUDF_POLARS_LOG_TRACES=1 +export KVIKIO_TASK_SIZE=$((16 * 1024 * 1024)) +export CUFILE_LOGGING_LEVEL=INFO +export KVIKIO_COMPAT_MODE=OFF + +source /opt/conda/etc/profile.d/conda.sh +conda activate rapidsmpf + +PROJECT_DIR=${REAL_HOME:-$HOME}/bzaitlen/test-data/Data-Gen +SCHEDULER_DIR=${PROJECT_DIR}/scheduler +RESULTS_DIR=${PROJECT_DIR}/results +RAY_ADDR_FILE=${SCHEDULER_DIR}/ray-address-job${SLURM_JOB_ID}.txt +DONE_FLAG=${SCHEDULER_DIR}/ray-done-job${SLURM_JOB_ID}.flag +mkdir -p "$SCHEDULER_DIR" "$RESULTS_DIR" + +CURRENT_NODE=$(hostname -s) +echo "Node: $CURRENT_NODE Head: $HEAD_NODE NodeID: $SLURM_NODEID Procid: $SLURM_PROCID" + +NODE_GPUS=${SLURM_GPUS_ON_NODE:-4} +EXPECTED_GPUS=$((NODE_GPUS * SLURM_NNODES)) +echo "[$CURRENT_NODE] NODE_GPUS=$NODE_GPUS EXPECTED_GPUS=$EXPECTED_GPUS" +echo "[$CURRENT_NODE] CUDA_VISIBLE_DEVICES=${CUDA_VISIBLE_DEVICES:-unset}" + +HEAD_PORT=${RAY_HEAD_PORT:-6379} +RAY_TMP_DIR=/tmp/ray-${SLURM_JOB_ID} + +ray stop --force 2>/dev/null || true +killall -q raylet gcs_server 2>/dev/null || true +pkill -f "ray::" 2>/dev/null || true +rm -rf "$RAY_TMP_DIR" 2>/dev/null || true +sleep 2 + +if [[ "$CURRENT_NODE" == "$HEAD_NODE" ]]; then + rm -f "$RAY_ADDR_FILE" "$DONE_FLAG" + HEAD_IP=$(hostname -I | awk '{print $1}') + echo "[$CURRENT_NODE] Starting Ray head at ${HEAD_IP}:${HEAD_PORT}" + + ray start --head \ + --node-ip-address="$HEAD_IP" \ + --port="$HEAD_PORT" \ + --num-gpus="$NODE_GPUS" \ + --temp-dir="$RAY_TMP_DIR" \ + --disable-usage-stats + + echo "${HEAD_IP}:${HEAD_PORT}" > "$RAY_ADDR_FILE" + + echo "[$CURRENT_NODE] Waiting for $EXPECTED_GPUS GPUs to join cluster..." + python - <= ${EXPECTED_GPUS}: + break + time.sleep(2) +else: + print("Timed out waiting for GPUs", file=sys.stderr) + sys.exit(1) +ray.shutdown() +PY + + DATETIME=$(date +%Y-%m-%d_%H-%M-%S) + output_log="$RESULTS_DIR/workflow-join-polars-1tb-ray-${EXPECTED_GPUS}gpus-job-${SLURM_JOB_ID}-$DATETIME.txt" + dataset_path=${DATASET_PATH:-/scratch/prestouser/test-data/500000-1TB} + output_path=${OUTPUT_PATH:-${dataset_path}/workflow_join_polars_ray_output} + + cp "$PROJECT_DIR/workflow_join_polars-1tb-ray.py" "$RESULTS_DIR/workflow_join_polars-1tb-ray-driver-job${SLURM_JOB_ID}-$DATETIME.py" + cp "$PROJECT_DIR/workflow_join_polars-1tb-ray.sh" "$RESULTS_DIR/workflow_join_polars-1tb-ray-job${SLURM_JOB_ID}-$DATETIME.sh" + cp "$PROJECT_DIR/workflow_join_polars-1tb-ray.slurm" "$RESULTS_DIR/workflow_join_polars-1tb-ray-job${SLURM_JOB_ID}-$DATETIME.slurm" + + set +e + python "$PROJECT_DIR/workflow_join_polars-1tb-ray.py" \ + --path-prefix "$dataset_path" \ + --output "$output_path" \ + --ray-address "${HEAD_IP}:${HEAD_PORT}" \ + --num-py-executors "${NUM_PY_EXECUTORS:-8}" \ + --target-partition-size "${TARGET_PARTITION_SIZE:-3221225472}" \ + --spill-device-limit "${SPILL_DEVICE_LIMIT:-70%}" \ + 2>&1 | tee -a "$output_log" + DRIVER_RC=${PIPESTATUS[0]} + set -e + + touch "$DONE_FLAG" + ray stop --force || true + rm -f "$RAY_ADDR_FILE" + exit "$DRIVER_RC" +else + echo -n "[$CURRENT_NODE] Worker waiting for Ray address" + set +x + while [ ! -f "$RAY_ADDR_FILE" ]; do + echo -n "." + sleep 2 + done + echo " found" + set -x + + HEAD_ADDR=$(cat "$RAY_ADDR_FILE") + echo "[$CURRENT_NODE] Joining Ray at ${HEAD_ADDR} with $NODE_GPUS GPUs" + ray start \ + --address="$HEAD_ADDR" \ + --num-gpus="$NODE_GPUS" \ + --temp-dir="$RAY_TMP_DIR" \ + --disable-usage-stats + + while [ ! -f "$DONE_FLAG" ]; do + sleep 5 + done + ray stop --force || true +fi + +echo "[$CURRENT_NODE] Exiting." diff --git a/Data-Gen/workflow_join_polars-1tb-ray.slurm b/Data-Gen/workflow_join_polars-1tb-ray.slurm new file mode 100755 index 0000000..ef8363b --- /dev/null +++ b/Data-Gen/workflow_join_polars-1tb-ray.slurm @@ -0,0 +1,53 @@ +#!/bin/bash +#SBATCH --job-name=bz-workflow-join-polars-1tb-ray +#SBATCH --output=logs/%x_%j.out +#SBATCH --error=logs/%x_%j.err +#SBATCH --time=00:40:00 +#SBATCH --nodes=6 +#SBATCH --ntasks-per-node=1 +#SBATCH --cpus-per-task=144 +#SBATCH --mem=0 +#SBATCH --gres=gpu:4 +#SBATCH --exclusive +#SBATCH --nodelist=presto-gb200-gcn-[01-10] + +export DATASET_PATH=${DATASET_PATH:-/scratch/prestouser/test-data/500000-1TB} +export OUTPUT_PATH=${OUTPUT_PATH:-${DATASET_PATH}/workflow_join_polars_ray_output} + +export LIBCUDF_HW_DECOMPRESSION=${LIBCUDF_HW_DECOMPRESSION:-OFF} +export NUM_PY_EXECUTORS=${NUM_PY_EXECUTORS:-8} +export TARGET_PARTITION_SIZE=${TARGET_PARTITION_SIZE:-3221225472} +export SPILL_DEVICE_LIMIT=${SPILL_DEVICE_LIMIT:-70%} + +export NVIDIA_VISIBLE_DEVICES=all +export NVIDIA_DRIVER_CAPABILITIES=compute,utility +export REAL_HOME=$HOME + +LOCAL_CONTAINER_PATH=${LOCAL_CONTAINER_PATH:-/scratch/prestouser/images/mg-polars-tpch-nvl72-2026-05-23-arm64-cuda-13-1-ucxthrottling-nvcr.sqsh} +CONTAINER_MOUNTS="${HOME}:${HOME},/scratch:/scratch" + +export HEAD_NODE=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n 1) + +mkdir -p logs ${HOME}/bzaitlen/test-data/Data-Gen/scheduler + +echo "========================================" +echo "Job ID: $SLURM_JOB_ID" +echo "Nodes: $SLURM_JOB_NUM_NODES" +echo "Node list: $SLURM_JOB_NODELIST" +echo "Head node: $HEAD_NODE" +echo "Dataset: $DATASET_PATH" +echo "Output: $OUTPUT_PATH" +echo "HW decompression: $LIBCUDF_HW_DECOMPRESSION" +echo "Frontend: ray" +echo "Allocated GPUs: ${SLURM_GPUS_ON_NODE:-4} per node" +echo "Target partition: $TARGET_PARTITION_SIZE" +echo "========================================" + +srun \ + --export=ALL \ + --container-image=${LOCAL_CONTAINER_PATH} \ + --container-mounts=${CONTAINER_MOUNTS} \ + --container-remap-root \ + --container-writable \ + --pty \ + bash ${HOME}/bzaitlen/test-data/Data-Gen/workflow_join_polars-1tb-ray.sh diff --git a/Data-Gen/workflow_join_polars-50gb.py b/Data-Gen/workflow_join_polars-50gb.py new file mode 100644 index 0000000..046b5bc --- /dev/null +++ b/Data-Gen/workflow_join_polars-50gb.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +""" +Standalone workflow join test using Polars. + +Reads all tables from parquet (produced by new_datagen_faster.py + create_join_tables.py) +and performs the full chained 4-table left join workflow. +""" +import time +import shutil +from pathlib import Path + +import polars as pl +from cudf_polars.engine.options import StreamingOptions +from cudf_polars.engine.ray import RayEngine +from cudf_polars.utils.config import MemoryResourceConfig + +PATH_PREFIX = Path(__file__).resolve().parent / "test-data" / "500000-1GB" +PATH_PREFIX = "/scratch/prestouser/test-data/500000-50GB" +OUTPUT = Path(PATH_PREFIX) / "workflow_join_polars_output" + + +def prepare_output_path(path: Path) -> None: + if path.is_dir(): + shutil.rmtree(path) + elif path.exists(): + path.unlink() + +def scan_tables(path_prefix: str) -> dict[str, pl.LazyFrame]: + return { + "table_a": pl.scan_parquet(f"{path_prefix}/temp_forced_table_a/*.parquet"), + "table_b": pl.scan_parquet(f"{path_prefix}/temp_table_b/*.parquet"), + "table_c": pl.scan_parquet(f"{path_prefix}/temp_forced_table_c/*.parquet"), + "table_d": pl.scan_parquet(f"{path_prefix}/temp_forced_table_d/*.parquet"), + "table_e": pl.scan_parquet(f"{path_prefix}/temp_forced_table_e/*.parquet"), + } + + +def print_row_counts(tables: dict[str, pl.LazyFrame]) -> None: + for name, table in tables.items(): + row_count = table.select(pl.len()).collect().item() + print(f" {name}: {row_count} rows") + + +def build_workflow_join_query(tables: dict[str, pl.LazyFrame]) -> pl.LazyFrame: + table_a = tables["table_a"] + table_b = tables["table_b"] + table_c = tables["table_c"].with_columns(pl.col("col_c_11").cast(pl.Utf8)) + table_d = tables["table_d"] + table_e = tables["table_e"] + + return ( + table_a + .join( + table_b, + left_on=["col_a", "col_b", "col_c", "col_d"], + right_on=["col_b_8", "col_b_3", "col_b_9", "col_b_1"], + how="left", + ) + .join( + table_c, + left_on=["col_a", "col_b", "col_e"], + right_on=["col_c_10", "col_c_9", "col_c_11"], + how="left", + ) + .join( + table_d, + left_on=["col_a", "col_c"], + right_on=["col_d_0", "col_d_1"], + how="left", + ) + .join( + table_e, + left_on=["col_a"], + right_on=["col_e_0"], + how="left", + ) + ) + + +print("Reading tables from parquet...") +tables = scan_tables(PATH_PREFIX) +print_row_counts(tables) + +print("\nRunning workflow join: table_a → table_b → table_c → table_d → table_e ...") +workflow_query = build_workflow_join_query(tables) + +RMM_MEMORY_RESOURCE_CONFIG = MemoryResourceConfig( + qualname="rmm.mr.CudaAsyncMemoryResource", + options={"release_threshold": 160_000_000_000}, +) + +STREAMING_OPTIONS = StreamingOptions( + # RapidsMPF options + spill_device_limit="70%", + pinned_memory=True, + pinned_initial_pool_size=51_539_607_552, + statistics=True, + # Executor options + num_py_executors=8, + fallback_mode="silent", + target_partition_size=3_221_225_472, + # Engine options + memory_resource_config=RMM_MEMORY_RESOURCE_CONFIG, +) + +engine = RayEngine.from_options(STREAMING_OPTIONS) +try: + prepare_output_path(OUTPUT) + start = time.time() + workflow_query.sink_parquet(OUTPUT, engine=engine, mkdir=True) + elapsed = time.time() - start + statistics = engine.global_statistics() +finally: + engine.shutdown() + +print(f"\nWorkflow join completed in {elapsed:.2f} seconds") +print(f" Output: {OUTPUT}") +print(f" Result rows: {pl.scan_parquet(f'{OUTPUT}/*.parquet').select(pl.len()).collect().item()}") +print("\nRapidsMPF statistics:") +print(statistics.report()) diff --git a/Data-Gen/workflow_join_polars.py b/Data-Gen/workflow_join_polars.py new file mode 100644 index 0000000..c708daa --- /dev/null +++ b/Data-Gen/workflow_join_polars.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +""" +Standalone workflow join test using Polars. + +Reads all tables from parquet (produced by new_datagen_faster.py + create_join_tables.py) +and performs the full chained 4-table left join workflow. +""" +import time +from pathlib import Path + +import polars as pl + +PATH_PREFIX = Path(__file__).resolve().parent / "test-data" / "500000-1GB" + + +def scan_tables(path_prefix: str) -> dict[str, pl.LazyFrame]: + return { + "table_a": pl.scan_parquet(f"{path_prefix}/temp_forced_table_a/*.parquet"), + "table_b": pl.scan_parquet(f"{path_prefix}/temp_table_b/*.parquet"), + "table_c": pl.scan_parquet(f"{path_prefix}/temp_forced_table_c/*.parquet"), + "table_d": pl.scan_parquet(f"{path_prefix}/temp_forced_table_d/*.parquet"), + "table_e": pl.scan_parquet(f"{path_prefix}/temp_forced_table_e/*.parquet"), + } + + +def print_row_counts(tables: dict[str, pl.LazyFrame]) -> None: + for name, table in tables.items(): + row_count = table.select(pl.len()).collect().item() + print(f" {name}: {row_count} rows") + + +def build_workflow_join_query(tables: dict[str, pl.LazyFrame]) -> pl.LazyFrame: + table_a = tables["table_a"] + table_b = tables["table_b"] + table_c = tables["table_c"].with_columns(pl.col("col_c_11").cast(pl.Utf8)) + table_d = tables["table_d"] + table_e = tables["table_e"] + + return ( + table_a + .join( + table_b, + left_on=["col_a", "col_b", "col_c", "col_d"], + right_on=["col_b_8", "col_b_3", "col_b_9", "col_b_1"], + how="left", + ) + .join( + table_c, + left_on=["col_a", "col_b", "col_e"], + right_on=["col_c_10", "col_c_9", "col_c_11"], + how="left", + ) + .join( + table_d, + left_on=["col_a", "col_c"], + right_on=["col_d_0", "col_d_1"], + how="left", + ) + .join( + table_e, + left_on=["col_a"], + right_on=["col_e_0"], + how="left", + ) + ) + + +print("Reading tables from parquet...") +tables = scan_tables(PATH_PREFIX) +print_row_counts(tables) + +print("\nRunning workflow join: table_a → table_b → table_c → table_d → table_e ...") +workflow_query = build_workflow_join_query(tables) + +start = time.time() +workflow_result = workflow_query.collect() +elapsed = time.time() - start + +print(f"\nWorkflow join completed in {elapsed:.2f} seconds") +print(f" Result: {len(workflow_result)} rows, {len(workflow_result.columns)} columns")