Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.pipe.sink.protocol.opcda;

import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
import org.apache.iotdb.pipe.api.exception.PipeException;

Expand All @@ -40,6 +41,7 @@
import com.sun.jna.ptr.IntByReference;
import com.sun.jna.ptr.PointerByReference;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.write.UnSupportedDataTypeException;
Expand All @@ -51,11 +53,17 @@
import java.io.Closeable;
import java.sql.Date;
import java.time.LocalDate;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_SEGMENT_ESCAPE_DEFAULT_VALUE;
import static org.apache.iotdb.db.pipe.sink.protocol.opcda.OpcDaHeader.IID_IOPCItemMgt;
import static org.apache.iotdb.db.pipe.sink.protocol.opcda.OpcDaHeader.IID_IOPCServer;
import static org.apache.iotdb.db.pipe.sink.protocol.opcda.OpcDaHeader.IID_IOPCSyncIO;
Expand All @@ -70,11 +78,19 @@
private final OpcDaHeader.IOPCSyncIO syncIO;
private final Map<String, Integer> serverHandleMap = new ConcurrentHashMap<>();
private final Map<String, Long> serverTimestampMap = new ConcurrentHashMap<>();
private final TableModelItemIdEncodingConfig tableModelItemIdEncodingConfig;

// Save it here to avoid memory leakage
private WTypes.BSTR bstr;

OpcDaServerHandle(String clsOrProgID) {
this(clsOrProgID, TableModelItemIdEncodingConfig.humanReadableDefaults());
}

OpcDaServerHandle(
final String clsOrProgID,
final TableModelItemIdEncodingConfig tableModelItemIdEncodingConfig) {
this.tableModelItemIdEncodingConfig = tableModelItemIdEncodingConfig;
final Guid.CLSID CLSID_OPC_SERVER = new Guid.CLSID(clsOrProgID);

Ole32.INSTANCE.CoInitializeEx(null, Ole32.COINIT_MULTITHREADED);
Expand Down Expand Up @@ -181,6 +197,20 @@
}

void transfer(final Tablet tablet) {
transfer(tablet, false, null);
}

void transfer(
final Tablet tablet, final boolean isTableModel, final String tableModelDatabaseName) {
if (!isTableModel) {
transferTreeModelTablet(tablet);
return;
}

transferTableModelTablet(tablet, tableModelDatabaseName);
}

private void transferTreeModelTablet(final Tablet tablet) {
new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
final List<IMeasurementSchema> schemas = tablet.getSchemas();

Expand All @@ -206,6 +236,233 @@
}
}

private void transferTableModelTablet(final Tablet tablet, final String tableModelDatabaseName) {
final List<IMeasurementSchema> schemas = tablet.getSchemas();
final Map<String, ItemValuePosition> itemIdToValuePosition =
collectLatestTableModelItemPositions(
tablet, tableModelDatabaseName, tableModelItemIdEncodingConfig);

for (final Map.Entry<String, ItemValuePosition> entry : itemIdToValuePosition.entrySet()) {
final String itemId = entry.getKey();
final ItemValuePosition itemValuePosition = entry.getValue();
final IMeasurementSchema schema = schemas.get(itemValuePosition.getColumnIndex());

if (!serverHandleMap.containsKey(itemId)) {
addItem(itemId, schema.getType());
}

final long timestamp = tablet.getTimestamp(itemValuePosition.getRowIndex());
if (serverTimestampMap.get(itemId) <= timestamp) {
writeData(
itemId,
getTabletObjectValue4Opc(
tablet.getValues()[itemValuePosition.getColumnIndex()],
itemValuePosition.getRowIndex(),
schema.getType()));
serverTimestampMap.put(itemId, timestamp);
}
}
}

static Map<String, ItemValuePosition> collectLatestTableModelItemPositions(
final Tablet tablet, final String tableModelDatabaseName) {
return collectLatestTableModelItemPositions(
tablet, tableModelDatabaseName, TableModelItemIdEncodingConfig.humanReadableDefaults());
}

