Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
476 changes: 476 additions & 0 deletions Data-Gen/create_join_tables.py

Large diffs are not rendered by default.

265 changes: 265 additions & 0 deletions Data-Gen/new_datagen_faster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
import random
import subprocess

import networkx as nx
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession, Row

target_size = 1 * 1024**4 # 1 TB
num_nodes_per_graph = 500_000
PATH_PREFIX = f"/scratch/prestouser/test-data/{num_nodes_per_graph}-1TB"
directory_path = PATH_PREFIX

spark = (
SparkSession.builder
.master("local[*]")
.appName("LargeGraph")
.config("spark.driver.host", "127.0.0.1")
.config("spark.driver.bindAddress", "127.0.0.1")
.config("spark.driver.memory", "128g")
.config("spark.executor.heartbeatInterval", "60s")
.config("spark.network.timeout", "300s")
.config("spark.memory.fraction", "0.8")
.config("spark.sql.shuffle.partitions", "800")
.config("spark.local.dir", "/scratch/prestouser/spark-tmp")
.getOrCreate()
)
def create_synthetic_distribution(params, plot=True):

slope = params.get('slope', -2)
min_degree = params.get('min_degree', 1)
max_degree = params.get('max_degree', 200_000)
max_prob = params.get('max_prob', 0.5)

degrees = np.arange(min_degree, max_degree + 1, dtype=float)

A = max_prob / (min_degree ** slope)

y_values = A * degrees ** slope

degrees_int = degrees.astype(int)

decay_dict = dict(zip(degrees_int, y_values))

return decay_dict

params = {
'slope': -2,
'intercpet': 0.8,
'r_squared': 0.98,
'max_degree': 200_000,
'min_degree': 1,
'max_prob': 0.5,
'degree_range': list(np.arange(1, 200_000))
}

target_distribution = create_synthetic_distribution(params, 200_000)

