Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 68 additions & 25 deletions latencypredictor/prediction_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,25 @@ def is_ready(self) -> bool:
def _prepare_features_with_interaction(self, df: pd.DataFrame, model_type: str) -> pd.DataFrame:
"""
Prepare features with interaction terms to match training server.

Args:
df: DataFrame with raw features
model_type: 'ttft' or 'tpot'

Returns:
DataFrame with engineered features including interactions
"""
# Encode pod_type as categorical (common for both TTFT and TPOT)
# Convert to categorical with known categories for consistent encoding
if 'pod_type' in df.columns:
df['pod_type'] = df['pod_type'].fillna('') # Handle NaN
df['pod_type_cat'] = pd.Categorical(
df['pod_type'],
categories=['', 'prefill', 'decode'], # '' = monolithic, prefill, decode
ordered=False
)
else:
# If pod_type column doesn't exist, create it as empty (monolithic)
df['pod_type_cat'] = pd.Categorical([''] * len(df), categories=['', 'prefill', 'decode'], ordered=False)
Comment thread
RishabhSaini marked this conversation as resolved.

if model_type == "ttft":
# Create interaction: prefix score * input length
df['effective_input_tokens'] = (1-df['prefix_cache_score']) * df['input_token_length']
Expand All @@ -238,31 +249,33 @@ def _prepare_features_with_interaction(self, df: pd.DataFrame, model_type: str)

# make it categorical for tree models (safe for LGB, XGB with enable_categorical)
df['prefill_score_bucket'] = pd.Categorical(df['prefill_score_bucket'], categories=[0,1,2,3], ordered=True)
# Return TTFT features with interaction


# Return TTFT features with interaction and pod_type
feature_cols = [
'kv_cache_percentage',
'input_token_length',
'num_request_waiting',
'num_request_running',
'prefix_cache_score',
'effective_input_tokens',
'prefill_score_bucket'
'prefill_score_bucket',
'pod_type_cat'
]

return df[feature_cols]

else: # tpot
# TPOT doesn't use prefix_cache_score, so no interaction needed
feature_cols = [
'kv_cache_percentage',
'input_token_length',
'num_request_waiting',
'num_request_running',
'num_tokens_generated'
'num_tokens_generated',
'pod_type_cat'
]

return df[feature_cols]

def load_models(self) -> bool:
Expand Down Expand Up @@ -314,29 +327,42 @@ def predict(self, features: dict) -> Tuple[float, float]:
'num_request_running': features['num_request_running'],
'prefix_cache_score': features['prefix_cache_score']
}

tpot_raw_data = {
'kv_cache_percentage': features['kv_cache_percentage'],
'input_token_length': features['input_token_length'],
'num_request_waiting': features['num_request_waiting'],
'num_request_running': features['num_request_running'],
'num_tokens_generated': features['num_tokens_generated']
}

# Prepare features with interactions
df_ttft_raw = pd.DataFrame([ttft_raw_data])
# Add pod_type if present
if 'pod_type' in features:
df_ttft_raw['pod_type'] = features['pod_type']
df_ttft = self._prepare_features_with_interaction(df_ttft_raw, "ttft")


df_tpot_raw = pd.DataFrame([tpot_raw_data])
# Add pod_type if present
if 'pod_type' in features:
df_tpot_raw['pod_type'] = features['pod_type']
df_tpot = self._prepare_features_with_interaction(df_tpot_raw, "tpot")
#df_tpot = pd.DataFrame([tpot_raw_data])

if self.model_type == ModelType.BAYESIAN_RIDGE:

# Bayesian Ridge can't handle categorical features directly
# Drop categorical bucket, but one-hot encode pod_type
ttft_for_scale = df_ttft.drop(columns=['prefill_score_bucket'], errors='ignore')
if 'pod_type_cat' in ttft_for_scale.columns:
ttft_for_scale = pd.get_dummies(ttft_for_scale, columns=['pod_type_cat'], prefix='pod_type', drop_first=False)
ttft_scaled = self.ttft_scaler.transform(ttft_for_scale)
tpot_scaled = self.tpot_scaler.transform(df_tpot)

tpot_for_scale = df_tpot.copy()
if 'pod_type_cat' in tpot_for_scale.columns:
tpot_for_scale = pd.get_dummies(tpot_for_scale, columns=['pod_type_cat'], prefix='pod_type', drop_first=False)
tpot_scaled = self.tpot_scaler.transform(tpot_for_scale)

