diff --git a/README.md b/README.md index 9e5a699..a57d723 100644 --- a/README.md +++ b/README.md @@ -235,7 +235,7 @@ This places the generated code in `gen/python`. See the `Makefile` for more exte | Field | Type | Label | Description | | ----- | ---- | ----- | ----------- | -| horizon_mins | [uint32](#uint32) | | || p50_fraction | [float](#float) | | || other_statistics_fractions | [CreateForecastRequest.ForecastValue.OtherStatisticsFractionsEntry](#ocf-dp-CreateForecastRequest-ForecastValue-OtherStatisticsFractionsEntry) | repeated | Struct for storing additional statistics like p10, p90, mean etc. || metadata | [google.protobuf.Struct](#google-protobuf-Struct) | optional | | +| horizon_mins | [uint32](#uint32) | | || p50_fraction | [float](#float) | | || other_statistics_fractions | [CreateForecastRequest.ForecastValue.OtherStatisticsFractionsEntry](#ocf-dp-CreateForecastRequest-ForecastValue-OtherStatisticsFractionsEntry) | repeated | Struct for storing additional statistics like p10, p90, mean etc. |
CreateForecastRequest.ForecastValue.OtherStatisticsFractionsEntry diff --git a/internal/server/postgres/bench_test.go b/internal/server/postgres/bench_test.go index 1ff73f2..6e2bbaa 100644 --- a/internal/server/postgres/bench_test.go +++ b/internal/server/postgres/bench_test.go @@ -12,7 +12,6 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/rs/zerolog" "github.com/stretchr/testify/require" - "google.golang.org/protobuf/types/known/structpb" timestamppb "google.golang.org/protobuf/types/known/timestamppb" pb "github.com/openclimatefix/data-platform/internal/gen/ocf/dp" @@ -116,12 +115,6 @@ var yields = func() []*pb.CreateForecastRequest_ForecastValue { "p90": 0.3, }, HorizonMins: uint32(i * 30), - Metadata: func() *structpb.Struct { - s, _ := structpb.NewStruct(map[string]any{ - "example_key": fmt.Sprintf("example_value_%d", i), - }) - return s - }(), }) } diff --git a/internal/server/postgres/dataserverimpl.go b/internal/server/postgres/dataserverimpl.go index e8544fb..4664d50 100644 --- a/internal/server/postgres/dataserverimpl.go +++ b/internal/server/postgres/dataserverimpl.go @@ -10,7 +10,6 @@ import ( "context" "errors" "fmt" - "math" "time" "github.com/google/uuid" @@ -21,7 +20,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/timestamppb" pb "github.com/openclimatefix/data-platform/internal/gen/ocf/dp" @@ -64,6 +62,19 @@ func timeptrToPgTimestamp(t *timestamppb.Timestamp) pgtype.Timestamp { return pgtype.Timestamp{Time: t.AsTime().UTC(), Valid: true} } +// extractSIPStatPtrFromMap gets a key's value from a map as a pointer, and converts it +// to a smallint percentage. If it doesn't exist, it returns nil. +func extractSIPStatPtrFromMap(m map[string]float32, key string) *int16 { + val, exists := m[key] + if !exists { + return nil + } + + sip_val := int16(val * 30000.0) + + return &sip_val +} + // --- Server Implementation ---------------------------------------------------------------------- func NewDataPlatformDataServiceServerImpl() *DataPlatformDataServiceServerImpl { @@ -172,48 +183,12 @@ func (s *DataPlatformDataServiceServerImpl) CreateForecast( // Create the forecast data paramsList := make([]db.CreatePredictedValuesParams, len(req.Values)) for i, value := range req.Values { - // Since the database wants each numeric value to have 4 significant figures, - // the float32 values need to be rounded accordingly. - roundedStats := make(map[string]any) - for k, val32 := range value.OtherStatisticsFractions { - val64 := float64(val32) - - var roundedVal float64 - if val64 < 1 { - roundedVal = math.Round(val64*10000) / 10000 - } else { - roundedVal = math.Round(val64*1000) / 1000 - } - - roundedStats[k] = roundedVal - } - - // Since CreatePredictedValues uses COPYFROM, manually coerce empty metadata to nil - if value.Metadata != nil && len(value.Metadata.Fields) == 0 { - value.Metadata = nil - } - - var otherStats *structpb.Struct - - if len(roundedStats) > 0 { - otherStats, err = structpb.NewStruct(roundedStats) - if err != nil { - l.Err(err).Msgf("structpb.NewStruct(%+v)", roundedStats) - } - } - paramsList[i] = db.CreatePredictedValuesParams{ HorizonMins: int16(value.HorizonMins), + P10Sip: extractSIPStatPtrFromMap(value.OtherStatisticsFractions, "p10"), P50Sip: int16(value.P50Fraction * 30000.0), + P90Sip: extractSIPStatPtrFromMap(value.OtherStatisticsFractions, "p90"), ForecastUuid: dbForecast.ForecastUuid, - TargetTimeUtc: pgtype.Timestamp{ - Time: req.InitTimeUtc.AsTime().Add( - time.Duration(value.HorizonMins) * time.Minute, - ), - Valid: true, - }, - OtherStatsFractions: otherStats, - Metadata: value.Metadata, } } @@ -548,11 +523,13 @@ func (s *DataPlatformDataServiceServerImpl) StreamForecastData( &row.ForecasterVersion, &row.CreatedAtUtc, &row.HorizonMins, + &row.P10Sip, &row.P50Sip, - &row.OtherStatsFractions, + &row.P90Sip, &row.CapacityWatts, - &row.InitTimeUtc, &row.Metadata, + &row.InitTimeUtc, + &row.TargetTimeUtc, ) if err != nil { l.Err(err).Msg("rows.Scan failed") @@ -560,10 +537,12 @@ func (s *DataPlatformDataServiceServerImpl) StreamForecastData( } otherStatistics := make(map[string]float32) - if row.OtherStatsFractions != nil { - for k, v := range row.OtherStatsFractions.AsMap() { - otherStatistics[k] = float32(v.(float64)) - } + if row.P10Sip != nil { + otherStatistics["p10"] = float32(*row.P10Sip) / 30000.0 + } + + if row.P90Sip != nil { + otherStatistics["p90"] = float32(*row.P90Sip) / 30000.0 } metadata := make(map[string]string) @@ -1639,14 +1618,12 @@ func (s *DataPlatformDataServiceServerImpl) GetForecastAsTimeseries( out := make([]*pb.GetForecastAsTimeseriesResponse_Value, len(dbPreds)) for i, pred := range dbPreds { otherStats := make(map[string]float32) - for k, v := range pred.OtherStatsFractions.AsMap() { - floatVal, ok := v.(float64) - if !ok { - l.Warn().Str("key", k).Interface("value", v).Msg("skipping non-float statistic") - continue - } + if pred.P10Sip != nil { + otherStats["p10"] = float32(*pred.P10Sip) / 30000.0 + } - otherStats[k] = float32(floatVal) + if pred.P90Sip != nil { + otherStats["p90"] = float32(*pred.P90Sip) / 30000.0 } out[i] = &pb.GetForecastAsTimeseriesResponse_Value{ @@ -1741,14 +1718,12 @@ func (s *DataPlatformDataServiceServerImpl) GetForecastAsTimeseries( values := make([]*pb.GetForecastAsTimeseriesResponse_Value, len(dbValues)) for i, value := range dbValues { otherStats := make(map[string]float32) - for k, v := range value.OtherStatsFractions.AsMap() { - floatVal, ok := v.(float64) - if !ok { - l.Warn().Str("key", k).Interface("value", v).Msg("skipping non-float statistic") - continue - } + if value.P10Sip != nil { + otherStats["p10"] = float32(*value.P10Sip) / 30000.0 + } - otherStats[k] = float32(floatVal) + if value.P90Sip != nil { + otherStats["p90"] = float32(*value.P90Sip) / 30000.0 } values[i] = &pb.GetForecastAsTimeseriesResponse_Value{ diff --git a/internal/server/postgres/dataserverimpl_test.go b/internal/server/postgres/dataserverimpl_test.go index 98ac857..3b5108c 100644 --- a/internal/server/postgres/dataserverimpl_test.go +++ b/internal/server/postgres/dataserverimpl_test.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "io" - "maps" "math/rand/v2" "strings" "testing" @@ -589,7 +588,6 @@ func TestGetForecastAtTimestamp(t *testing.T) { "p90": float32(0.6 + float32(i)*0.05), "p10": float32(0.4 + float32(i)*0.05), }, - Metadata: metadata, } } @@ -1061,7 +1059,6 @@ func TestGetForecastAsTimeseries(t *testing.T) { "p10": float32(max(float32(i-1)*float32(100/len(yields))/100.0, 0)), "p90": float32(min(float32(i+1)*float32(100/len(yields))/100.0, 1.1)), }, - Metadata: metadata, } } @@ -1801,8 +1798,6 @@ func TestCreateObservations(t *testing.T) { func TestGetWeekAverageDeltas(t *testing.T) { pivotTime := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) - metadata, err := structpb.NewStruct(map[string]any{"source": "test"}) - require.NoError(t, err) // Create a site to attach the observations to siteResp, err := dc.CreateLocation(t.Context(), &pb.CreateLocationRequest{ @@ -1862,7 +1857,6 @@ func TestGetWeekAverageDeltas(t *testing.T) { "p10": float32(max(float32(i-1)*float32(100/len(yields))/100.0, 0)), "p90": float32(min(float32(i+1)*float32(100/len(yields))/100.0, 1.1)), }, - Metadata: metadata, } } @@ -1925,7 +1919,6 @@ func TestCreateForecast(t *testing.T) { "p10": 0.4 + float32(i)*0.05, "p90": 0.6 + float32(i)*0.05, }, - Metadata: metadata, } } @@ -1935,7 +1928,6 @@ func TestCreateForecast(t *testing.T) { HorizonMins: uint32(i * 30), P50Fraction: 0.0, OtherStatisticsFractions: map[string]float32{}, - Metadata: &structpb.Struct{}, } } @@ -1948,7 +1940,6 @@ func TestCreateForecast(t *testing.T) { "p10": 1.3, "p90": -0.2, }, - Metadata: metadata, } } @@ -2078,11 +2069,8 @@ func TestCreateForecast(t *testing.T) { }) require.NoError(t, err) - expectedMetadata := tc.req.Values[0].Metadata.AsMap() - maps.Copy(expectedMetadata, tc.req.Metadata.AsMap()) - for _, val := range fResp.Values { - require.Equal(t, expectedMetadata, val.Metadata.AsMap()) + require.Equal(t, tc.req.Metadata.AsMap(), val.Metadata.AsMap()) } } }) @@ -2130,7 +2118,6 @@ func TestGetLatestForecasts(t *testing.T) { "p10": 0.4 + float32(i)*0.05, "p90": 0.6 + float32(i)*0.05, }, - Metadata: metadata, } } @@ -2247,7 +2234,6 @@ func TestStreamForecastData(t *testing.T) { OtherStatisticsFractions: map[string]float32{ "p90": float32(i)*0.25 + 0.1, }, - Metadata: metadata, } } @@ -2261,6 +2247,7 @@ func TestStreamForecastData(t *testing.T) { EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, InitTimeUtc: initTime, Values: yields, + Metadata: metadata, }) require.NoError(t, err) @@ -2270,6 +2257,7 @@ func TestStreamForecastData(t *testing.T) { EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, InitTimeUtc: initTime, Values: yields, + Metadata: metadata, }) require.NoError(t, err) } diff --git a/internal/server/postgres/sql/migrations/00009_optimize_storage.sql b/internal/server/postgres/sql/migrations/00009_optimize_storage.sql new file mode 100644 index 0000000..ffab5ad --- /dev/null +++ b/internal/server/postgres/sql/migrations/00009_optimize_storage.sql @@ -0,0 +1,169 @@ +-- +goose NO TRANSACTION +-- +goose Up + +/* + * Reduces the database size by approximately 40%. + * + * Modifies the predicted_generation_values table to optimize it's use of storage. + * This is done through changing the index, removing redundant columns, and replacing + * dynamic columns with small static ones. + * + * The schema modifications and the corresponding data changes are seperated out for + * faster migration. Since the predicted_generation_values table is very large, simple + * DELETES and UPDATES would take a long time, and not actually gain us any storage + * savings (at least until an autovacuum process ran). By instead moving the data + * partition-wise and then replacing the partitions, we keep the process light on CPU. + */ + +DROP INDEX IF EXISTS loc.idx_sources_mv_gist_sys_period; + +CREATE INDEX IF NOT EXISTS idx_sources_mv_composite_lookup +ON loc.sources_mv USING gist (geometry_uuid, source_type_id, sys_period); + +ALTER TABLE pred.predicted_generation_values + ADD COLUMN p10_sip SMALLINT, + ADD COLUMN p90_sip SMALLINT, + DROP CONSTRAINT predicted_generation_values_pkey CASCADE, + ALTER COLUMN target_time_utc DROP NOT NULL; + +-- +goose StatementBegin +CREATE OR REPLACE PROCEDURE pred.swap_predicted_generation_partitions() +LANGUAGE plpgsql +AS $$ +DECLARE + partition_record RECORD; + new_part_name TEXT; + part_bound TEXT; +BEGIN + FOR partition_record IN + SELECT child.relname AS table_name, + pg_get_expr(child.relpartbound, child.oid) AS bounds + FROM pg_inherits + JOIN pg_class parent ON pg_inherits.inhparent = parent.oid + JOIN pg_class child ON pg_inherits.inhrelid = child.oid + JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace + WHERE parent.relname = 'predicted_generation_values' + AND nmsp_parent.nspname = 'pred' + LOOP + new_part_name := partition_record.table_name || '_v2'; + part_bound := partition_record.bounds; + + EXECUTE format('CREATE TABLE pred.%I (LIKE pred.predicted_generation_values INCLUDING ALL);', new_part_name); + + EXECUTE format(' + INSERT INTO pred.%I (horizon_mins, p50_sip, p10_sip, p90_sip, forecast_uuid, target_time_utc, metadata, other_stats_fractions) + SELECT + horizon_mins, + p50_sip, + ((other_stats_fractions->>''p10'')::REAL * 30000)::SMALLINT, + ((other_stats_fractions->>''p90'')::REAL * 30000)::SMALLINT, + forecast_uuid, + NULL, + NULL, + NULL + FROM pred.%I; + ', new_part_name, partition_record.table_name); + + EXECUTE format('ALTER TABLE pred.%I ADD PRIMARY KEY (forecast_uuid, horizon_mins);', new_part_name); + EXECUTE format('ALTER TABLE pred.predicted_generation_values DETACH PARTITION pred.%I;', partition_record.table_name); + EXECUTE format('ALTER TABLE pred.predicted_generation_values ATTACH PARTITION pred.%I %s;', new_part_name, part_bound); + EXECUTE format('DROP TABLE pred.%I;', partition_record.table_name); + EXECUTE format('ALTER TABLE pred.%I RENAME TO %I;', new_part_name, partition_record.table_name); + + COMMIT; + END LOOP; +END; +$$; +-- +goose StatementEnd + +CALL pred.swap_predicted_generation_partitions(); +DROP PROCEDURE pred.swap_predicted_generation_partitions; + +ALTER TABLE pred.predicted_generation_values + DROP COLUMN target_time_utc, + DROP COLUMN other_stats_fractions, + DROP COLUMN metadata, + ADD PRIMARY KEY (forecast_uuid, horizon_mins); + +DROP TABLE IF EXISTS pred.predicted_generation_values_template; +CREATE TABLE pred.predicted_generation_values_template ( + horizon_mins SMALLINT NOT NULL, + CONSTRAINT horizon_mins_nonnegative_check CHECK (horizon_mins >= 0), + CONSTRAINT horizon_mins_fiveminutely_check CHECK (horizon_mins % 5 = 0), + p50_sip SMALLINT NOT NULL, + CONSTRAINT p50_sip_nonnegative_check CHECK (p50_sip >= 0), + p10_sip SMALLINT, + p90_sip SMALLINT, + forecast_uuid UUID NOT NULL REFERENCES pred.forecasts (forecast_uuid) ON DELETE CASCADE ON UPDATE CASCADE, + PRIMARY KEY (forecast_uuid, horizon_mins) +); + +ANALYZE pred.predicted_generation_values; + + +-- +goose Down +ALTER TABLE pred.predicted_generation_values + DROP CONSTRAINT predicted_generation_values_pkey CASCADE, + ADD COLUMN target_time_utc TIMESTAMP, + ADD COLUMN other_stats_fractions JSONB DEFAULT NULL, + ADD CONSTRAINT other_stats_nullifempty CHECK (other_stats_fractions IS NULL OR other_stats_fractions != '{}'), + ADD CONSTRAINT other_stats_valid_fractions_check CHECK (pred.check_all_jsonb_values_are_valid_stat_fractions(other_stats_fractions)), + ADD COLUMN metadata JSONB DEFAULT NULL; + + +-- +goose StatementBegin +CREATE OR REPLACE PROCEDURE pred.rollback_predicted_generation_partitions() +LANGUAGE plpgsql +AS $$ +DECLARE + partition_record RECORD; +BEGIN + FOR partition_record IN + SELECT child.relname AS table_name + FROM pg_inherits + JOIN pg_class parent ON pg_inherits.inhparent = parent.oid + JOIN pg_class child ON pg_inherits.inhrelid = child.oid + JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace + WHERE parent.relname = 'predicted_generation_values' + AND nmsp_parent.nspname = 'pred' + LOOP + EXECUTE format(' + UPDATE pred.%I + SET target_time_utc = UUIDV7_EXTRACT_TIMESTAMP(forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => horizon_mins::INTEGER), + other_stats_fractions = CASE WHEN p10_sip IS NOT NULL OR p90_sip IS NOT NULL THEN jsonb_strip_nulls(jsonb_build_object(''p10'', p10_sip::REAL / 30000, ''p90'', p90_sip::REAL / 30000)) + ELSE NULL END; + ', partition_record.table_name); + + EXECUTE format('ALTER TABLE pred.%I ALTER COLUMN target_time_utc SET NOT NULL;', partition_record.table_name); + EXECUTE format('ALTER TABLE pred.%I ADD PRIMARY KEY (forecast_uuid, target_time_utc);', partition_record.table_name); + + COMMIT; + END LOOP; +END; +$$; +-- +goose StatementEnd + +CALL pred.rollback_predicted_generation_partitions(); +DROP PROCEDURE pred.rollback_predicted_generation_partitions; + +ALTER TABLE pred.predicted_generation_values + ALTER COLUMN target_time_utc SET NOT NULL, + ADD PRIMARY KEY (forecast_uuid, target_time_utc), + DROP COLUMN p10_sip, DROP COLUMN p90_sip; + +DROP TABLE IF EXISTS pred.predicted_generation_values_template; +CREATE TABLE pred.predicted_generation_values_template ( + horizon_mins SMALLINT NOT NULL, + CONSTRAINT horizon_mins_nonnegative_check CHECK (horizon_mins >= 0), + CONSTRAINT horizon_mins_fiveminutely_check CHECK (horizon_mins % 5 = 0), + p50_sip SMALLINT NOT NULL, + CONSTRAINT p50_sip_nonnegative_check CHECK (p50_sip >= 0), + target_time_utc TIMESTAMP NOT NULL, + forecast_uuid UUID NOT NULL REFERENCES pred.forecasts (forecast_uuid) ON DELETE CASCADE ON UPDATE CASCADE, + metadata JSONB DEFAULT NULL CONSTRAINT metadata_nullifempty CHECK (metadata IS NULL OR metadata != '{}'), + other_stats_fractions JSONB DEFAULT NULL CONSTRAINT other_stats_nullifempty CHECK (other_stats_fractions IS NULL OR other_stats_fractions != '{}'), + CONSTRAINT other_stats_valid_fractions_check CHECK (pred.check_all_jsonb_values_are_valid_stat_fractions(other_stats_fractions)), + PRIMARY KEY (forecast_uuid, target_time_utc) +); + +ANALYZE pred.predicted_generation_values; diff --git a/internal/server/postgres/sql/queries/predictions.sql b/internal/server/postgres/sql/queries/predictions.sql index 0974373..51b1aca 100644 --- a/internal/server/postgres/sql/queries/predictions.sql +++ b/internal/server/postgres/sql/queries/predictions.sql @@ -123,9 +123,9 @@ WHERE forecast_uuid IN (SELECT forecast_uuid FROM forecasts_to_delete); * with 0 representing 0% and 30000 representing 100% of capacity. */ INSERT INTO pred.predicted_generation_values ( - horizon_mins, p50_sip, forecast_uuid, target_time_utc, other_stats_fractions, metadata + horizon_mins, p50_sip, p10_sip, p90_sip, forecast_uuid ) VALUES ( - $1, $2, $3, $4, $5, $6 + $1, $2, $3, $4, $5 ); -- name: ListPredictionsForForecasts :many @@ -153,11 +153,15 @@ SELECT mf.forecaster_version, f.created_at_utc, pg.horizon_mins, + pg.p10_sip, pg.p50_sip, - pg.other_stats_fractions, + pg.p90_sip, sv.capacity_watts, + f.metadata, UUIDV7_EXTRACT_TIMESTAMP(f.forecast_uuid)::TIMESTAMP AS init_time_utc, - COALESCE(pg.metadata || f.metadata, pg.metadata, f.metadata) AS metadata + ( + UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER) + )::TIMESTAMP AS target_time_utc FROM pred.forecasts AS f INNER JOIN matched_forecasters AS mf USING (forecaster_id) INNER JOIN pred.predicted_generation_values AS pg @@ -169,7 +173,8 @@ FROM pred.forecasts AS f FROM loc.sources_mv AS s WHERE s.geometry_uuid = f.geometry_uuid AND s.source_type_id = f.source_type_id - AND s.sys_period @> pg.target_time_utc + AND s.sys_period + @> (UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER)) LIMIT 1 ) AS sv ON TRUE WHERE f.geometry_uuid = sqlc.arg(geometry_uuid)::UUID @@ -256,29 +261,39 @@ WITH allowed_forecasts_overlapping_window AS ( ) ), winning_predictions AS ( - SELECT DISTINCT ON (pg.target_time_utc) - pg.target_time_utc, + SELECT DISTINCT ON ( + UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER) + ) fow.forecast_uuid, fow.init_time_utc, fow.created_at_utc, fow.geometry_uuid, fow.source_type_id, pg.horizon_mins, + pg.p10_sip, pg.p50_sip, - pg.other_stats_fractions, - COALESCE(pg.metadata || fow.metadata, pg.metadata, fow.metadata) AS metadata + pg.p90_sip, + fow.metadata, + ( + UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER) + )::TIMESTAMP AS target_time_utc FROM allowed_forecasts_overlapping_window AS fow INNER JOIN pred.predicted_generation_values AS pg USING (forecast_uuid) - WHERE pg.target_time_utc BETWEEN sqlc.arg(start_timestamp_utc)::TIMESTAMP AND sqlc.arg(end_timestamp_utc)::TIMESTAMP - AND pg.horizon_mins >= sqlc.arg(horizon_mins)::INTEGER + WHERE ( + UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER) + ) BETWEEN sqlc.arg(start_timestamp_utc)::TIMESTAMP AND sqlc.arg(end_timestamp_utc)::TIMESTAMP + AND pg.horizon_mins >= sqlc.arg(horizon_mins)::INTEGER -- Sorting by decreasing init time ensures the DISTINCT captures the lowest allowed horizon - ORDER BY pg.target_time_utc ASC, fow.init_time_utc DESC + ORDER BY + (UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER)) ASC, + fow.init_time_utc DESC ) SELECT wp.horizon_mins, + wp.p10_sip, wp.p50_sip, + wp.p90_sip, wp.target_time_utc, - wp.other_stats_fractions, wp.metadata, wp.init_time_utc, wp.created_at_utc, @@ -343,22 +358,24 @@ SELECT laf.geometry_uuid, laf.source_type_id, pg.horizon_mins, + pg.p10_sip, pg.p50_sip, - pg.target_time_utc, + pg.p90_sip, laf.created_at_utc, laf.init_time_utc, - pg.other_stats_fractions, sv.capacity_watts, sv.latitude, sv.longitude, sv.geometry_name, - COALESCE(pg.metadata || laf.metadata, pg.metadata, laf.metadata) AS metadata + laf.metadata, + sqlc.arg(target_timestamp_utc)::TIMESTAMP AS target_time_utc FROM latest_allowed_forecast_per_location AS laf INNER JOIN pred.predicted_generation_values AS pg USING (forecast_uuid) INNER JOIN loc.sources_mv AS sv USING (geometry_uuid, source_type_id) WHERE - pg.target_time_utc = sqlc.arg(target_timestamp_utc)::TIMESTAMP - AND sv.sys_period @> pg.target_time_utc; + (UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER)) + = sqlc.arg(target_timestamp_utc)::TIMESTAMP + AND sv.sys_period @> sqlc.arg(target_timestamp_utc)::TIMESTAMP; -- name: GetWeekAverageDeltasForLocations :many /* GetWeekAverageDeltasForLocations retrieves the average deltas between predicted and observed generation values @@ -385,9 +402,11 @@ relevant_predicted_values AS MATERIALIZED ( SELECT rf.geometry_uuid, rf.source_type_id, - pg.target_time_utc, pg.horizon_mins, - pg.p50_sip + pg.p50_sip, + ( + UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER) + )::TIMESTAMP AS target_time_utc FROM relevant_forecasts AS rf INNER JOIN pred.predicted_generation_values AS pg USING (forecast_uuid) WHERE pg.forecast_uuid >= UUIDV7_BOUNDARY(sqlc.arg(pivot_timestamp)::TIMESTAMP - INTERVAL '8 days') diff --git a/internal/server/postgres/testdata/seeding.sql b/internal/server/postgres/testdata/seeding.sql index 895c260..a9d9de2 100644 --- a/internal/server/postgres/testdata/seeding.sql +++ b/internal/server/postgres/testdata/seeding.sql @@ -65,22 +65,23 @@ BEGIN TSRANGE(init_time_utc, init_time_utc + (forecast_len_mins * INTERVAL '1 minute')) FROM generated_data RETURNING forecast_uuid, init_time_utc - ), - static_json AS ( - SELECT '{"source": "benchmark"}'::jsonb AS meta, '{"p10": 0.1, "p90": 0.9}'::jsonb AS stats ) INSERT INTO pred.predicted_generation_values - (horizon_mins, p50_sip, forecast_uuid, target_time_utc, metadata, other_stats_fractions) - SELECT gs.h, (random() * 30000)::SMALLINT, inf.forecast_uuid, - inf.init_time_utc + (gs.h * INTERVAL '1 minute'), sj.meta, sj.stats + (horizon_mins, p50_sip, p10_sip, p90_sip, forecast_uuid) + SELECT + gs.h, + (random() * 30000)::SMALLINT, + 3000::SMALLINT, + 27000::SMALLINT, + inf.forecast_uuid FROM inserted_forecasts inf - CROSS JOIN static_json sj CROSS JOIN LATERAL generate_series(0, forecast_len_mins - pgv_res_mins, pgv_res_mins) AS gs(h) ORDER BY inf.init_time_utc ASC; -- Spoof the table size so Postgres uses indexes rather than seq scan in testing UPDATE pg_class SET reltuples = 346000000, relpages = 5000000 WHERE relname = 'predicted_generation_values'; - UPDATE pg_class SET reltuples = 346000000, relpages = 5000000 WHERE relname = 'predicted_generation_values_forecast_uuid_idx'; + UPDATE pg_class SET reltuples = 346000000, relpages = 5000000 WHERE relname = 'predicted_generation_values_pkey'; + REFRESH MATERIALIZED VIEW loc.sources_mv; RETURN QUERY SELECT target_locations * (history_window_mins / forecast_freq_mins) * (forecast_len_mins / pgv_res_mins), geo_list; diff --git a/proto/ocf/dp/dp-data.messages.proto b/proto/ocf/dp/dp-data.messages.proto index edcf2ba..b833e7d 100644 --- a/proto/ocf/dp/dp-data.messages.proto +++ b/proto/ocf/dp/dp-data.messages.proto @@ -318,6 +318,7 @@ message ListForecastersResponse { message CreateForecastRequest { message ForecastValue { + reserved 4; uint32 horizon_mins = 1; float p50_fraction = 2 [ (buf.validate.field).float.gte = 0, @@ -328,6 +329,9 @@ message CreateForecastRequest { map other_statistics_fractions = 3 [ (buf.validate.field).map.min_pairs = 0, (buf.validate.field).map.max_pairs = 20, + (buf.validate.field).map.keys = { + string: {in: ["p10", "p90"]} + }, (buf.validate.field).map.values = { float: { gte: 0.0, @@ -336,7 +340,6 @@ message CreateForecastRequest { } } ]; - optional google.protobuf.Struct metadata = 4; } Forecaster forecaster = 1 [