Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ This places the generated code in `gen/python`. See the `Makefile` for more exte

| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| horizon_mins | [uint32](#uint32) | | || p50_fraction | [float](#float) | | || other_statistics_fractions | [CreateForecastRequest.ForecastValue.OtherStatisticsFractionsEntry](#ocf-dp-CreateForecastRequest-ForecastValue-OtherStatisticsFractionsEntry) | repeated | Struct for storing additional statistics like p10, p90, mean etc. || metadata | [google.protobuf.Struct](#google-protobuf-Struct) | optional | |
| horizon_mins | [uint32](#uint32) | | || p50_fraction | [float](#float) | | || other_statistics_fractions | [CreateForecastRequest.ForecastValue.OtherStatisticsFractionsEntry](#ocf-dp-CreateForecastRequest-ForecastValue-OtherStatisticsFractionsEntry) | repeated | Struct for storing additional statistics like p10, p90, mean etc. |
</details><a name="ocf-dp-CreateForecastRequest-ForecastValue-OtherStatisticsFractionsEntry"></a>
<details><summary>CreateForecastRequest.ForecastValue.OtherStatisticsFractionsEntry</summary>

Expand Down
7 changes: 0 additions & 7 deletions internal/server/postgres/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/structpb"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"

pb "github.com/openclimatefix/data-platform/internal/gen/ocf/dp"
Expand Down Expand Up @@ -116,12 +115,6 @@ var yields = func() []*pb.CreateForecastRequest_ForecastValue {
"p90": 0.3,
},
HorizonMins: uint32(i * 30),
Metadata: func() *structpb.Struct {
s, _ := structpb.NewStruct(map[string]any{
"example_key": fmt.Sprintf("example_value_%d", i),
})
return s
}(),
})
}

Expand Down
95 changes: 35 additions & 60 deletions internal/server/postgres/dataserverimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"context"
"errors"
"fmt"
"math"
"time"

"github.com/google/uuid"
Expand All @@ -21,7 +20,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"

pb "github.com/openclimatefix/data-platform/internal/gen/ocf/dp"
Expand Down Expand Up @@ -64,6 +62,19 @@ func timeptrToPgTimestamp(t *timestamppb.Timestamp) pgtype.Timestamp {
return pgtype.Timestamp{Time: t.AsTime().UTC(), Valid: true}
}

// extractSIPStatPtrFromMap gets a key's value from a map as a pointer, and converts it
// to a smallint percentage. If it doesn't exist, it returns nil.
func extractSIPStatPtrFromMap(m map[string]float32, key string) *int16 {
val, exists := m[key]
if !exists {
return nil
}

sip_val := int16(val * 30000.0)

return &sip_val
}

// --- Server Implementation ----------------------------------------------------------------------

