diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaServerHandle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaServerHandle.java index 816d757391b9..3bf677f16fb9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaServerHandle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaServerHandle.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.opcda; +import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter; import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -40,6 +41,7 @@ import com.sun.jna.ptr.IntByReference; import com.sun.jna.ptr.PointerByReference; import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.write.UnSupportedDataTypeException; @@ -51,11 +53,17 @@ import java.io.Closeable; import java.sql.Date; import java.time.LocalDate; +import java.util.Arrays; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_SEGMENT_ESCAPE_DEFAULT_VALUE; import static org.apache.iotdb.db.pipe.sink.protocol.opcda.OpcDaHeader.IID_IOPCItemMgt; import static org.apache.iotdb.db.pipe.sink.protocol.opcda.OpcDaHeader.IID_IOPCServer; import static org.apache.iotdb.db.pipe.sink.protocol.opcda.OpcDaHeader.IID_IOPCSyncIO; @@ -70,11 +78,19 @@ public class OpcDaServerHandle implements Closeable { private final OpcDaHeader.IOPCSyncIO syncIO; private final Map serverHandleMap = new ConcurrentHashMap<>(); private final Map serverTimestampMap = new ConcurrentHashMap<>(); + private final TableModelItemIdEncodingConfig tableModelItemIdEncodingConfig; // Save it here to avoid memory leakage private WTypes.BSTR bstr; OpcDaServerHandle(String clsOrProgID) { + this(clsOrProgID, TableModelItemIdEncodingConfig.humanReadableDefaults()); + } + + OpcDaServerHandle( + final String clsOrProgID, + final TableModelItemIdEncodingConfig tableModelItemIdEncodingConfig) { + this.tableModelItemIdEncodingConfig = tableModelItemIdEncodingConfig; final Guid.CLSID CLSID_OPC_SERVER = new Guid.CLSID(clsOrProgID); Ole32.INSTANCE.CoInitializeEx(null, Ole32.COINIT_MULTITHREADED); @@ -181,6 +197,20 @@ static String getClsIDFromProgID(final String progID) { } void transfer(final Tablet tablet) { + transfer(tablet, false, null); + } + + void transfer( + final Tablet tablet, final boolean isTableModel, final String tableModelDatabaseName) { + if (!isTableModel) { + transferTreeModelTablet(tablet); + return; + } + + transferTableModelTablet(tablet, tableModelDatabaseName); + } + + private void transferTreeModelTablet(final Tablet tablet) { new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); final List schemas = tablet.getSchemas(); @@ -206,6 +236,233 @@ void transfer(final Tablet tablet) { } } + private void transferTableModelTablet(final Tablet tablet, final String tableModelDatabaseName) { + final List schemas = tablet.getSchemas(); + final Map itemIdToValuePosition = + collectLatestTableModelItemPositions( + tablet, tableModelDatabaseName, tableModelItemIdEncodingConfig); + + for (final Map.Entry entry : itemIdToValuePosition.entrySet()) { + final String itemId = entry.getKey(); + final ItemValuePosition itemValuePosition = entry.getValue(); + final IMeasurementSchema schema = schemas.get(itemValuePosition.getColumnIndex()); + + if (!serverHandleMap.containsKey(itemId)) { + addItem(itemId, schema.getType()); + } + + final long timestamp = tablet.getTimestamp(itemValuePosition.getRowIndex()); + if (serverTimestampMap.get(itemId) <= timestamp) { + writeData( + itemId, + getTabletObjectValue4Opc( + tablet.getValues()[itemValuePosition.getColumnIndex()], + itemValuePosition.getRowIndex(), + schema.getType())); + serverTimestampMap.put(itemId, timestamp); + } + } + } + + static Map collectLatestTableModelItemPositions( + final Tablet tablet, final String tableModelDatabaseName) { + return collectLatestTableModelItemPositions( + tablet, tableModelDatabaseName, TableModelItemIdEncodingConfig.humanReadableDefaults()); + } + + static Map collectLatestTableModelItemPositions( + final Tablet tablet, + final String tableModelDatabaseName, + final TableModelItemIdEncodingConfig tableModelItemIdEncodingConfig) { + if (Objects.isNull(tableModelDatabaseName)) { + throw new PipeException("The table model database name must exist when transferring tablet."); + } + + new PipeTableModelTabletEventSorter(tablet).sortAndDeduplicateByDevIdTimestamp(); + final List schemas = tablet.getSchemas(); + final Map itemIdToValuePosition = new LinkedHashMap<>(); + + // Iterate backward so the first value captured for an item id is the latest non-null one. + for (int rowIndex = tablet.getRowSize() - 1; rowIndex >= 0; --rowIndex) { + for (int columnIndex = 0; columnIndex < schemas.size(); ++columnIndex) { + if (!tablet.getColumnTypes().get(columnIndex).equals(ColumnCategory.FIELD) + || tablet.isNull(rowIndex, columnIndex)) { + continue; + } + + itemIdToValuePosition.putIfAbsent( + generateTableModelItemId( + tableModelDatabaseName, + tablet, + rowIndex, + schemas.get(columnIndex).getMeasurementName(), + tableModelItemIdEncodingConfig), + new ItemValuePosition(rowIndex, columnIndex)); + } + } + + return itemIdToValuePosition; + } + + static String generateTableModelItemId( + final String tableModelDatabaseName, + final Tablet tablet, + final int rowIndex, + final String measurementName) { + return generateTableModelItemId( + tableModelDatabaseName, + tablet, + rowIndex, + measurementName, + TableModelItemIdEncodingConfig.humanReadableDefaults()); + } + + static String generateTableModelItemId( + final String tableModelDatabaseName, + final Tablet tablet, + final int rowIndex, + final String measurementName, + final TableModelItemIdEncodingConfig tableModelItemIdEncodingConfig) { + // In table model, tablet.getDeviceID(rowIndex) already contains the table name as the first + // segment, followed by tag columns in declaration order. We prefix it with the database name + // and append the field measurement to build the OPC DA item id. + // + // Examples: + // 1. database=factory, deviceId=status.d1, measurement=s1 -> factory.status.d1.s1 + // 2. database=factory, logical device-id segments are [status, "a.b", "c"], + // measurement=s1 -> factory.status.a__ESC__DOT__ESC__b.c.s1 + // This case exists because '.' is also the OPC DA item-id path separator. If the raw tag + // value "a.b" were written directly, it would be indistinguishable from two segments "a" + // and "b", so it must be escaped first. + // 3. database=factory, deviceId=status.__NULL__, measurement=s1 + // where the tag column is null -> factory.status.__NULL__.s1 + final StringBuilder builder = + new StringBuilder(tableModelItemIdEncodingConfig.encodeSegment(tableModelDatabaseName)); + for (final Object segment : tablet.getDeviceID(rowIndex).getSegments()) { + builder + .append(TsFileConstant.PATH_SEPARATOR) + .append(tableModelItemIdEncodingConfig.encodeSegment(segment)); + } + return builder + .append(TsFileConstant.PATH_SEPARATOR) + .append(tableModelItemIdEncodingConfig.encodeSegment(measurementName)) + .toString(); + } + + static final class ItemValuePosition { + private final int rowIndex; + private final int columnIndex; + + private ItemValuePosition(final int rowIndex, final int columnIndex) { + this.rowIndex = rowIndex; + this.columnIndex = columnIndex; + } + + int getRowIndex() { + return rowIndex; + } + + int getColumnIndex() { + return columnIndex; + } + } + + static final class TableModelItemIdEncodingConfig { + private final String nullTagSentinel; + private final String segmentEscape; + private final String escapedSegmentEscape; + private final String escapedPathSeparator; + + static TableModelItemIdEncodingConfig humanReadableDefaults() { + return new TableModelItemIdEncodingConfig( + CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_DEFAULT_VALUE, + CONNECTOR_OPC_DA_SEGMENT_ESCAPE_DEFAULT_VALUE, + CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_DEFAULT_VALUE, + CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_DEFAULT_VALUE); + } + + TableModelItemIdEncodingConfig( + final String nullTagSentinel, + final String segmentEscape, + final String escapedSegmentEscape, + final String escapedPathSeparator) { + validate(nullTagSentinel, segmentEscape, escapedSegmentEscape, escapedPathSeparator); + this.nullTagSentinel = nullTagSentinel; + this.segmentEscape = segmentEscape; + this.escapedSegmentEscape = escapedSegmentEscape; + this.escapedPathSeparator = escapedPathSeparator; + } + + private static void validate( + final String nullTagSentinel, + final String segmentEscape, + final String escapedSegmentEscape, + final String escapedPathSeparator) { + final String pathSeparator = String.valueOf(TsFileConstant.PATH_SEPARATOR); + final List tokens = + Arrays.asList(nullTagSentinel, segmentEscape, escapedSegmentEscape, escapedPathSeparator); + if (tokens.stream().anyMatch(token -> Objects.isNull(token) || token.isEmpty())) { + throw new PipeException( + "The OPC DA table model item id encoding tokens must be non-empty."); + } + if (tokens.stream().anyMatch(token -> token.contains(pathSeparator))) { + throw new PipeException( + "The OPC DA table model item id encoding tokens must not contain path separator '.'."); + } + if (nullTagSentinel.contains(segmentEscape) + || escapedSegmentEscape.contains(segmentEscape) + || escapedPathSeparator.contains(segmentEscape)) { + throw new PipeException( + "The OPC DA table model item id encoding tokens must not contain the segment escape token."); + } + if (tokens.stream().distinct().count() != tokens.size()) { + throw new PipeException( + "The OPC DA table model item id encoding tokens must be pairwise different."); + } + } + + String encodeSegment(final Object segment) { + if (Objects.isNull(segment)) { + return nullTagSentinel; + } + + // Escape reserved markers inside one device-id segment so the final item id can still be + // split by '.' unambiguously. For example, with the default config: + // - "a.b" becomes "a__ESC__DOT__ESC__b" + // - "__NULL__" becomes "__ESC____NULL____ESC__" + return segment + .toString() + .replace(segmentEscape, escapedMarker(escapedSegmentEscape)) + .replace(nullTagSentinel, escapedMarker(nullTagSentinel)) + .replace(TsFileConstant.PATH_SEPARATOR, escapedMarker(escapedPathSeparator)); + } + + private String escapedMarker(final String marker) { + return segmentEscape + marker + segmentEscape; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof TableModelItemIdEncodingConfig)) { + return false; + } + final TableModelItemIdEncodingConfig that = (TableModelItemIdEncodingConfig) obj; + return Objects.equals(nullTagSentinel, that.nullTagSentinel) + && Objects.equals(segmentEscape, that.segmentEscape) + && Objects.equals(escapedSegmentEscape, that.escapedSegmentEscape) + && Objects.equals(escapedPathSeparator, that.escapedPathSeparator); + } + + @Override + public int hashCode() { + return Objects.hash( + nullTagSentinel, segmentEscape, escapedSegmentEscape, escapedPathSeparator); + } + } + private void addItem(final String itemId, final TSDataType type) { final OpcDaHeader.OPCITEMDEF[] itemDefs = new OpcDaHeader.OPCITEMDEF[1]; itemDefs[0] = new OpcDaHeader.OPCITEMDEF(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaSink.java index 322943de402e..ce3cde4d7647 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaSink.java @@ -19,8 +19,10 @@ package org.apache.iotdb.db.pipe.sink.protocol.opcda; -import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.pipe.api.PipeConnector; +import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; @@ -30,29 +32,49 @@ import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.record.Tablet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_CLSID_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_PROGID_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_SEGMENT_ESCAPE_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_DA_SEGMENT_ESCAPE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_DA_CLSID_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_DA_ESCAPED_PATH_SEPARATOR_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_DA_ESCAPED_SEGMENT_ESCAPE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_DA_NULL_TAG_SENTINEL_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_DA_PROGID_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_DA_SEGMENT_ESCAPE_KEY; /** * Send data in IoTDB based on Opc Da protocol, using JNA. All data are converted into tablets, and * then push the newest value to the local COM server in another process. + * + *

For table-model events, the OPC DA item id is generated as {@code ..}. For example, if {@code tableModelDatabaseName=factory}, {@code + * deviceId=status.d1} and the field is {@code s1}, the sink writes to OPC DA item id {@code + * factory.status.d1.s1}. */ @TreeModel +@TableModel public class OpcDaSink implements PipeConnector { private static final Logger LOGGER = LoggerFactory.getLogger(OpcDaSink.class); - private static final Map> + private static final Map> CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP = new ConcurrentHashMap<>(); - private String clsID; + private ServerHandleKey serverHandleKey; private OpcDaServerHandle handle; @Override @@ -74,6 +96,8 @@ public void validate(final PipeParameterValidator validator) throws Exception { if (!System.getProperty("os.name").toLowerCase().startsWith("windows")) { throw new PipeParameterNotValidException("opc-da-sink must run on windows system."); } + + createTableModelItemIdEncodingConfig(validator.getParameters()); } @Override @@ -81,18 +105,24 @@ public void customize( final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration) throws Exception { synchronized (CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP) { - clsID = parameters.getStringByKeys(CONNECTOR_OPC_DA_CLSID_KEY, SINK_OPC_DA_CLSID_KEY); + String clsID = parameters.getStringByKeys(CONNECTOR_OPC_DA_CLSID_KEY, SINK_OPC_DA_CLSID_KEY); if (Objects.isNull(clsID)) { clsID = OpcDaServerHandle.getClsIDFromProgID( parameters.getStringByKeys(CONNECTOR_OPC_DA_PROGID_KEY, SINK_OPC_DA_PROGID_KEY)); } + serverHandleKey = + new ServerHandleKey(clsID, createTableModelItemIdEncodingConfig(parameters)); handle = CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP .computeIfAbsent( - clsID, key -> new Pair<>(new AtomicInteger(0), new OpcDaServerHandle(clsID))) + serverHandleKey, + key -> + new Pair<>( + new AtomicInteger(0), + new OpcDaServerHandle(key.clsID, key.tableModelItemIdEncodingConfig))) .getRight(); - CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.get(clsID).getLeft().incrementAndGet(); + CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.get(serverHandleKey).getLeft().incrementAndGet(); } } @@ -108,8 +138,10 @@ public void heartbeat() throws Exception { @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { - OpcUaSink.transferByTablet( - tabletInsertionEvent, LOGGER, (tablet, isTableModel) -> handle.transfer(tablet)); + transferByTablet( + tabletInsertionEvent, + (tablet, isTableModel, tableModelDatabaseName) -> + handle.transfer(tablet, isTableModel, tableModelDatabaseName)); } @Override @@ -119,13 +151,13 @@ public void transfer(final Event event) throws Exception { @Override public void close() throws Exception { - if (Objects.isNull(clsID)) { + if (Objects.isNull(serverHandleKey)) { return; } synchronized (CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP) { final Pair pair = - CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.get(clsID); + CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.get(serverHandleKey); if (pair == null) { return; } @@ -134,9 +166,125 @@ public void close() throws Exception { try { pair.getRight().close(); } finally { - CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.remove(clsID); + CLS_ID_TO_REFERENCE_COUNT_AND_HANDLE_MAP.remove(serverHandleKey); } } } } + + private static OpcDaServerHandle.TableModelItemIdEncodingConfig + createTableModelItemIdEncodingConfig(final PipeParameters parameters) { + return new OpcDaServerHandle.TableModelItemIdEncodingConfig( + parameters.getStringOrDefault( + Arrays.asList( + CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_KEY, SINK_OPC_DA_NULL_TAG_SENTINEL_KEY), + CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_DEFAULT_VALUE), + parameters.getStringOrDefault( + Arrays.asList(CONNECTOR_OPC_DA_SEGMENT_ESCAPE_KEY, SINK_OPC_DA_SEGMENT_ESCAPE_KEY), + CONNECTOR_OPC_DA_SEGMENT_ESCAPE_DEFAULT_VALUE), + parameters.getStringOrDefault( + Arrays.asList( + CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_KEY, + SINK_OPC_DA_ESCAPED_SEGMENT_ESCAPE_KEY), + CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_DEFAULT_VALUE), + parameters.getStringOrDefault( + Arrays.asList( + CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_KEY, + SINK_OPC_DA_ESCAPED_PATH_SEPARATOR_KEY), + CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_DEFAULT_VALUE)); + } + + private static void transferByTablet( + final TabletInsertionEvent tabletInsertionEvent, + final ThrowingTriConsumer transferTablet) + throws Exception { + if (!(tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) + && !(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) { + LOGGER.warn( + "This Connector only support " + + "PipeInsertNodeTabletInsertionEvent and PipeRawTabletInsertionEvent. " + + "Ignore {}.", + tabletInsertionEvent); + return; + } + + if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { + transferTabletWrapper( + (PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent, transferTablet); + } else { + transferTabletWrapper((PipeRawTabletInsertionEvent) tabletInsertionEvent, transferTablet); + } + } + + private static void transferTabletWrapper( + final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent, + final ThrowingTriConsumer transferTablet) + throws Exception { + if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(OpcDaSink.class.getName())) { + return; + } + try { + final boolean isTableModel = pipeInsertNodeTabletInsertionEvent.isTableModelEvent(); + final String tableModelDatabaseName = + isTableModel ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() : null; + for (final Tablet tablet : pipeInsertNodeTabletInsertionEvent.convertToTablets()) { + transferTablet.accept(tablet, isTableModel, tableModelDatabaseName); + } + } finally { + pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(OpcDaSink.class.getName(), false); + } + } + + private static void transferTabletWrapper( + final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent, + final ThrowingTriConsumer transferTablet) + throws Exception { + if (!pipeRawTabletInsertionEvent.increaseReferenceCount(OpcDaSink.class.getName())) { + return; + } + try { + final boolean isTableModel = pipeRawTabletInsertionEvent.isTableModelEvent(); + transferTablet.accept( + pipeRawTabletInsertionEvent.convertToTablet(), + isTableModel, + isTableModel ? pipeRawTabletInsertionEvent.getTableModelDatabaseName() : null); + } finally { + pipeRawTabletInsertionEvent.decreaseReferenceCount(OpcDaSink.class.getName(), false); + } + } + + @FunctionalInterface + private interface ThrowingTriConsumer { + void accept(final T t, final U u, final V v) throws E; + } + + private static final class ServerHandleKey { + private final String clsID; + private final OpcDaServerHandle.TableModelItemIdEncodingConfig tableModelItemIdEncodingConfig; + + private ServerHandleKey( + final String clsID, + final OpcDaServerHandle.TableModelItemIdEncodingConfig tableModelItemIdEncodingConfig) { + this.clsID = clsID; + this.tableModelItemIdEncodingConfig = tableModelItemIdEncodingConfig; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof ServerHandleKey)) { + return false; + } + final ServerHandleKey that = (ServerHandleKey) obj; + return Objects.equals(clsID, that.clsID) + && Objects.equals(tableModelItemIdEncodingConfig, that.tableModelItemIdEncodingConfig); + } + + @Override + public int hashCode() { + return Objects.hash(clsID, tableModelItemIdEncodingConfig); + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaServerHandleTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaServerHandleTest.java new file mode 100644 index 000000000000..96286a658ae1 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/protocol/opcda/OpcDaServerHandleTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.opcda; + +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.write.record.Tablet; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class OpcDaServerHandleTest { + + @Test + public void testCollectLatestTableModelItemPositions() { + final Tablet tablet = generateTableModelTablet(); + + final Map itemPositions = + OpcDaServerHandle.collectLatestTableModelItemPositions(tablet, "factory"); + + Assert.assertEquals(6, itemPositions.size()); + Assert.assertFalse(itemPositions.containsKey("factory.status.d1.site")); + + assertItem(itemPositions, tablet, "factory.status.d1.s1", 13L, 2L); + assertItem(itemPositions, tablet, "factory.status.d1.s2", false, 2L); + assertItem(itemPositions, tablet, "factory.status.d2.s1", 21L, 1L); + assertItem(itemPositions, tablet, "factory.status.d2.s2", true, 1L); + assertItem(itemPositions, tablet, "factory.status.__NULL__.s1", 31L, 3L); + assertItem(itemPositions, tablet, "factory.status.__NULL__.s2", false, 3L); + } + + @Test + public void testGenerateTableModelItemIdShouldEscapeSpecialSegments() { + final Tablet tablet = generateEscapedTableModelTablet(); + + final Map itemPositions = + OpcDaServerHandle.collectLatestTableModelItemPositions(tablet, "factory"); + + Assert.assertEquals(4, itemPositions.size()); + assertItem(itemPositions, tablet, "factory.status.a.b__ESC__DOT__ESC__c.s1", 11L, 1L); + assertItem(itemPositions, tablet, "factory.status.a__ESC__DOT__ESC__b.c.s1", 22L, 2L); + assertItem(itemPositions, tablet, "factory.status.__NULL__.c.s1", 33L, 3L); + assertItem(itemPositions, tablet, "factory.status.null.c.s1", 44L, 4L); + } + + @Test + public void testGenerateTableModelItemIdShouldHonorCustomEncodingConfig() { + final Tablet tablet = generateCustomEncodingTablet(); + final Map itemPositions = + OpcDaServerHandle.collectLatestTableModelItemPositions( + tablet, + "factory", + new OpcDaServerHandle.TableModelItemIdEncodingConfig("[NULL]", "[ESC]", "ESC", "DOT")); + + Assert.assertEquals(3, itemPositions.size()); + assertItem(itemPositions, tablet, "factory.status.[NULL].c.s1", 33L, 1L); + assertItem(itemPositions, tablet, "factory.status.[ESC][NULL][ESC].c.s1", 55L, 2L); + assertItem(itemPositions, tablet, "factory.status.[ESC]ESC[ESC].c.s1", 66L, 3L); + } + + private static void assertItem( + final Map itemPositions, + final Tablet tablet, + final String itemId, + final Object expectedValue, + final long expectedTimestamp) { + final OpcDaServerHandle.ItemValuePosition itemValuePosition = itemPositions.get(itemId); + Assert.assertNotNull(itemValuePosition); + Assert.assertEquals(expectedTimestamp, tablet.getTimestamp(itemValuePosition.getRowIndex())); + Assert.assertEquals( + expectedValue, + tablet.getValue(itemValuePosition.getRowIndex(), itemValuePosition.getColumnIndex())); + } + + private static Tablet generateTableModelTablet() { + final List measurementNames = Arrays.asList("site", "s1", "s2"); + final List dataTypes = + Arrays.asList(TSDataType.STRING, TSDataType.INT64, TSDataType.BOOLEAN); + final List columnTypes = + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD, ColumnCategory.FIELD); + + final Tablet tablet = new Tablet("status", measurementNames, dataTypes, columnTypes, 5); + tablet.initBitMaps(); + + addRow(tablet, 0, 2L, "d1", 12L, true); + addRow(tablet, 1, 3L, null, 31L, false); + addRow(tablet, 2, 1L, "d1", 11L, null); + addRow(tablet, 3, 1L, "d2", 21L, true); + addRow(tablet, 4, 2L, "d1", 13L, false); + return tablet; + } + + private static Tablet generateEscapedTableModelTablet() { + final List measurementNames = Arrays.asList("tag1", "tag2", "s1"); + final List dataTypes = + Arrays.asList(TSDataType.STRING, TSDataType.STRING, TSDataType.INT64); + final List columnTypes = + Arrays.asList(ColumnCategory.TAG, ColumnCategory.TAG, ColumnCategory.FIELD); + + final Tablet tablet = new Tablet("status", measurementNames, dataTypes, columnTypes, 4); + tablet.initBitMaps(); + + addRow(tablet, 0, 1L, new String[] {"a", "b.c"}, 11L); + addRow(tablet, 1, 2L, new String[] {"a.b", "c"}, 22L); + addRow(tablet, 2, 3L, new String[] {null, "c"}, 33L); + addRow(tablet, 3, 4L, new String[] {"null", "c"}, 44L); + return tablet; + } + + private static Tablet generateCustomEncodingTablet() { + final List measurementNames = Arrays.asList("tag1", "tag2", "s1"); + final List dataTypes = + Arrays.asList(TSDataType.STRING, TSDataType.STRING, TSDataType.INT64); + final List columnTypes = + Arrays.asList(ColumnCategory.TAG, ColumnCategory.TAG, ColumnCategory.FIELD); + + final Tablet tablet = new Tablet("status", measurementNames, dataTypes, columnTypes, 3); + tablet.initBitMaps(); + + addRow(tablet, 0, 1L, new String[] {null, "c"}, 33L); + addRow(tablet, 1, 2L, new String[] {"[NULL]", "c"}, 55L); + addRow(tablet, 2, 3L, new String[] {"[ESC]", "c"}, 66L); + return tablet; + } + + private static void addRow( + final Tablet tablet, + final int rowIndex, + final long timestamp, + final String site, + final Long s1, + final Boolean s2) { + tablet.addTimestamp(rowIndex, timestamp); + tablet.addValue( + "site", rowIndex, site == null ? null : new Binary(site.getBytes(StandardCharsets.UTF_8))); + tablet.addValue("s1", rowIndex, s1); + tablet.addValue("s2", rowIndex, s2); + tablet.setRowSize(rowIndex + 1); + } + + private static void addRow( + final Tablet tablet, + final int rowIndex, + final long timestamp, + final String[] tags, + final Long s1) { + tablet.addTimestamp(rowIndex, timestamp); + tablet.addValue( + "tag1", + rowIndex, + tags[0] == null ? null : new Binary(tags[0].getBytes(StandardCharsets.UTF_8))); + tablet.addValue( + "tag2", + rowIndex, + tags[1] == null ? null : new Binary(tags[1].getBytes(StandardCharsets.UTF_8))); + tablet.addValue("s1", rowIndex, s1); + tablet.setRowSize(rowIndex + 1); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/sink/opcda/OpcDaSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/sink/opcda/OpcDaSink.java index 76dd6b420723..ccb462f09cdb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/sink/opcda/OpcDaSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/sink/opcda/OpcDaSink.java @@ -20,6 +20,7 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.opcda; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.PlaceholderSink; +import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; /** @@ -29,4 +30,5 @@ * OPC DA connector. */ @TreeModel +@TableModel public class OpcDaSink extends PlaceholderSink {} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java index e6f14911e66a..e3428f4eb416 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java @@ -351,6 +351,27 @@ public class PipeSinkConstant { public static final String CONNECTOR_OPC_DA_PROGID_KEY = "connector.opcda.progid"; public static final String SINK_OPC_DA_PROGID_KEY = "sink.opcda.progid"; + public static final String CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_KEY = + "connector.opcda.null-tag-sentinel"; + public static final String SINK_OPC_DA_NULL_TAG_SENTINEL_KEY = "sink.opcda.null-tag-sentinel"; + public static final String CONNECTOR_OPC_DA_NULL_TAG_SENTINEL_DEFAULT_VALUE = "__NULL__"; + + public static final String CONNECTOR_OPC_DA_SEGMENT_ESCAPE_KEY = "connector.opcda.segment-escape"; + public static final String SINK_OPC_DA_SEGMENT_ESCAPE_KEY = "sink.opcda.segment-escape"; + public static final String CONNECTOR_OPC_DA_SEGMENT_ESCAPE_DEFAULT_VALUE = "__ESC__"; + + public static final String CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_KEY = + "connector.opcda.escaped-segment-escape"; + public static final String SINK_OPC_DA_ESCAPED_SEGMENT_ESCAPE_KEY = + "sink.opcda.escaped-segment-escape"; + public static final String CONNECTOR_OPC_DA_ESCAPED_SEGMENT_ESCAPE_DEFAULT_VALUE = "ESC"; + + public static final String CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_KEY = + "connector.opcda.escaped-path-separator"; + public static final String SINK_OPC_DA_ESCAPED_PATH_SEPARATOR_KEY = + "sink.opcda.escaped-path-separator"; + public static final String CONNECTOR_OPC_DA_ESCAPED_PATH_SEPARATOR_DEFAULT_VALUE = "DOT"; + public static final String CONNECTOR_USE_EVENT_USER_NAME_KEY = "connector.use-event-user-name"; public static final String SINK_USE_EVENT_USER_NAME_KEY = "sink.use-event-user-name"; public static final boolean CONNECTOR_USE_EVENT_USER_NAME_DEFAULT_VALUE = false;