static Map<String, ItemValuePosition> collectLatestTableModelItemPositions(
final Tablet tablet,
final String tableModelDatabaseName,
final TableModelItemIdEncodingConfig tableModelItemIdEncodingConfig) {
if (Objects.isNull(tableModelDatabaseName)) {
throw new PipeException("The table model database name must exist when transferring tablet.");
}

new PipeTableModelTabletEventSorter(tablet).sortAndDeduplicateByDevIdTimestamp();
final List<IMeasurementSchema> schemas = tablet.getSchemas();
final Map<String, ItemValuePosition> itemIdToValuePosition = new LinkedHashMap<>();

// Iterate backward so the first value captured for an item id is the latest non-null one.
for (int rowIndex = tablet.getRowSize() - 1; rowIndex >= 0; --rowIndex) {
for (int columnIndex = 0; columnIndex < schemas.size(); ++columnIndex) {
if (!tablet.getColumnTypes().get(columnIndex).equals(ColumnCategory.FIELD)
|| tablet.isNull(rowIndex, columnIndex)) {
continue;
}

itemIdToValuePosition.putIfAbsent(
generateTableModelItemId(
tableModelDatabaseName,
tablet,
rowIndex,
schemas.get(columnIndex).getMeasurementName(),
tableModelItemIdEncodingConfig),
new ItemValuePosition(rowIndex, columnIndex));
}
}

return itemIdToValuePosition;
}

static String generateTableModelItemId(
final String tableModelDatabaseName,
final Tablet tablet,
final int rowIndex,
final String measurementName) {
return generateTableModelItemId(
tableModelDatabaseName,
tablet,
rowIndex,
measurementName,
TableModelItemIdEncodingConfig.humanReadableDefaults());
}

static String generateTableModelItemId(
final String tableModelDatabaseName,
final Tablet tablet,
final int rowIndex,
final String measurementName,
final TableModelItemIdEncodingConfig tableModelItemIdEncodingConfig) {
// In table model, tablet.getDeviceID(rowIndex) already contains the table name as the first
// segment, followed by tag columns in declaration order. We prefix it with the database name
// and append the field measurement to build the OPC DA item id.
//
// Examples:
// 1. database=factory, deviceId=status.d1, measurement=s1 -> factory.status.d1.s1
// 2. database=factory, logical device-id segments are [status, "a.b", "c"],
// measurement=s1 -> factory.status.a__ESC__DOT__ESC__b.c.s1
// This case exists because '.' is also the OPC DA item-id path separator. If the raw tag
// value "a.b" were written directly, it would be indistinguishable from two segments "a"
// and "b", so it must be escaped first.
// 3. database=factory, deviceId=status.__NULL__, measurement=s1
// where the tag column is null -> factory.status.__NULL__.s1
final StringBuilder builder =
new StringBuilder(tableModelItemIdEncodingConfig.encodeSegment(tableModelDatabaseName));
for (final Object segment : tablet.getDeviceID(rowIndex).getSegments()) {
builder
.append(TsFileConstant.PATH_SEPARATOR)
.append(tableModelItemIdEncodingConfig.encodeSegment(segment));
}
return builder
.append(TsFileConstant.PATH_SEPARATOR)
.append(tableModelItemIdEncodingConfig.encodeSegment(measurementName))
.toString();
}

static final class ItemValuePosition {
private final int rowIndex;
private final int columnIndex;

private ItemValuePosition(final int rowIndex, final int columnIndex) {
this.rowIndex = rowIndex;
this.columnIndex = columnIndex;
}

int getRowIndex() {
return rowIndex;
}

int getColumnIndex() {
return columnIndex;
}
}

