diff --git a/README.md b/README.md
index 9e5a699..a57d723 100644
--- a/README.md
+++ b/README.md
@@ -235,7 +235,7 @@ This places the generated code in `gen/python`. See the `Makefile` for more exte
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
-| horizon_mins | [uint32](#uint32) | | || p50_fraction | [float](#float) | | || other_statistics_fractions | [CreateForecastRequest.ForecastValue.OtherStatisticsFractionsEntry](#ocf-dp-CreateForecastRequest-ForecastValue-OtherStatisticsFractionsEntry) | repeated | Struct for storing additional statistics like p10, p90, mean etc. || metadata | [google.protobuf.Struct](#google-protobuf-Struct) | optional | |
+| horizon_mins | [uint32](#uint32) | | || p50_fraction | [float](#float) | | || other_statistics_fractions | [CreateForecastRequest.ForecastValue.OtherStatisticsFractionsEntry](#ocf-dp-CreateForecastRequest-ForecastValue-OtherStatisticsFractionsEntry) | repeated | Struct for storing additional statistics like p10, p90, mean etc. |
CreateForecastRequest.ForecastValue.OtherStatisticsFractionsEntry
diff --git a/internal/server/postgres/bench_test.go b/internal/server/postgres/bench_test.go
index 1ff73f2..6e2bbaa 100644
--- a/internal/server/postgres/bench_test.go
+++ b/internal/server/postgres/bench_test.go
@@ -12,7 +12,6 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
- "google.golang.org/protobuf/types/known/structpb"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
pb "github.com/openclimatefix/data-platform/internal/gen/ocf/dp"
@@ -116,12 +115,6 @@ var yields = func() []*pb.CreateForecastRequest_ForecastValue {
"p90": 0.3,
},
HorizonMins: uint32(i * 30),
- Metadata: func() *structpb.Struct {
- s, _ := structpb.NewStruct(map[string]any{
- "example_key": fmt.Sprintf("example_value_%d", i),
- })
- return s
- }(),
})
}
diff --git a/internal/server/postgres/dataserverimpl.go b/internal/server/postgres/dataserverimpl.go
index e8544fb..4664d50 100644
--- a/internal/server/postgres/dataserverimpl.go
+++ b/internal/server/postgres/dataserverimpl.go
@@ -10,7 +10,6 @@ import (
"context"
"errors"
"fmt"
- "math"
"time"
"github.com/google/uuid"
@@ -21,7 +20,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
pb "github.com/openclimatefix/data-platform/internal/gen/ocf/dp"
@@ -64,6 +62,19 @@ func timeptrToPgTimestamp(t *timestamppb.Timestamp) pgtype.Timestamp {
return pgtype.Timestamp{Time: t.AsTime().UTC(), Valid: true}
}
+// extractSIPStatPtrFromMap gets a key's value from a map as a pointer, and converts it
+// to a smallint percentage. If it doesn't exist, it returns nil.
+func extractSIPStatPtrFromMap(m map[string]float32, key string) *int16 {
+ val, exists := m[key]
+ if !exists {
+ return nil
+ }
+
+ sip_val := int16(val * 30000.0)
+
+ return &sip_val
+}
+
// --- Server Implementation ----------------------------------------------------------------------
func NewDataPlatformDataServiceServerImpl() *DataPlatformDataServiceServerImpl {
@@ -172,48 +183,12 @@ func (s *DataPlatformDataServiceServerImpl) CreateForecast(
// Create the forecast data
paramsList := make([]db.CreatePredictedValuesParams, len(req.Values))
for i, value := range req.Values {
- // Since the database wants each numeric value to have 4 significant figures,
- // the float32 values need to be rounded accordingly.
- roundedStats := make(map[string]any)
- for k, val32 := range value.OtherStatisticsFractions {
- val64 := float64(val32)
-
- var roundedVal float64
- if val64 < 1 {
- roundedVal = math.Round(val64*10000) / 10000
- } else {
- roundedVal = math.Round(val64*1000) / 1000
- }
-
- roundedStats[k] = roundedVal
- }
-
- // Since CreatePredictedValues uses COPYFROM, manually coerce empty metadata to nil
- if value.Metadata != nil && len(value.Metadata.Fields) == 0 {
- value.Metadata = nil
- }
-
- var otherStats *structpb.Struct
-
- if len(roundedStats) > 0 {
- otherStats, err = structpb.NewStruct(roundedStats)
- if err != nil {
- l.Err(err).Msgf("structpb.NewStruct(%+v)", roundedStats)
- }
- }
-
paramsList[i] = db.CreatePredictedValuesParams{
HorizonMins: int16(value.HorizonMins),
+ P10Sip: extractSIPStatPtrFromMap(value.OtherStatisticsFractions, "p10"),
P50Sip: int16(value.P50Fraction * 30000.0),
+ P90Sip: extractSIPStatPtrFromMap(value.OtherStatisticsFractions, "p90"),
ForecastUuid: dbForecast.ForecastUuid,
- TargetTimeUtc: pgtype.Timestamp{
- Time: req.InitTimeUtc.AsTime().Add(
- time.Duration(value.HorizonMins) * time.Minute,
- ),
- Valid: true,
- },
- OtherStatsFractions: otherStats,
- Metadata: value.Metadata,
}
}
@@ -548,11 +523,13 @@ func (s *DataPlatformDataServiceServerImpl) StreamForecastData(
&row.ForecasterVersion,
&row.CreatedAtUtc,
&row.HorizonMins,
+ &row.P10Sip,
&row.P50Sip,
- &row.OtherStatsFractions,
+ &row.P90Sip,
&row.CapacityWatts,
- &row.InitTimeUtc,
&row.Metadata,
+ &row.InitTimeUtc,
+ &row.TargetTimeUtc,
)
if err != nil {
l.Err(err).Msg("rows.Scan failed")
@@ -560,10 +537,12 @@ func (s *DataPlatformDataServiceServerImpl) StreamForecastData(
}
otherStatistics := make(map[string]float32)
- if row.OtherStatsFractions != nil {
- for k, v := range row.OtherStatsFractions.AsMap() {
- otherStatistics[k] = float32(v.(float64))
- }
+ if row.P10Sip != nil {
+ otherStatistics["p10"] = float32(*row.P10Sip) / 30000.0
+ }
+
+ if row.P90Sip != nil {
+ otherStatistics["p90"] = float32(*row.P90Sip) / 30000.0
}
metadata := make(map[string]string)
@@ -1639,14 +1618,12 @@ func (s *DataPlatformDataServiceServerImpl) GetForecastAsTimeseries(
out := make([]*pb.GetForecastAsTimeseriesResponse_Value, len(dbPreds))
for i, pred := range dbPreds {
otherStats := make(map[string]float32)
- for k, v := range pred.OtherStatsFractions.AsMap() {
- floatVal, ok := v.(float64)
- if !ok {
- l.Warn().Str("key", k).Interface("value", v).Msg("skipping non-float statistic")
- continue
- }
+ if pred.P10Sip != nil {
+ otherStats["p10"] = float32(*pred.P10Sip) / 30000.0
+ }
- otherStats[k] = float32(floatVal)
+ if pred.P90Sip != nil {
+ otherStats["p90"] = float32(*pred.P90Sip) / 30000.0
}
out[i] = &pb.GetForecastAsTimeseriesResponse_Value{
@@ -1741,14 +1718,12 @@ func (s *DataPlatformDataServiceServerImpl) GetForecastAsTimeseries(
values := make([]*pb.GetForecastAsTimeseriesResponse_Value, len(dbValues))
for i, value := range dbValues {
otherStats := make(map[string]float32)
- for k, v := range value.OtherStatsFractions.AsMap() {
- floatVal, ok := v.(float64)
- if !ok {
- l.Warn().Str("key", k).Interface("value", v).Msg("skipping non-float statistic")
- continue
- }
+ if value.P10Sip != nil {
+ otherStats["p10"] = float32(*value.P10Sip) / 30000.0
+ }
- otherStats[k] = float32(floatVal)
+ if value.P90Sip != nil {
+ otherStats["p90"] = float32(*value.P90Sip) / 30000.0
}
values[i] = &pb.GetForecastAsTimeseriesResponse_Value{
diff --git a/internal/server/postgres/dataserverimpl_test.go b/internal/server/postgres/dataserverimpl_test.go
index 98ac857..3b5108c 100644
--- a/internal/server/postgres/dataserverimpl_test.go
+++ b/internal/server/postgres/dataserverimpl_test.go
@@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"io"
- "maps"
"math/rand/v2"
"strings"
"testing"
@@ -589,7 +588,6 @@ func TestGetForecastAtTimestamp(t *testing.T) {
"p90": float32(0.6 + float32(i)*0.05),
"p10": float32(0.4 + float32(i)*0.05),
},
- Metadata: metadata,
}
}
@@ -1061,7 +1059,6 @@ func TestGetForecastAsTimeseries(t *testing.T) {
"p10": float32(max(float32(i-1)*float32(100/len(yields))/100.0, 0)),
"p90": float32(min(float32(i+1)*float32(100/len(yields))/100.0, 1.1)),
},
- Metadata: metadata,
}
}
@@ -1801,8 +1798,6 @@ func TestCreateObservations(t *testing.T) {
func TestGetWeekAverageDeltas(t *testing.T) {
pivotTime := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)
- metadata, err := structpb.NewStruct(map[string]any{"source": "test"})
- require.NoError(t, err)
// Create a site to attach the observations to
siteResp, err := dc.CreateLocation(t.Context(), &pb.CreateLocationRequest{
@@ -1862,7 +1857,6 @@ func TestGetWeekAverageDeltas(t *testing.T) {
"p10": float32(max(float32(i-1)*float32(100/len(yields))/100.0, 0)),
"p90": float32(min(float32(i+1)*float32(100/len(yields))/100.0, 1.1)),
},
- Metadata: metadata,
}
}
@@ -1925,7 +1919,6 @@ func TestCreateForecast(t *testing.T) {
"p10": 0.4 + float32(i)*0.05,
"p90": 0.6 + float32(i)*0.05,
},
- Metadata: metadata,
}
}
@@ -1935,7 +1928,6 @@ func TestCreateForecast(t *testing.T) {
HorizonMins: uint32(i * 30),
P50Fraction: 0.0,
OtherStatisticsFractions: map[string]float32{},
- Metadata: &structpb.Struct{},
}
}
@@ -1948,7 +1940,6 @@ func TestCreateForecast(t *testing.T) {
"p10": 1.3,
"p90": -0.2,
},
- Metadata: metadata,
}
}
@@ -2078,11 +2069,8 @@ func TestCreateForecast(t *testing.T) {
})
require.NoError(t, err)
- expectedMetadata := tc.req.Values[0].Metadata.AsMap()
- maps.Copy(expectedMetadata, tc.req.Metadata.AsMap())
-
for _, val := range fResp.Values {
- require.Equal(t, expectedMetadata, val.Metadata.AsMap())
+ require.Equal(t, tc.req.Metadata.AsMap(), val.Metadata.AsMap())
}
}
})
@@ -2130,7 +2118,6 @@ func TestGetLatestForecasts(t *testing.T) {
"p10": 0.4 + float32(i)*0.05,
"p90": 0.6 + float32(i)*0.05,
},
- Metadata: metadata,
}
}
@@ -2247,7 +2234,6 @@ func TestStreamForecastData(t *testing.T) {
OtherStatisticsFractions: map[string]float32{
"p90": float32(i)*0.25 + 0.1,
},
- Metadata: metadata,
}
}
@@ -2261,6 +2247,7 @@ func TestStreamForecastData(t *testing.T) {
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
InitTimeUtc: initTime,
Values: yields,
+ Metadata: metadata,
})
require.NoError(t, err)
@@ -2270,6 +2257,7 @@ func TestStreamForecastData(t *testing.T) {
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
InitTimeUtc: initTime,
Values: yields,
+ Metadata: metadata,
})
require.NoError(t, err)
}
diff --git a/internal/server/postgres/sql/migrations/00009_optimize_storage.sql b/internal/server/postgres/sql/migrations/00009_optimize_storage.sql
new file mode 100644
index 0000000..ffab5ad
--- /dev/null
+++ b/internal/server/postgres/sql/migrations/00009_optimize_storage.sql
@@ -0,0 +1,169 @@
+-- +goose NO TRANSACTION
+-- +goose Up
+
+/*
+ * Reduces the database size by approximately 40%.
+ *
+ * Modifies the predicted_generation_values table to optimize it's use of storage.
+ * This is done through changing the index, removing redundant columns, and replacing
+ * dynamic columns with small static ones.
+ *
+ * The schema modifications and the corresponding data changes are seperated out for
+ * faster migration. Since the predicted_generation_values table is very large, simple
+ * DELETES and UPDATES would take a long time, and not actually gain us any storage
+ * savings (at least until an autovacuum process ran). By instead moving the data
+ * partition-wise and then replacing the partitions, we keep the process light on CPU.
+ */
+
+DROP INDEX IF EXISTS loc.idx_sources_mv_gist_sys_period;
+
+CREATE INDEX IF NOT EXISTS idx_sources_mv_composite_lookup
+ON loc.sources_mv USING gist (geometry_uuid, source_type_id, sys_period);
+
+ALTER TABLE pred.predicted_generation_values
+ ADD COLUMN p10_sip SMALLINT,
+ ADD COLUMN p90_sip SMALLINT,
+ DROP CONSTRAINT predicted_generation_values_pkey CASCADE,
+ ALTER COLUMN target_time_utc DROP NOT NULL;
+
+-- +goose StatementBegin
+CREATE OR REPLACE PROCEDURE pred.swap_predicted_generation_partitions()
+LANGUAGE plpgsql
+AS $$
+DECLARE
+ partition_record RECORD;
+ new_part_name TEXT;
+ part_bound TEXT;
+BEGIN
+ FOR partition_record IN
+ SELECT child.relname AS table_name,
+ pg_get_expr(child.relpartbound, child.oid) AS bounds
+ FROM pg_inherits
+ JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
+ JOIN pg_class child ON pg_inherits.inhrelid = child.oid
+ JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
+ WHERE parent.relname = 'predicted_generation_values'
+ AND nmsp_parent.nspname = 'pred'
+ LOOP
+ new_part_name := partition_record.table_name || '_v2';
+ part_bound := partition_record.bounds;
+
+ EXECUTE format('CREATE TABLE pred.%I (LIKE pred.predicted_generation_values INCLUDING ALL);', new_part_name);
+
+ EXECUTE format('
+ INSERT INTO pred.%I (horizon_mins, p50_sip, p10_sip, p90_sip, forecast_uuid, target_time_utc, metadata, other_stats_fractions)
+ SELECT
+ horizon_mins,
+ p50_sip,
+ ((other_stats_fractions->>''p10'')::REAL * 30000)::SMALLINT,
+ ((other_stats_fractions->>''p90'')::REAL * 30000)::SMALLINT,
+ forecast_uuid,
+ NULL,
+ NULL,
+ NULL
+ FROM pred.%I;
+ ', new_part_name, partition_record.table_name);
+
+ EXECUTE format('ALTER TABLE pred.%I ADD PRIMARY KEY (forecast_uuid, horizon_mins);', new_part_name);
+ EXECUTE format('ALTER TABLE pred.predicted_generation_values DETACH PARTITION pred.%I;', partition_record.table_name);
+ EXECUTE format('ALTER TABLE pred.predicted_generation_values ATTACH PARTITION pred.%I %s;', new_part_name, part_bound);
+ EXECUTE format('DROP TABLE pred.%I;', partition_record.table_name);
+ EXECUTE format('ALTER TABLE pred.%I RENAME TO %I;', new_part_name, partition_record.table_name);
+
+ COMMIT;
+ END LOOP;
+END;
+$$;
+-- +goose StatementEnd
+
+CALL pred.swap_predicted_generation_partitions();
+DROP PROCEDURE pred.swap_predicted_generation_partitions;
+
+ALTER TABLE pred.predicted_generation_values
+ DROP COLUMN target_time_utc,
+ DROP COLUMN other_stats_fractions,
+ DROP COLUMN metadata,
+ ADD PRIMARY KEY (forecast_uuid, horizon_mins);
+
+DROP TABLE IF EXISTS pred.predicted_generation_values_template;
+CREATE TABLE pred.predicted_generation_values_template (
+ horizon_mins SMALLINT NOT NULL,
+ CONSTRAINT horizon_mins_nonnegative_check CHECK (horizon_mins >= 0),
+ CONSTRAINT horizon_mins_fiveminutely_check CHECK (horizon_mins % 5 = 0),
+ p50_sip SMALLINT NOT NULL,
+ CONSTRAINT p50_sip_nonnegative_check CHECK (p50_sip >= 0),
+ p10_sip SMALLINT,
+ p90_sip SMALLINT,
+ forecast_uuid UUID NOT NULL REFERENCES pred.forecasts (forecast_uuid) ON DELETE CASCADE ON UPDATE CASCADE,
+ PRIMARY KEY (forecast_uuid, horizon_mins)
+);
+
+ANALYZE pred.predicted_generation_values;
+
+
+-- +goose Down
+ALTER TABLE pred.predicted_generation_values
+ DROP CONSTRAINT predicted_generation_values_pkey CASCADE,
+ ADD COLUMN target_time_utc TIMESTAMP,
+ ADD COLUMN other_stats_fractions JSONB DEFAULT NULL,
+ ADD CONSTRAINT other_stats_nullifempty CHECK (other_stats_fractions IS NULL OR other_stats_fractions != '{}'),
+ ADD CONSTRAINT other_stats_valid_fractions_check CHECK (pred.check_all_jsonb_values_are_valid_stat_fractions(other_stats_fractions)),
+ ADD COLUMN metadata JSONB DEFAULT NULL;
+
+
+-- +goose StatementBegin
+CREATE OR REPLACE PROCEDURE pred.rollback_predicted_generation_partitions()
+LANGUAGE plpgsql
+AS $$
+DECLARE
+ partition_record RECORD;
+BEGIN
+ FOR partition_record IN
+ SELECT child.relname AS table_name
+ FROM pg_inherits
+ JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
+ JOIN pg_class child ON pg_inherits.inhrelid = child.oid
+ JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
+ WHERE parent.relname = 'predicted_generation_values'
+ AND nmsp_parent.nspname = 'pred'
+ LOOP
+ EXECUTE format('
+ UPDATE pred.%I
+ SET target_time_utc = UUIDV7_EXTRACT_TIMESTAMP(forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => horizon_mins::INTEGER),
+ other_stats_fractions = CASE WHEN p10_sip IS NOT NULL OR p90_sip IS NOT NULL THEN jsonb_strip_nulls(jsonb_build_object(''p10'', p10_sip::REAL / 30000, ''p90'', p90_sip::REAL / 30000))
+ ELSE NULL END;
+ ', partition_record.table_name);
+
+ EXECUTE format('ALTER TABLE pred.%I ALTER COLUMN target_time_utc SET NOT NULL;', partition_record.table_name);
+ EXECUTE format('ALTER TABLE pred.%I ADD PRIMARY KEY (forecast_uuid, target_time_utc);', partition_record.table_name);
+
+ COMMIT;
+ END LOOP;
+END;
+$$;
+-- +goose StatementEnd
+
+CALL pred.rollback_predicted_generation_partitions();
+DROP PROCEDURE pred.rollback_predicted_generation_partitions;
+
+ALTER TABLE pred.predicted_generation_values
+ ALTER COLUMN target_time_utc SET NOT NULL,
+ ADD PRIMARY KEY (forecast_uuid, target_time_utc),
+ DROP COLUMN p10_sip, DROP COLUMN p90_sip;
+
+DROP TABLE IF EXISTS pred.predicted_generation_values_template;
+CREATE TABLE pred.predicted_generation_values_template (
+ horizon_mins SMALLINT NOT NULL,
+ CONSTRAINT horizon_mins_nonnegative_check CHECK (horizon_mins >= 0),
+ CONSTRAINT horizon_mins_fiveminutely_check CHECK (horizon_mins % 5 = 0),
+ p50_sip SMALLINT NOT NULL,
+ CONSTRAINT p50_sip_nonnegative_check CHECK (p50_sip >= 0),
+ target_time_utc TIMESTAMP NOT NULL,
+ forecast_uuid UUID NOT NULL REFERENCES pred.forecasts (forecast_uuid) ON DELETE CASCADE ON UPDATE CASCADE,
+ metadata JSONB DEFAULT NULL CONSTRAINT metadata_nullifempty CHECK (metadata IS NULL OR metadata != '{}'),
+ other_stats_fractions JSONB DEFAULT NULL CONSTRAINT other_stats_nullifempty CHECK (other_stats_fractions IS NULL OR other_stats_fractions != '{}'),
+ CONSTRAINT other_stats_valid_fractions_check CHECK (pred.check_all_jsonb_values_are_valid_stat_fractions(other_stats_fractions)),
+ PRIMARY KEY (forecast_uuid, target_time_utc)
+);
+
+ANALYZE pred.predicted_generation_values;
diff --git a/internal/server/postgres/sql/queries/predictions.sql b/internal/server/postgres/sql/queries/predictions.sql
index 0974373..51b1aca 100644
--- a/internal/server/postgres/sql/queries/predictions.sql
+++ b/internal/server/postgres/sql/queries/predictions.sql
@@ -123,9 +123,9 @@ WHERE forecast_uuid IN (SELECT forecast_uuid FROM forecasts_to_delete);
* with 0 representing 0% and 30000 representing 100% of capacity.
*/
INSERT INTO pred.predicted_generation_values (
- horizon_mins, p50_sip, forecast_uuid, target_time_utc, other_stats_fractions, metadata
+ horizon_mins, p50_sip, p10_sip, p90_sip, forecast_uuid
) VALUES (
- $1, $2, $3, $4, $5, $6
+ $1, $2, $3, $4, $5
);
-- name: ListPredictionsForForecasts :many
@@ -153,11 +153,15 @@ SELECT
mf.forecaster_version,
f.created_at_utc,
pg.horizon_mins,
+ pg.p10_sip,
pg.p50_sip,
- pg.other_stats_fractions,
+ pg.p90_sip,
sv.capacity_watts,
+ f.metadata,
UUIDV7_EXTRACT_TIMESTAMP(f.forecast_uuid)::TIMESTAMP AS init_time_utc,
- COALESCE(pg.metadata || f.metadata, pg.metadata, f.metadata) AS metadata
+ (
+ UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER)
+ )::TIMESTAMP AS target_time_utc
FROM pred.forecasts AS f
INNER JOIN matched_forecasters AS mf USING (forecaster_id)
INNER JOIN pred.predicted_generation_values AS pg
@@ -169,7 +173,8 @@ FROM pred.forecasts AS f
FROM loc.sources_mv AS s
WHERE s.geometry_uuid = f.geometry_uuid
AND s.source_type_id = f.source_type_id
- AND s.sys_period @> pg.target_time_utc
+ AND s.sys_period
+ @> (UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER))
LIMIT 1
) AS sv ON TRUE
WHERE f.geometry_uuid = sqlc.arg(geometry_uuid)::UUID
@@ -256,29 +261,39 @@ WITH allowed_forecasts_overlapping_window AS (
)
),
winning_predictions AS (
- SELECT DISTINCT ON (pg.target_time_utc)
- pg.target_time_utc,
+ SELECT DISTINCT ON (
+ UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER)
+ )
fow.forecast_uuid,
fow.init_time_utc,
fow.created_at_utc,
fow.geometry_uuid,
fow.source_type_id,
pg.horizon_mins,
+ pg.p10_sip,
pg.p50_sip,
- pg.other_stats_fractions,
- COALESCE(pg.metadata || fow.metadata, pg.metadata, fow.metadata) AS metadata
+ pg.p90_sip,
+ fow.metadata,
+ (
+ UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER)
+ )::TIMESTAMP AS target_time_utc
FROM allowed_forecasts_overlapping_window AS fow
INNER JOIN pred.predicted_generation_values AS pg USING (forecast_uuid)
- WHERE pg.target_time_utc BETWEEN sqlc.arg(start_timestamp_utc)::TIMESTAMP AND sqlc.arg(end_timestamp_utc)::TIMESTAMP
- AND pg.horizon_mins >= sqlc.arg(horizon_mins)::INTEGER
+ WHERE (
+ UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER)
+ ) BETWEEN sqlc.arg(start_timestamp_utc)::TIMESTAMP AND sqlc.arg(end_timestamp_utc)::TIMESTAMP
+ AND pg.horizon_mins >= sqlc.arg(horizon_mins)::INTEGER
-- Sorting by decreasing init time ensures the DISTINCT captures the lowest allowed horizon
- ORDER BY pg.target_time_utc ASC, fow.init_time_utc DESC
+ ORDER BY
+ (UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER)) ASC,
+ fow.init_time_utc DESC
)
SELECT
wp.horizon_mins,
+ wp.p10_sip,
wp.p50_sip,
+ wp.p90_sip,
wp.target_time_utc,
- wp.other_stats_fractions,
wp.metadata,
wp.init_time_utc,
wp.created_at_utc,
@@ -343,22 +358,24 @@ SELECT
laf.geometry_uuid,
laf.source_type_id,
pg.horizon_mins,
+ pg.p10_sip,
pg.p50_sip,
- pg.target_time_utc,
+ pg.p90_sip,
laf.created_at_utc,
laf.init_time_utc,
- pg.other_stats_fractions,
sv.capacity_watts,
sv.latitude,
sv.longitude,
sv.geometry_name,
- COALESCE(pg.metadata || laf.metadata, pg.metadata, laf.metadata) AS metadata
+ laf.metadata,
+ sqlc.arg(target_timestamp_utc)::TIMESTAMP AS target_time_utc
FROM latest_allowed_forecast_per_location AS laf
INNER JOIN pred.predicted_generation_values AS pg USING (forecast_uuid)
INNER JOIN loc.sources_mv AS sv USING (geometry_uuid, source_type_id)
WHERE
- pg.target_time_utc = sqlc.arg(target_timestamp_utc)::TIMESTAMP
- AND sv.sys_period @> pg.target_time_utc;
+ (UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER))
+ = sqlc.arg(target_timestamp_utc)::TIMESTAMP
+ AND sv.sys_period @> sqlc.arg(target_timestamp_utc)::TIMESTAMP;
-- name: GetWeekAverageDeltasForLocations :many
/* GetWeekAverageDeltasForLocations retrieves the average deltas between predicted and observed generation values
@@ -385,9 +402,11 @@ relevant_predicted_values AS MATERIALIZED (
SELECT
rf.geometry_uuid,
rf.source_type_id,
- pg.target_time_utc,
pg.horizon_mins,
- pg.p50_sip
+ pg.p50_sip,
+ (
+ UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER)
+ )::TIMESTAMP AS target_time_utc
FROM relevant_forecasts AS rf
INNER JOIN pred.predicted_generation_values AS pg USING (forecast_uuid)
WHERE pg.forecast_uuid >= UUIDV7_BOUNDARY(sqlc.arg(pivot_timestamp)::TIMESTAMP - INTERVAL '8 days')
diff --git a/internal/server/postgres/testdata/seeding.sql b/internal/server/postgres/testdata/seeding.sql
index 895c260..a9d9de2 100644
--- a/internal/server/postgres/testdata/seeding.sql
+++ b/internal/server/postgres/testdata/seeding.sql
@@ -65,22 +65,23 @@ BEGIN
TSRANGE(init_time_utc, init_time_utc + (forecast_len_mins * INTERVAL '1 minute'))
FROM generated_data
RETURNING forecast_uuid, init_time_utc
- ),
- static_json AS (
- SELECT '{"source": "benchmark"}'::jsonb AS meta, '{"p10": 0.1, "p90": 0.9}'::jsonb AS stats
)
INSERT INTO pred.predicted_generation_values
- (horizon_mins, p50_sip, forecast_uuid, target_time_utc, metadata, other_stats_fractions)
- SELECT gs.h, (random() * 30000)::SMALLINT, inf.forecast_uuid,
- inf.init_time_utc + (gs.h * INTERVAL '1 minute'), sj.meta, sj.stats
+ (horizon_mins, p50_sip, p10_sip, p90_sip, forecast_uuid)
+ SELECT
+ gs.h,
+ (random() * 30000)::SMALLINT,
+ 3000::SMALLINT,
+ 27000::SMALLINT,
+ inf.forecast_uuid
FROM inserted_forecasts inf
- CROSS JOIN static_json sj
CROSS JOIN LATERAL generate_series(0, forecast_len_mins - pgv_res_mins, pgv_res_mins) AS gs(h)
ORDER BY inf.init_time_utc ASC;
-- Spoof the table size so Postgres uses indexes rather than seq scan in testing
UPDATE pg_class SET reltuples = 346000000, relpages = 5000000 WHERE relname = 'predicted_generation_values';
- UPDATE pg_class SET reltuples = 346000000, relpages = 5000000 WHERE relname = 'predicted_generation_values_forecast_uuid_idx';
+ UPDATE pg_class SET reltuples = 346000000, relpages = 5000000 WHERE relname = 'predicted_generation_values_pkey';
+
REFRESH MATERIALIZED VIEW loc.sources_mv;
RETURN QUERY SELECT target_locations * (history_window_mins / forecast_freq_mins) * (forecast_len_mins / pgv_res_mins), geo_list;
diff --git a/proto/ocf/dp/dp-data.messages.proto b/proto/ocf/dp/dp-data.messages.proto
index edcf2ba..b833e7d 100644
--- a/proto/ocf/dp/dp-data.messages.proto
+++ b/proto/ocf/dp/dp-data.messages.proto
@@ -318,6 +318,7 @@ message ListForecastersResponse {
message CreateForecastRequest {
message ForecastValue {
+ reserved 4;
uint32 horizon_mins = 1;
float p50_fraction = 2 [
(buf.validate.field).float.gte = 0,
@@ -328,6 +329,9 @@ message CreateForecastRequest {
map other_statistics_fractions = 3 [
(buf.validate.field).map.min_pairs = 0,
(buf.validate.field).map.max_pairs = 20,
+ (buf.validate.field).map.keys = {
+ string: {in: ["p10", "p90"]}
+ },
(buf.validate.field).map.values = {
float: {
gte: 0.0,
@@ -336,7 +340,6 @@ message CreateForecastRequest {
}
}
];
- optional google.protobuf.Struct metadata = 4;
}
Forecaster forecaster = 1 [