func NewDataPlatformDataServiceServerImpl() *DataPlatformDataServiceServerImpl {
Expand Down Expand Up @@ -172,48 +183,12 @@ func (s *DataPlatformDataServiceServerImpl) CreateForecast(
// Create the forecast data
paramsList := make([]db.CreatePredictedValuesParams, len(req.Values))
for i, value := range req.Values {
// Since the database wants each numeric value to have 4 significant figures,
// the float32 values need to be rounded accordingly.
roundedStats := make(map[string]any)
for k, val32 := range value.OtherStatisticsFractions {
val64 := float64(val32)

var roundedVal float64
if val64 < 1 {
roundedVal = math.Round(val64*10000) / 10000
} else {
roundedVal = math.Round(val64*1000) / 1000
}

roundedStats[k] = roundedVal
}

// Since CreatePredictedValues uses COPYFROM, manually coerce empty metadata to nil
if value.Metadata != nil && len(value.Metadata.Fields) == 0 {
value.Metadata = nil
}

var otherStats *structpb.Struct

if len(roundedStats) > 0 {
otherStats, err = structpb.NewStruct(roundedStats)
if err != nil {
l.Err(err).Msgf("structpb.NewStruct(%+v)", roundedStats)
}
}

paramsList[i] = db.CreatePredictedValuesParams{
HorizonMins: int16(value.HorizonMins),
P10Sip: extractSIPStatPtrFromMap(value.OtherStatisticsFractions, "p10"),
P50Sip: int16(value.P50Fraction * 30000.0),
P90Sip: extractSIPStatPtrFromMap(value.OtherStatisticsFractions, "p90"),
ForecastUuid: dbForecast.ForecastUuid,
TargetTimeUtc: pgtype.Timestamp{
Time: req.InitTimeUtc.AsTime().Add(
time.Duration(value.HorizonMins) * time.Minute,
),
Valid: true,
},
OtherStatsFractions: otherStats,
Metadata: value.Metadata,
}
}

Expand Down Expand Up @@ -548,22 +523,26 @@ func (s *DataPlatformDataServiceServerImpl) StreamForecastData(
&row.ForecasterVersion,
&row.CreatedAtUtc,
&row.HorizonMins,
&row.P10Sip,
&row.P50Sip,
&row.OtherStatsFractions,
&row.P90Sip,
&row.CapacityWatts,
&row.InitTimeUtc,
&row.Metadata,
&row.InitTimeUtc,
&row.TargetTimeUtc,
)
if err != nil {
l.Err(err).Msg("rows.Scan failed")
return status.Errorf(codes.Internal, "Error reading prediction stream")
}

otherStatistics := make(map[string]float32)
if row.OtherStatsFractions != nil {
for k, v := range row.OtherStatsFractions.AsMap() {
otherStatistics[k] = float32(v.(float64))
}
if row.P10Sip != nil {
otherStatistics["p10"] = float32(*row.P10Sip) / 30000.0
}

if row.P90Sip != nil {
otherStatistics["p90"] = float32(*row.P90Sip) / 30000.0
}

metadata := make(map[string]string)
Expand Down Expand Up @@ -1639,14 +1618,12 @@ func (s *DataPlatformDataServiceServerImpl) GetForecastAsTimeseries(
out := make([]*pb.GetForecastAsTimeseriesResponse_Value, len(dbPreds))
for i, pred := range dbPreds {
otherStats := make(map[string]float32)
for k, v := range pred.OtherStatsFractions.AsMap() {
floatVal, ok := v.(float64)
if !ok {
l.Warn().Str("key", k).Interface("value", v).Msg("skipping non-float statistic")
continue
}
if pred.P10Sip != nil {
otherStats["p10"] = float32(*pred.P10Sip) / 30000.0
}

otherStats[k] = float32(floatVal)
if pred.P90Sip != nil {
otherStats["p90"] = float32(*pred.P90Sip) / 30000.0
}

out[i] = &pb.GetForecastAsTimeseriesResponse_Value{
Expand Down Expand Up @@ -1741,14 +1718,12 @@ func (s *DataPlatformDataServiceServerImpl) GetForecastAsTimeseries(
values := make([]*pb.GetForecastAsTimeseriesResponse_Value, len(dbValues))
for i, value := range dbValues {
otherStats := make(map[string]float32)
for k, v := range value.OtherStatsFractions.AsMap() {
floatVal, ok := v.(float64)
if !ok {
l.Warn().Str("key", k).Interface("value", v).Msg("skipping non-float statistic")
continue
}
if value.P10Sip != nil {
otherStats["p10"] = float32(*value.P10Sip) / 30000.0
}

otherStats[k] = float32(floatVal)
if value.P90Sip != nil {
otherStats["p90"] = float32(*value.P90Sip) / 30000.0
}

values[i] = &pb.GetForecastAsTimeseriesResponse_Value{
Expand Down
18 changes: 3 additions & 15 deletions internal/server/postgres/dataserverimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"io"
"maps"
"math/rand/v2"
"strings"
"testing"
Expand Down Expand Up @@ -589,7 +588,6 @@ func TestGetForecastAtTimestamp(t *testing.T) {
"p90": float32(0.6 + float32(i)*0.05),
"p10": float32(0.4 + float32(i)*0.05),
},
Metadata: metadata,
}
}

Expand Down Expand Up @@ -1061,7 +1059,6 @@ func TestGetForecastAsTimeseries(t *testing.T) {
"p10": float32(max(float32(i-1)*float32(100/len(yields))/100.0, 0)),
"p90": float32(min(float32(i+1)*float32(100/len(yields))/100.0, 1.1)),
},
Metadata: metadata,
}
}

Expand Down Expand Up @@ -1801,8 +1798,6 @@ func TestCreateObservations(t *testing.T) {

func TestGetWeekAverageDeltas(t *testing.T) {
pivotTime := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)
metadata, err := structpb.NewStruct(map[string]any{"source": "test"})
require.NoError(t, err)

// Create a site to attach the observations to
siteResp, err := dc.CreateLocation(t.Context(), &pb.CreateLocationRequest{
Expand Down Expand Up @@ -1862,7 +1857,6 @@ func TestGetWeekAverageDeltas(t *testing.T) {
"p10": float32(max(float32(i-1)*float32(100/len(yields))/100.0, 0)),
"p90": float32(min(float32(i+1)*float32(100/len(yields))/100.0, 1.1)),
},
Metadata: metadata,
}
}

Expand Down Expand Up @@ -1925,7 +1919,6 @@ func TestCreateForecast(t *testing.T) {
"p10": 0.4 + float32(i)*0.05,
"p90": 0.6 + float32(i)*0.05,
},
Metadata: metadata,
}
}

Expand All @@ -1935,7 +1928,6 @@ func TestCreateForecast(t *testing.T) {
HorizonMins: uint32(i * 30),
P50Fraction: 0.0,
OtherStatisticsFractions: map[string]float32{},
Metadata: &structpb.Struct{},
}
}

Expand All @@ -1948,7 +1940,6 @@ func TestCreateForecast(t *testing.T) {
"p10": 1.3,
"p90": -0.2,
},
Metadata: metadata,
}
}

Expand Down Expand Up @@ -2078,11 +2069,8 @@ func TestCreateForecast(t *testing.T) {
})
require.NoError(t, err)

expectedMetadata := tc.req.Values[0].Metadata.AsMap()
maps.Copy(expectedMetadata, tc.req.Metadata.AsMap())

for _, val := range fResp.Values {
require.Equal(t, expectedMetadata, val.Metadata.AsMap())
require.Equal(t, tc.req.Metadata.AsMap(), val.Metadata.AsMap())
}
}
})
Expand Down Expand Up @@ -2130,7 +2118,6 @@ func TestGetLatestForecasts(t *testing.T) {
"p10": 0.4 + float32(i)*0.05,
"p90": 0.6 + float32(i)*0.05,
},
Metadata: metadata,
}
}

Expand Down Expand Up @@ -2247,7 +2234,6 @@ func TestStreamForecastData(t *testing.T) {
OtherStatisticsFractions: map[string]float32{
"p90": float32(i)*0.25 + 0.1,
},
Metadata: metadata,
}
}

Expand All @@ -2261,6 +2247,7 @@ func TestStreamForecastData(t *testing.T) {
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
InitTimeUtc: initTime,
Values: yields,
Metadata: metadata,
})
require.NoError(t, err)

Expand All @@ -2270,6 +2257,7 @@ func TestStreamForecastData(t *testing.T) {
EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR,
InitTimeUtc: initTime,
Values: yields,
Metadata: metadata,
})
require.NoError(t, err)
}
Expand Down
Loading
Loading