Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion backend/pkg/api/connect/service/console/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ func rpcPublishMessagePayloadOptionsToSerializeInput(po *v1alpha.PublishMessageP
encoding = serde.PayloadEncodingProtobufBSR
}

// When the client picks Protobuf together with a schema ID, route through the schema-registry
// path (ProtobufSchemaSerde) instead of the static-config ProtobufSerde — the latter requires
// a configured proto.Service and panics when only schema-registry-backed schemas are in use.
if encoding == serde.PayloadEncodingProtobuf && po.GetSchemaId() > 0 {
encoding = serde.PayloadEncodingProtobufSchema
}

input := &serde.RecordPayloadInput{
Payload: po.GetData(),
Encoding: encoding,
Expand All @@ -63,7 +70,13 @@ func rpcPublishMessagePayloadOptionsToSerializeInput(po *v1alpha.PublishMessageP
input.Options = []serde.SerdeOpt{serde.WithSchemaID(uint32(po.GetSchemaId()))}
}

if po.GetIndex() > 0 {
if path := po.GetIndexPath(); len(path) > 0 {
ints := make([]int, len(path))
for i, v := range path {
ints[i] = int(v)
}
input.Options = append(input.Options, serde.WithIndex(ints...))
} else if po.GetIndex() > 0 {
input.Options = append(input.Options, serde.WithIndex(int(po.GetIndex())))
}

Expand Down
27 changes: 27 additions & 0 deletions backend/pkg/api/connect/service/console/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,30 @@ func (api *Service) PublishMessage(
},
), nil
}

// GenerateSchemaSample renders a zero-valued JSON skeleton for any Schema
// Registry-backed schema (Avro / Protobuf / JSON Schema). The backend
// dispatches on the registered schema type so the frontend can call a single
// RPC regardless of which encoding the user picked.
func (api *Service) GenerateSchemaSample(
ctx context.Context,
req *connect.Request[v1alpha.GenerateSchemaSampleRequest],
) (*connect.Response[v1alpha.GenerateSchemaSampleResponse], error) {
indexPath := make([]int, 0, len(req.Msg.GetIndexPath()))
for _, v := range req.Msg.GetIndexPath() {
indexPath = append(indexPath, int(v))
}

sample, err := api.consoleSvc.GenerateSchemaSampleJSON(ctx, int(req.Msg.GetSchemaId()), indexPath)
if err != nil {
return nil, apierrors.NewConnectError(
connect.CodeInvalidArgument,
fmt.Errorf("failed to generate schema sample JSON: %w", err),
apierrors.NewErrorInfo(commonv1alpha1.Reason_REASON_INVALID_INPUT.String()),
)
}

return connect.NewResponse(&v1alpha.GenerateSchemaSampleResponse{
SampleJson: string(sample),
}), nil
}
77 changes: 77 additions & 0 deletions backend/pkg/api/handle_schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,83 @@ func (api *API) handleGetSchemaSubjects() http.HandlerFunc {
}
}

func (api *API) handleGetAllSchemas() http.HandlerFunc {
if !api.Cfg.SchemaRegistry.Enabled {
return api.handleSchemaRegistryNotConfigured()
}

parseBool := func(r *http.Request, name string) (bool, error) {
raw := rest.GetQueryParam(r, name)
if raw == "" {
return false, nil
}
v, err := strconv.ParseBool(raw)
if err != nil {
return false, fmt.Errorf("invalid %q query param: %w", name, err)
}
return v, nil
}
parseInt := func(r *http.Request, name string) (int, error) {
raw := rest.GetQueryParam(r, name)
if raw == "" {
return 0, nil
}
v, err := strconv.Atoi(raw)
if err != nil {
return 0, fmt.Errorf("invalid %q query param: %w", name, err)
}
if v < 0 {
return 0, fmt.Errorf("invalid %q query param: must be non-negative", name)
}
return v, nil
}

return func(w http.ResponseWriter, r *http.Request) {
opts := console.GetAllSchemasOptions{
SubjectPrefix: rest.GetQueryParam(r, "subjectPrefix"),
}
var err error
if opts.LatestOnly, err = parseBool(r, "latestOnly"); err != nil {
rest.SendRESTError(w, r, api.Logger, &rest.Error{Err: err, Status: http.StatusBadRequest, Message: err.Error()})
return
}
if opts.Deleted, err = parseBool(r, "deleted"); err != nil {
rest.SendRESTError(w, r, api.Logger, &rest.Error{Err: err, Status: http.StatusBadRequest, Message: err.Error()})
return
}
if opts.DeletedOnly, err = parseBool(r, "deletedOnly"); err != nil {
rest.SendRESTError(w, r, api.Logger, &rest.Error{Err: err, Status: http.StatusBadRequest, Message: err.Error()})
return
}
if opts.Offset, err = parseInt(r, "offset"); err != nil {
rest.SendRESTError(w, r, api.Logger, &rest.Error{Err: err, Status: http.StatusBadRequest, Message: err.Error()})
return
}
if opts.Limit, err = parseInt(r, "limit"); err != nil {
rest.SendRESTError(w, r, api.Logger, &rest.Error{Err: err, Status: http.StatusBadRequest, Message: err.Error()})
return
}
// Defense in depth: cap unbounded fetches. A registry with thousands of schemas would
// otherwise stream MBs of schema text per request.
const maxSchemasLimit = 1000
if opts.Limit == 0 || opts.Limit > maxSchemasLimit {
opts.Limit = maxSchemasLimit
}

res, err := api.ConsoleSvc.GetAllSchemas(r.Context(), opts)
if err != nil {
rest.SendRESTError(w, r, api.Logger, &rest.Error{
Err: err,
Status: http.StatusBadGateway,
Message: fmt.Sprintf("Failed to retrieve schemas from the schema registry: %v", err.Error()),
IsSilent: false,
})
return
}
rest.SendResponse(w, r, api.Logger, http.StatusOK, res)
}
}

