Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,10 @@ private ExecutionContext<?> baseInboxContext(UUID nodeId, UUID qryId, long fragm
NoOpIoTracker.INSTANCE,
0,
ImmutableMap.of(),
null);
null,
obj -> {
throw new UnsupportedOperationException("Unexpected method call.");
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.calcite.schema.SchemaPlus;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
Expand Down Expand Up @@ -148,10 +149,14 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
/** Entries holder per execution thread. */
private static final ThreadLocal<Collection<QueryTxEntry>> txEntriesHolder = new ThreadLocal<>();

/** Binary marshaller. */
private final Function<Object, Object> binaryMarshaller;

/**
* @param qctx Parent base query context.
* @param qryId Query ID.
* @param fragmentDesc Partitions information.
* @param binaryMarshaller Binary marshaller.
* @param params Parameters.
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
Expand All @@ -169,7 +174,8 @@ public ExecutionContext(
IoTracker ioTracker,
long timeout,
Map<String, Object> params,
@Nullable Collection<QueryTxEntry> qryTxEntries
@Nullable Collection<QueryTxEntry> qryTxEntries,
Function<Object, Object> binaryMarshaller
) {
super(qctx);

Expand All @@ -186,6 +192,7 @@ public ExecutionContext(
this.params = params;
this.timeout = timeout;
this.qryTxEntries = qryTxEntries == null ? txEntriesHolder.get() : qryTxEntries;
this.binaryMarshaller = binaryMarshaller;

startTs = U.currentTimeMillis();

Expand All @@ -199,6 +206,11 @@ public ExecutionContext(
);
}

/** Binary marshaller. */
public Function<Object, Object> binaryMarshaller() {
return binaryMarshaller;
}

/**
* @return Query ID.
*/
Expand Down Expand Up @@ -302,11 +314,16 @@ public IgniteLogger logger() {
return baseDataContext.get(name);
}

/** */
/** Returns a parameter with internal representation or {@link BinaryObject}. */
public Object getParameter(String name, Type storageType) {
assert name.startsWith("?") : name;

return TypeUtils.toInternal(this, params.get(name), storageType);
Object param = params.get(name);

if (param != null && storageType == java.lang.Object.class && !(param instanceof BinaryObject))
return binaryMarshaller.apply(param);

Comment thread
zstan marked this conversation as resolved.
return TypeUtils.toInternal(this, param, storageType);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
/** */
private final Map<String, FragmentPlan> fragmentPlanCache = new GridBoundedConcurrentLinkedHashMap<>(1024);

/** Binary marshaller. */
private Function<Object, Object> binaryMarshaller;

/**
* @param ctx Kernal.
*/
Expand Down Expand Up @@ -472,6 +475,10 @@ public void injectService(InjectResourcesService injectSvc) {

udfQryLimit.set(ctx.config().getQueryThreadPoolSize() - 1);

binaryMarshaller = obj -> {
return ctx.cacheObjects().binary().toBinary(obj);
};

init();
}

Expand Down Expand Up @@ -683,7 +690,9 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
createIoTracker(locNodeId, qry.localQueryId()),
timeout,
qryParams,
userTx == null ? null : ExecutionContext.transactionChanges(userTx.writeEntries()));
userTx == null ? null : ExecutionContext.transactionChanges(userTx.writeEntries()),
binaryMarshaller
);

Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(), mailboxRegistry(),
exchangeService(), failureProcessor()).go(fragment.root());
Expand Down Expand Up @@ -960,7 +969,8 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) {
createIoTracker(nodeId, msg.originatingQueryId()),
msg.timeout(),
Commons.parametersMap(msg.parameters()),
msg.queryTransactionEntries()
msg.queryTransactionEntries(),
binaryMarshaller
);

