From 440c0b4a5e71cd4da116824b141706bd38f01c6c Mon Sep 17 00:00:00 2001 From: Julin <142230457+c-julin@users.noreply.github.com> Date: Thu, 21 May 2026 16:44:20 +0100 Subject: [PATCH 1/3] feat(produce): schema-aware Produce Record UI (UX-1292) Frontend half of the schema-aware Produce Record flow (stacked on the backend PR). - topic-produce: subject + version selection, encoding-aware schema picker, Protobuf message-type dropdown reading off the new messageTypes field on subject details, sample-JSON generation via GenerateSchemaSample. - backend-api / rest-interfaces: new SchemaMessageType type + messageTypes field on SchemaRegistryVersionedSchema; supporting REST bindings for the schemas / subject-types endpoints. --- .../components/pages/topics/topic-produce.tsx | 323 ++++++++++++++++-- frontend/src/state/backend-api.ts | 49 +++ frontend/src/state/rest-interfaces.ts | 23 ++ 3 files changed, 359 insertions(+), 36 deletions(-) diff --git a/frontend/src/components/pages/topics/topic-produce.tsx b/frontend/src/components/pages/topics/topic-produce.tsx index 8340ea1947..89e30349bc 100644 --- a/frontend/src/components/pages/topics/topic-produce.tsx +++ b/frontend/src/components/pages/topics/topic-produce.tsx @@ -18,16 +18,36 @@ import { } from '@redpanda-data/ui'; import { Link } from '@tanstack/react-router'; import { TrashIcon } from 'components/icons'; -import { type FC, useEffect, useState } from 'react'; +import { type FC, useEffect, useMemo, useRef, useState } from 'react'; import { Controller, type SubmitHandler, useFieldArray, useForm, useWatch } from 'react-hook-form'; -import { setMonacoTheme } from '../../../config'; +import { config as appConfig, setMonacoTheme } from '../../../config'; import { CompressionType, CompressionTypeSchema, KafkaRecordHeaderSchema, PayloadEncoding, } from '../../../protogen/redpanda/api/console/v1alpha1/common_pb'; +import { GenerateSchemaSampleRequestSchema } from '../../../protogen/redpanda/api/console/v1alpha1/publish_messages_pb'; +import type { SchemaMessageType } from '../../../state/rest-interfaces'; +import { SchemaType, type SchemaTypeType } from '../../../state/rest-interfaces'; + +// Maps Schema Registry's schema type (AVRO/PROTOBUF/JSON) onto the payload +// encoding the Console PublishMessage RPC expects. JSON-Schema rather than +// plain JSON so the schema-registry-backed serde path is used. +const schemaTypeToEncoding = (t: SchemaTypeType): PayloadEncoding | undefined => { + if (t === SchemaType.PROTOBUF) { + return PayloadEncoding.PROTOBUF; + } + if (t === SchemaType.AVRO) { + return PayloadEncoding.AVRO; + } + if (t === SchemaType.JSON) { + return PayloadEncoding.JSON_SCHEMA; + } + return; +}; + import { PublishMessagePayloadOptionsSchema, PublishMessageRequestSchema, @@ -68,10 +88,16 @@ const encodingOptions: EncodingOption[] = [ label: 'Avro', tooltip: 'The given JSON will be serialized using the selected schema', }, - // We hide Protobuf until we can provide a better UX with selecting types rather than having users - // specify an index that points to the type within the proto schema. - // {value: PayloadEncoding.PROTOBUF, label: 'Protobuf', tooltip: 'The given JSON will be serialized using the selected schema'}, - + { + value: PayloadEncoding.PROTOBUF, + label: 'Protobuf', + tooltip: 'The given JSON will be serialized using the selected schema', + }, + { + value: PayloadEncoding.JSON_SCHEMA, + label: 'JSON Schema', + tooltip: 'The given JSON will be validated against the selected JSON Schema and tagged with its schema ID', + }, { value: PayloadEncoding.BINARY, label: 'Binary (Base64)', @@ -95,6 +121,9 @@ function encodingToLanguage(encoding: PayloadEncoding) { if (encoding === PayloadEncoding.JSON) { return 'json'; } + if (encoding === PayloadEncoding.JSON_SCHEMA) { + return 'json'; + } if (encoding === PayloadEncoding.PROTOBUF) { return 'protobuf'; } @@ -113,9 +142,12 @@ type PayloadOptions = { schemaVersion?: number; schemaId?: number; - protobufIndex?: number; // if encoding is protobuf, we also need an index + // Confluent Protobuf message-index path. Empty = first top-level message. + protobufIndexPath?: number[]; }; +const indexPathKey = (path: number[] | undefined): string => JSON.stringify(path ?? []); + type Inputs = { partition: number; compressionType: CompressionType; @@ -187,10 +219,10 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { } }, [valuePayloadOptions.encoding, valuePayloadOptions.data, setError, clearErrors]); - const showKeySchemaSelection = - keyPayloadOptions.encoding === PayloadEncoding.AVRO || keyPayloadOptions.encoding === PayloadEncoding.PROTOBUF; - const showValueSchemaSelection = - valuePayloadOptions.encoding === PayloadEncoding.AVRO || valuePayloadOptions.encoding === PayloadEncoding.PROTOBUF; + const encodingNeedsSchema = (enc?: PayloadEncoding | 'base64') => + enc === PayloadEncoding.AVRO || enc === PayloadEncoding.PROTOBUF || enc === PayloadEncoding.JSON_SCHEMA; + const showKeySchemaSelection = encodingNeedsSchema(keyPayloadOptions.encoding); + const showValueSchemaSelection = encodingNeedsSchema(valuePayloadOptions.encoding); const compressionTypes = CompressionTypeSchema.values .filter((value) => value.number !== CompressionType.UNSPECIFIED) @@ -225,17 +257,164 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { if (!api.schemaSubjects) { api.refreshSchemaSubjects(); } + if (!api.schemas) { + api.refreshSchemas(undefined, { latestOnly: true }); + } }, []); - const availableValues = api.schemaSubjects?.filter((x) => !x.isSoftDeleted) ?? []; + // UX-1292: auto-detect Avro/Protobuf/JSON-Schema on first mount by checking + // for the conventional `${topic}-key` / `${topic}-value` subjects (Kafka + // TopicNameStrategy). One-shot — once we've pre-filled (or determined there + // is nothing to pre-fill), this never runs again so we don't clobber edits. + const autoDetectedRef = useRef(false); + const [autoDetected, setAutoDetected] = useState<{ key?: string; value?: string }>({}); + // biome-ignore lint/correctness/useExhaustiveDependencies: one-shot on first subjects load + useEffect(() => { + if (autoDetectedRef.current || !api.schemaSubjects) { + return; + } + autoDetectedRef.current = true; + + const tryAutoFill = async (target: 'key' | 'value') => { + const subjectName = `${topicName}-${target}`; + const subj = api.schemaSubjects?.find((s) => s.name === subjectName && !s.isSoftDeleted); + if (!subj) { + return; + } + try { + await api.refreshSchemaDetails(subjectName); + } catch { + return; + } + const detail = api.schemaDetails.get(subjectName); + if (!detail) { + return; + } + const encoding = schemaTypeToEncoding(detail.type); + if (encoding === undefined) { + return; + } + setValue(`${target}.encoding`, encoding); + setValue(`${target}.schemaName`, subjectName); + if (detail.latestActiveVersion) { + setValue(`${target}.schemaVersion`, detail.latestActiveVersion); + } + setAutoDetected((prev) => ({ ...prev, [target]: subjectName })); + }; + + tryAutoFill('key'); + tryAutoFill('value'); + }, [api.schemaSubjects, topicName]); + + // Subscribe reactively so the Schema dropdown updates once subjects finish + // loading. Direct `api.schemaSubjects` access doesn't track in a render and + // would otherwise stay empty when nothing else triggers a re-render. + const schemaSubjectsReactive = useApiStoreHook((s) => s.schemaSubjects); + const schemasReactive = useApiStoreHook((s) => s.schemas); + const availableValues = schemaSubjectsReactive?.filter((x) => !x.isSoftDeleted) ?? []; + const subjectTypeMap = useMemo(() => { + const m = new Map(); + for (const entry of schemasReactive ?? []) { + m.set(entry.subject, entry.type); + } + return m; + }, [schemasReactive]); const keySchemaName = useWatch({ control, name: 'key.schemaName' }); const valueSchemaName = useWatch({ control, name: 'value.schemaName' }); + const keySchemaVersion = useWatch({ control, name: 'key.schemaVersion' }); + const valueSchemaVersion = useWatch({ control, name: 'value.schemaVersion' }); + const keyEncoding = useWatch({ control, name: 'key.encoding' }); + const valueEncoding = useWatch({ control, name: 'value.encoding' }); + + // UX-1292 follow-up: filter the Schema dropdown to subjects that match the + // selected encoding. Subject→type comes from the dedicated subject-types + // endpoint (one round trip via Schema Registry's GET /schemas?latestOnly). + const encodingToSchemaType = (enc?: PayloadEncoding | 'base64'): SchemaTypeType | undefined => { + if (enc === PayloadEncoding.AVRO) { + return SchemaType.AVRO; + } + if (enc === PayloadEncoding.PROTOBUF) { + return SchemaType.PROTOBUF; + } + if (enc === PayloadEncoding.JSON_SCHEMA) { + return SchemaType.JSON; + } + return; + }; + + const filterSubjectsByEncoding = (enc?: PayloadEncoding | 'base64') => { + const wanted = encodingToSchemaType(enc); + if (!wanted) { + return availableValues; + } + // If the subject isn't in the type map (e.g. types haven't loaded yet, or + // the backend couldn't determine it), keep it visible rather than silently + // dropping it. + return availableValues.filter((subj) => { + const t = subjectTypeMap.get(subj.name); + return t === undefined || t === wanted; + }); + }; + + const keyAvailableValues = filterSubjectsByEncoding(keyEncoding); + const valueAvailableValues = filterSubjectsByEncoding(valueEncoding); + + const resolveSchemaId = (subjectName?: string, version?: number): number | undefined => { + if (!(subjectName && version)) { + return; + } + const detail = api.schemaDetails.get(subjectName); + if (!detail) { + return; + } + const match = detail.schemas.find((s) => s.version === version && !s.isSoftDeleted); + return match?.id; + }; + + const keySchemaId = encodingNeedsSchema(keyEncoding) ? resolveSchemaId(keySchemaName, keySchemaVersion) : undefined; + const valueSchemaId = encodingNeedsSchema(valueEncoding) + ? resolveSchemaId(valueSchemaName, valueSchemaVersion) + : undefined; + const keySchemaDetail = useApiStoreHook((s) => (keySchemaName ? s.schemaDetails.get(keySchemaName) : undefined)); const valueSchemaDetail = useApiStoreHook((s) => valueSchemaName ? s.schemaDetails.get(valueSchemaName) : undefined ); + const keyMessageTypes = useMemo(() => { + if (!keySchemaId) return []; + return keySchemaDetail?.schemas.find((s) => s.id === keySchemaId)?.messageTypes ?? []; + }, [keySchemaId, keySchemaDetail]); + + const valueMessageTypes = useMemo(() => { + if (!valueSchemaId) return []; + return valueSchemaDetail?.schemas.find((s) => s.id === valueSchemaId)?.messageTypes ?? []; + }, [valueSchemaId, valueSchemaDetail]); + + const generateSample = async ( + target: 'key' | 'value', + schemaId: number | undefined, + indexPath: number[] | undefined + ) => { + if (!schemaId) { + return; + } + const client = appConfig.consoleClient; + if (!client) { + return; + } + // Single RPC for all three schema types — backend dispatches on schema kind. + // indexPath is only consulted for Protobuf and may be empty otherwise. + const req = create(GenerateSchemaSampleRequestSchema); + req.schemaId = schemaId; + req.indexPath = indexPath ?? []; + const res = await client.generateSchemaSample(req).catch(() => undefined); + if (res?.sampleJson) { + setValue(`${target}.data`, res.sampleJson, { shouldDirty: true }); + } + }; + // biome-ignore lint/complexity: This will be refactored anyway as part of MobX removal const onSubmit: SubmitHandler = async (data) => { const req = create(PublishMessageRequestSchema); @@ -281,12 +460,8 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { req.key.data = encodeData(data.key.data, data.key.encoding); req.key.encoding = data.key.encoding; - // Determine schemaId from schemaVersion if schema is selected and encoding is Avro or Protobuf - if ( - (data.key.encoding === PayloadEncoding.AVRO || data.key.encoding === PayloadEncoding.PROTOBUF) && - data.key.schemaName && - data.key.schemaVersion - ) { + // Determine schemaId from schemaVersion if schema is selected and encoding is Avro, Protobuf, or JSON Schema + if (encodingNeedsSchema(data.key.encoding) && data.key.schemaName && data.key.schemaVersion) { const schemaDetail = api.schemaDetails.get(data.key.schemaName); if (schemaDetail) { const selectedSchema = schemaDetail.schemas.find( @@ -298,7 +473,9 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { } } - req.key.index = data.key.protobufIndex; + if (data.key.encoding === PayloadEncoding.PROTOBUF && data.key.protobufIndexPath) { + req.key.indexPath = data.key.protobufIndexPath; + } } // Value @@ -314,12 +491,8 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { } req.value.encoding = data.value.encoding; - // Determine schemaId from schemaVersion if schema is selected and encoding is Avro or Protobuf - if ( - (data.value.encoding === PayloadEncoding.AVRO || data.value.encoding === PayloadEncoding.PROTOBUF) && - data.value.schemaName && - data.value.schemaVersion - ) { + // Determine schemaId from schemaVersion if schema is selected and encoding is Avro, Protobuf, or JSON Schema + if (encodingNeedsSchema(data.value.encoding) && data.value.schemaName && data.value.schemaVersion) { const schemaDetail = api.schemaDetails.get(data.value.schemaName); if (schemaDetail) { const selectedSchema = schemaDetail.schemas.find( @@ -331,7 +504,9 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { } } - req.value.index = data.value.protobufIndex; + if (data.value.encoding === PayloadEncoding.PROTOBUF && data.value.protobufIndexPath) { + req.value.indexPath = data.value.protobufIndexPath; + } } const result = await api.publishMessage(req).catch((err) => { @@ -355,7 +530,7 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { } }; - const filteredEncodingOptions = encodingOptions.filter((x) => x.value !== PayloadEncoding.AVRO); + const filteredEncodingOptions = encodingOptions; return (
@@ -433,6 +608,11 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { )} /> + {autoDetected.key && ( + + Auto-detected from {autoDetected.key} + + )} {Boolean(showKeySchemaSelection) && ( @@ -460,7 +640,7 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { }); } }} - options={availableValues.map((schema) => ({ + options={keyAvailableValues.map((schema) => ({ key: schema.name, value: schema.name, }))} @@ -502,10 +682,29 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { {keyPayloadOptions.encoding === PayloadEncoding.PROTOBUF && ( - + + )} {keyPayloadOptions.encoding !== PayloadEncoding.NULL && ( @@ -535,6 +734,21 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { + {encodingNeedsSchema(keyPayloadOptions.encoding) && ( + + )} )} @@ -561,6 +775,11 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { )} /> + {autoDetected.value && ( + + Auto-detected from {autoDetected.value} + + )} {Boolean(showValueSchemaSelection) && ( @@ -583,7 +802,7 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { }); } }} - options={availableValues.map((schema) => ({ + options={valueAvailableValues.map((schema) => ({ key: schema.name, value: schema.name, }))} @@ -625,10 +844,29 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { {valuePayloadOptions.encoding === PayloadEncoding.PROTOBUF && ( - + + )} {valuePayloadOptions.encoding !== PayloadEncoding.NULL && ( @@ -648,11 +886,24 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { /> )} /> + {encodingNeedsSchema(valuePayloadOptions.encoding) && ( + + )} )} {Boolean(errors?.value?.data) && {errors?.value?.data?.message}} - diff --git a/frontend/src/state/backend-api.ts b/frontend/src/state/backend-api.ts index d9571b1931..7aba2b59c8 100644 --- a/frontend/src/state/backend-api.ts +++ b/frontend/src/state/backend-api.ts @@ -93,6 +93,7 @@ import { type SchemaRegistryDeleteSubjectResponse, type SchemaRegistryDeleteSubjectVersionResponse, type SchemaRegistryModeResponse, + type SchemaRegistrySchema, type SchemaRegistrySchemaTypesResponse, type SchemaRegistrySetCompatibilityModeRequest, type SchemaRegistrySubject, @@ -353,6 +354,44 @@ class CacheEntry { } } +// GetAllSchemasParams mirrors the query params accepted by the backend's +// GET /schema-registry/schemas handler (console.GetAllSchemasOptions). +export type GetAllSchemasParams = { + subjectPrefix?: string; + latestOnly?: boolean; + deleted?: boolean; + deletedOnly?: boolean; + offset?: number; + limit?: number; +}; + +function buildSchemasQuery(opts?: GetAllSchemasParams): string { + if (!opts) { + return ''; + } + const params = new URLSearchParams(); + if (opts.subjectPrefix) { + params.set('subjectPrefix', opts.subjectPrefix); + } + if (opts.latestOnly) { + params.set('latestOnly', 'true'); + } + if (opts.deleted) { + params.set('deleted', 'true'); + } + if (opts.deletedOnly) { + params.set('deletedOnly', 'true'); + } + if (opts.offset !== undefined && opts.offset > 0) { + params.set('offset', String(opts.offset)); + } + if (opts.limit !== undefined && opts.limit > 0) { + params.set('limit', String(opts.limit)); + } + const qs = params.toString(); + return qs ? `?${qs}` : ''; +} + function cachedApiRequest(url: string, force = false): Promise { const entry = cache.get(url); @@ -431,6 +470,7 @@ const _apiCreator = (set: any, get: any) => ({ schemaMode: undefined as string | null | undefined, // undefined = not yet known, null = got not configured response schemaCompatibility: undefined as string | null | undefined, // undefined = not yet known, null = got not configured response schemaSubjects: undefined as SchemaRegistrySubject[] | undefined, + schemas: undefined as SchemaRegistrySchema[] | undefined, schemaTypes: undefined as string[] | undefined, schemaDetails: new Map(), // subjectName => details schemaReferencedBy: new Map>(), // subjectName => version => details @@ -1337,6 +1377,15 @@ const _apiCreator = (set: any, get: any) => ({ ); }, + refreshSchemas(force?: boolean, opts?: GetAllSchemasParams) { + const url = `${appConfig.restBasePath}/schema-registry/schemas${buildSchemasQuery(opts)}`; + cachedApiRequest(url, force).then((schemas) => { + if (Array.isArray(schemas)) { + set({ schemas }); + } + }, addError); + }, + refreshSchemaTypes(force?: boolean) { cachedApiRequest( `${appConfig.restBasePath}/schema-registry/schemas/types`, diff --git a/frontend/src/state/rest-interfaces.ts b/frontend/src/state/rest-interfaces.ts index b3f9e01fd9..3890cc2d80 100644 --- a/frontend/src/state/rest-interfaces.ts +++ b/frontend/src/state/rest-interfaces.ts @@ -1407,6 +1407,23 @@ export type SchemaRegistrySubject = { isSoftDeleted: boolean; }; +// GET /schema-registry/schemas +// Mirrors backend console.SchemaRegistrySchema — one entry per (subject, version) +// returned by the schema registry's GET /schemas endpoint. +export type SchemaRegistrySchema = { + subject: string; + version: number; + id: number; + type: SchemaTypeType; + schema?: string; + references?: SchemaReference[]; + metadata?: { + tags?: Record; + properties?: Record; + sensitive?: string[]; + }; +}; + // GET /schema-registry/schemas/types export type SchemaRegistrySchemaTypesResponse = { isConfigured?: false; // can only ever be undefined (schema reg is configured properly), or false (is not configured) @@ -1438,6 +1455,12 @@ export type SchemaRegistryVersionedSchema = { schema: string; references: SchemaReference[]; metadata?: SchemaMetadata; + messageTypes?: SchemaMessageType[]; +}; + +export type SchemaMessageType = { + fullyQualifiedName: string; + indexPath: number[]; }; export type SchemaReference = { From 057467cbe2cd32b67ae60ad36bbc97a1903f95ad Mon Sep 17 00:00:00 2001 From: Julin <142230457+c-julin@users.noreply.github.com> Date: Thu, 21 May 2026 16:45:36 +0100 Subject: [PATCH 2/3] fix(produce): clear schema name + version on encoding change (UX-1292) A subject's compatible schema set is encoding-specific (Avro vs Protobuf vs JSON Schema), so any pre-selected subject/version becomes stale the moment the user changes encoding. Reset key/value schemaName, schemaVersion, and protobufIndexPath on encoding change so the dropdown re-populates from the encoding-filtered subject list. --- .../components/pages/topics/topic-produce.tsx | 43 +++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/frontend/src/components/pages/topics/topic-produce.tsx b/frontend/src/components/pages/topics/topic-produce.tsx index 89e30349bc..52cdac0ea0 100644 --- a/frontend/src/components/pages/topics/topic-produce.tsx +++ b/frontend/src/components/pages/topics/topic-produce.tsx @@ -327,6 +327,23 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { const keyEncoding = useWatch({ control, name: 'key.encoding' }); const valueEncoding = useWatch({ control, name: 'value.encoding' }); + // A subject's compatible schema set is encoding-specific, so changing encoding invalidates any + // pre-selected subject/version. Called from the encoding picker's onChange. + const clearSchemaSelection = (target: 'key' | 'value') => { + setValue(`${target}.schemaName`, undefined); + setValue(`${target}.schemaVersion`, undefined); + setValue(`${target}.protobufIndexPath`, undefined); + }; + + // Default the Protobuf message-type picker to the first available type for the given (subject, + // version), or clear it. Called from the subject/version picker onChange handlers. + const applyDefaultMessageType = (target: 'key' | 'value', subjectName: string, version: number) => { + const detail = api.schemaDetails.get(subjectName); + const matching = detail?.schemas.find((s) => s.version === version && !s.isSoftDeleted); + const firstType = matching?.messageTypes?.[0]; + setValue(`${target}.protobufIndexPath`, firstType ? firstType.indexPath : undefined); + }; + // UX-1292 follow-up: filter the Schema dropdown to subjects that match the // selected encoding. Subject→type comes from the dedicated subject-types // endpoint (one round trip via Schema Registry's GET /schemas?latestOnly). @@ -601,7 +618,10 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { name="key.encoding" render={({ field: { onChange, value } }) => ( - onChange={onChange} + onChange={(newEnc) => { + onChange(newEnc); + clearSchemaSelection('key'); + }} options={filteredEncodingOptions} value={value} /> @@ -633,6 +653,7 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { const detail = api.schemaDetails.get(newVal); if (detail?.latestActiveVersion) { setValue('key.schemaVersion', detail.latestActiveVersion); + applyDefaultMessageType('key', newVal, detail.latestActiveVersion); } }) .catch(() => { @@ -661,7 +682,12 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { const schemaDetail = keySchemaDetail; return ( - onChange={onChange} + onChange={(newVer) => { + onChange(newVer); + if (keySchemaName && newVer !== undefined) { + applyDefaultMessageType('key', keySchemaName, newVer); + } + }} options={ schemaDetail?.versions .slice() @@ -768,7 +794,10 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { name="value.encoding" render={({ field: { onChange, value } }) => ( - onChange={onChange} + onChange={(newEnc) => { + onChange(newEnc); + clearSchemaSelection('value'); + }} options={filteredEncodingOptions} value={value} /> @@ -798,6 +827,7 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { const detail = api.schemaDetails.get(newVal); if (detail?.latestActiveVersion) { setValue('value.schemaVersion', detail.latestActiveVersion); + applyDefaultMessageType('value', newVal, detail.latestActiveVersion); } }); } @@ -823,7 +853,12 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { const schemaDetail = valueSchemaDetail; return ( - onChange={onChange} + onChange={(newVer) => { + onChange(newVer); + if (valueSchemaName && newVer !== undefined) { + applyDefaultMessageType('value', valueSchemaName, newVer); + } + }} options={ schemaDetail?.versions .slice() From ddb3fbcb87f652eb9df7d2648006c782571eccda Mon Sep 17 00:00:00 2001 From: Julin <142230457+c-julin@users.noreply.github.com> Date: Thu, 21 May 2026 19:27:32 +0100 Subject: [PATCH 3/3] fix(produce): frontend review fixes (UX-1292) Addresses adversarial code review findings on PR #2462: - Stale refreshSchemaDetails-response guard on both schemaName onChange handlers: if the user switches subject while a fetch is in flight, drop the response instead of overwriting the new selection's version and message-type. - Submit-time validation: block publish when the encoding requires a schema but schemaName/schemaVersion are missing or schemaId hasn't resolved yet. Without this the form silently submitted with encoding=PROTOBUF and no schema_id, producing a confusing backend error. - Auto-detect on mount no longer clobbers user edits: a getValues() check bails out if the user has already changed encoding or picked a subject/version while the detection fetch was in flight. - Auto-detect now calls applyDefaultMessageType for Protobuf subjects so the message-type picker isn't left blank after a successful detection. - generateSchemaSample errors surface as toasts via formatToastErrorMessageGRPC instead of being silently swallowed. --- .../components/pages/topics/topic-produce.tsx | 66 ++++++++++++++++++- 1 file changed, 63 insertions(+), 3 deletions(-) diff --git a/frontend/src/components/pages/topics/topic-produce.tsx b/frontend/src/components/pages/topics/topic-produce.tsx index 52cdac0ea0..b670cb214f 100644 --- a/frontend/src/components/pages/topics/topic-produce.tsx +++ b/frontend/src/components/pages/topics/topic-produce.tsx @@ -1,4 +1,5 @@ import { create } from '@bufbuild/protobuf'; +import type { ConnectError } from '@connectrpc/connect'; import { Alert, Box, @@ -55,6 +56,7 @@ import { import { appGlobal } from '../../../state/app-global'; import { api, useApiStoreHook } from '../../../state/backend-api'; import { uiState } from '../../../state/ui-state'; +import { formatToastErrorMessageGRPC } from '../../../utils/toast.utils'; import { Label } from '../../../utils/tsx-utils'; import { base64ToUInt8Array, isValidBase64, substringWithEllipsis } from '../../../utils/utils'; import KowlEditor from '../../misc/kowl-editor'; @@ -168,6 +170,7 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { control, register, setValue, + getValues, handleSubmit, setError, formState: { isSubmitting, errors }, @@ -294,10 +297,20 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { if (encoding === undefined) { return; } + // Race guard: don't clobber user edits made while the refresh was in flight. + // The form's defaults are encoding=TEXT and undefined schema/version, so any deviation + // means the user has touched the form and we should skip the pre-fill. + const current = getValues(target); + if (current.encoding !== PayloadEncoding.TEXT || current.schemaName || current.schemaVersion) { + return; + } setValue(`${target}.encoding`, encoding); setValue(`${target}.schemaName`, subjectName); if (detail.latestActiveVersion) { setValue(`${target}.schemaVersion`, detail.latestActiveVersion); + if (encoding === PayloadEncoding.PROTOBUF) { + applyDefaultMessageType(target, subjectName, detail.latestActiveVersion); + } } setAutoDetected((prev) => ({ ...prev, [target]: subjectName })); }; @@ -426,14 +439,51 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { const req = create(GenerateSchemaSampleRequestSchema); req.schemaId = schemaId; req.indexPath = indexPath ?? []; - const res = await client.generateSchemaSample(req).catch(() => undefined); - if (res?.sampleJson) { - setValue(`${target}.data`, res.sampleJson, { shouldDirty: true }); + try { + const res = await client.generateSchemaSample(req); + if (res?.sampleJson) { + setValue(`${target}.data`, res.sampleJson, { shouldDirty: true }); + } + } catch (err) { + toast({ + status: 'error', + description: formatToastErrorMessageGRPC({ + error: err as ConnectError, + action: 'generate schema sample', + entity: 'schema', + }), + }); } }; // biome-ignore lint/complexity: This will be refactored anyway as part of MobX removal const onSubmit: SubmitHandler = async (data) => { + // Validate schema-encoded payloads have a fully-resolved (subject, version) pair before we + // build the request. Without this guard the form silently submits with encoding=PROTOBUF + // and no schema_id, which the backend then rejects with a confusing error. + const validateSchemaSelection = (target: 'key' | 'value'): boolean => { + const opts = data[target]; + if (!encodingNeedsSchema(opts.encoding)) { + return true; + } + if (!opts.schemaName) { + setError(`${target}.schemaName`, { type: 'manual', message: 'Select a schema' }); + return false; + } + if (!opts.schemaVersion) { + setError(`${target}.schemaVersion`, { type: 'manual', message: 'Select a schema version' }); + return false; + } + if (!resolveSchemaId(opts.schemaName, opts.schemaVersion)) { + setError(`${target}.schemaName`, { type: 'manual', message: 'Schema is still loading — try again' }); + return false; + } + return true; + }; + if (!(validateSchemaSelection('key') && validateSchemaSelection('value'))) { + return; + } + const req = create(PublishMessageRequestSchema); req.topic = topicName; req.partitionId = data.partition; @@ -650,6 +700,12 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { api .refreshSchemaDetails(newVal) .then(() => { + // Stale-response guard: if the user has already switched to a + // different subject while this request was in flight, drop the + // result so we don't overwrite their new selection. + if (getValues('key.schemaName') !== newVal) { + return; + } const detail = api.schemaDetails.get(newVal); if (detail?.latestActiveVersion) { setValue('key.schemaVersion', detail.latestActiveVersion); @@ -824,6 +880,10 @@ const PublishTopicForm: FC<{ topicName: string }> = ({ topicName }) => { if (newVal) { // Fetch schema details to get available versions api.refreshSchemaDetails(newVal).then(() => { + // Stale-response guard: drop if user switched subject in-flight. + if (getValues('value.schemaName') !== newVal) { + return; + } const detail = api.schemaDetails.get(newVal); if (detail?.latestActiveVersion) { setValue('value.schemaVersion', detail.latestActiveVersion);