func (api *API) handleGetSchemaSubjectDetails() http.HandlerFunc {
if !api.Cfg.SchemaRegistry.Enabled {
return api.handleSchemaRegistryNotConfigured()
Expand Down
1 change: 1 addition & 0 deletions backend/pkg/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ func (api *API) routes() *chi.Mux {
r.Delete("/schema-registry/config/{subject}", api.handleDeleteSchemaRegistrySubjectConfig())
r.Get("/schema-registry/contexts", api.handleGetSchemaRegistryContexts())
r.Get("/schema-registry/subjects", api.handleGetSchemaSubjects())
r.Get("/schema-registry/schemas", api.handleGetAllSchemas())
r.Get("/schema-registry/schemas/types", api.handleGetSchemaRegistrySchemaTypes())
r.Get("/schema-registry/schemas/ids/{id}/versions", api.handleGetSchemaUsagesByID())
r.Delete("/schema-registry/subjects/{subject}", api.handleDeleteSubject())
Expand Down
62 changes: 62 additions & 0 deletions backend/pkg/console/proto_message_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2026 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file https://github.com/redpanda-data/redpanda/blob/dev/licenses/bsl.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package console

import (
"context"
"errors"
"fmt"
"slices"

"google.golang.org/protobuf/reflect/protoreflect"

"github.com/redpanda-data/console/backend/pkg/proto"
)

// protoMessageTypesByID walks the descriptor tree of the schema, returning each message paired with
// its Confluent wire-format index path. Goes through cachedSchemaClient so it works without the
// optional static Protobuf service.
func (s *Service) protoMessageTypesByID(ctx context.Context, schemaID int) ([]proto.MessageTypeInfo, error) {
if s.cachedSchemaClient == nil {
return nil, errors.New("schema registry is not configured")
}

files, rootFilename, err := s.cachedSchemaClient.ProtoFilesByID(ctx, schemaID)
if err != nil {
return nil, fmt.Errorf("failed to load proto files for schema %d: %w", schemaID, err)
}

rootFile := files.FindFileByPath(rootFilename)
if rootFile == nil {
return nil, fmt.Errorf("root proto file %q not found for schema %d", rootFilename, schemaID)
}

var out []proto.MessageTypeInfo
walkMessageTypes(rootFile.Messages(), nil, &out)
return out, nil
}

func walkMessageTypes(msgs protoreflect.MessageDescriptors, prefix []int32, out *[]proto.MessageTypeInfo) {
for i := 0; i < msgs.Len(); i++ {
md := msgs.Get(i)
path := append(slices.Clone(prefix), int32(i))
// Synthetic map-entry descriptors (e.g. Foo.LabelsEntry for a map<string,string> field)
// are not user-selectable message types — skip them but keep walking so real siblings
// keep their indices.
if md.IsMapEntry() {
continue
}
*out = append(*out, proto.MessageTypeInfo{
FullyQualifiedName: string(md.FullName()),
IndexPath: path,
})
walkMessageTypes(md.Messages(), path, out)
}
}
139 changes: 132 additions & 7 deletions backend/pkg/console/schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/twmb/franz-go/pkg/sr"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"

"github.com/redpanda-data/console/backend/pkg/proto"
)

// SchemaRegistryMode returns the schema registry mode.
Expand Down Expand Up @@ -214,6 +216,101 @@ func (s *Service) GetSchemaRegistrySubjects(ctx context.Context, subjectPrefix s
return result, nil
}

// SchemaRegistrySchema is a single (subject, version) entry returned by the
// registry's GET /schemas endpoint. The shape mirrors sr.SubjectSchema so the
// payload is suitable for a future dataplane API surface.
type SchemaRegistrySchema struct {
Subject string `json:"subject"`
Version int `json:"version"`
ID int `json:"id"`
Type sr.SchemaType `json:"type"`
Schema string `json:"schema,omitempty"`
References []Reference `json:"references,omitempty"`
Metadata *SchemaMetadata `json:"metadata,omitempty"`
}

// GetAllSchemasOptions controls the query parameters forwarded to the schema
// registry's GET /schemas endpoint.
type GetAllSchemasOptions struct {
SubjectPrefix string
LatestOnly bool
Deleted bool // include soft-deleted entries
DeletedOnly bool
Offset int
Limit int
}

// GetAllSchemas lists schemas from the registry's GET /schemas endpoint and
// returns the full (subject, version, id, type, schema, references, metadata)
// for each entry. Callers control filtering and pagination via opts.
func (s *Service) GetAllSchemas(ctx context.Context, opts GetAllSchemasOptions) ([]SchemaRegistrySchema, error) {
srClient, err := s.schemaClientFactory.GetSchemaRegistryClient(ctx)
if err != nil {
return nil, err
}

params := make([]sr.Param, 0, 6)
if opts.SubjectPrefix != "" {
params = append(params, sr.SubjectPrefix(opts.SubjectPrefix))
}
if opts.LatestOnly {
params = append(params, sr.LatestOnly)
}
if opts.Deleted {
params = append(params, sr.ShowDeleted)
}
if opts.DeletedOnly {
params = append(params, sr.DeletedOnly)
}
if opts.Offset > 0 {
params = append(params, sr.Offset(opts.Offset))
}
if opts.Limit > 0 {
params = append(params, sr.Limit(opts.Limit))
}

schemas, err := srClient.AllSchemas(sr.WithParams(ctx, params...))
if err != nil {
return nil, err
}

result := make([]SchemaRegistrySchema, 0, len(schemas))
for _, schema := range schemas {
references := make([]Reference, len(schema.References))
for i, ref := range schema.References {
references[i] = Reference{
Name: ref.Name,
Subject: ref.Subject,
Version: ref.Version,
}
}
var metadata *SchemaMetadata
if schema.SchemaMetadata != nil {
metadata = &SchemaMetadata{
Tags: schema.SchemaMetadata.Tags,
Properties: schema.SchemaMetadata.Properties,
Sensitive: schema.SchemaMetadata.Sensitive,
}
}
result = append(result, SchemaRegistrySchema{
Subject: schema.Subject,
Version: schema.Version,
ID: schema.ID,
Type: schema.Type,
Schema: schema.Schema.Schema,
References: references,
Metadata: metadata,
})
}
slices.SortFunc(result, func(a, b SchemaRegistrySchema) int {
if c := strings.Compare(a.Subject, b.Subject); c != 0 {
return c
}
return a.Version - b.Version
})
return result, nil
}

// SchemaRegistrySubjectDetails represents a schema registry subject along
// with other information such as the registered versions that belong to it,
// or the full schema information that's part of the subject.
Expand Down Expand Up @@ -344,6 +441,8 @@ func (s *Service) GetSchemaRegistrySubjectDetails(ctx context.Context, subjectNa
return nil, err
}

s.populateProtoMessageTypes(ctx, subjectName, schemas)

var schemaType sr.SchemaType
if len(schemas) > 0 {
schemaType = schemas[len(schemas)-1].Type
Expand All @@ -360,6 +459,31 @@ func (s *Service) GetSchemaRegistrySubjectDetails(ctx context.Context, subjectNa
}, nil
}

// populateProtoMessageTypes resolves message types for every Protobuf version in schemas. Failures
// are non-fatal: a parse error leaves MessageTypes nil rather than failing the parent call.
func (s *Service) populateProtoMessageTypes(ctx context.Context, subjectName string, schemas []SchemaRegistryVersionedSchema) {
grp, grpCtx := errgroup.WithContext(ctx)
grp.SetLimit(10)
for i := range schemas {
if schemas[i].Type != sr.TypeProtobuf {
continue
}
grp.Go(func() error {
types, err := s.protoMessageTypesByID(grpCtx, schemas[i].ID)
if err != nil {
s.logger.WarnContext(grpCtx, "failed to resolve protobuf message types",
slog.String("subject", subjectName),
slog.Int("schemaId", schemas[i].ID),
slog.Any("err", err))
return nil
}
schemas[i].MessageTypes = types
return nil
})
}
_ = grp.Wait()
}

// SchemaRegistrySubjectDetailsVersion represents a schema version and if it's
// soft-deleted or not.
type SchemaRegistrySubjectDetailsVersion struct {
Expand Down Expand Up @@ -486,13 +610,14 @@ func (s *Service) getSubjectMode(ctx context.Context, srClient *rpsr.Client, sub

// SchemaRegistryVersionedSchema describes a retrieved schema.
type SchemaRegistryVersionedSchema struct {
ID int `json:"id"`
Version int `json:"version"`
IsSoftDeleted bool `json:"isSoftDeleted"`
Type sr.SchemaType `json:"type"`
Schema string `json:"schema"`
References []Reference `json:"references"`
Metadata *SchemaMetadata `json:"metadata,omitempty"`
ID int `json:"id"`
Version int `json:"version"`
IsSoftDeleted bool `json:"isSoftDeleted"`
Type sr.SchemaType `json:"type"`
Schema string `json:"schema"`
References []Reference `json:"references"`
Metadata *SchemaMetadata `json:"metadata,omitempty"`
MessageTypes []proto.MessageTypeInfo `json:"messageTypes,omitempty"` // Protobuf only.
}

// Reference describes a reference to a different schema stored in the schema registry.
Expand Down
Loading
Loading