Skip to content

Latest commit

 

History

History
205 lines (155 loc) · 7.83 KB

File metadata and controls

205 lines (155 loc) · 7.83 KB

Defining a processor

What is a processor

A Processor is a component that converts incoming data to a different model.

A processor must have a method decorated with @ElementListener taking an incoming data and returning the processed data:

@ElementListener
public MyNewData map(final MyData data) {
    return ...;
}

Processors must be Serializable because they are distributed components.

If you just need to access data on a map-based ruleset, you can use Record or JsonObject as parameter type.
From there, Talend Component Kit wraps the data to allow you to access it as a map. The parameter type is not enforced.
This means that if you know that you will get a SuperCustomDto bean, then you can use it as parameter type. But for generic components that are reusable in any chain, it is highly encouraged to use Record until you have an evaluation language-based processor that has its own way to access components.

For example:

@ElementListener
public MyNewData map(final Record incomingData) {
    String name = incomingData.getString("name");
    int name = incomingData.getInt("age");
    return ...;
}

// equivalent to (using POJO subclassing)

public class Person {
    private String age;
    private int age;

    // getters/setters
}

@ElementListener
public MyNewData map(final Person person) {
    String name = person.getName();
    int age = person.getAge();
    return ...;
}

A processor also supports @BeforeGroup and @AfterGroup methods, which must not have any parameter and return void values. Any other result would be ignored. These methods are used by the runtime to mark a chunk of the data in a way which is estimated good for the execution flow size.

Note
Because the size is estimated, the size of a group can vary. It is even possible to have groups of size 1.

It is recommended to batch records, for performance reasons:

@BeforeGroup
public void initBatch() {
    // ...
}

@AfterGroup
public void endBatch() {
    // ...
}

You can optimize the data batch processing by using the maxBatchSize parameter. This parameter is automatically implemented on the component when it is deployed to a Talend application. Only the logic needs to be implemented. You can however customize its value setting in your LocalConfiguration the property _maxBatchSize.value - for the family - or ${component simple class name}._maxBatchSize.value - for a particular component, otherwise its default will be 1000. If you replace value by active, you can also configure if this feature is enabled or not. This is useful when you don’t want to use it at all. Learn how to implement chunking/bulking in this document.

Defining output connections

In some cases, you may need to split the output of a processor in two or more connections. A common example is to have "main" and "reject" output connections where parts of the incoming data are passed to a specific bucket for later processing.

Talend Component Kit supports two types of output connections: Flow and Reject.

  • Flow is the main and standard output connection.

  • The Reject connection handles records rejected during the processing. A component can only have one reject connection, if any. Its name must be REJECT to be processed correctly in Talend applications.

Note
You can also define the different output connections of your component in the Starter.

To define an output connection, you can use @Output as replacement of the returned value in the @ElementListener:

@ElementListener
public void map(final MyData data, @Output final OutputEmitter<MyNewData> output) {
    output.emit(createNewData(data));
}

Alternatively, you can pass a string that represents the new branch:

@ElementListener
public void map(final MyData data,
                @Output final OutputEmitter<MyNewData> main,
                @Output("REJECT") final OutputEmitter<MyNewDataWithError> rejected) {
    if (isRejected(data)) {
        rejected.emit(createNewData(data));
    } else {
        main.emit(createNewData(data));
    }
}

// or

@ElementListener
public MyNewData map(final MyData data,
                    @Output("REJECT") final OutputEmitter<MyNewDataWithError> rejected) {
    if (isSuspicious(data)) {
        rejected.emit(createNewData(data));
        return createNewData(data); // in this case the processing continues but notifies another channel
    }
    return createNewData(data);
}

Defining conditional outputs flows

Processors @ElementListener methods can declare several output flows. During the design phase, all output flows are typically available. However, in some cases, we may want to disable certain flows based on the user’s configuration settings. In such instances, a service will be called to determine the available output flows. (Currently, only Studio supports this feature.)

  • A processor without @ConditionalOutputFlows keep the current behavior. All declared flows are visible at design time

  • A processor with @ConditionalOutputFlows has its output flows list conditioned by its configuration

    • The @ConditionalOutputFlows has one parameter: the name of a @AvailableOutputFlows service: @ConditionalOutputFlows("the_name")

    • Each processor in a same TCK container can use a different service to retrieve the list of available output flows

      • So several services can be annotated with @AvailableOutputFlows but their name must be different @AvailableOutputFlows("the_name"), @AvailableOutputFlow("another_name"), …​

  • The return type of a @AvailableOutputFlows service is a List<String>, the build should fail if not.

@Processor(name = "Processor")
@ConditionalOutputFlows("my_service_that_return_active_flows")
public class MyProcessor implements Serializable {

  @ElementListener
  public void process(@Input final Record input,
          @Output final OutputEmitter<Record> main,
          @Output("second_flow") final OutputEmitter<Record> second,
          @Output("third_flow") final OutputEmitter<Record> third) {
      [.........]
  }
}

 @AvailableOutputFlows("my_service_that_return_active_flows")
  //After the @Option, must follow ("configuration")
  public Collection<String> getAvailableFlows(@Option("configuration") Config config) {
      final List<String> flows = Arrays.asList("default", "second_flow");
      if (config.isABoolean()) {
          flows.add("third_flow");
      }
      return flows;
  }

Defining multiple inputs

Having multiple inputs is similar to having multiple outputs, except that an OutputEmitter wrapper is not needed:

@ElementListener
public MyNewData map(@Input final MyData data, @Input("input2") final MyData2 data2) {
    return createNewData(data1, data2);
}

@Input takes the input name as parameter. If no name is set, it defaults to the "main (default)" input branch. It is recommended to use the default branch when possible and to avoid naming branches according to the component semantic.

Shortcut syntax for bulk output processors

For the case of output components (not emitting any data) using bulking you can pass the list of records to the @AfterGroup method:

@Processor(name = "DocOutput")
public class DocOutput implements Serializable {

    @AfterGroup
    public void onCommit(final Collection<Record> records) {
        // save records
    }
}