Skip to content

Commit 9855a9f

Browse files
committed
Stash
1 parent b35ecf9 commit 9855a9f

10 files changed

Lines changed: 368 additions & 44 deletions

File tree

client/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@
2525
<artifactId>commons-cli</artifactId>
2626
<version>1.4</version>
2727
</dependency>
28+
<dependency>
29+
<groupId>commons-io</groupId>
30+
<artifactId>commons-io</artifactId>
31+
<version>2.11.0</version>
32+
</dependency>
2833
<dependency>
2934
<groupId>org.jline</groupId>
3035
<artifactId>jline</artifactId>
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package cn.edu.tsinghua.iginx.client;
20+
21+
import cn.edu.tsinghua.iginx.exceptions.ExecutionException;
22+
import cn.edu.tsinghua.iginx.exceptions.SessionException;
23+
import cn.edu.tsinghua.iginx.session.CurveMatchResult;
24+
import cn.edu.tsinghua.iginx.session.Session;
25+
import cn.edu.tsinghua.iginx.session.SessionExecuteSqlResult;
26+
27+
import java.util.Arrays;
28+
import java.util.List;
29+
import java.util.stream.Collectors;
30+
31+
public class CurveMatchClient {
32+
33+
private static Session session;
34+
35+
public static void main(String[] args) throws SessionException, ExecutionException {
36+
session = new Session("127.0.0.1", 6888, "root", "root");
37+
// 打开 Session
38+
session.openSession();
39+
40+
// 曲线匹配
41+
curveMatch(args);
42+
43+
// 关闭 Session
44+
session.closeSession();
45+
}
46+
47+
private static void curveMatch(String[] args) throws ExecutionException, SessionException {
48+
if (args.length != 5) {
49+
System.out.println("参数个数必须为5!");
50+
return;
51+
}
52+
53+
List<String> paths = Arrays.stream(args[0].split(";")).collect(Collectors.toList());
54+
55+
long startTime = Long.parseLong(args[1]);
56+
long endTime = Long.parseLong(args[2]);
57+
58+
List<Double> queryList = Arrays.stream(args[3].split(",")).map(Double::parseDouble).collect(Collectors.toList());
59+
60+
long curveUnit = Long.parseLong(args[4]);
61+
62+
CurveMatchResult result = session.curveMatch(paths, startTime, endTime, queryList, curveUnit);
63+
64+
long matchedTimestamp = result.getMatchedTimestamp();
65+
String matchedPath = result.getMatchedPath();
66+
67+
String[] parts = matchedPath.split("\\.");
68+
StringBuilder sql = new StringBuilder();
69+
sql.append("select first_value(");
70+
sql.append(parts[parts.length - 1]);
71+
sql.append(") from ");
72+
sql.append(matchedPath, 0, matchedPath.lastIndexOf('.'));
73+
sql.append(" group (");
74+
sql.append(matchedTimestamp);
75+
sql.append(", ");
76+
sql.append(matchedTimestamp + curveUnit * queryList.size());
77+
sql.append(") by ");
78+
sql.append(curveUnit);
79+
sql.append("ms");
80+
81+
SessionExecuteSqlResult dataSet = session.executeSql(sql.toString());
82+
dataSet.print(false, "ms");
83+
}
84+
}
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package cn.edu.tsinghua.iginx.client;
20+
21+
import cn.edu.tsinghua.iginx.exceptions.IginxException;
22+
import cn.edu.tsinghua.iginx.exceptions.SessionException;
23+
import cn.edu.tsinghua.iginx.session.Session;
24+
import cn.edu.tsinghua.iginx.thrift.DataType;
25+
import org.apache.commons.io.FileUtils;
26+
import org.apache.commons.io.LineIterator;
27+
28+
import java.io.File;
29+
import java.io.IOException;
30+
import java.text.ParseException;
31+
import java.text.SimpleDateFormat;
32+
import java.util.ArrayList;
33+
import java.util.Arrays;
34+
import java.util.List;
35+
import java.util.stream.Collectors;
36+
37+
import static java.lang.Double.parseDouble;
38+
39+
public class InsertSingleFileClient {
40+
41+
private static Session session;
42+
43+
private static final String separator = "\\s+";
44+
45+
private static final ThreadLocal<SimpleDateFormat> format = new ThreadLocal<>();
46+
47+
// 存储组
48+
private static String storageGroupName;
49+
// 设备
50+
private static String deviceId;
51+
52+
// 飞机型号
53+
private static String planeType;
54+
// 架机号
55+
private static String planeId;
56+
// 任务单号/任务编号
57+
private static String taskId;
58+
// 试飞地点
59+
private static String place;
60+
// 试飞日期
61+
private static String date;
62+
// 试验类型
63+
private static String flightType;
64+
65+
// 采样率
66+
private static int frequency;
67+
68+
// 传感器
69+
private static List<String> measurements;
70+
71+
public static void main(String[] args) throws SessionException {
72+
session = new Session("127.0.0.1", 6888, "root", "root");
73+
// 打开 Session
74+
session.openSession();
75+
76+
// 写入单个文件
77+
insertSingleFile(args);
78+
79+
// 关闭 Session
80+
session.closeSession();
81+
}
82+
83+
private static void insertSingleFile(String[] args) {
84+
if (args.length != 1) {
85+
System.out.println("参数个数必须为1!");
86+
return;
87+
}
88+
89+
format.set(new SimpleDateFormat("yyyyMMddHH:mm:ss:SSS"));
90+
91+
String filename = args[0];
92+
extractInfo(filename);
93+
try {
94+
LineIterator it = FileUtils.lineIterator(new File(filename));
95+
96+
if(!it.hasNext()){
97+
System.out.println(filename + "是空文件!");
98+
it.close();
99+
return;
100+
}
101+
102+
String line = it.nextLine();
103+
extractMeasurements(line);
104+
int lineNum = 1;
105+
106+
int batchSize = Math.max(100, Math.min(10000, 512 * 1024 / measurements.size()));
107+
108+
int cnt = 0;
109+
int singleCnt = 0;
110+
long[] timestamps = new long[batchSize];
111+
Object[] valuesList = new Object[batchSize];
112+
113+
System.out.println("文件 " + filename + " 参数数量 " + measurements.size() + " 批处理大小 " + batchSize);
114+
115+
while (it.hasNext()) {
116+
line = it.nextLine();
117+
lineNum++;
118+
String[] contents = line.trim().split(separator);
119+
long timestamp;
120+
if (contents[0].length() == 12) {
121+
timestamp = format.get().parse(date + contents[0]).getTime() * 1000;
122+
} else if (contents[0].length() == 15) {
123+
timestamp = format.get().parse(date + contents[0].substring(0, 12)).getTime() * 1000 + Long.parseLong(contents[0].substring(12));
124+
} else {
125+
//logger.error("line {} 有非法的时间格式:{}!", lineNum, date + contents[0]);
126+
continue;
127+
}
128+
129+
//now check whether we need to read another line
130+
while(it.hasNext() && contents.length < measurements.size() + 1){
131+
// logger.error("Incomplete line: {}", line);
132+
// logger.error("Incomplete line with {} elements at line {}", contents.length, lineNum);
133+
line = it.nextLine();
134+
lineNum++;
135+
String[] tempContents = line.trim().split(separator);
136+
contents = Arrays.copyOf( contents, contents.length+tempContents.length );
137+
for(int i=0;i<tempContents.length;i++)
138+
contents[i+contents.length-tempContents.length] = tempContents[i];
139+
}
140+
//skip these illegal lines
141+
if(contents.length > measurements.size() + 1) {
142+
// logger.error("Skipping illegal line with {} elements: {}", contents.length, line);
143+
// logger.error("{} : =====================", lineNum);
144+
// logger.error("Skipping illegal line with {} elements at line {}", contents.length, lineNum);
145+
continue;
146+
}
147+
148+
timestamps[singleCnt] = timestamp;
149+
if (measurements.size() != contents.length - 1) {
150+
contents = line.trim().split("\t");
151+
if (measurements.size() != contents.length - 1) {
152+
// logger.error("文件{}存在某行数据点数目({})与参数数目({})不符。", filename, contents.length - 1, measurements.size());
153+
continue;
154+
}
155+
}
156+
Object[] tmpValues = new Object[measurements.size()];
157+
for (int i = 1; i < contents.length; i++) {
158+
tmpValues[i - 1] = (isValueValid(contents[i]) ? parseDouble(contents[i]) : 0);
159+
}
160+
valuesList[singleCnt] = tmpValues;
161+
162+
cnt++;
163+
singleCnt++;
164+
if (singleCnt % batchSize == 0) {
165+
insertBatch(singleCnt, deviceId, measurements, timestamps, valuesList);
166+
singleCnt = 0;
167+
}
168+
169+
if (cnt % 1_000_000 == 0) {
170+
System.out.println("文件 " + filename + " 已加载 " + cnt + " 行");
171+
}
172+
}
173+
if (singleCnt != 0) {
174+
insertBatch(singleCnt, deviceId, measurements, timestamps, valuesList);
175+
}
176+
System.out.println("文件 " + filename + " 内容 " + lineNum + " 行 " + cnt + " 共加载 {} 行。");
177+
it.close();
178+
} catch (IOException | ParseException e) {
179+
System.out.println(e.getMessage());
180+
}
181+
}
182+
183+
private static List<String> extractMeasurements(String line) {
184+
String myline = fixIllegalParameterChars(line);
185+
//process the list of measurements
186+
String[] tempMeasurements = myline.trim().split(separator);
187+
return new ArrayList<>(Arrays.asList(tempMeasurements).subList(1, tempMeasurements.length));
188+
}
189+
190+
private static String fixIllegalParameterChars(String line){
191+
String ret = line;
192+
ret = ret.replace("<", "小");
193+
ret = ret.replace(">", "大");
194+
ret = ret.replace("=", "等");
195+
ret = ret.replace(".", "点");
196+
ret = ret.replace("-", "一");
197+
return ret;
198+
}
199+
200+
private static void extractInfo(String filename) {
201+
String[] parts = filename.split("-");
202+
planeType = parts[1];
203+
planeId = parts[2];
204+
place = parts[3];
205+
date = "20" + parts[4];
206+
flightType = parts[5]+"-"+parts[6];
207+
flightType = flightType.replace('-', '一');
208+
209+
String taskIDPostfix = "";
210+
int pos4TaskId = parts.length - 2;
211+
try{
212+
frequency = Integer.parseInt(parts[parts.length - 1].substring(0, parts[parts.length - 1].indexOf(".")));
213+
}catch(NumberFormatException e){
214+
frequency = Integer.parseInt(parts[parts.length - 2]);
215+
taskIDPostfix = parts[parts.length - 1].substring(0, parts[parts.length - 1].indexOf("."));
216+
pos4TaskId = parts.length - 3;
217+
}
218+
219+
//the rest are in the taskID
220+
taskId="";
221+
for(int i=7;i<=pos4TaskId;i++)
222+
taskId = taskId+parts[i]+"-";
223+
taskId = taskIDPostfix.length()==0?taskId.substring( 0, taskId.length()-1 ):taskId+taskIDPostfix;
224+
//IoTDB does not support path with these characters
225+
taskId = taskId.replace('-', '一');
226+
taskId = taskId.replace('&', '二');
227+
228+
storageGroupName = "root." + planeType + "." + planeId + "." + taskId;
229+
deviceId = storageGroupName + "." + place + "." + date + "." + flightType;
230+
}
231+
232+
private static boolean isValueValid(String value) {
233+
try {
234+
Double.parseDouble(value);
235+
} catch (NumberFormatException e) {
236+
return false;
237+
}
238+
return true;
239+
}
240+
241+
private static void insertBatch(int size, String deviceId, List<String> measurements, long[] timestamps, Object[] values) {
242+
List<String> pathList = measurements.stream().map(str-> deviceId+"." +str).collect( Collectors.toList() );
243+
244+
List<DataType> datatypeList = new ArrayList<>();
245+
for (int i = 0; i < measurements.size(); i++) {
246+
datatypeList.add(DataType.DOUBLE);
247+
}
248+
249+
timestamps = Arrays.copyOfRange(timestamps, 0, size);
250+
values = Arrays.copyOfRange(values, 0, size);
251+
252+
try {
253+
session.insertRowRecords(pathList, timestamps, values, datatypeList, null);
254+
} catch (IginxException e) {
255+
System.out.println(e.getMessage());
256+
}
257+
}
258+
}

