From 89def7728eb38b2cb2c81ad0b2886d9f8b475649 Mon Sep 17 00:00:00 2001
From: devsjc <47188100+devsjc@users.noreply.github.com>
Date: Tue, 16 Jun 2026 13:14:28 +0100
Subject: [PATCH 1/6] perf(postgres): Optimize table
---
internal/server/postgres/dataserverimpl.go | 89 ++++------
.../sql/migrations/00009_optimize_storage.sql | 168 ++++++++++++++++++
.../postgres/sql/queries/predictions.sql | 51 ++++--
internal/server/postgres/testdata/seeding.sql | 17 +-
4 files changed, 250 insertions(+), 75 deletions(-)
create mode 100644 internal/server/postgres/sql/migrations/00009_optimize_storage.sql
diff --git a/internal/server/postgres/dataserverimpl.go b/internal/server/postgres/dataserverimpl.go
index e8544fb..30478e0 100644
--- a/internal/server/postgres/dataserverimpl.go
+++ b/internal/server/postgres/dataserverimpl.go
@@ -10,7 +10,6 @@ import (
"context"
"errors"
"fmt"
- "math"
"time"
"github.com/google/uuid"
@@ -21,7 +20,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
pb "github.com/openclimatefix/data-platform/internal/gen/ocf/dp"
@@ -64,6 +62,19 @@ func timeptrToPgTimestamp(t *timestamppb.Timestamp) pgtype.Timestamp {
return pgtype.Timestamp{Time: t.AsTime().UTC(), Valid: true}
}
+// extractSIPStatPtrFromMap gets a key's value from a map as a pointer, and converts it
+// to a smallint percentage. If it doesn't exist, it returns nil.
+func extractSIPStatPtrFromMap(m map[string]float32, key string) *int16 {
+ val, exists := m[key]
+ if !exists {
+ return nil
+ }
+
+ sip_val := int16(val * 30000.0)
+
+ return &sip_val
+}
+
// --- Server Implementation ----------------------------------------------------------------------
func NewDataPlatformDataServiceServerImpl() *DataPlatformDataServiceServerImpl {
@@ -172,48 +183,18 @@ func (s *DataPlatformDataServiceServerImpl) CreateForecast(
// Create the forecast data
paramsList := make([]db.CreatePredictedValuesParams, len(req.Values))
for i, value := range req.Values {
- // Since the database wants each numeric value to have 4 significant figures,
- // the float32 values need to be rounded accordingly.
- roundedStats := make(map[string]any)
- for k, val32 := range value.OtherStatisticsFractions {
- val64 := float64(val32)
-
- var roundedVal float64
- if val64 < 1 {
- roundedVal = math.Round(val64*10000) / 10000
- } else {
- roundedVal = math.Round(val64*1000) / 1000
- }
-
- roundedStats[k] = roundedVal
- }
-
// Since CreatePredictedValues uses COPYFROM, manually coerce empty metadata to nil
if value.Metadata != nil && len(value.Metadata.Fields) == 0 {
value.Metadata = nil
}
- var otherStats *structpb.Struct
-
- if len(roundedStats) > 0 {
- otherStats, err = structpb.NewStruct(roundedStats)
- if err != nil {
- l.Err(err).Msgf("structpb.NewStruct(%+v)", roundedStats)
- }
- }
-
paramsList[i] = db.CreatePredictedValuesParams{
HorizonMins: int16(value.HorizonMins),
+ P10Sip: extractSIPStatPtrFromMap(value.OtherStatisticsFractions, "p10"),
P50Sip: int16(value.P50Fraction * 30000.0),
+ P90Sip: extractSIPStatPtrFromMap(value.OtherStatisticsFractions, "p90"),
ForecastUuid: dbForecast.ForecastUuid,
- TargetTimeUtc: pgtype.Timestamp{
- Time: req.InitTimeUtc.AsTime().Add(
- time.Duration(value.HorizonMins) * time.Minute,
- ),
- Valid: true,
- },
- OtherStatsFractions: otherStats,
- Metadata: value.Metadata,
+ Metadata: value.Metadata,
}
}
@@ -548,10 +529,12 @@ func (s *DataPlatformDataServiceServerImpl) StreamForecastData(
&row.ForecasterVersion,
&row.CreatedAtUtc,
&row.HorizonMins,
+ &row.P10Sip,
&row.P50Sip,
- &row.OtherStatsFractions,
+ &row.P90Sip,
&row.CapacityWatts,
&row.InitTimeUtc,
+ &row.TargetTimeUtc,
&row.Metadata,
)
if err != nil {
@@ -560,10 +543,12 @@ func (s *DataPlatformDataServiceServerImpl) StreamForecastData(
}
otherStatistics := make(map[string]float32)
- if row.OtherStatsFractions != nil {
- for k, v := range row.OtherStatsFractions.AsMap() {
- otherStatistics[k] = float32(v.(float64))
- }
+ if row.P10Sip != nil {
+ otherStatistics["p10"] = float32(*row.P10Sip) / 30000.0
+ }
+
+ if row.P90Sip != nil {
+ otherStatistics["p90"] = float32(*row.P90Sip) / 30000.0
}
metadata := make(map[string]string)
@@ -1639,14 +1624,12 @@ func (s *DataPlatformDataServiceServerImpl) GetForecastAsTimeseries(
out := make([]*pb.GetForecastAsTimeseriesResponse_Value, len(dbPreds))
for i, pred := range dbPreds {
otherStats := make(map[string]float32)
- for k, v := range pred.OtherStatsFractions.AsMap() {
- floatVal, ok := v.(float64)
- if !ok {
- l.Warn().Str("key", k).Interface("value", v).Msg("skipping non-float statistic")
- continue
- }
+ if pred.P10Sip != nil {
+ otherStats["p10"] = float32(*pred.P10Sip) / 30000.0
+ }
- otherStats[k] = float32(floatVal)
+ if pred.P90Sip != nil {
+ otherStats["p90"] = float32(*pred.P90Sip) / 30000.0
}
out[i] = &pb.GetForecastAsTimeseriesResponse_Value{
@@ -1741,14 +1724,12 @@ func (s *DataPlatformDataServiceServerImpl) GetForecastAsTimeseries(
values := make([]*pb.GetForecastAsTimeseriesResponse_Value, len(dbValues))
for i, value := range dbValues {
otherStats := make(map[string]float32)
- for k, v := range value.OtherStatsFractions.AsMap() {
- floatVal, ok := v.(float64)
- if !ok {
- l.Warn().Str("key", k).Interface("value", v).Msg("skipping non-float statistic")
- continue
- }
+ if value.P10Sip != nil {
+ otherStats["p10"] = float32(*value.P10Sip) / 30000.0
+ }
- otherStats[k] = float32(floatVal)
+ if value.P90Sip != nil {
+ otherStats["p90"] = float32(*value.P90Sip) / 30000.0
}
values[i] = &pb.GetForecastAsTimeseriesResponse_Value{
diff --git a/internal/server/postgres/sql/migrations/00009_optimize_storage.sql b/internal/server/postgres/sql/migrations/00009_optimize_storage.sql
new file mode 100644
index 0000000..231e2af
--- /dev/null
+++ b/internal/server/postgres/sql/migrations/00009_optimize_storage.sql
@@ -0,0 +1,168 @@
+-- +goose NO TRANSACTION
+-- +goose Up
+
+/*
+ * Reduces the database size by approximately 40%.
+ *
+ * Modifies the predicted_generation_values table to optimize it's use of storage.
+ * This is done through changing the index, removing redundant columns, and replacing
+ * dynamic columns with small static ones.
+ *
+ * The schema modifications and the corresponding data changes are seperated out for
+ * faster migration. Since the predicted_generation_values table is very large, simple
+ * DELETES and UPDATES would take a long time, and not actually gain us any storage
+ * savings (at least until an autovacuum process ran). By instead moving the data
+ * partition-wise and then replacing the partitions, we keep the process light on CPU.
+ */
+
+DROP INDEX loc.idx_sources_mv_gist_sys_period;
+
+CREATE INDEX idx_sources_mv_composite_lookup
+ON loc.sources_mv USING gist (geometry_uuid, source_type_id, sys_period);
+
+ALTER TABLE pred.predicted_generation_values
+ ADD COLUMN p10_sip SMALLINT,
+ ADD COLUMN p90_sip SMALLINT,
+ DROP CONSTRAINT predicted_generation_values_pkey CASCADE,
+ ALTER COLUMN target_time_utc DROP NOT NULL;
+
+-- +goose StatementBegin
+CREATE OR REPLACE PROCEDURE pred.swap_predicted_generation_partitions()
+LANGUAGE plpgsql
+AS $$
+DECLARE
+ partition_record RECORD;
+ new_part_name TEXT;
+ part_bound TEXT;
+BEGIN
+ FOR partition_record IN
+ SELECT child.relname AS table_name,
+ pg_get_expr(child.relpartbound, child.oid) AS bounds
+ FROM pg_inherits
+ JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
+ JOIN pg_class child ON pg_inherits.inhrelid = child.oid
+ JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
+ WHERE parent.relname = 'predicted_generation_values'
+ AND nmsp_parent.nspname = 'pred'
+ LOOP
+ new_part_name := partition_record.table_name || '_v2';
+ part_bound := partition_record.bounds;
+
+ EXECUTE format('CREATE TABLE pred.%I (LIKE pred.predicted_generation_values INCLUDING ALL);', new_part_name);
+
+ EXECUTE format('
+ INSERT INTO pred.%I (horizon_mins, p50_sip, p10_sip, p90_sip, forecast_uuid, target_time_utc, metadata, other_stats_fractions)
+ SELECT
+ horizon_mins,
+ p50_sip,
+ ((other_stats_fractions->>''p10'')::REAL * 30000)::SMALLINT,
+ ((other_stats_fractions->>''p90'')::REAL * 30000)::SMALLINT,
+ forecast_uuid,
+ NULL,
+ metadata,
+ NULL
+ FROM pred.%I;
+ ', new_part_name, partition_record.table_name);
+
+ EXECUTE format('ALTER TABLE pred.%I ADD PRIMARY KEY (forecast_uuid, horizon_mins);', new_part_name);
+ EXECUTE format('ALTER TABLE pred.predicted_generation_values DETACH PARTITION pred.%I;', partition_record.table_name);
+ EXECUTE format('ALTER TABLE pred.predicted_generation_values ATTACH PARTITION pred.%I %s;', new_part_name, part_bound);
+ EXECUTE format('DROP TABLE pred.%I;', partition_record.table_name);
+ EXECUTE format('ALTER TABLE pred.%I RENAME TO %I;', new_part_name, partition_record.table_name);
+
+ COMMIT;
+ END LOOP;
+END;
+$$;
+-- +goose StatementEnd
+
+CALL pred.swap_predicted_generation_partitions();
+DROP PROCEDURE pred.swap_predicted_generation_partitions;
+
+ALTER TABLE pred.predicted_generation_values
+ DROP COLUMN target_time_utc,
+ DROP COLUMN other_stats_fractions;
+
+ALTER TABLE pred.predicted_generation_values
+ ADD PRIMARY KEY (forecast_uuid, horizon_mins);
+
+DROP TABLE IF EXISTS pred.predicted_generation_values_template;
+CREATE TABLE pred.predicted_generation_values_template (
+ horizon_mins SMALLINT NOT NULL,
+ CONSTRAINT horizon_mins_nonnegative_check CHECK (horizon_mins >= 0),
+ CONSTRAINT horizon_mins_fiveminutely_check CHECK (horizon_mins % 5 = 0),
+ p50_sip SMALLINT NOT NULL,
+ CONSTRAINT p50_sip_nonnegative_check CHECK (p50_sip >= 0),
+ p10_sip SMALLINT,
+ p90_sip SMALLINT,
+ forecast_uuid UUID NOT NULL REFERENCES pred.forecasts (forecast_uuid) ON DELETE CASCADE ON UPDATE CASCADE,
+ metadata JSONB DEFAULT NULL CONSTRAINT metadata_nullifempty CHECK (metadata IS NULL OR metadata != '{}'),
+ PRIMARY KEY (forecast_uuid, horizon_mins)
+);
+
+ANALYZE pred.predicted_generation_values;
+
+
+-- +goose Down
+ALTER TABLE pred.predicted_generation_values ADD COLUMN target_time_utc TIMESTAMP;
+ALTER TABLE pred.predicted_generation_values ADD COLUMN other_stats_fractions JSONB DEFAULT NULL;
+
+ALTER TABLE pred.predicted_generation_values
+ ADD CONSTRAINT other_stats_nullifempty CHECK (other_stats_fractions IS NULL OR other_stats_fractions != '{}'),
+ ADD CONSTRAINT other_stats_valid_fractions_check CHECK (pred.check_all_jsonb_values_are_valid_stat_fractions(other_stats_fractions));
+
+
+-- +goose StatementBegin
+CREATE OR REPLACE PROCEDURE pred.rollback_predicted_generation_partitions()
+LANGUAGE plpgsql
+AS $$
+DECLARE
+ partition_record RECORD;
+BEGIN
+ FOR partition_record IN
+ SELECT child.relname AS table_name
+ FROM pg_inherits
+ JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
+ JOIN pg_class child ON pg_inherits.inhrelid = child.oid
+ JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
+ WHERE parent.relname = 'predicted_generation_values'
+ AND nmsp_parent.nspname = 'pred'
+ LOOP
+ EXECUTE format('
+ UPDATE pred.%I
+ SET target_time_utc = UUIDV7_EXTRACT_TIMESTAMP(forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => horizon_mins::INTEGER),
+ other_stats_fractions = CASE WHEN p10_sip IS NOT NULL OR p90_sip IS NOT NULL THEN JSONB_BUILD_OBJECT(''p10'', p10_sip::REAL / 30000, ''p90'', p90_sip::REAL / 30000) ELSE NULL END;
+ ', partition_record.table_name);
+
+ EXECUTE format('ALTER TABLE pred.%I ALTER COLUMN target_time_utc SET NOT NULL;', partition_record.table_name);
+ EXECUTE format('ALTER TABLE pred.%I ADD PRIMARY KEY (forecast_uuid, target_time_utc);', partition_record.table_name);
+
+ COMMIT;
+ END LOOP;
+END;
+$$;
+-- +goose StatementEnd
+
+CALL pred.rollback_predicted_generation_partitions();
+DROP PROCEDURE pred.rollback_predicted_generation_partitions;
+
+ALTER TABLE pred.predicted_generation_values ALTER COLUMN target_time_utc SET NOT NULL;
+ALTER TABLE pred.predicted_generation_values ADD PRIMARY KEY (forecast_uuid, target_time_utc);
+ALTER TABLE pred.predicted_generation_values DROP COLUMN p10_sip, DROP COLUMN p90_sip;
+
+DROP TABLE IF EXISTS pred.predicted_generation_values_template;
+CREATE TABLE pred.predicted_generation_values_template (
+ horizon_mins SMALLINT NOT NULL,
+ CONSTRAINT horizon_mins_nonnegative_check CHECK (horizon_mins >= 0),
+ CONSTRAINT horizon_mins_fiveminutely_check CHECK (horizon_mins % 5 = 0),
+ p50_sip SMALLINT NOT NULL,
+ CONSTRAINT p50_sip_nonnegative_check CHECK (p50_sip >= 0),
+ target_time_utc TIMESTAMP NOT NULL,
+ forecast_uuid UUID NOT NULL REFERENCES pred.forecasts (forecast_uuid) ON DELETE CASCADE ON UPDATE CASCADE,
+ metadata JSONB DEFAULT NULL CONSTRAINT metadata_nullifempty CHECK (metadata IS NULL OR metadata != '{}'),
+ other_stats_fractions JSONB DEFAULT NULL CONSTRAINT other_stats_nullifempty CHECK (other_stats_fractions IS NULL OR other_stats_fractions != '{}'),
+ CONSTRAINT other_stats_valid_fractions_check CHECK (pred.check_all_jsonb_values_are_valid_stat_fractions(other_stats_fractions)),
+ PRIMARY KEY (forecast_uuid, target_time_utc)
+);
+
+ANALYZE pred.predicted_generation_values;
diff --git a/internal/server/postgres/sql/queries/predictions.sql b/internal/server/postgres/sql/queries/predictions.sql
index 0974373..37d6019 100644
--- a/internal/server/postgres/sql/queries/predictions.sql
+++ b/internal/server/postgres/sql/queries/predictions.sql
@@ -123,7 +123,7 @@ WHERE forecast_uuid IN (SELECT forecast_uuid FROM forecasts_to_delete);
* with 0 representing 0% and 30000 representing 100% of capacity.
*/
INSERT INTO pred.predicted_generation_values (
- horizon_mins, p50_sip, forecast_uuid, target_time_utc, other_stats_fractions, metadata
+ horizon_mins, p50_sip, p10_sip, p90_sip, forecast_uuid, metadata
) VALUES (
$1, $2, $3, $4, $5, $6
);
@@ -153,10 +153,14 @@ SELECT
mf.forecaster_version,
f.created_at_utc,
pg.horizon_mins,
+ pg.p10_sip,
pg.p50_sip,
- pg.other_stats_fractions,
+ pg.p90_sip,
sv.capacity_watts,
UUIDV7_EXTRACT_TIMESTAMP(f.forecast_uuid)::TIMESTAMP AS init_time_utc,
+ (
+ UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER)
+ )::TIMESTAMP AS target_time_utc,
COALESCE(pg.metadata || f.metadata, pg.metadata, f.metadata) AS metadata
FROM pred.forecasts AS f
INNER JOIN matched_forecasters AS mf USING (forecaster_id)
@@ -169,7 +173,8 @@ FROM pred.forecasts AS f
FROM loc.sources_mv AS s
WHERE s.geometry_uuid = f.geometry_uuid
AND s.source_type_id = f.source_type_id
- AND s.sys_period @> pg.target_time_utc
+ AND s.sys_period
+ @> (UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER))
LIMIT 1
) AS sv ON TRUE
WHERE f.geometry_uuid = sqlc.arg(geometry_uuid)::UUID
@@ -256,29 +261,39 @@ WITH allowed_forecasts_overlapping_window AS (
)
),
winning_predictions AS (
- SELECT DISTINCT ON (pg.target_time_utc)
- pg.target_time_utc,
+ SELECT DISTINCT ON (
+ UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER)
+ )
fow.forecast_uuid,
fow.init_time_utc,
fow.created_at_utc,
fow.geometry_uuid,
fow.source_type_id,
pg.horizon_mins,
+ pg.p10_sip,
pg.p50_sip,
- pg.other_stats_fractions,
+ pg.p90_sip,
+ (
+ UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER)
+ )::TIMESTAMP AS target_time_utc,
COALESCE(pg.metadata || fow.metadata, pg.metadata, fow.metadata) AS metadata
FROM allowed_forecasts_overlapping_window AS fow
INNER JOIN pred.predicted_generation_values AS pg USING (forecast_uuid)
- WHERE pg.target_time_utc BETWEEN sqlc.arg(start_timestamp_utc)::TIMESTAMP AND sqlc.arg(end_timestamp_utc)::TIMESTAMP
- AND pg.horizon_mins >= sqlc.arg(horizon_mins)::INTEGER
+ WHERE (
+ UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER)
+ ) BETWEEN sqlc.arg(start_timestamp_utc)::TIMESTAMP AND sqlc.arg(end_timestamp_utc)::TIMESTAMP
+ AND pg.horizon_mins >= sqlc.arg(horizon_mins)::INTEGER
-- Sorting by decreasing init time ensures the DISTINCT captures the lowest allowed horizon
- ORDER BY pg.target_time_utc ASC, fow.init_time_utc DESC
+ ORDER BY
+ (UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER)) ASC,
+ fow.init_time_utc DESC
)
SELECT
wp.horizon_mins,
+ wp.p10_sip,
wp.p50_sip,
+ wp.p90_sip,
wp.target_time_utc,
- wp.other_stats_fractions,
wp.metadata,
wp.init_time_utc,
wp.created_at_utc,
@@ -343,22 +358,24 @@ SELECT
laf.geometry_uuid,
laf.source_type_id,
pg.horizon_mins,
+ pg.p10_sip,
pg.p50_sip,
- pg.target_time_utc,
+ pg.p90_sip,
laf.created_at_utc,
laf.init_time_utc,
- pg.other_stats_fractions,
sv.capacity_watts,
sv.latitude,
sv.longitude,
sv.geometry_name,
+ sqlc.arg(target_timestamp_utc)::TIMESTAMP AS target_time_utc,
COALESCE(pg.metadata || laf.metadata, pg.metadata, laf.metadata) AS metadata
FROM latest_allowed_forecast_per_location AS laf
INNER JOIN pred.predicted_generation_values AS pg USING (forecast_uuid)
INNER JOIN loc.sources_mv AS sv USING (geometry_uuid, source_type_id)
WHERE
- pg.target_time_utc = sqlc.arg(target_timestamp_utc)::TIMESTAMP
- AND sv.sys_period @> pg.target_time_utc;
+ (UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER))
+ = sqlc.arg(target_timestamp_utc)::TIMESTAMP
+ AND sv.sys_period @> sqlc.arg(target_timestamp_utc)::TIMESTAMP;
-- name: GetWeekAverageDeltasForLocations :many
/* GetWeekAverageDeltasForLocations retrieves the average deltas between predicted and observed generation values
@@ -385,9 +402,11 @@ relevant_predicted_values AS MATERIALIZED (
SELECT
rf.geometry_uuid,
rf.source_type_id,
- pg.target_time_utc,
pg.horizon_mins,
- pg.p50_sip
+ pg.p50_sip,
+ (
+ UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER)
+ )::TIMESTAMP AS target_time_utc
FROM relevant_forecasts AS rf
INNER JOIN pred.predicted_generation_values AS pg USING (forecast_uuid)
WHERE pg.forecast_uuid >= UUIDV7_BOUNDARY(sqlc.arg(pivot_timestamp)::TIMESTAMP - INTERVAL '8 days')
diff --git a/internal/server/postgres/testdata/seeding.sql b/internal/server/postgres/testdata/seeding.sql
index 895c260..4044f70 100644
--- a/internal/server/postgres/testdata/seeding.sql
+++ b/internal/server/postgres/testdata/seeding.sql
@@ -67,12 +67,18 @@ BEGIN
RETURNING forecast_uuid, init_time_utc
),
static_json AS (
- SELECT '{"source": "benchmark"}'::jsonb AS meta, '{"p10": 0.1, "p90": 0.9}'::jsonb AS stats
+ -- Removed the stats JSON payload entirely to mirror the dropped column
+ SELECT '{"source": "benchmark"}'::jsonb AS meta
)
INSERT INTO pred.predicted_generation_values
- (horizon_mins, p50_sip, forecast_uuid, target_time_utc, metadata, other_stats_fractions)
- SELECT gs.h, (random() * 30000)::SMALLINT, inf.forecast_uuid,
- inf.init_time_utc + (gs.h * INTERVAL '1 minute'), sj.meta, sj.stats
+ (horizon_mins, p50_sip, p10_sip, p90_sip, forecast_uuid, metadata)
+ SELECT
+ gs.h,
+ (random() * 30000)::SMALLINT,
+ 3000::SMALLINT,
+ 27000::SMALLINT,
+ inf.forecast_uuid,
+ sj.meta
FROM inserted_forecasts inf
CROSS JOIN static_json sj
CROSS JOIN LATERAL generate_series(0, forecast_len_mins - pgv_res_mins, pgv_res_mins) AS gs(h)
@@ -80,7 +86,8 @@ BEGIN
-- Spoof the table size so Postgres uses indexes rather than seq scan in testing
UPDATE pg_class SET reltuples = 346000000, relpages = 5000000 WHERE relname = 'predicted_generation_values';
- UPDATE pg_class SET reltuples = 346000000, relpages = 5000000 WHERE relname = 'predicted_generation_values_forecast_uuid_idx';
+ UPDATE pg_class SET reltuples = 346000000, relpages = 5000000 WHERE relname = 'predicted_generation_values_pkey';
+
REFRESH MATERIALIZED VIEW loc.sources_mv;
RETURN QUERY SELECT target_locations * (history_window_mins / forecast_freq_mins) * (forecast_len_mins / pgv_res_mins), geo_list;
From 4797b5b096a271d0dfea1841f504d8cb48b46407 Mon Sep 17 00:00:00 2001
From: devsjc <47188100+devsjc@users.noreply.github.com>
Date: Tue, 16 Jun 2026 16:46:10 +0100
Subject: [PATCH 2/6] feat(proto): Remove metadata from ForecastValue
This was a field that as of yet wasn't being used. In order to make sure
this very large table doesn't take up more storage space than it needs
to, I'm removing this dynamic column. Any value-related metadata can
live in the forecast metadata, I'm pretty comfident.
---
README.md | 2 +-
internal/server/postgres/bench_test.go | 7 -----
internal/server/postgres/dataserverimpl.go | 6 ----
.../server/postgres/dataserverimpl_test.go | 18 ++----------
.../sql/migrations/00009_optimize_storage.sql | 29 ++++++++++---------
.../postgres/sql/queries/predictions.sql | 16 +++++-----
proto/ocf/dp/dp-data.messages.proto | 2 +-
7 files changed, 28 insertions(+), 52 deletions(-)
diff --git a/README.md b/README.md
index 9e5a699..a57d723 100644
--- a/README.md
+++ b/README.md
@@ -235,7 +235,7 @@ This places the generated code in `gen/python`. See the `Makefile` for more exte
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
-| horizon_mins | [uint32](#uint32) | | || p50_fraction | [float](#float) | | || other_statistics_fractions | [CreateForecastRequest.ForecastValue.OtherStatisticsFractionsEntry](#ocf-dp-CreateForecastRequest-ForecastValue-OtherStatisticsFractionsEntry) | repeated | Struct for storing additional statistics like p10, p90, mean etc. || metadata | [google.protobuf.Struct](#google-protobuf-Struct) | optional | |
+| horizon_mins | [uint32](#uint32) | | || p50_fraction | [float](#float) | | || other_statistics_fractions | [CreateForecastRequest.ForecastValue.OtherStatisticsFractionsEntry](#ocf-dp-CreateForecastRequest-ForecastValue-OtherStatisticsFractionsEntry) | repeated | Struct for storing additional statistics like p10, p90, mean etc. |
CreateForecastRequest.ForecastValue.OtherStatisticsFractionsEntry
diff --git a/internal/server/postgres/bench_test.go b/internal/server/postgres/bench_test.go
index 1ff73f2..6e2bbaa 100644
--- a/internal/server/postgres/bench_test.go
+++ b/internal/server/postgres/bench_test.go
@@ -12,7 +12,6 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
- "google.golang.org/protobuf/types/known/structpb"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
pb "github.com/openclimatefix/data-platform/internal/gen/ocf/dp"
@@ -116,12 +115,6 @@ var yields = func() []*pb.CreateForecastRequest_ForecastValue {
"p90": 0.3,
},
HorizonMins: uint32(i * 30),
- Metadata: func() *structpb.Struct {
- s, _ := structpb.NewStruct(map[string]any{
- "example_key": fmt.Sprintf("example_value_%d", i),
- })
- return s
- }(),
})
}
diff --git a/internal/server/postgres/dataserverimpl.go b/internal/server/postgres/dataserverimpl.go
index 30478e0..1abd72a 100644
--- a/internal/server/postgres/dataserverimpl.go
+++ b/internal/server/postgres/dataserverimpl.go
@@ -183,18 +183,12 @@ func (s *DataPlatformDataServiceServerImpl) CreateForecast(
// Create the forecast data
paramsList := make([]db.CreatePredictedValuesParams, len(req.Values))
for i, value := range req.Values {
- // Since CreatePredictedValues uses COPYFROM, manually coerce empty metadata to nil
- if value.Metadata != nil && len(value.Metadata.Fields) == 0 {
- value.Metadata = nil
- }
-
paramsList[i] = db.CreatePredictedValuesParams{
HorizonMins: int16(value.HorizonMins),
P10Sip: extractSIPStatPtrFromMap(value.OtherStatisticsFractions, "p10"),
P50Sip: int16(value.P50Fraction * 30000.0),
P90Sip: extractSIPStatPtrFromMap(value.OtherStatisticsFractions, "p90"),
ForecastUuid: dbForecast.ForecastUuid,
- Metadata: value.Metadata,
}
}
diff --git a/internal/server/postgres/dataserverimpl_test.go b/internal/server/postgres/dataserverimpl_test.go
index 98ac857..3b5108c 100644
--- a/internal/server/postgres/dataserverimpl_test.go
+++ b/internal/server/postgres/dataserverimpl_test.go
@@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"io"
- "maps"
"math/rand/v2"
"strings"
"testing"
@@ -589,7 +588,6 @@ func TestGetForecastAtTimestamp(t *testing.T) {
"p90": float32(0.6 + float32(i)*0.05),
"p10": float32(0.4 + float32(i)*0.05),
},
- Metadata: metadata,
}
}
@@ -1061,7 +1059,6 @@ func TestGetForecastAsTimeseries(t *testing.T) {
"p10": float32(max(float32(i-1)*float32(100/len(yields))/100.0, 0)),
"p90": float32(min(float32(i+1)*float32(100/len(yields))/100.0, 1.1)),
},
- Metadata: metadata,
}
}
@@ -1801,8 +1798,6 @@ func TestCreateObservations(t *testing.T) {
func TestGetWeekAverageDeltas(t *testing.T) {
pivotTime := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)
- metadata, err := structpb.NewStruct(map[string]any{"source": "test"})
- require.NoError(t, err)
// Create a site to attach the observations to
siteResp, err := dc.CreateLocation(t.Context(), &pb.CreateLocationRequest{
@@ -1862,7 +1857,6 @@ func TestGetWeekAverageDeltas(t *testing.T) {
"p10": float32(max(float32(i-1)*float32(100/len(yields))/100.0, 0)),
"p90": float32(min(float32(i+1)*float32(100/len(yields))/100.0, 1.1)),
},
- Metadata: metadata,
}
}
@@ -1925,7 +1919,6 @@ func TestCreateForecast(t *testing.T) {
"p10": 0.4 + float32(i)*0.05,
"p90": 0.6 + float32(i)*0.05,
},
- Metadata: metadata,
}
}
@@ -1935,7 +1928,6 @@ func TestCreateForecast(t *testing.T) {
HorizonMins: uint32(i * 30),
P50Fraction: 0.0,
OtherStatisticsFractions: map[string]float32{},
- Metadata: &structpb.Struct{},
}
}
@@ -1948,7 +1940,6 @@ func TestCreateForecast(t *testing.T) {
"p10": 1.3,
"p90": -0.2,
},
- Metadata: metadata,
}
}
@@ -2078,11 +2069,8 @@ func TestCreateForecast(t *testing.T) {
})
require.NoError(t, err)
- expectedMetadata := tc.req.Values[0].Metadata.AsMap()
- maps.Copy(expectedMetadata, tc.req.Metadata.AsMap())
-
for _, val := range fResp.Values {
- require.Equal(t, expectedMetadata, val.Metadata.AsMap())
+ require.Equal(t, tc.req.Metadata.AsMap(), val.Metadata.AsMap())
}
}
})
@@ -2130,7 +2118,6 @@ func TestGetLatestForecasts(t *testing.T) {
"p10": 0.4 + float32(i)*0.05,
"p90": 0.6 + float32(i)*0.05,
},
- Metadata: metadata,
}
}
@@ -2247,7 +2234,6 @@ func TestStreamForecastData(t *testing.T) {
OtherStatisticsFractions: map[string]float32{
"p90": float32(i)*0.25 + 0.1,
},
- Metadata: metadata,
}
}
@@ -2261,6 +2247,7 @@ func TestStreamForecastData(t *testing.T) {
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
InitTimeUtc: initTime,
Values: yields,
+ Metadata: metadata,
})
require.NoError(t, err)
@@ -2270,6 +2257,7 @@ func TestStreamForecastData(t *testing.T) {
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
InitTimeUtc: initTime,
Values: yields,
+ Metadata: metadata,
})
require.NoError(t, err)
}
diff --git a/internal/server/postgres/sql/migrations/00009_optimize_storage.sql b/internal/server/postgres/sql/migrations/00009_optimize_storage.sql
index 231e2af..95df7f8 100644
--- a/internal/server/postgres/sql/migrations/00009_optimize_storage.sql
+++ b/internal/server/postgres/sql/migrations/00009_optimize_storage.sql
@@ -59,7 +59,7 @@ BEGIN
((other_stats_fractions->>''p90'')::REAL * 30000)::SMALLINT,
forecast_uuid,
NULL,
- metadata,
+ NULL,
NULL
FROM pred.%I;
', new_part_name, partition_record.table_name);
@@ -81,9 +81,8 @@ DROP PROCEDURE pred.swap_predicted_generation_partitions;
ALTER TABLE pred.predicted_generation_values
DROP COLUMN target_time_utc,
- DROP COLUMN other_stats_fractions;
-
-ALTER TABLE pred.predicted_generation_values
+ DROP COLUMN other_stats_fractions,
+ DROP COLUMN metadata,
ADD PRIMARY KEY (forecast_uuid, horizon_mins);
DROP TABLE IF EXISTS pred.predicted_generation_values_template;
@@ -96,7 +95,6 @@ CREATE TABLE pred.predicted_generation_values_template (
p10_sip SMALLINT,
p90_sip SMALLINT,
forecast_uuid UUID NOT NULL REFERENCES pred.forecasts (forecast_uuid) ON DELETE CASCADE ON UPDATE CASCADE,
- metadata JSONB DEFAULT NULL CONSTRAINT metadata_nullifempty CHECK (metadata IS NULL OR metadata != '{}'),
PRIMARY KEY (forecast_uuid, horizon_mins)
);
@@ -104,12 +102,13 @@ ANALYZE pred.predicted_generation_values;
-- +goose Down
-ALTER TABLE pred.predicted_generation_values ADD COLUMN target_time_utc TIMESTAMP;
-ALTER TABLE pred.predicted_generation_values ADD COLUMN other_stats_fractions JSONB DEFAULT NULL;
-
-ALTER TABLE pred.predicted_generation_values
+ALTER TABLE pred.predicted_generation_values
+ DROP CONSTRAINT predicted_generation_values_pkey CASCADE,
+ ADD COLUMN target_time_utc TIMESTAMP,
+ ADD COLUMN other_stats_fractions JSONB DEFAULT NULL,
ADD CONSTRAINT other_stats_nullifempty CHECK (other_stats_fractions IS NULL OR other_stats_fractions != '{}'),
- ADD CONSTRAINT other_stats_valid_fractions_check CHECK (pred.check_all_jsonb_values_are_valid_stat_fractions(other_stats_fractions));
+ ADD CONSTRAINT other_stats_valid_fractions_check CHECK (pred.check_all_jsonb_values_are_valid_stat_fractions(other_stats_fractions)),
+ ADD COLUMN metadata JSONB DEFAULT NULL;
-- +goose StatementBegin
@@ -131,7 +130,8 @@ BEGIN
EXECUTE format('
UPDATE pred.%I
SET target_time_utc = UUIDV7_EXTRACT_TIMESTAMP(forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => horizon_mins::INTEGER),
- other_stats_fractions = CASE WHEN p10_sip IS NOT NULL OR p90_sip IS NOT NULL THEN JSONB_BUILD_OBJECT(''p10'', p10_sip::REAL / 30000, ''p90'', p90_sip::REAL / 30000) ELSE NULL END;
+ other_stats_fractions = CASE WHEN p10_sip IS NOT NULL OR p90_sip IS NOT NULL THEN jsonb_strip_nulls(jsonb_build_object('p10', p10_sip::REAL / 30000, 'p90', p90_sip::REAL / 30000))
+ ELSE NULL END;
', partition_record.table_name);
EXECUTE format('ALTER TABLE pred.%I ALTER COLUMN target_time_utc SET NOT NULL;', partition_record.table_name);
@@ -146,9 +146,10 @@ $$;
CALL pred.rollback_predicted_generation_partitions();
DROP PROCEDURE pred.rollback_predicted_generation_partitions;
-ALTER TABLE pred.predicted_generation_values ALTER COLUMN target_time_utc SET NOT NULL;
-ALTER TABLE pred.predicted_generation_values ADD PRIMARY KEY (forecast_uuid, target_time_utc);
-ALTER TABLE pred.predicted_generation_values DROP COLUMN p10_sip, DROP COLUMN p90_sip;
+ALTER TABLE pred.predicted_generation_values
+ ALTER COLUMN target_time_utc SET NOT NULL,
+ ADD PRIMARY KEY (forecast_uuid, target_time_utc),
+ DROP COLUMN p10_sip, DROP COLUMN p90_sip;
DROP TABLE IF EXISTS pred.predicted_generation_values_template;
CREATE TABLE pred.predicted_generation_values_template (
diff --git a/internal/server/postgres/sql/queries/predictions.sql b/internal/server/postgres/sql/queries/predictions.sql
index 37d6019..51b1aca 100644
--- a/internal/server/postgres/sql/queries/predictions.sql
+++ b/internal/server/postgres/sql/queries/predictions.sql
@@ -123,9 +123,9 @@ WHERE forecast_uuid IN (SELECT forecast_uuid FROM forecasts_to_delete);
* with 0 representing 0% and 30000 representing 100% of capacity.
*/
INSERT INTO pred.predicted_generation_values (
- horizon_mins, p50_sip, p10_sip, p90_sip, forecast_uuid, metadata
+ horizon_mins, p50_sip, p10_sip, p90_sip, forecast_uuid
) VALUES (
- $1, $2, $3, $4, $5, $6
+ $1, $2, $3, $4, $5
);
-- name: ListPredictionsForForecasts :many
@@ -157,11 +157,11 @@ SELECT
pg.p50_sip,
pg.p90_sip,
sv.capacity_watts,
+ f.metadata,
UUIDV7_EXTRACT_TIMESTAMP(f.forecast_uuid)::TIMESTAMP AS init_time_utc,
(
UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER)
- )::TIMESTAMP AS target_time_utc,
- COALESCE(pg.metadata || f.metadata, pg.metadata, f.metadata) AS metadata
+ )::TIMESTAMP AS target_time_utc
FROM pred.forecasts AS f
INNER JOIN matched_forecasters AS mf USING (forecaster_id)
INNER JOIN pred.predicted_generation_values AS pg
@@ -273,10 +273,10 @@ winning_predictions AS (
pg.p10_sip,
pg.p50_sip,
pg.p90_sip,
+ fow.metadata,
(
UUIDV7_EXTRACT_TIMESTAMP(pg.forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => pg.horizon_mins::INTEGER)
- )::TIMESTAMP AS target_time_utc,
- COALESCE(pg.metadata || fow.metadata, pg.metadata, fow.metadata) AS metadata
+ )::TIMESTAMP AS target_time_utc
FROM allowed_forecasts_overlapping_window AS fow
INNER JOIN pred.predicted_generation_values AS pg USING (forecast_uuid)
WHERE (
@@ -367,8 +367,8 @@ SELECT
sv.latitude,
sv.longitude,
sv.geometry_name,
- sqlc.arg(target_timestamp_utc)::TIMESTAMP AS target_time_utc,
- COALESCE(pg.metadata || laf.metadata, pg.metadata, laf.metadata) AS metadata
+ laf.metadata,
+ sqlc.arg(target_timestamp_utc)::TIMESTAMP AS target_time_utc
FROM latest_allowed_forecast_per_location AS laf
INNER JOIN pred.predicted_generation_values AS pg USING (forecast_uuid)
INNER JOIN loc.sources_mv AS sv USING (geometry_uuid, source_type_id)
diff --git a/proto/ocf/dp/dp-data.messages.proto b/proto/ocf/dp/dp-data.messages.proto
index edcf2ba..7f25916 100644
--- a/proto/ocf/dp/dp-data.messages.proto
+++ b/proto/ocf/dp/dp-data.messages.proto
@@ -318,6 +318,7 @@ message ListForecastersResponse {
message CreateForecastRequest {
message ForecastValue {
+ reserved 4;
uint32 horizon_mins = 1;
float p50_fraction = 2 [
(buf.validate.field).float.gte = 0,
@@ -336,7 +337,6 @@ message CreateForecastRequest {
}
}
];
- optional google.protobuf.Struct metadata = 4;
}
Forecaster forecaster = 1 [
From d44553ce8d08ff6ddd524f9f5e84f8a6b876ed80 Mon Sep 17 00:00:00 2001
From: devsjc <47188100+devsjc@users.noreply.github.com>
Date: Wed, 17 Jun 2026 08:06:56 +0100
Subject: [PATCH 3/6] fix: Correct quotes in down migration
---
.../postgres/sql/migrations/00009_optimize_storage.sql | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/internal/server/postgres/sql/migrations/00009_optimize_storage.sql b/internal/server/postgres/sql/migrations/00009_optimize_storage.sql
index 95df7f8..ffab5ad 100644
--- a/internal/server/postgres/sql/migrations/00009_optimize_storage.sql
+++ b/internal/server/postgres/sql/migrations/00009_optimize_storage.sql
@@ -15,9 +15,9 @@
* partition-wise and then replacing the partitions, we keep the process light on CPU.
*/
-DROP INDEX loc.idx_sources_mv_gist_sys_period;
+DROP INDEX IF EXISTS loc.idx_sources_mv_gist_sys_period;
-CREATE INDEX idx_sources_mv_composite_lookup
+CREATE INDEX IF NOT EXISTS idx_sources_mv_composite_lookup
ON loc.sources_mv USING gist (geometry_uuid, source_type_id, sys_period);
ALTER TABLE pred.predicted_generation_values
@@ -130,7 +130,7 @@ BEGIN
EXECUTE format('
UPDATE pred.%I
SET target_time_utc = UUIDV7_EXTRACT_TIMESTAMP(forecast_uuid)::TIMESTAMP + MAKE_INTERVAL(mins => horizon_mins::INTEGER),
- other_stats_fractions = CASE WHEN p10_sip IS NOT NULL OR p90_sip IS NOT NULL THEN jsonb_strip_nulls(jsonb_build_object('p10', p10_sip::REAL / 30000, 'p90', p90_sip::REAL / 30000))
+ other_stats_fractions = CASE WHEN p10_sip IS NOT NULL OR p90_sip IS NOT NULL THEN jsonb_strip_nulls(jsonb_build_object(''p10'', p10_sip::REAL / 30000, ''p90'', p90_sip::REAL / 30000))
ELSE NULL END;
', partition_record.table_name);
From c514f1f1d6f6fef16edb4281fee3e96bc9b236b7 Mon Sep 17 00:00:00 2001
From: devsjc <47188100+devsjc@users.noreply.github.com>
Date: Wed, 17 Jun 2026 08:43:59 +0100
Subject: [PATCH 4/6] fix: Stream impl
---
internal/server/postgres/dataserverimpl.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/internal/server/postgres/dataserverimpl.go b/internal/server/postgres/dataserverimpl.go
index 1abd72a..4664d50 100644
--- a/internal/server/postgres/dataserverimpl.go
+++ b/internal/server/postgres/dataserverimpl.go
@@ -527,9 +527,9 @@ func (s *DataPlatformDataServiceServerImpl) StreamForecastData(
&row.P50Sip,
&row.P90Sip,
&row.CapacityWatts,
+ &row.Metadata,
&row.InitTimeUtc,
&row.TargetTimeUtc,
- &row.Metadata,
)
if err != nil {
l.Err(err).Msg("rows.Scan failed")
From 6ee16501f9bf98633a69946725d3605e64adb783 Mon Sep 17 00:00:00 2001
From: devsjc <47188100+devsjc@users.noreply.github.com>
Date: Wed, 17 Jun 2026 08:53:24 +0100
Subject: [PATCH 5/6] fix: bench seedings
---
internal/server/postgres/testdata/seeding.sql | 10 ++--------
1 file changed, 2 insertions(+), 8 deletions(-)
diff --git a/internal/server/postgres/testdata/seeding.sql b/internal/server/postgres/testdata/seeding.sql
index 4044f70..a9d9de2 100644
--- a/internal/server/postgres/testdata/seeding.sql
+++ b/internal/server/postgres/testdata/seeding.sql
@@ -65,22 +65,16 @@ BEGIN
TSRANGE(init_time_utc, init_time_utc + (forecast_len_mins * INTERVAL '1 minute'))
FROM generated_data
RETURNING forecast_uuid, init_time_utc
- ),
- static_json AS (
- -- Removed the stats JSON payload entirely to mirror the dropped column
- SELECT '{"source": "benchmark"}'::jsonb AS meta
)
INSERT INTO pred.predicted_generation_values
- (horizon_mins, p50_sip, p10_sip, p90_sip, forecast_uuid, metadata)
+ (horizon_mins, p50_sip, p10_sip, p90_sip, forecast_uuid)
SELECT
gs.h,
(random() * 30000)::SMALLINT,
3000::SMALLINT,
27000::SMALLINT,
- inf.forecast_uuid,
- sj.meta
+ inf.forecast_uuid
FROM inserted_forecasts inf
- CROSS JOIN static_json sj
CROSS JOIN LATERAL generate_series(0, forecast_len_mins - pgv_res_mins, pgv_res_mins) AS gs(h)
ORDER BY inf.init_time_utc ASC;
From 60baeee36a683614805b70a21e6c885d984a1530 Mon Sep 17 00:00:00 2001
From: devsjc <47188100+devsjc@users.noreply.github.com>
Date: Wed, 17 Jun 2026 10:44:43 +0100
Subject: [PATCH 6/6] fix: Validate other_stats_fractions keys
---
proto/ocf/dp/dp-data.messages.proto | 3 +++
1 file changed, 3 insertions(+)
diff --git a/proto/ocf/dp/dp-data.messages.proto b/proto/ocf/dp/dp-data.messages.proto
index 7f25916..b833e7d 100644
--- a/proto/ocf/dp/dp-data.messages.proto
+++ b/proto/ocf/dp/dp-data.messages.proto
@@ -329,6 +329,9 @@ message CreateForecastRequest {
map other_statistics_fractions = 3 [
(buf.validate.field).map.min_pairs = 0,
(buf.validate.field).map.max_pairs = 20,
+ (buf.validate.field).map.keys = {
+ string: {in: ["p10", "p90"]}
+ },
(buf.validate.field).map.values = {
float: {
gte: 0.0,