executeFragment(qry, fragmentPlan, ectx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ private int[] fieldToInlinedKeysMapping() {

for (InlineIndexKeyType keyType : inlinedKeys) {
// Variable length types can be not fully inlined, so it's probably better to directly read full cache row
// instead of trying to read inlined value and than falllback to cache row reading.
// instead of trying to read inlined value and then fallback to cache row reading.
// Inlined JAVA_OBJECT can't be compared with fill cache row in case of hash collision, this can lead to
// issues when processing the next index page in cursor if current page was concurrently splitted.
// issues when processing the next index page in cursor if current page was concurrently splited.
Comment thread
zstan marked this conversation as resolved.
if (keyType.keySize() < 0 || keyType.type() == IndexKeyType.JAVA_OBJECT)
Comment thread
zstan marked this conversation as resolved.
return null;
}
Expand Down Expand Up @@ -204,7 +204,7 @@ protected TreeIndex<IndexRow> treeIndex() {
}

/** From Row to IndexRow convertor. */
protected IndexRow row2indexRow(Row bound) {
protected @Nullable IndexRow row2indexRow(Row bound) {
if (bound == null)
return null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import java.util.Map;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyType;
import org.apache.ignite.internal.cache.query.index.sorted.IndexPlainRowImpl;
Expand Down Expand Up @@ -54,7 +54,7 @@ public IndexWrappedKeyScan(
}

/** */
@Override protected IndexRow row2indexRow(Row bound) {
@Override @Nullable protected IndexRow row2indexRow(Row bound) {
if (bound == null)
return null;

Expand All @@ -63,22 +63,29 @@ public IndexWrappedKeyScan(
Object key = rowHnd.get(QueryUtils.KEY_COL, bound);
assert key != null : String.format("idxName=%s, bound=%s", idx.name(), Commons.toString(rowHnd, bound));

if (key instanceof BinaryObject)
return binaryObject2indexRow((BinaryObject)key);
String idxTypeName = idx.indexDefinition().typeDescriptor().keyTypeName();

throw new IgniteException(String.format(
"Unsupported type for index boundary: [expected=%s, current=%s]",
BinaryObject.class.getName(), key.getClass().getName()
));
if (key instanceof BinaryObject bo) {
try {
String searchTypeName = bo.type().typeName();

if (!idxTypeName.equals(searchTypeName))
// The same behavior as for table scan.
return null;
}
catch (BinaryObjectException ex) {
// The same behavior as for table scan.
return null;
}

return binaryObject2indexRow(bo);
}
else
throw new AssertionError("Invalid types for comparison: %s %s".formatted(idxTypeName, key.getClass().getName()));
}

/** */
private IndexRow binaryObject2indexRow(BinaryObject o) {
assert o.type().typeName().equals(idx.indexDefinition().typeDescriptor().keyTypeName()) : String.format(
"idx=%s, o=%s, oType=%s, idxKeyType=%s",
idx.name(), o, o.type().typeName(), idx.indexDefinition().typeDescriptor().keyTypeName()
);

InlineIndexRowHandler idxRowHnd = idx.segment(0).rowHandler();
IndexKey[] keys = new IndexKey[idx.indexDefinition().indexKeyDefinitions().size()];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ else if (rel instanceof Intersect)

RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);

return new ScanNode<>(ctx, rowType, new TableFunctionScan<>(rowType, dataSupplier, rowFactory));
return new ScanNode<>(ctx, rowType, new TableFunctionScan<>(rowType, dataSupplier, rowFactory, ctx.binaryMarshaller()));
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Collection;
import java.util.Iterator;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
Expand All @@ -36,34 +37,57 @@ public class TableFunctionScan<Row> implements Iterable<Row> {
/** */
private final RowFactory<Row> rowFactory;

/** */
Function<Object, Object> binaryMarshaller;

/** */
private static final String ERR_SIZE_TEMPLATE = "Unable to process table function data: row length [%d]" +
" doesn't match defined columns number [%d].";

/** */
public TableFunctionScan(
RelDataType rowType,
Supplier<Iterable<?>> dataSupplier,
RowFactory<Row> rowFactory
RowFactory<Row> rowFactory,
Function<Object, Object> marshaller
) {
this.rowType = rowType;
this.dataSupplier = dataSupplier;
this.rowFactory = rowFactory;
binaryMarshaller = marshaller;
}

/** {@inheritDoc} */
@Override public Iterator<Row> iterator() {
return F.iterator(dataSupplier.get(), this::convertToRow, true);
}

/** */
private static void rowSizeChecker(int rowSize, int fldCount) {
if (rowSize != fldCount)
throw new IgniteSQLException(ERR_SIZE_TEMPLATE.formatted(rowSize, fldCount));
}

/** */
private Row convertToRow(Object rowContainer) {
if (rowContainer.getClass() != Object[].class && !Collection.class.isAssignableFrom(rowContainer.getClass()))
throw new IgniteSQLException("Unable to process table function data: row type is neither Collection or Object[].");

Object[] rowArr = rowContainer.getClass() == Object[].class
? (Object[])rowContainer
: ((Collection<?>)rowContainer).toArray();
if (rowContainer instanceof Object[])
rowSizeChecker(((Object[])rowContainer).length, rowType.getFieldCount());
else
rowSizeChecker(((Collection<?>)rowContainer).size(), rowType.getFieldCount());

if (rowArr.length != rowType.getFieldCount()) {
throw new IgniteSQLException("Unable to process table function data: row length [" + rowArr.length
+ "] doesn't match defined columns number [" + rowType.getFieldCount() + "].");
Object[] rowArr;

if (rowContainer.getClass().isArray()) {
rowArr = (Object[])rowContainer;
for (int pos = 0; pos < rowArr.length; ++pos)
rowArr[pos] = binaryMarshaller.apply(rowArr[pos]);
}
else {
Collection<?> coll = (Collection<?>)rowContainer;
rowArr = coll.stream().map(e -> binaryMarshaller.apply(e)).toArray();
}

return rowFactory.create(rowArr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,15 @@ public static BigDecimal toBigDecimal(boolean val, int precision, int scale) {
}

/** CAST(VARCHAR AS DECIMAL). */
public static BigDecimal toBigDecimal(String s, int precision, int scale) {
public static @Nullable BigDecimal toBigDecimal(String s, int precision, int scale) {
if (s == null)
return null;

return toBigDecimal(new BigDecimal(s.trim()), precision, scale);
}

/** Converts {@code val} to a {@link BigDecimal} with the given {@code precision} and {@code scale}. */
public static BigDecimal toBigDecimal(Number val, int precision, int scale) {
public static @Nullable BigDecimal toBigDecimal(Number val, int precision, int scale) {
assert precision > 0 : "Invalid precision: " + precision;
assert scale >= 0 : "Invalid scale: " + scale;

Expand All @@ -146,7 +146,7 @@ public static BigDecimal toBigDecimal(Number val, int precision, int scale) {
}

/** Cast object depending on type to DECIMAL. */
public static BigDecimal toBigDecimal(Object o, int precision, int scale) {
public static @Nullable BigDecimal toBigDecimal(Object o, int precision, int scale) {
if (o == null)
return null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ private <Row> Object insertVal(Row row, ExecutionContext<Row> ectx) throws Ignit
}

/** */
private Object newVal(String typeName) throws IgniteCheckedException {
private Object newVal(String typeName) {
GridCacheContext<?, ?> cctx = cacheContext();

BinaryObjectBuilder builder = cctx.grid().binary().builder(typeName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,12 @@ public static boolean hasScale(RelDataType type) {
}

/** */
public static Object toInternal(DataContext ctx, Object val) {
public static @Nullable Object toInternal(DataContext ctx, Object val) {
return val == null ? null : toInternal(ctx, val, val.getClass());
}

/** */
public static Object toInternal(DataContext ctx, Object val, Type storageType) {
public static @Nullable Object toInternal(DataContext ctx, Object val, Type storageType) {
if (val == null)
return null;
else if (storageType == java.sql.Date.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ public class LogicalRelImplementorTest extends GridCommonAbstractTest {
NoOpIoTracker.INSTANCE,
0,
null,
null
null,
obj -> {
throw new UnsupportedOperationException("Unexpected method call.");
}
) {
@Override public ColocationGroup group(long srcId) {
return ColocationGroup.forNodes(Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ private RuntimeSortedIndex<Object[]> generate(RelDataType rowType, final List<In
NoOpIoTracker.INSTANCE,
0,
null,
null),
null,
obj -> {
throw new UnsupportedOperationException("Unexpected method call.");
}),
RelCollations.of(ImmutableIntList.copyOf(idxCols)),
(o1, o2) -> {
for (int colIdx : idxCols) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,10 @@ protected ExecutionContext<Object[]> executionContext(UUID nodeId, UUID qryId, l
NoOpIoTracker.INSTANCE,
0,
ImmutableMap.of(),
null
null,
obj -> {
throw new UnsupportedOperationException("Unexpected method call.");
}
);
}

Expand Down
Loading
Loading