Skip to content

Commit fe10036

Browse files
authored
Merge branch 'main' into fixSchemaPrefix
2 parents 2a0af95 + 303496c commit fe10036

12 files changed

Lines changed: 463 additions & 278 deletions

File tree

core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/naive/NaiveOperatorMemoryExecutor.java

Lines changed: 147 additions & 165 deletions
Large diffs are not rendered by default.

core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashInnerJoinLazyStream.java

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,17 @@
44
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
55
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils;
66
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils;
7-
import cn.edu.tsinghua.iginx.engine.shared.data.read.Field;
7+
import cn.edu.tsinghua.iginx.engine.shared.data.Value;
88
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
99
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
1010
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
1111
import cn.edu.tsinghua.iginx.engine.shared.operator.InnerJoin;
1212
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
1313
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType;
1414
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter;
15+
import cn.edu.tsinghua.iginx.thrift.DataType;
1516
import cn.edu.tsinghua.iginx.utils.Pair;
17+
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
1618
import java.util.ArrayList;
1719
import java.util.Arrays;
1820
import java.util.Collections;
@@ -38,6 +40,8 @@ public class HashInnerJoinLazyStream extends BinaryLazyStream {
3840
private String joinColumnA;
3941

4042
private String joinColumnB;
43+
44+
private boolean needTypeCast = false;
4145

4246
public HashInnerJoinLazyStream(InnerJoin innerJoin, RowStream streamA, RowStream streamB) {
4347
super(streamA, streamB);
@@ -89,18 +93,28 @@ private void initialize() throws PhysicalException {
8993
}
9094
}
9195
this.index = headerB.indexOf(innerJoin.getPrefixB() + '.' + joinColumnB);
96+
97+
int indexA = headerA.indexOf(innerJoin.getPrefixA() + '.' + joinColumnA);
98+
DataType dataTypeA = headerA.getField(indexA).getType();
99+
DataType dataTypeB = headerB.getField(index).getType();
100+
if (ValueUtils.isNumericType(dataTypeA) && ValueUtils.isNumericType(dataTypeB)) {
101+
this.needTypeCast = true;
102+
}
92103

93104
while (streamB.hasNext()) {
94105
Row rowB = streamB.next();
95-
Object value = rowB.getValue(innerJoin.getPrefixB() + '.' + joinColumnB);
106+
Value value = rowB.getAsValue(innerJoin.getPrefixB() + '.' + joinColumnB);
96107
if (value == null) {
97108
continue;
98109
}
110+
if (needTypeCast) {
111+
value = ValueUtils.transformToDouble(value);
112+
}
99113
int hash;
100-
if (value instanceof byte[]) {
101-
hash = Arrays.hashCode((byte[]) value);
114+
if (value.getDataType() == DataType.BINARY) {
115+
hash = Arrays.hashCode(value.getBinaryV());
102116
} else {
103-
hash = value.hashCode();
117+
hash = value.getValue().hashCode();
104118
}
105119
List<Row> rows = streamBHashMap.getOrDefault(hash, new ArrayList<>());
106120
rows.add(rowB);
@@ -139,16 +153,18 @@ public boolean hasNext() throws PhysicalException {
139153
private void tryMatch() throws PhysicalException {
140154
Row rowA = streamA.next();
141155

142-
Object value = rowA.getValue(innerJoin.getPrefixA() + '.' + joinColumnA);
156+
Value value = rowA.getAsValue(innerJoin.getPrefixA() + '.' + joinColumnA);
143157
if (value == null) {
144158
return;
145159
}
146-
160+
if (needTypeCast) {
161+
value = ValueUtils.transformToDouble(value);
162+
}
147163
int hash;
148-
if (value instanceof byte[]) {
149-
hash = Arrays.hashCode((byte[]) value);
164+
if (value.getDataType() == DataType.BINARY) {
165+
hash = Arrays.hashCode(value.getBinaryV());
150166
} else {
151-
hash = value.hashCode();
167+
hash = value.getValue().hashCode();
152168
}
153169

154170
if (streamBHashMap.containsKey(hash)) {

core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/HashOuterJoinLazyStream.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@
44
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
55
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils;
66
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils;
7+
import cn.edu.tsinghua.iginx.engine.shared.data.Value;
78
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
89
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
910
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
11+
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
1012
import cn.edu.tsinghua.iginx.engine.shared.operator.OuterJoin;
1113
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
1214
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType;
1315
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter;
1416
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType;
17+
import cn.edu.tsinghua.iginx.thrift.DataType;
1518
import cn.edu.tsinghua.iginx.utils.Pair;
1619
import java.util.ArrayList;
1720
import java.util.Arrays;
@@ -48,6 +51,8 @@ public class HashOuterJoinLazyStream extends BinaryLazyStream {
4851
private String joinColumnA;
4952

5053
private String joinColumnB;
54+
55+
private boolean needTypeCast = false;
5156

5257
public HashOuterJoinLazyStream(OuterJoin outerJoin, RowStream streamA, RowStream streamB) {
5358
super(streamA, streamB);
@@ -103,24 +108,35 @@ private void initialize() throws PhysicalException {
103108
}
104109
}
105110

111+
int indexAnother;
106112
if (outerJoinType == OuterJoinType.RIGHT) {
107113
this.index = headerA.indexOf(outerJoin.getPrefixA() + '.' + joinColumnA);
114+
indexAnother = headerB.indexOf(outerJoin.getPrefixB() + '.' + joinColumnB);
108115
} else {
109116
this.index = headerB.indexOf(outerJoin.getPrefixB() + '.' + joinColumnB);
117+
indexAnother = headerA.indexOf(outerJoin.getPrefixA() + '.' + joinColumnA);
110118
}
111119

120+
DataType dataType1 = headerA.getField(indexAnother).getType();
121+
DataType dataType2 = headerB.getField(index).getType();
122+
if (ValueUtils.isNumericType(dataType1) && ValueUtils.isNumericType(dataType2)) {
123+
this.needTypeCast = true;
124+
}
112125

113126
while (streamB.hasNext()) {
114127
Row rowB = streamB.next();
115-
Object value = rowB.getValue(outerJoin.getPrefixB() + '.' + joinColumnB);
128+
Value value = rowB.getAsValue(outerJoin.getPrefixB() + '.' + joinColumnB);
116129
if (value == null) {
117130
continue;
118131
}
132+
if (needTypeCast) {
133+
value = ValueUtils.transformToDouble(value);
134+
}
119135
int hash;
120-
if (value instanceof byte[]) {
121-
hash = Arrays.hashCode((byte[]) value);
136+
if (value.getDataType() == DataType.BINARY) {
137+
hash = Arrays.hashCode(value.getBinaryV());
122138
} else {
123-
hash = value.hashCode();
139+
hash = value.getValue().hashCode();
124140
}
125141
List<Row> rows = streamBHashMap.getOrDefault(hash, new ArrayList<>());
126142
rows.add(rowB);
@@ -203,16 +219,18 @@ public boolean hasNext() throws PhysicalException {
203219
private void tryMatch() throws PhysicalException {
204220
Row rowA = streamA.next();
205221

206-
Object value = rowA.getValue(outerJoin.getPrefixA() + '.' + joinColumnA);
222+
Value value = rowA.getAsValue(outerJoin.getPrefixA() + '.' + joinColumnA);
207223
if (value == null) {
208224
return;
209225
}
210-
226+
if (needTypeCast) {
227+
value = ValueUtils.transformToDouble(value);
228+
}
211229
int hash;
212-
if (value instanceof byte[]) {
213-
hash = Arrays.hashCode((byte[]) value);
230+
if (value.getDataType() == DataType.BINARY) {
231+
hash = Arrays.hashCode(value.getBinaryV());
214232
} else {
215-
hash = value.hashCode();
233+
hash = value.getValue().hashCode();
216234
}
217235

218236
if (streamBHashMap.containsKey(hash)) {

core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopInnerJoinLazyStream.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,15 @@
44
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
55
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils;
66
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils;
7-
import cn.edu.tsinghua.iginx.engine.shared.data.read.Field;
87
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
98
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
109
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
10+
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
1111
import cn.edu.tsinghua.iginx.engine.shared.operator.InnerJoin;
1212
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
1313
import cn.edu.tsinghua.iginx.utils.Pair;
1414
import java.util.ArrayList;
1515
import java.util.List;
16-
import java.util.Objects;
1716

1817
public class NestedLoopInnerJoinLazyStream extends BinaryLazyStream {
1918

@@ -131,7 +130,8 @@ private Row tryMatch() throws PhysicalException {
131130
}
132131
} else { // Join condition: natural or using
133132
for (String joinColumn : joinColumns) {
134-
if (!Objects.equals(nextA.getValue(innerJoin.getPrefixA() + '.' + joinColumn), nextB.getValue(innerJoin.getPrefixB() + '.' + joinColumn))) {
133+
if (ValueUtils.compare(nextA.getAsValue(innerJoin.getPrefixA() + '.' + joinColumn),
134+
nextB.getAsValue(innerJoin.getPrefixB() + '.' + joinColumn)) != 0) {
135135
nextB = null;
136136
return null;
137137
}

core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/NestedLoopOuterJoinLazyStream.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@
77
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
88
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
99
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
10+
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
1011
import cn.edu.tsinghua.iginx.engine.shared.operator.OuterJoin;
1112
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
1213
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType;
1314
import cn.edu.tsinghua.iginx.utils.Pair;
1415
import java.util.ArrayList;
1516
import java.util.HashSet;
1617
import java.util.List;
17-
import java.util.Objects;
1818
import java.util.Set;
1919

2020
public class NestedLoopOuterJoinLazyStream extends BinaryLazyStream {
@@ -205,7 +205,8 @@ private Row tryMatch() throws PhysicalException {
205205
}
206206
} else { // Join condition: natural or using
207207
for (String joinColumn : joinColumns) {
208-
if (!Objects.equals(nextA.getValue(outerJoin.getPrefixA() + '.' + joinColumn), nextB.getValue(outerJoin.getPrefixB() + '.' + joinColumn))) {
208+
if (ValueUtils.compare(nextA.getAsValue(outerJoin.getPrefixA() + '.' + joinColumn),
209+
nextB.getAsValue(outerJoin.getPrefixB() + '.' + joinColumn)) != 0) {
209210
nextB = null;
210211
return null;
211212
}

core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeInnerJoinLazyStream.java

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,21 @@
44
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
55
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils;
66
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils;
7-
import cn.edu.tsinghua.iginx.engine.shared.data.read.Field;
7+
import cn.edu.tsinghua.iginx.engine.shared.data.Value;
88
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
99
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
1010
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
11+
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
1112
import cn.edu.tsinghua.iginx.engine.shared.operator.InnerJoin;
1213
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
1314
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType;
1415
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter;
15-
import cn.edu.tsinghua.iginx.thrift.DataType;
1616
import cn.edu.tsinghua.iginx.utils.Pair;
1717
import java.util.ArrayList;
1818
import java.util.Collections;
1919
import java.util.Deque;
2020
import java.util.LinkedList;
2121
import java.util.List;
22-
import java.util.Objects;
2322

2423
/**
2524
* input two stream must be ascending order.
@@ -36,15 +35,13 @@ public class SortedMergeInnerJoinLazyStream extends BinaryLazyStream {
3635

3736
private String joinColumnB;
3837

39-
private DataType joinColumnDataType;
40-
4138
private Row nextA;
4239

4340
private Row nextB;
4441

4542
private int index;
4643

47-
private Object curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join
44+
private Value curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join
4845

4946
private final List<Row> sameValueStreamBRows; // StreamB中join列的值相同的列缓存
5047

@@ -101,13 +98,6 @@ private void initialize() throws PhysicalException {
10198
}
10299
this.index = headerB.indexOf(innerJoin.getPrefixB() + '.' + joinColumnB);
103100

104-
DataType dataTypeA = headerA.getField(headerA.indexOf(innerJoin.getPrefixA() + "." + joinColumnA)).getType();
105-
DataType dataTypeB = headerA.getField(headerA.indexOf(innerJoin.getPrefixA() + "." + joinColumnA)).getType();
106-
if (!dataTypeA.equals(dataTypeB)) {
107-
throw new InvalidOperatorParameterException("the datatype of join columns is different");
108-
}
109-
joinColumnDataType = dataTypeA;
110-
111101
if (filter != null) { // Join condition: on
112102
this.header = RowUtils.constructNewHead(headerA, headerB, innerJoin.getPrefixA(), innerJoin.getPrefixB());
113103
} else { // Join condition: natural or using
@@ -135,8 +125,8 @@ public boolean hasNext() throws PhysicalException {
135125
}
136126

137127
private void tryMatch() throws PhysicalException {
138-
Object curJoinColumnAValue = nextA.getValue(innerJoin.getPrefixA() + "." + joinColumnA);
139-
int cmp = RowUtils.compareObjects(joinColumnDataType, curJoinColumnAValue, curJoinColumnBValue);
128+
Value curJoinColumnAValue = nextA.getAsValue(innerJoin.getPrefixA() + "." + joinColumnA);
129+
int cmp = ValueUtils.compare(curJoinColumnAValue, curJoinColumnBValue);
140130
if (cmp < 0) {
141131
nextA = null;
142132
} else if (cmp > 0) {
@@ -169,13 +159,13 @@ private boolean hasMoreRows() throws PhysicalException {
169159
}
170160
while (sameValueStreamBRows.isEmpty() && nextB != null) {
171161
sameValueStreamBRows.add(nextB);
172-
curJoinColumnBValue = nextB.getValue(innerJoin.getPrefixB() + "." + joinColumnB);
162+
curJoinColumnBValue = nextB.getAsValue(innerJoin.getPrefixB() + "." + joinColumnB);
173163
nextB = null;
174164

175165
while (streamB.hasNext()) {
176166
nextB = streamB.next();
177-
Object joinColumnBValue = nextB.getValue(innerJoin.getPrefixB() + "." + joinColumnB);
178-
if (Objects.equals(joinColumnBValue, curJoinColumnBValue)) {
167+
Value joinColumnBValue = nextB.getAsValue(innerJoin.getPrefixB() + "." + joinColumnB);
168+
if (ValueUtils.compare(joinColumnBValue, curJoinColumnBValue) == 0) {
179169
sameValueStreamBRows.add(nextB);
180170
nextB = null;
181171
} else {

core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/stream/SortedMergeOuterJoinLazyStream.java

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,22 @@
44
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
55
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils;
66
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.RowUtils;
7+
import cn.edu.tsinghua.iginx.engine.shared.data.Value;
78
import cn.edu.tsinghua.iginx.engine.shared.data.read.Header;
89
import cn.edu.tsinghua.iginx.engine.shared.data.read.Row;
910
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
11+
import cn.edu.tsinghua.iginx.engine.shared.function.system.utils.ValueUtils;
1012
import cn.edu.tsinghua.iginx.engine.shared.operator.OuterJoin;
1113
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
1214
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.FilterType;
1315
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.PathFilter;
1416
import cn.edu.tsinghua.iginx.engine.shared.operator.type.OuterJoinType;
15-
import cn.edu.tsinghua.iginx.thrift.DataType;
1617
import cn.edu.tsinghua.iginx.utils.Pair;
1718
import java.util.ArrayList;
1819
import java.util.Collections;
1920
import java.util.Deque;
2021
import java.util.LinkedList;
2122
import java.util.List;
22-
import java.util.Objects;
2323

2424
public class SortedMergeOuterJoinLazyStream extends BinaryLazyStream {
2525

@@ -35,13 +35,11 @@ public class SortedMergeOuterJoinLazyStream extends BinaryLazyStream {
3535

3636
private String joinColumnB;
3737

38-
private DataType joinColumnDataType;
39-
4038
private Row nextA;
4139

4240
private Row nextB;
4341

44-
private Object curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join
42+
private Value curJoinColumnBValue; // 当前StreamB中join列的值,用于同值join
4543

4644
private final List<Row> sameValueStreamBRows; // StreamB中join列的值相同的列缓存
4745

@@ -113,13 +111,6 @@ private void initialize() throws PhysicalException {
113111
this.index = headerB.indexOf(outerJoin.getPrefixB() + '.' + joinColumnB);
114112
}
115113

116-
DataType dataTypeA = headerA.getField(headerA.indexOf(outerJoin.getPrefixA() + "." + joinColumnA)).getType();
117-
DataType dataTypeB = headerA.getField(headerA.indexOf(outerJoin.getPrefixA() + "." + joinColumnA)).getType();
118-
if (!dataTypeA.equals(dataTypeB)) {
119-
throw new InvalidOperatorParameterException("the datatype of join columns is different");
120-
}
121-
joinColumnDataType = dataTypeA;
122-
123114
if (filter != null) { // Join condition: on
124115
this.header = RowUtils.constructNewHead(headerA, headerB, outerJoin.getPrefixA(), outerJoin.getPrefixB());
125116
} else { // Join condition: natural or using
@@ -205,8 +196,8 @@ public boolean hasNext() throws PhysicalException {
205196
}
206197

207198
private void tryMatch() throws PhysicalException {
208-
Object curJoinColumnAValue = nextA.getValue(outerJoin.getPrefixA() + "." + joinColumnA);
209-
int cmp = RowUtils.compareObjects(joinColumnDataType, curJoinColumnAValue, curJoinColumnBValue);
199+
Value curJoinColumnAValue = nextA.getAsValue(outerJoin.getPrefixA() + "." + joinColumnA);
200+
int cmp = ValueUtils.compare(curJoinColumnAValue, curJoinColumnBValue);
210201
if (cmp < 0) {
211202
unmatchedStreamARows.add(nextA);
212203
nextA = null;
@@ -255,13 +246,13 @@ private boolean hasMoreRows() throws PhysicalException {
255246
}
256247
while (sameValueStreamBRows.isEmpty() && nextB != null) {
257248
sameValueStreamBRows.add(nextB);
258-
curJoinColumnBValue = nextB.getValue(outerJoin.getPrefixB() + "." + joinColumnB);
249+
curJoinColumnBValue = nextB.getAsValue(outerJoin.getPrefixB() + "." + joinColumnB);
259250
nextB = null;
260251

261252
while (streamB.hasNext()) {
262253
nextB = streamB.next();
263-
Object joinColumnBValue = nextB.getValue(outerJoin.getPrefixB() + "." + joinColumnB);
264-
if (Objects.equals(joinColumnBValue, curJoinColumnBValue)) {
254+
Value joinColumnBValue = nextB.getAsValue(outerJoin.getPrefixB() + "." + joinColumnB);
255+
if (ValueUtils.compare(joinColumnBValue, curJoinColumnBValue) == 0) {
265256
sameValueStreamBRows.add(nextB);
266257
nextB = null;
267258
} else {

0 commit comments

Comments
 (0)