core/src/main/java/cn/edu/tsinghua/iginx/rest/query/QueryParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,7 @@ public Map<String, String> getTagsFromPaths(String path, StringBuilder name) {
626626
int lastBrace = path.indexOf("}");
627627
if(firstBrace==-1 || lastBrace==-1) {
628628
name.append(path);
629-
return null;
629+
return ret;
630630
}
631631
name.append(path.substring(0, firstBrace));
632632
String tagLists = path.substring(firstBrace+1, lastBrace);

core/src/main/java/cn/edu/tsinghua/iginx/transform/data/LogWriter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ public class LogWriter extends ExportWriter {
1414
@Override
1515
public void write(BatchData batchData) {
1616
Header header = batchData.getHeader();
17-
logger.info(header.toString());
17+
// logger.info(header.toString());
1818

1919
List<Row> rowList = batchData.getRowList();
20-
rowList.forEach(row -> {
21-
logger.info(row.toCSVTypeString());
22-
});
20+
// rowList.forEach(row -> {
21+
// logger.info(row.toCSVTypeString());
22+
// });
2323
}
2424
}

dataSources/iotdb11/src/main/java/cn/edu/tsinghua/iginx/iotdb/tools/DataViewWrapper.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ public String getPath(int index) {
5151
}
5252
String path = dataView.getPath(index);
5353
Map<String, String> tags = dataView.getTags(index);
54-
path += '.';
55-
path += TagKVUtils.tagPrefix;
5654
if (tags != null && !tags.isEmpty()) {
5755
TreeMap<String, String> sortedTags = new TreeMap<>(tags);
5856
StringBuilder pathBuilder = new StringBuilder();
@@ -61,8 +59,6 @@ public String getPath(int index) {
6159
});
6260
path += pathBuilder.toString();
6361
}
64-
path += '.';
65-
path += TagKVUtils.tagSuffix;
6662
pathCache.put(index, path);
6763
return path;
6864
}

0 commit comments

Comments
 (0)