def get_disk_usage(path):
import platform
try:
if platform.system() == "Darwin":
result = subprocess.run(
['du', '-sk', path],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
size_in_bytes = int(result.stdout.split()[0]) * 1024
else:
result = subprocess.run(
['du', '-sb', path],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
size_in_bytes = int(result.stdout.split()[0])
return size_in_bytes
except subprocess.CalledProcessError as e:
raise RuntimeError(f"Failed to get disk usage: {e.stderr.strip()}")

def random_node():
return int(np.random.randint(1_000_000, 10_000_000_000))

def random_feature():
return int(np.random.randint(1, 70000)) # cast to native int

def random_col_e():
return str(np.random.choice(['col_e_A', 'col_e_B'])) # cast to native str

num_graphs = spark.sparkContext.defaultParallelism # number of cores available

def configuration_model_with_distribution(n, degree_distribution,seed):
"""
Generate a graph with a specific degree distribution
"""
degrees = []
remaining_nodes = n

for degree, prob in sorted(degree_distribution.items()):
if remaining_nodes <= 0:
break
count = min(int(n * prob + 0.5), remaining_nodes)
if count > 0:
degrees.extend([int(degree)] * count)
remaining_nodes -= count

if remaining_nodes > 0:
min_degree = min(degree_distribution.keys())
degrees.extend([min_degree] * remaining_nodes)

if len(degrees) < 2:
degrees = [1, 1]

if sum(degrees) % 2 != 0:
degrees[0] += 1

try:
g = nx.configuration_model(degrees, seed=seed)
g = nx.Graph(g)

if g.number_of_edges() == 0:
raise nx.NetworkXError("Generated graph has no edges")

return g
except Exception as e:
print(f"Error generating graph: {e}")
return nx.barabasi_albert_graph(n, 2)

def generate_graph_partition(pdf_iterator):
"""
Generator function that yeilds edges for each partition.
pdf_iterator yields pandas DataFrames (one per partition)
"""
# Get broadcasted values
degree_dist = target_distribution_bc.value
seed_val = seed_bc.value

for pdf in pdf_iterator:
all_edges = []

for partition_id in pdf['id'].values:
partition_id = int(partition_id)

g = configuration_model_with_distribution(
num_nodes_per_graph,
degree_dist,
seed_val + partition_id
)

node_map = {node: random_node() for node in g.nodes()}

for edge in g.edges():
all_edges.append({
'col_a': int(node_map[edge[0]]),
'col_b': int(node_map[edge[1]]),
'col_c': int(random_feature()),
'col_d': int(random_feature()),
'col_e': random_col_e()
})

if all_edges:
yield pd.DataFrame(all_edges)

target_distribution_bc = spark.sparkContext.broadcast(target_distribution)

seed = 1000
seed_bc = spark.sparkContext.broadcast(seed)

edge_df = (
spark.range(num_graphs)
.repartition(num_graphs)
.mapInPandas(
generate_graph_partition,
schema="col_a long, col_b long, col_c int, col_d int, col_e string"
)
.distinct()
)

edge_df.write.mode("overwrite").parquet(directory_path)

print(f"Initial write complete. Size: {get_disk_usage(directory_path) / 1024**3:.2f} GB")


# =======================================================================================
# ================================ REVAMPED DATA SCALER =================================
# =======================================================================================

from pyspark.sql.functions import col, floor, rand, lit, when, hash as spark_hash
import math

initial_dir_size = get_disk_usage(directory_path)

print(f"Initial dataset size: {round(initial_dir_size / 1024**3, 2)} GB")

copies_needed = math.ceil(target_size / initial_dir_size)

print(f"Target size: {round(target_size / 1024**3, 2)} GB")
print(f"Copies needed (including original): {copies_needed}")

long_cols = ["col_a", "col_b"]
integer_cols = ["col_c", "col_d"]
string_cols = ["col_e"]

print(f"Reading data from: {directory_path}")
df_original = spark.read.parquet(directory_path)

if 'source' in df_original.columns:
df_original = df_original.drop('source')

# Create a "copy_id" Dataframe
df_copies = spark.range(copies_needed).toDF("copy_id")

# Cross join to create all copies at once
df_expanded = df_original.crossJoin(df_copies)

print(f"Creating {copies_needed} versions in parallel...")

# Define noise range
NOISE_MIN = -1
NOISE_MAX = 1

# Create a seed column based on copy_id and row hash
# This gives us different randomness for each day
df_with_seed = df_expanded.withColumn(
"rand_seed",
(spark_hash(col("col_a"), col("col_b"), col("copy_id")) % 1000000).cast("integer")
)

# Apply noise based on copy_id (copy_id=0 is orignal, no noise)
# Use rand() with a base seed, then add variation based on the row's values
df_augmented = df_with_seed.select(
*[
when(col("copy_id") == 0, col(c)).otherwise(
(col(c) + floor(rand(42) * (NOISE_MAX - NOISE_MIN + 1)) + NOISE_MIN)
).cast("long").alias(c)
for c in long_cols
],
*[
when(col("copy_id") == 0, col(c)).otherwise(
(col(c) + floor(rand(1042) * (NOISE_MAX - NOISE_MIN + 1)) + NOISE_MIN)
).cast("integer").alias(c)
for c in integer_cols
],
*[col(c) for c in string_cols]
)

# Calculate partitions based on target file size
target_file_size_mb = 250
estimated_size = initial_dir_size * copies_needed
repartitions = max(1, int(estimated_size / (target_file_size_mb * 1024**2)))

print(f"Writing combined dataset with {repartitions} partitions...")
print(f"Estimated final size: {round(estimated_size / 1024**3, 2)} GB")

# Write everytihing in one shot
df_augmented.repartition(repartitions).write.mode("overwrite").parquet(directory_path)

# Verify final size
final_size = get_disk_usage(directory_path)
print(f"\nFinal size: {round(final_size / 1024**3, 2)} GB")
print(f"Target was: {round(target_size / 1024**3, 2)} GB")
print(f"Achieved: {round(100 * final_size / target_size, 1)}% of target")
122 changes: 122 additions & 0 deletions Data-Gen/notes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
1. new_datagen_faster.py
2. create_join_tables.py

new_datagen_faster.py runs first. It generates the base table_a data by:

Creating synthetic graph partitions using a power-law degree distribution (via networkx configuration models)
Writing the initial edge data as parquet to PATH_PREFIX
Scaling it up to the target size (1 GB) by cross-joining copies with noise
Then create_join_tables.py runs second. It:

Reads the table_a parquet that new_datagen_faster.py produced (line 37: table_a = spark.read.parquet(PATH_PREFIX))
Generates the other tables (table_b, table_c, table_d, table_e) from schema metadata in an Excel file
Forces join-key alignment by overwriting a percentage of rows in each table with values from table_b


## 50 GB
target_sze = 50 * 1024**3 # 1 GB
num_nodes_per_graph is still 500_000

### Spark CPU

workflow_join-50gb.py changes the config. But I doubt this is optimal. Running on `2xGrace` (144 cores)
.config("spark.driver.memory", "128g")
.config("spark.memory.fraction", "0.8")
.config("spark.sql.shuffle.partitions", "800")

```
Workflow join completed in 1235.04 seconds
Output: /scratch/prestouser/test-data/500000-50GB/workflow_join_spark_output
Result rows: 2662631628
```

### cuDF-Polars
workflow_join_polars-50gb.py Running on full NVL4: 2xGrace 4xB200

workflow_join_polars-50gb.py

```
Workflow join completed in 43.47 seconds
Output: /scratch/prestouser/test-data/500000-50GB/workflow_join_polars_output
Result rows: 2662631628

RapidsMPF statistics:
Statistics:
- alloc-device: 353.05 GiB | 1.63 s | 216.76 GiB/s | avg-stream-delay 2.02 ms
- alloc-host: 1.98 KiB | 270.61 us | 7.14 MiB/s | avg-stream-delay 97.29 us
- copy-device-to-pinned_host: 322.07 GiB | 2.94 s | 109.71 GiB/s | avg-stream-delay 996.55 us
- copy-pinned_host-to-device: 322.07 GiB | 1.71 s | 188.09 GiB/s | avg-stream-delay 26.02 us
```


## 1GB

target_size = 1 * 1024**3 # 1 GB
num_nodes_per_graph = 500_000
PATH_PREFIX = f"/scratch/prestouser/test-data/{num_nodes_per_graph}-1GB"

Reading data from: /scratch/prestouser/test-data/500000-1GB
Creating 1 versions in parallel...
Writing combined dataset with 10 partitions...
Estimated final size: 2.68 GB
Final size: 2.76 GB
Target was: 1.0 GB
Achieved: 276.0% of target


1. Processing table_a...
Distinct combinations for table_a: 14000
✓ table_a_forced created (0.1% forced)

2. Processing table_c...
Distinct combinations for table_c: 14000
✓ table_c_forced created (1.0% forced)

3. Processing table_d...
Distinct combinations for table_d: 14000
✓ table_d_forced created (1.0% forced)

4. Processing table_e...
Distinct values for table_e: 14000
✓ table_e_forced created (1.0% forced)



## 10GB
target_size = 10 * 1024**3 # 1 GB
num_nodes_per_graph = 500_000
PATH_PREFIX = f"/scratch/prestouser/test-data/{num_nodes_per_graph}-1GB"

Reading data from: /scratch/prestouser/test-data/500000-1GB
Creating 4 versions in parallel...
Writing combined dataset with 43 partitions...
Estimated final size: 10.7 GB

Final size: 11.04 GB
Target was: 10.0 GB
Achieved: 110.4% of target

Created DataFrame for table 'table_b' with 14000 random rows.
Created DataFrame for table 'table_c' with 21000000 random rows.
Created DataFrame for table 'table_d' with 12000000 random rows.
Created DataFrame for table 'table_e' with 33000000 random rows.
============================================================
FORCING TABLES TO MATCH TABLE_B VALUES
Match percentages: A=0.1%, C=1.0%, D=1.0%, E=1.0%
============================================================

1. Processing table_a...
Distinct combinations for table_a: 14000
✓ table_a_forced created (0.1% forced)

2. Processing table_c...
Distinct combinations for table_c: 14000
✓ table_c_forced created (1.0% forced)

3. Processing table_d...
Distinct combinations for table_d: 14000
✓ table_d_forced created (1.0% forced)

4. Processing table_e...
Distinct values for table_e: 14000
✓ table_e_forced created (1.0% forced)
Loading