static final class TableModelItemIdEncodingConfig {
private final String nullTagSentinel;
private final String segmentEscape;
private final String escapedSegmentEscape;
private final String escapedPathSeparator;

static TableModelItemIdEncodingConfig humanReadableDefaults() {
return new TableModelItemIdEncodingConfig(
CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_DEFAULT_VALUE,
CONNECTOR_OPC_DA_SEGMENT_ESCAPE_DEFAULT_VALUE,
CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_DEFAULT_VALUE,
CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_DEFAULT_VALUE);
}

TableModelItemIdEncodingConfig(
final String nullTagSentinel,
final String segmentEscape,
final String escapedSegmentEscape,
final String escapedPathSeparator) {
validate(nullTagSentinel, segmentEscape, escapedSegmentEscape, escapedPathSeparator);
this.nullTagSentinel = nullTagSentinel;
this.segmentEscape = segmentEscape;
this.escapedSegmentEscape = escapedSegmentEscape;
this.escapedPathSeparator = escapedPathSeparator;
}

private static void validate(
final String nullTagSentinel,
final String segmentEscape,
final String escapedSegmentEscape,
final String escapedPathSeparator) {
final String pathSeparator = String.valueOf(TsFileConstant.PATH_SEPARATOR);
final List<String> tokens =
Arrays.asList(nullTagSentinel, segmentEscape, escapedSegmentEscape, escapedPathSeparator);
if (tokens.stream().anyMatch(token -> Objects.isNull(token) || token.isEmpty())) {
throw new PipeException(
"The OPC DA table model item id encoding tokens must be non-empty.");
}
if (tokens.stream().anyMatch(token -> token.contains(pathSeparator))) {
throw new PipeException(
"The OPC DA table model item id encoding tokens must not contain path separator '.'.");
}
if (nullTagSentinel.contains(segmentEscape)
|| escapedSegmentEscape.contains(segmentEscape)
|| escapedPathSeparator.contains(segmentEscape)) {
throw new PipeException(
"The OPC DA table model item id encoding tokens must not contain the segment escape token.");

Check warning on line 416 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaServerHandle.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 105).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ4pUCPOgYpe4Bv-J49-&open=AZ4pUCPOgYpe4Bv-J49-&pullRequest=17676
}
if (tokens.stream().distinct().count() != tokens.size()) {
throw new PipeException(
"The OPC DA table model item id encoding tokens must be pairwise different.");
}
}

String encodeSegment(final Object segment) {
if (Objects.isNull(segment)) {
return nullTagSentinel;
}

// Escape reserved markers inside one device-id segment so the final item id can still be
// split by '.' unambiguously. For example, with the default config:
// - "a.b" becomes "a__ESC__DOT__ESC__b"
// - "__NULL__" becomes "__ESC____NULL____ESC__"
return segment
.toString()
.replace(segmentEscape, escapedMarker(escapedSegmentEscape))
.replace(nullTagSentinel, escapedMarker(nullTagSentinel))
.replace(TsFileConstant.PATH_SEPARATOR, escapedMarker(escapedPathSeparator));
}

private String escapedMarker(final String marker) {
return segmentEscape + marker + segmentEscape;
}

@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof TableModelItemIdEncodingConfig)) {
return false;
}
final TableModelItemIdEncodingConfig that = (TableModelItemIdEncodingConfig) obj;
return Objects.equals(nullTagSentinel, that.nullTagSentinel)
&& Objects.equals(segmentEscape, that.segmentEscape)
&& Objects.equals(escapedSegmentEscape, that.escapedSegmentEscape)
&& Objects.equals(escapedPathSeparator, that.escapedPathSeparator);
}

@Override
public int hashCode() {
return Objects.hash(
nullTagSentinel, segmentEscape, escapedSegmentEscape, escapedPathSeparator);
}
}

private void addItem(final String itemId, final TSDataType type) {
final OpcDaHeader.OPCITEMDEF[] itemDefs = new OpcDaHeader.OPCITEMDEF[1];
itemDefs[0] = new OpcDaHeader.OPCITEMDEF();
Expand Down
Loading
Loading