Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/current/_includes/v25.4/cdc/message-format-list.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ By default, changefeeds emit messages in JSON format. You can use a different fo
- `json`
- `csv`
- `avro`
- `parquet`
- `parquet`
- `protobuf`
3 changes: 2 additions & 1 deletion src/current/_includes/v26.1/cdc/message-format-list.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ By default, changefeeds emit messages in JSON format. You can use a different fo
- `json`
- `csv`
- `avro`
- `parquet`
- `parquet`
- `protobuf`
2 changes: 1 addition & 1 deletion src/current/v25.4/changefeed-message-envelopes.md
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ Option | Description | Sink support
<a id="diff-option"></a>`diff` | Include a [`"before"`](#before) field in each message, showing the state of the row before the change. Supported with [`wrapped`](#wrapped-option) or [`enriched`](#enriched-option) envelopes. | All
<a id="enriched-properties-option"></a>`enriched_properties` (**Preview**) | (Only applicable when [`envelope=enriched`](#enriched-option) is set) Specify the type of metadata included in the message payload. Values: `source`, `schema`. | Kafka, Pub/Sub, webhook, sinkless
<a id="bare-option"></a>`envelope=bare` | Emit an envelope without the `"after"` wrapper. The row's column data is at the top level of the message. Metadata that would typically be separate will be under a [`"__crdb__"`](#__crdb__) field. Provides a more compact structure to the envelope. `bare` is the default envelope when using [CDC queries]({% link {{ page.version.version }}/cdc-queries.md %}). When `bare` is used with the Avro format, `"record"` will replace the `"after"` keyword. | All
<a id="enriched-option"></a>`envelope=enriched` (**Preview**) | Extend the envelope with [additional metadata fields](#field-reference). With `enriched_properties`, includes a [`"source"`](#source) field and/or a [`"schema"`](#schema) field with extra context. Supported in JSON and Avro message formats. | Kafka, Pub/Sub, webhook, sinkless
<a id="enriched-option"></a>`envelope=enriched` (**Preview**) | Extend the envelope with [additional metadata fields](#field-reference). With `enriched_properties`, includes a [`"source"`](#source) field and/or a [`"schema"`](#schema) field with extra context. Supported in JSON, Avro, and Protobuf [message formats]({% link {{ page.version.version }}/changefeed-messages.md %}#message-formats). | Kafka, Pub/Sub, webhook, sinkless
<a id="key-only-option"></a>`envelope=key_only` | [Send only the primary key](#key_only) of the changed row and no value payload, which is more efficient if only the key of the changed row is needed. Not compatible with the `updated` option. | Kafka, sinkless
`envelope=row` | Emit the row data without any additional metadata field in the envelope. Not supported in Avro format or with the [`diff`](#diff-option) option. | Kafka, sinkless
<a id="wrapped-option"></a>`envelope=wrapped` (default) | Produce changefeed messages in a wrapped structure with metadata and row data. [`wrapped`](#wrapped) includes an [`"after"`](#after) field, and optionally a [`"before"`](#before) field if [`diff`](#diff-option) is used. **Note:** Envelopes contain a primary key when your changefeed is emitting to a sink that does not have a message key as part of its protocol. By default, messages emitting to Kafka sinks do not have the primary key field. Use the [`key_in_value`](#key-in-value-option) option to include a primary key array field in messages emitted to [Kafka sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka). | All
Expand Down
30 changes: 30 additions & 0 deletions src/current/v25.4/changefeed-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,36 @@ With `encode_json_value_null_as_object` enabled, the changefeed will encode the

When `encode_json_value_null_as_object` is enabled, if the changefeed encounters the literal value `{"__crdb_json_null__": true}` in JSON, it will have the same representation as a JSON `NULL` value and a warning will be printed to the [`DEV` logging channel]({% link {{ page.version.version }}/logging.md %}#dev).

### Protobuf

{% include_cached new-in.html version="v25.4" %} You can use the [`format=protobuf`]({% link {{ page.version.version }}/create-changefeed.md %}#format) option to emit Protocol Buffers (Protobuf) format messages from your changefeed. Protobuf is a binary serialization format that can provide efficient integration with Protobuf-native messaging infrastructure, such as Kafka-based streaming systems.
Comment thread
florence-crl marked this conversation as resolved.
Outdated

The following sections provide information on Protobuf usage with CockroachDB changefeeds.

#### Protobuf limitations

The following changefeed option is **not** supported with `format=protobuf`:

- [`headers_json_column_name`]({% link {{ page.version.version }}/create-changefeed.md %}#headers-json-column-name): This option is specific to JSON format and does not work with Protobuf messages.

The following changefeed options **are** supported with `format=protobuf`:

- [`diff`]({% link {{ page.version.version }}/create-changefeed.md %}#diff)
- [`key_in_value`]({% link {{ page.version.version }}/create-changefeed.md %}#key-in-value)
- [`mvcc_timestamp`]({% link {{ page.version.version }}/create-changefeed.md %}#mvcc-timestamp)
- [`resolved`]({% link {{ page.version.version }}/create-changefeed.md %}#resolved)
- [`split_column_families`]({% link {{ page.version.version }}/create-changefeed.md %}#split-column-families)
- [`topic_in_value`]({% link {{ page.version.version }}/create-changefeed.md %}#topic-in-value)
- [`updated`]({% link {{ page.version.version }}/create-changefeed.md %}#updated)
- [`virtual_columns`]({% link {{ page.version.version }}/create-changefeed.md %}#virtual-columns)

#### Protobuf sink compatibility

Protobuf format is supported with the following changefeed sinks:

- [Kafka]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka)
- [Cloud storage]({% link {{ page.version.version }}/changefeed-sinks.md %}#cloud-storage-sink)

## See also

- [Online Schema Changes]({% link {{ page.version.version }}/online-schema-changes.md %})
Expand Down
6 changes: 3 additions & 3 deletions src/current/v25.4/create-changefeed.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ toc: true
docs_area: reference.sql
---

The `CREATE CHANGEFEED` [statement]({% link {{ page.version.version }}/sql-statements.md %}) creates a new changefeed, which targets an allowlist of tables called "watched rows". Every change to a watched row is emitted as a record in a configurable format (`JSON` or Avro) to a [configurable sink]({% link {{ page.version.version }}/changefeed-sinks.md %}) or directly to the SQL session.
The `CREATE CHANGEFEED` [statement]({% link {{ page.version.version }}/sql-statements.md %}) creates a new changefeed, which targets an allowlist of tables called "watched rows". Every change to a watched row is emitted as a record in a configurable format (such as JSON, Avro, CSV, Parquet, or Protobuf) to a [configurable sink]({% link {{ page.version.version }}/changefeed-sinks.md %}) or directly to the SQL session.

When a changefeed emits messages to a sink, it works as a [job]({% link {{ page.version.version }}/how-does-a-changefeed-work.md %}). You can [create](#examples), [pause](#pause-a-changefeed), [resume](#resume-a-paused-changefeed), [alter]({% link {{ page.version.version }}/alter-changefeed.md %}), or [cancel](#cancel-a-changefeed) a changefeed job.

Expand Down Expand Up @@ -122,10 +122,10 @@ Option | Value | Description
<a name="envelope"></a>`envelope` | `wrapped` / `enriched` / `bare` / `key_only` / `row` | `wrapped` the default envelope structure for changefeed messages containing an array of the primary key, a top-level field for the type of message, and the current state of the row (or `null` for deleted rows).<br><br>Refer to the [Changefeed Message Envelopes]({% link {{ page.version.version }}/changefeed-message-envelopes.md %}) page for more detail on each envelope.<br><br>Default: `envelope=wrapped`. Default for [CDC-queries]({% link {{ page.version.version }}/cdc-queries.md %}): `envelope=bare`.
<a name="execution-locality"></a>`execution_locality` | Key-value pairs | Restricts the execution of a changefeed to nodes that match the defined locality filter requirements, e.g., `WITH execution_locality = 'region=us-west-1a,cloud=aws'`. <br><br>See [Run a changefeed job by locality]({% link {{ page.version.version }}/changefeeds-in-multi-region-deployments.md %}#run-a-changefeed-job-by-locality) for usage and reference detail.
<a name="extra_headers"></a>`extra_headers` | [`json`]({% link {{ page.version.version }}/string.md %}) | Specifies extra headers for [webhook sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#webhook-sink) and [kafka sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka). Use this option to add headers to all messages sent to the sink. <br><br>Example: `CREATE CHANGEFEED FOR foo WITH extra_headers='{"x-api-key": "xxx", "some-other-header": "yyy"}'`
<a name="format"></a>`format` | `json` / `avro` / `csv` / `parquet` | Format of the emitted message. <br><br>`avro`: For mappings of CockroachDB types to Avro types, [refer-to-the-table]({% link {{ page.version.version }}/changefeed-messages.md %}#avro-types) and detail on [Avro-limitations]({% link {{ page.version.version }}/changefeed-messages.md %}#avro-limitations). **Note:** [`confluent_schema_registry`](#confluent-schema-registry) is required with `format=avro`. <br><br>`csv`: You cannot combine `format=csv` with the [`diff`](#diff) or [`resolved`](#resolved) options. Changefeeds use the same CSV format as the [`EXPORT`](export.html) statement. Refer to [Export-data-with-changefeeds]({% link {{ page.version.version }}/export-data-with-changefeeds.md %}) for details using these options to create a changefeed as an alternative to `EXPORT`. **Note:** [`initial_scan = 'only'`](#initial-scan) is required with `format=csv`. <br><br>`parquet`: Cloud storage is the only supported sink. The [`topic_in_value`](#topic-in-value) option is not compatible with `parquet` format.<br><br>Default: `format=json`.
<a name="format"></a>`format` | `json` / `avro` / `csv` / `parquet` / `protobuf` | Format of the emitted message. <br><br>`avro`: For mappings of CockroachDB types to Avro types, [refer-to-the-table]({% link {{ page.version.version }}/changefeed-messages.md %}#avro-types) and detail on [Avro-limitations]({% link {{ page.version.version }}/changefeed-messages.md %}#avro-limitations). **Note:** [`confluent_schema_registry`](#confluent-schema-registry) is required with `format=avro`. <br><br>`csv`: You cannot combine `format=csv` with the [`diff`](#diff) or [`resolved`](#resolved) options. Changefeeds use the same CSV format as the [`EXPORT`](export.html) statement. Refer to [Export-data-with-changefeeds]({% link {{ page.version.version }}/export-data-with-changefeeds.md %}) for details using these options to create a changefeed as an alternative to `EXPORT`. **Note:** [`initial_scan = 'only'`](#initial-scan) is required with `format=csv`. <br><br>`parquet`: [Cloud storage]({% link {{ page.version.version }}/changefeed-sinks.md %}#cloud-storage-sink) is the only supported sink. The [`topic_in_value`](#topic-in-value) option is not compatible with `parquet` format. <br><br>`protobuf`: [Kafka]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka) and [Cloud storage]({% link {{ page.version.version }}/changefeed-sinks.md %}#cloud-storage-sink) are the only supported sinks. The [`headers_json_column_name`](#headers-json-column-name) option is not compatible with `protobuf` format.<br><br>Default: `format=json`.
<a name="full-table-name"></a>`full_table_name` | N/A | Use fully qualified table name in topics, subjects, schemas, and record output instead of the default table name. This can prevent unintended behavior when the same table name is present in multiple databases.<br><br>**Note:** This option cannot modify existing table names used as topics, subjects, etc., as part of an [`ALTER CHANGEFEED`]({% link {{ page.version.version }}/alter-changefeed.md %}) statement. To modify a topic, subject, etc., to use a fully qualified table name, create a new changefeed with this option. <br><br>Example: `CREATE CHANGEFEED FOR foo... WITH full_table_name` will create the topic name `defaultdb.public.foo` instead of `foo`.
<a name="gc-protect-expires-after"></a>`gc_protect_expires_after` | [Duration string](https://pkg.go.dev/time#ParseDuration) | Automatically expires protected timestamp records that are older than the defined duration. In the case where a changefeed job remains paused, `gc_protect_expires_after` will trigger the underlying protected timestamp record to expire and cancel the changefeed job to prevent accumulation of protected data.<br><br>Refer to [Protect-Changefeed-Data-from-Garbage-Collection]({% link {{ page.version.version }}/protect-changefeed-data.md %}) for more detail on protecting changefeed data.
<a name="headers-json-column-name"></a>`headers_json_column_name` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Specify a [JSONB]({% link {{ page.version.version }}/jsonb.md %}) column that the changefeed emits as Kafka headers, separate from the message payload, for each row’s change event. `headers_json_column_name` is supported for Kafka sinks. For more details, refer to [Specify a column as a Kafka header]({% link {{ page.version.version }}/changefeed-messages.md %}#specify-a-column-as-a-kafka-header).
<a name="headers-json-column-name"></a>`headers_json_column_name` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Specify a [JSONB]({% link {{ page.version.version }}/jsonb.md %}) column that the changefeed emits as Kafka headers, separate from the message payload, for each row’s change event. `headers_json_column_name` is supported for Kafka sinks. For more details, refer to [Specify a column as a Kafka header]({% link {{ page.version.version }}/changefeed-messages.md %}#specify-a-column-as-a-kafka-header). **Note:** `headers_json_column_name` is not compatible with changefeeds running in [`protobuf` format](#format).
<a name="ignore-disable-changefeed-replication"></a>`ignore_disable_changefeed_replication` | [`BOOL`]({% link {{ page.version.version }}/bool.md %}) | When set to `true`, the changefeed **will emit** events even if CDC filtering for TTL jobs is configured using the `disable_changefeed_replication` [session variable]({% link {{ page.version.version }}/set-vars.md %}), `sql.ttl.changefeed_replication.disabled` [cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}), or the `ttl_disable_changefeed_replication` [table storage parameter]({% link {{ page.version.version }}/row-level-ttl.md %}).<br><br>Refer to [Filter changefeeds for tables using TTL](#filter-changefeeds-for-tables-using-row-level-ttl) for usage details.
<a name="initial-scan"></a>`initial_scan` | `yes`/`no`/`only` | Control whether or not an initial scan will occur at the start time of a changefeed. Only one `initial_scan` option (`yes`, `no`, or `only`) can be used. If none of these are set, an initial scan will occur if there is no [`cursor`](#cursor), and will not occur if there is one. This preserves the behavior from previous releases. With `initial_scan = 'only'` set, the changefeed job will end with a successful status (`succeeded`) after the initial scan completes. You cannot specify `yes`, `no`, `only` simultaneously. <br><br>If used in conjunction with `cursor`, an initial scan will be performed at the cursor timestamp. If no `cursor` is specified, the initial scan is performed at `now()`. <br><br>Although the [`initial_scan` / `no_initial_scan`]({% link {{ page.version.version }}/create-changefeed.md %}#initial-scan) syntax from previous versions is still supported, you cannot combine the previous and current syntax.<br><br>Default: `initial_scan = 'yes'`
<a name="kafka-sink-config"></a>`kafka_sink_config` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Set fields to configure the required level of message acknowledgement from the Kafka server, the version of the server, and batching parameters for Kafka sinks. Set the message file compression type. See [Kafka sink configuration]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka-sink-configuration) for more detail on configuring all the available fields for this option. <br><br>Example: `CREATE CHANGEFEED FOR table INTO 'kafka://localhost:9092' WITH kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "RequiredAcks": "ONE"}'`
Expand Down
2 changes: 1 addition & 1 deletion src/current/v26.1/changefeed-message-envelopes.md
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ Option | Description | Sink support
<a id="diff-option"></a>`diff` | Include a [`"before"`](#before) field in each message, showing the state of the row before the change. Supported with [`wrapped`](#wrapped-option) or [`enriched`](#enriched-option) envelopes. | All
<a id="enriched-properties-option"></a>`enriched_properties` (**Preview**) | (Only applicable when [`envelope=enriched`](#enriched-option) is set) Specify the type of metadata included in the message payload. Values: `source`, `schema`. | Kafka, Pub/Sub, webhook, sinkless
<a id="bare-option"></a>`envelope=bare` | Emit an envelope without the `"after"` wrapper. The row's column data is at the top level of the message. Metadata that would typically be separate will be under a [`"__crdb__"`](#__crdb__) field. Provides a more compact structure to the envelope. `bare` is the default envelope when using [CDC queries]({% link {{ page.version.version }}/cdc-queries.md %}). When `bare` is used with the Avro format, `"record"` will replace the `"after"` keyword. | All
<a id="enriched-option"></a>`envelope=enriched` (**Preview**) | Extend the envelope with [additional metadata fields](#field-reference). With `enriched_properties`, includes a [`"source"`](#source) field and/or a [`"schema"`](#schema) field with extra context. Supported in JSON and Avro message formats. | Kafka, Pub/Sub, webhook, sinkless
<a id="enriched-option"></a>`envelope=enriched` (**Preview**) | Extend the envelope with [additional metadata fields](#field-reference). With `enriched_properties`, includes a [`"source"`](#source) field and/or a [`"schema"`](#schema) field with extra context. Supported in JSON, Avro, and Protobuf [message formats]({% link {{ page.version.version }}/changefeed-messages.md %}#message-formats). | Kafka, Pub/Sub, webhook, sinkless
<a id="key-only-option"></a>`envelope=key_only` | [Send only the primary key](#key_only) of the changed row and no value payload, which is more efficient if only the key of the changed row is needed. Not compatible with the `updated` option. | Kafka, sinkless
`envelope=row` | Emit the row data without any additional metadata field in the envelope. Not supported in Avro format or with the [`diff`](#diff-option) option. | Kafka, sinkless
<a id="wrapped-option"></a>`envelope=wrapped` (default) | Produce changefeed messages in a wrapped structure with metadata and row data. [`wrapped`](#wrapped) includes an [`"after"`](#after) field, and optionally a [`"before"`](#before) field if [`diff`](#diff-option) is used. **Note:** Envelopes contain a primary key when your changefeed is emitting to a sink that does not have a message key as part of its protocol. By default, messages emitting to Kafka sinks do not have the primary key field. Use the [`key_in_value`](#key-in-value-option) option to include a primary key array field in messages emitted to [Kafka sinks]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka). | All
Expand Down
Loading
Loading