Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,20 @@ public enum ApiFeature {
*
* <p>Disabled by default.
*/
BILLING_EVENTS_LOGGING("billing-events-logging", false);
BILLING_EVENTS_LOGGING("billing-events-logging", false),

/**
* Billing events response feature flag: if enabled, the API will include the per-request billing
* events as a JSON array on the {@code Billing-Events} HTTP response header. Independent from
* {@link #BILLING_EVENTS_LOGGING} — both can be enabled simultaneously.
*
* <p>Set via {@code stargate.feature.flags.billing-events-response=true} at startup
* (authoritative; request headers cannot disable a startup-enabled flag) or per-request via
* {@code Feature-Flag-billing-events-response} header when not configured at startup.
*
* <p>Disabled by default.
*/
BILLING_EVENTS_RESPONSE("billing-events-response", false);

/**
* Prefix for HTTP headers used to override feature flags for specific requests: prepended before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.stargate.sgv2.jsonapi.config.feature.ApiFeatures;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
Expand All @@ -33,9 +34,18 @@
* the {@code external_*} variant is used. Events whose type is not in {@link
* BillingConfig#enabledEventTypes()} are dropped.
*
* <p>Eligibility requires all of: {@link ApiFeature#BILLING_EVENTS_LOGGING} is enabled, the {@code
* billing.events} logger is enabled, and the {@link ModelUsage} is non-null. The region for each
* event is read from {@link ModelUsage#tenant()} ({@code Tenant.region()}).
* <p>Each emitted event can be sent to two independent sinks:
*
* <ul>
* <li>The {@code billing.events} logger (one JSON line per event) — gated by {@link
* ApiFeature#BILLING_EVENTS_LOGGING}.
* <li>An in-memory buffer (read later by a response filter and returned as the {@code
* Billing-Events} HTTP response header) — gated by {@link
* ApiFeature#BILLING_EVENTS_RESPONSE}.
* </ul>
*
* If neither flag is enabled (or {@link ModelUsage} is null), {@link #emitEvent(ModelUsage)} is a
* no-op. The two flags are independent — both can be on at once.
*/
public class Billing {

Expand All @@ -52,6 +62,12 @@ public class Billing {
private final Set<BillingEventType> enabledEventTypes;
private final ApiFeatures apiFeatures;

// Buffered events collected for the BILLING_EVENTS_RESPONSE flag. Populated only when that
// feature is enabled. emitEvent can be invoked from concurrent tasks within one request
// (async embedding / reranking calls), so the list is synchronized.
private final List<BillingEvent> collectedEvents =
Collections.synchronizedList(new ArrayList<>());

public Billing(BillingConfig config, ApiFeatures apiFeatures) {
Objects.requireNonNull(config, "config must not be null");
this.apiFeatures = Objects.requireNonNull(apiFeatures, "apiFeatures must not be null");
Expand All @@ -66,28 +82,60 @@ public Billing(BillingConfig config, ApiFeatures apiFeatures) {
}

/**
* Emits billing events for the given aggregated model usage, if the request and configuration
* allow it. No-op otherwise (feature disabled, logger disabled, null usage).
* Builds billing events for the given usage and dispatches them to whichever sinks are enabled:
* the {@code billing.events} logger (when {@link ApiFeature#BILLING_EVENTS_LOGGING} is on) and/or
* an in-memory buffer surfaced via {@link #collectedEvents()} (when {@link
* ApiFeature#BILLING_EVENTS_RESPONSE} is on). No-op if neither flag is on or {@code modelUsage}
* is null.
*/
public void emitEvent(ModelUsage modelUsage) {
if (!shouldEmit(modelUsage)) {
if (modelUsage == null) {
return;
}
for (BillingEvent event : buildEvents(modelUsage)) {
try {
BILLING_LOGGER.info(OBJECT_WRITER.writeValueAsString(event));
} catch (JacksonException e) {
LOGGER.error("Failed to serialize billing event of type {}", event.eventType(), e);
boolean shouldLog =
apiFeatures.isFeatureEnabled(ApiFeature.BILLING_EVENTS_LOGGING)
&& BILLING_LOGGER.isInfoEnabled();
boolean shouldBuffer = apiFeatures.isFeatureEnabled(ApiFeature.BILLING_EVENTS_RESPONSE);
if (!shouldLog && !shouldBuffer) {
return;
}
List<BillingEvent> events = buildEvents(modelUsage);
if (shouldLog) {
for (BillingEvent event : events) {
try {
BILLING_LOGGER.info(OBJECT_WRITER.writeValueAsString(event));
} catch (JacksonException e) {
LOGGER.error("Failed to serialize billing event of type {}", event.eventType(), e);
}
}
}
if (shouldBuffer) {
collectedEvents.addAll(events);
}
}

/** Whether a billing event should be emitted for the given request. */
/**
* Snapshot of billing events accumulated by {@link #emitEvent(ModelUsage)} for this request when
* {@link ApiFeature#BILLING_EVENTS_RESPONSE} is enabled. Returns an unmodifiable copy so callers
* can iterate safely while other tasks may still be writing.
*/
public List<BillingEvent> collectedEvents() {
synchronized (collectedEvents) {
return List.copyOf(collectedEvents);
}
}

/** Whether a billing event would be emitted for the given request. */
@VisibleForTesting
boolean shouldEmit(ModelUsage modelUsage) {
return apiFeatures.isFeatureEnabled(ApiFeature.BILLING_EVENTS_LOGGING)
&& BILLING_LOGGER.isInfoEnabled()
&& modelUsage != null;
if (modelUsage == null) {
return false;
}
boolean shouldLog =
apiFeatures.isFeatureEnabled(ApiFeature.BILLING_EVENTS_LOGGING)
&& BILLING_LOGGER.isInfoEnabled();
boolean shouldBuffer = apiFeatures.isFeatureEnabled(ApiFeature.BILLING_EVENTS_RESPONSE);
return shouldLog || shouldBuffer;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.stargate.sgv2.jsonapi.service.provider;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import io.stargate.sgv2.jsonapi.api.request.RequestContext;
import io.stargate.sgv2.jsonapi.config.feature.ApiFeature;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.container.ContainerResponseContext;
import java.util.List;
import org.jboss.resteasy.reactive.server.ServerResponseFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Adds the {@code Billing-Events} HTTP response header (a JSON array of {@link BillingEvent}s
* collected during the request) when {@link ApiFeature#BILLING_EVENTS_RESPONSE} is enabled.
*
* <p>If the feature is off, or no billing events were emitted, the header is not added. Failures to
* serialize are logged and silently dropped so a serialization bug never breaks the actual API
* response.
*/
@ApplicationScoped
public class BillingResponseFilter {

/** HTTP response header that carries the JSON array of billing events. */
public static final String BILLING_EVENTS_HEADER = "Billing-Events";

private static final Logger LOGGER = LoggerFactory.getLogger(BillingResponseFilter.class);

// ObjectWriter is thread-safe and expensive to build; share one across all requests.
private static final ObjectWriter OBJECT_WRITER = new ObjectMapper().writer();

private final RequestContext requestContext;

@Inject
public BillingResponseFilter(RequestContext requestContext) {
this.requestContext = requestContext;
}

@ServerResponseFilter
public void addBillingHeader(ContainerResponseContext responseContext) {
if (!requestContext.apiFeatures().isFeatureEnabled(ApiFeature.BILLING_EVENTS_RESPONSE)) {
return;
}
List<BillingEvent> events = requestContext.billing().collectedEvents();
if (events.isEmpty()) {
return;
}
try {
responseContext
.getHeaders()
.add(BILLING_EVENTS_HEADER, OBJECT_WRITER.writeValueAsString(events));
} catch (JsonProcessingException e) {
LOGGER.error("Failed to serialize {} billing events to response header", events.size(), e);
}
}
}
4 changes: 4 additions & 0 deletions src/main/resources/embedding-providers-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@ stargate:
vector-dimension: 1024
properties:
max-tokens: 512
- name: nvidia/nv-embedqa-e5-v5
vector-dimension: 1024
properties:
max-tokens: 512
jinaAI:
#see https://jina.ai/embeddings/#apiform
display-name: Jina AI
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package io.stargate.sgv2.jsonapi.service.provider;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.stargate.sgv2.jsonapi.api.request.RequestContext;
import io.stargate.sgv2.jsonapi.config.BillingConfig;
import io.stargate.sgv2.jsonapi.config.DatabaseType;
import io.stargate.sgv2.jsonapi.config.feature.ApiFeature;
import io.stargate.sgv2.jsonapi.config.feature.ApiFeatures;
import io.stargate.sgv2.jsonapi.config.feature.FeaturesConfig;
import jakarta.ws.rs.container.ContainerResponseContext;
import jakarta.ws.rs.core.MultivaluedHashMap;
import jakarta.ws.rs.core.MultivaluedMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.Test;

class BillingResponseFilterTest {

private static final ObjectMapper MAPPER = new ObjectMapper();

private record BillingAndFeatures(Billing billing, ApiFeatures apiFeatures) {}

private static BillingAndFeatures newBillingWith(boolean logging, boolean response) {
BillingConfig config = mock(BillingConfig.class);
when(config.product()).thenReturn("serverless");
when(config.resourceType()).thenReturn("serverless_database");
when(config.internalModelProviders()).thenReturn(List.of("nvidia"));
when(config.enabledEventTypes()).thenReturn(Optional.empty());

FeaturesConfig featuresConfig = mock(FeaturesConfig.class);
Map<ApiFeature, String> flags = new HashMap<>();
flags.put(ApiFeature.BILLING_EVENTS_LOGGING, String.valueOf(logging));
flags.put(ApiFeature.BILLING_EVENTS_RESPONSE, String.valueOf(response));
when(featuresConfig.flags()).thenReturn(flags);

ApiFeatures apiFeatures = ApiFeatures.fromConfigAndRequest(featuresConfig, null);
return new BillingAndFeatures(new Billing(config, apiFeatures), apiFeatures);
}

private static ModelUsage usage() {
return new ModelUsage(
ModelProvider.NVIDIA,
ModelType.EMBEDDING,
"test-model",
io.stargate.sgv2.jsonapi.api.request.tenant.Tenant.create(
DatabaseType.ASTRA, "tenant-1", "us-west-2"),
ModelInputType.INDEX,
10,
20,
100,
200,
1000L);
}

private static BillingResponseFilter filterFor(Billing billing, ApiFeatures apiFeatures) {
RequestContext rc = mock(RequestContext.class);
when(rc.billing()).thenReturn(billing);
when(rc.apiFeatures()).thenReturn(apiFeatures);
return new BillingResponseFilter(rc);
}

private static ContainerResponseContext responseContextWithHeaders(
MultivaluedMap<String, Object> headers) {
ContainerResponseContext response = mock(ContainerResponseContext.class);
when(response.getHeaders()).thenReturn(headers);
return response;
}

@Test
void addsHeaderWhenFeatureOnAndEventsPresent() throws Exception {
BillingAndFeatures bf = newBillingWith(false, true);
bf.billing().emitEvent(usage());
BillingResponseFilter filter = filterFor(bf.billing(), bf.apiFeatures());

MultivaluedMap<String, Object> headers = new MultivaluedHashMap<>();
filter.addBillingHeader(responseContextWithHeaders(headers));

Object headerValue = headers.getFirst(BillingResponseFilter.BILLING_EVENTS_HEADER);
assertThat(headerValue).isNotNull();
JsonNode parsed = MAPPER.readTree(headerValue.toString());
assertThat(parsed.isArray()).isTrue();
assertThat(parsed.size()).isEqualTo(3);
assertThat(parsed.get(0).get("event_type").asText()).isEqualTo("internal_model_total_tokens");
}

@Test
void skipsHeaderWhenFeatureOff() {
// RESPONSE off — header must not be added even if LOGGING was on for this request.
BillingAndFeatures bf = newBillingWith(true, false);
bf.billing().emitEvent(usage());
BillingResponseFilter filter = filterFor(bf.billing(), bf.apiFeatures());

MultivaluedMap<String, Object> headers = new MultivaluedHashMap<>();
ContainerResponseContext response = responseContextWithHeaders(headers);
filter.addBillingHeader(response);

assertThat(headers.containsKey(BillingResponseFilter.BILLING_EVENTS_HEADER)).isFalse();
// We should never touch the headers either (early return saves the work).
verify(response, never()).getHeaders();
}

@Test
void skipsHeaderWhenNoEventsCollected() {
// RESPONSE on, but no emitEvent calls — header skipped because buffer is empty.
BillingAndFeatures bf = newBillingWith(false, true);
BillingResponseFilter filter = filterFor(bf.billing(), bf.apiFeatures());

MultivaluedMap<String, Object> headers = new MultivaluedHashMap<>();
filter.addBillingHeader(responseContextWithHeaders(headers));

assertThat(headers.containsKey(BillingResponseFilter.BILLING_EVENTS_HEADER)).isFalse();
}
}
Loading