ttft_pred_mean, ttft_std = self.ttft_model.predict(ttft_scaled, return_std=True)
tpot_pred_mean, tpot_std = self.tpot_model.predict(tpot_scaled, return_std=True)
Expand Down Expand Up @@ -388,37 +414,53 @@ def predict_batch(self, features_list: List[dict]) -> Tuple[np.ndarray, np.ndarr
# Create raw feature data (without interaction)
ttft_raw_data = []
tpot_raw_data = []

for features in features_list:
ttft_raw_data.append({
ttft_entry = {
'kv_cache_percentage': features['kv_cache_percentage'],
'input_token_length': features['input_token_length'],
'num_request_waiting': features['num_request_waiting'],
'num_request_running': features['num_request_running'],
'prefix_cache_score': features['prefix_cache_score']
})

tpot_raw_data.append({
}
# Add pod_type if present
if 'pod_type' in features:
ttft_entry['pod_type'] = features['pod_type']
ttft_raw_data.append(ttft_entry)

tpot_entry = {
'kv_cache_percentage': features['kv_cache_percentage'],
'input_token_length': features['input_token_length'],
'num_request_waiting': features['num_request_waiting'],
'num_request_running': features['num_request_running'],
'num_tokens_generated': features['num_tokens_generated']
})

}
# Add pod_type if present
if 'pod_type' in features:
tpot_entry['pod_type'] = features['pod_type']
tpot_raw_data.append(tpot_entry)

# Prepare features with interactions
df_ttft_raw = pd.DataFrame(ttft_raw_data)
df_ttft_batch = self._prepare_features_with_interaction(df_ttft_raw, "ttft")
#df_ttft_batch = pd.DataFrame(ttft_raw_data)

df_tpot_raw = pd.DataFrame(tpot_raw_data)
df_tpot_batch = self._prepare_features_with_interaction(df_tpot_raw, "tpot")
#df_tpot_batch = pd.DataFrame(tpot_raw_data)

if self.model_type == ModelType.BAYESIAN_RIDGE:
# Bayesian Ridge can't handle categorical features directly
# Drop categorical bucket, but one-hot encode pod_type
ttft_for_scale = df_ttft_batch.drop(columns=['prefill_score_bucket'], errors='ignore')
if 'pod_type_cat' in ttft_for_scale.columns:
ttft_for_scale = pd.get_dummies(ttft_for_scale, columns=['pod_type_cat'], prefix='pod_type', drop_first=False)
ttft_scaled = self.ttft_scaler.transform(ttft_for_scale)
tpot_scaled = self.tpot_scaler.transform(df_tpot_batch)

tpot_for_scale = df_tpot_batch.copy()
if 'pod_type_cat' in tpot_for_scale.columns:
tpot_for_scale = pd.get_dummies(tpot_for_scale, columns=['pod_type_cat'], prefix='pod_type', drop_first=False)
tpot_scaled = self.tpot_scaler.transform(tpot_for_scale)

ttft_pred_mean, ttft_std = self.ttft_model.predict(ttft_scaled, return_std=True)
tpot_pred_mean, tpot_std = self.tpot_model.predict(tpot_scaled, return_std=True)
Expand Down Expand Up @@ -471,6 +513,7 @@ class PredictionRequest(BaseModel):
num_request_running: int = Field(..., ge=0)
num_tokens_generated: int = Field(..., ge=0)
prefix_cache_score: float = Field(..., ge=0.0, le=1.0, description="Prefix cache hit ratio score (0.0 to 1.0)")
pod_type: Optional[str] = Field(default="", description="Pod type: 'prefill', 'decode', or '' for monolithic")


class PredictionResponse(BaseModel):
Expand Down
196 changes: 196 additions & 0 deletions latencypredictor/test_dual_server_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,202 @@ def test_prediction_missing_prefix_cache_score():
print("✓ Prediction correctly failed when prefix_cache_score was missing")


def test_prediction_with_pod_type_prefill():
"""Test predictions with pod_type='prefill' parameter."""
print("Testing prediction with pod_type='prefill'...")

features = {
"kv_cache_percentage": 0.5,
"input_token_length": 200,
"num_request_waiting": 4,
"num_request_running": 1,
"num_tokens_generated": 0, # Prefill doesn't generate tokens
"prefix_cache_score": 0.7,
"pod_type": "prefill",
}

r = requests.post(f"{PREDICTION_URL}/predict", json=features)
assert r.status_code == 200

data = r.json()
assert "ttft_ms" in data
assert "tpot_ms" in data
assert data["ttft_ms"] > 0
assert data["tpot_ms"] >= 0 # Non-negative

print(f"✓ Prefill prediction: TTFT={data['ttft_ms']:.2f}ms, TPOT={data['tpot_ms']:.2f}ms")


def test_prediction_with_pod_type_decode():
"""Test predictions with pod_type='decode' parameter."""
print("Testing prediction with pod_type='decode'...")

features = {
"kv_cache_percentage": 0.5,
"input_token_length": 200,
"num_request_waiting": 4,
"num_request_running": 1,
"num_tokens_generated": 10,
"prefix_cache_score": 0.7,
"pod_type": "decode",
}

r = requests.post(f"{PREDICTION_URL}/predict", json=features)
assert r.status_code == 200

data = r.json()
assert "ttft_ms" in data
assert "tpot_ms" in data
assert data["ttft_ms"] > 0
assert data["tpot_ms"] >= 0 # Non-negative

print(f"✓ Decode prediction: TTFT={data['ttft_ms']:.2f}ms, TPOT={data['tpot_ms']:.2f}ms")


def test_bulk_prediction_with_pod_type():
"""Test bulk predictions with mixed pod types."""
print("Testing bulk prediction with pod_type...")

requests_data = [
# Prefill pod request
{
"kv_cache_percentage": 0.5,
"input_token_length": 200,
"num_request_waiting": 4,
"num_request_running": 1,
"num_tokens_generated": 0,
"prefix_cache_score": 0.7,
"pod_type": "prefill",
},
# Decode pod request
{
"kv_cache_percentage": 0.3,
"input_token_length": 150,
"num_request_waiting": 2,
"num_request_running": 1,
"num_tokens_generated": 10,
"prefix_cache_score": 0.5,
"pod_type": "decode",
},
# Legacy request (no pod_type)
{
"kv_cache_percentage": 0.6,
"input_token_length": 300,
"num_request_waiting": 3,
"num_request_running": 2,
"num_tokens_generated": 5,
"prefix_cache_score": 0.8,
}
]

bulk_request = {"requests": requests_data}

r = requests.post(f"{PREDICTION_URL}/predict/bulk/strict", json=bulk_request)
assert r.status_code == 200

data = r.json()
assert data["total_requests"] == 3
assert data["successful_predictions"] == 3
assert data["failed_predictions"] == 0

predictions = data["predictions"]

# Check prefill prediction (index 0)
prefill_pred = predictions[0]
assert prefill_pred["ttft_ms"] > 0
assert prefill_pred["tpot_ms"] >= 0 # Relaxed constraint for prefill
print(f" Prefill: TTFT={prefill_pred['ttft_ms']:.2f}ms, TPOT={prefill_pred['tpot_ms']:.2f}ms")

# Check decode prediction (index 1)
decode_pred = predictions[1]
assert decode_pred["ttft_ms"] > 0
assert decode_pred["tpot_ms"] > 0 # Should be positive for decode
print(f" Decode: TTFT={decode_pred['ttft_ms']:.2f}ms, TPOT={decode_pred['tpot_ms']:.2f}ms")

# Check legacy prediction (index 2)
legacy_pred = predictions[2]
assert legacy_pred["ttft_ms"] > 0
assert legacy_pred["tpot_ms"] > 0
print(f" Legacy: TTFT={legacy_pred['ttft_ms']:.2f}ms, TPOT={legacy_pred['tpot_ms']:.2f}ms")

print("✓ Bulk prediction with mixed pod types passed")


def test_training_data_with_pod_type():
"""Test that training server accepts pod_type in training data."""
print("Testing training data with pod_type...")

# Generate training samples with pod_type
prefill_entries = []
decode_entries = []

# Prefill training samples (TPOT should be 0)
for i in range(10):
prefill_entries.append({
"kv_cache_percentage": 0.5,
"input_token_length": 200 + i * 10,
"num_request_waiting": i % 5,
"num_request_running": 1,
"actual_ttft_ms": 100.0 + i * 5,
"actual_tpot_ms": 0.0, # Prefill doesn't produce tokens
"num_tokens_generated": 0,
"prefix_cache_score": 0.7,
"pod_type": "prefill",
})

# Decode training samples (both TTFT and TPOT)
for i in range(10):
decode_entries.append({
"kv_cache_percentage": 0.5,
"input_token_length": 200 + i * 10,
"num_request_waiting": i % 5,
"num_request_running": 1,
"actual_ttft_ms": 100.0 + i * 5,
"actual_tpot_ms": 10.0 + i * 2,
"num_tokens_generated": 5 + i,
"prefix_cache_score": 0.7,
"pod_type": "decode",
})

all_entries = prefill_entries + decode_entries
payload = {"entries": all_entries}

r = requests.post(f"{TRAINING_URL}/add_training_data_bulk", json=payload)
assert r.status_code == 202
assert r.json().get("message") == f"Accepted {len(all_entries)} training samples."

print(f"✓ Successfully sent {len(all_entries)} training samples with pod_type")


def test_invalid_pod_type():
"""Test that invalid pod_type values are handled correctly."""
print("Testing invalid pod_type handling...")

features = {
"kv_cache_percentage": 0.5,
"input_token_length": 200,
"num_request_waiting": 4,
"num_request_running": 1,
"num_tokens_generated": 10,
"prefix_cache_score": 0.7,
"pod_type": "invalid_type", # Invalid pod type
}

r = requests.post(f"{PREDICTION_URL}/predict", json=features)

# Should either accept it (treating as legacy) or reject with validation error
if r.status_code == 422:
print("✓ Invalid pod_type rejected with validation error (strict validation)")
elif r.status_code == 200:
data = r.json()
# If accepted, should still return valid predictions
assert data["ttft_ms"] > 0
assert data["tpot_ms"] >= 0
print("✓ Invalid pod_type accepted with fallback behavior (permissive validation)")
else:
assert False, f"Unexpected status code {r.status_code} for invalid pod_type"


def test_training_server_metrics():
"""Test training server metrics endpoint."""
r = requests.get(f"{TRAINING_URL}/metrics")
Expand Down
Loading