Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -96,9 +97,8 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
private static final String UDF_OPTIONS_KEY = "options";

// Model related keys
private static final String MODEL_NAME_KEY = "model-name";

private static final String MODEL_CLASS_NAME_KEY = "class-name";
private static final String MODEL_NAME_KEY = "name";
private static final String MODEL_TYPE_KEY = "type";
Comment thread
yuxiqian marked this conversation as resolved.

public static final String TRANSFORM_PRIMARY_KEY_KEY = "primary-keys";

Expand Down Expand Up @@ -145,7 +145,6 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe

Optional.ofNullable(
((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(MODEL_KEY))
.map(node -> validateArray("model", node))
.ifPresent(node -> modelDefs.addAll(parseModels(node)));
}

Expand Down Expand Up @@ -428,24 +427,45 @@ private List<ModelDef> parseModels(JsonNode modelsNode) {
} else {
modelDefs.add(convertJsonNodeToModelDef(modelsNode));
}
Set<String> seenNames = new HashSet<>();
for (ModelDef model : modelDefs) {
if (!seenNames.add(model.getName())) {
throw new IllegalArgumentException(
"Duplicate model name '" + model.getName() + "' in pipeline definition.");
}
}
return modelDefs;
}

private ModelDef convertJsonNodeToModelDef(JsonNode modelNode) {
Preconditions.checkArgument(
modelNode instanceof ObjectNode,
"`model` in `pipeline` should be an object, but got %s",
modelNode);
ObjectNode node = (ObjectNode) modelNode;
String name =
checkNotNull(
modelNode.get(MODEL_NAME_KEY),
node.remove(MODEL_NAME_KEY),
"Missing required field \"%s\" in `model`",
MODEL_NAME_KEY)
.asText();
String model =
Preconditions.checkArgument(
name.matches("[a-zA-Z_][a-zA-Z0-9_]*") && !name.startsWith("__"),
"Model name \"%s\" is not a valid identifier. "
+ "It must start with a letter or underscore, "
+ "contain only letters, digits, or underscores, "
+ "and must not start with double underscores.",
name);
Comment thread
yuxiqian marked this conversation as resolved.
String type =
checkNotNull(
modelNode.get(MODEL_CLASS_NAME_KEY),
node.remove(MODEL_TYPE_KEY),
"Missing required field \"%s\" in `model`",
MODEL_CLASS_NAME_KEY)
MODEL_TYPE_KEY)
.asText();
Map<String, String> properties = mapper.convertValue(modelNode, Map.class);
return new ModelDef(name, model, properties);
Map<String, String> options = new HashMap<>();
node.fields()
.forEachRemaining(entry -> options.put(entry.getKey(), entry.getValue().asText()));
return new ModelDef(name, type, options);
}

Comment thread
yuxiqian marked this conversation as resolved.
private void validateJsonNodeKeys(
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,13 @@ pipeline:
schema-operator.rpc-timeout: 1 h
execution.runtime-mode: STREAMING
model:
- model-name: GET_EMBEDDING
class-name: OpenAIEmbeddingModel
openai.model: text-embedding-3-small
openai.host: https://xxxx
openai.apikey: abcd1234
- name: Sonnet
type: openai-compatible
model-name: claude-sonnet-4-6
endpoint: https://idealab.alibaba-inc.com/api/openai/v1
api-key: cafebabe
- name: Opus
type: openai-compatible
model-name: claude-opus-4-5
endpoint: https://idealab.alibaba-inc.com/api/openai/v1
api-key: cafebabe
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.factories;

import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.common.model.AiModelClient;

/**
* A factory to create {@link AiModelClient} instances. See also {@link Factory} for more
* information.
*/
@Experimental
public interface AiModelClientFactory extends Factory {

/** Creates a new {@link AiModelClient} instance. */
AiModelClient createClient(Context context);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.model;

import org.apache.flink.cdc.common.annotation.Experimental;

import java.io.Serializable;

/**
* Marker interface for a runtime AI model client. Concrete capabilities are declared via ability
* interfaces in {@code org.apache.flink.cdc.common.model.abilities}.
*
* <p>Implementations must be {@link Serializable} so that they can be distributed across Flink task
* managers together with the operator that holds them.
*/
@Experimental
public interface AiModelClient extends Serializable, AutoCloseable {

default void open() throws Exception {
// Do nothing
}

@Override
default void close() throws Exception {
// Do nothing
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.model.abilities;

import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.common.model.AiModelClient;

/**
* Ability interface for {@link AiModelClient} implementations that can produce dense vector
* embeddings from text input.
*/
@Experimental
public interface SupportsEmbedding {

/** Converts the given text into a dense float vector. */
float[] embed(String text);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.model.abilities;

import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.common.model.AiModelClient;

/**
* Ability interface for {@link AiModelClient} implementations that can perform chat-style text
* generation given a system prompt and a user input.
*/
@Experimental
public interface SupportsTextGeneration {

/**
* Generates text based on a system-level prompt and a user-provided input message. Returns a
* JSON string conforming to the output schema declared by the calling AI function.
*/
String generate(String systemPrompt, String userInput);
}
4 changes: 3 additions & 1 deletion flink-cdc-composer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@ limitations under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<!-- This is for testing AI functions with the dummy model. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-model</artifactId>
<artifactId>flink-cdc-pipeline-model-dummy</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,41 +20,31 @@
import java.util.Map;
import java.util.Objects;

/**
* Common properties of model.
*
* <p>A transformation definition contains:
*
* <ul>
* <li>modelName: The name of function.
* <li>className: The model to transform data.
* <li>parameters: The parameters that used to configure the model.
* </ul>
*/
/** Common properties of model. */
public class ModelDef {

private final String modelName;
private final String name;

private final String className;
private final String type;

private final Map<String, String> parameters;
private final Map<String, String> options;

public ModelDef(String modelName, String className, Map<String, String> parameters) {
this.modelName = modelName;
this.className = className;
this.parameters = parameters;
public ModelDef(String name, String type, Map<String, String> options) {
this.name = name;
this.type = type;
this.options = options;
}

public String getModelName() {
return modelName;
public String getName() {
return name;
}

public String getClassName() {
return className;
public String getType() {
return type;
}

public Map<String, String> getParameters() {
return parameters;
public Map<String, String> getOptions() {
return options;
}

@Override
Expand All @@ -66,27 +56,27 @@ public boolean equals(Object o) {
return false;
}
ModelDef modelDef = (ModelDef) o;
return Objects.equals(modelName, modelDef.modelName)
&& Objects.equals(className, modelDef.className)
&& Objects.equals(parameters, modelDef.parameters);
return Objects.equals(name, modelDef.name)
&& Objects.equals(type, modelDef.type)
&& Objects.equals(options, modelDef.options);
}

@Override
public int hashCode() {
return Objects.hash(modelName, className, parameters);
return Objects.hash(name, type, options);
}

@Override
public String toString() {
return "ModelDef{"
+ "name='"
+ modelName
+ name
+ '\''
+ ", model='"
+ className
+ ", type='"
+ type
+ '\''
+ ", parameters="
+ parameters
+ ", options="
+ options
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef)
stream,
pipelineDef.getTransforms(),
pipelineDef.getUdfs(),
pipelineDef.getModels(),
dataSource.supportedMetadataColumns());

// PreTransform ---> PostTransform
Expand Down
Loading
Loading