新聞中心
01 問題
本篇我們一起來看看Apache IoTDB如何解決要解決??No8???提到的網(wǎng)友問題,如下:

網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)!專注于網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、微信小程序、集團企業(yè)網(wǎng)站建設(shè)等服務(wù)項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了蕪湖免費建站歡迎大家使用!
?簡單說就是如何處理兩條時間線的數(shù)值計算?上面例子是一個 “+” 加法。
02 數(shù)據(jù)準(zhǔn)備
要處理多條時序數(shù)據(jù)分析問題,我們首先先建立兩條時間序列,我們做一下數(shù)據(jù)準(zhǔn)備:
- 啟動IoTDB服務(wù)實例
我們首先源碼編譯Master最新代碼(),并啟動服務(wù)實例。如下
iotdb git:(master) mvn clean install -DskipTests
...
...
[INFO] -------------------------------------------
[INFO] BUILD SUCCESS
[INFO] -------------------------------------------
[INFO] Total time: 02:41 min
[INFO] Finished at: 2021-04-07T07:13:32+08:00
[INFO] -------------------------------------------
iotdb git:(master) cd distribution/target/apache-iotdb-0.12.0-SNAPSHOT-server-bin/apache-iotdb-0.12.0-SNAPSHOT-server-bin/
apache-iotdb-0.12.0-SNAPSHOT-server-bin git:(master) sbin/start-server.sh
---------------------
Starting IoTDB
...
...
2021-04-07 07:33:50,636 [main] INFO o.a.i.db.service.IoTDB:93 - IoTDB has started.
- 啟動Cli
iotdb git:(master) cd distribution/target/apache-iotdb-0.12.0-SNAPSHOT-server-bin/apache-iotdb-0.12.0-SNAPSHOT-server-bin/
apache-iotdb-0.12.0-SNAPSHOT-server-bin git:(master) sbin/start-cli.sh -h 127.0.0.1 -p 6667 -u root -pw root
- 創(chuàng)建存儲組
IoTDB> set storage group to root.lemming
- 創(chuàng)建時間線
IoTDB> create timeseries root.lemming.device1.m1 with datatype=FLOAT,encoding=RLE
IoTDB> create timeseries root.lemming.device1.m2 with datatype=FLOAT,encoding=RLE
...
IoTDB> show timeseries;
- 插入時間序列數(shù)據(jù)
IoTDB> insert into root.lemming.device1(timestamp,m1, m2) values(1,3333,4444)
IoTDB> select * from root.lemming.device1;
看到這里,不知道大家是否發(fā)現(xiàn),IoTDB雖然在存儲層面是按時間序列進行列存儲的,但是在API層面已經(jīng)為大家抽象好來二維表結(jié)構(gòu),大家可以把多個時間序列當(dāng)成一張二維度表的不同列,這個抽象大家就免去來入在InfluxDB中利用PIVOT進行行轉(zhuǎn)列的需求,這樣就更方便理解和對多時序數(shù)據(jù)的進行查詢分析。好,那么下面我們看看如果實現(xiàn) m1 + m2 這個需求。
03 代碼數(shù)據(jù)準(zhǔn)備
我們一個一個敲命令不如咱直接寫個Java代碼實例,方便反復(fù)測試,測試本篇內(nèi)容,我們只需要依賴iotdb-jdbc模塊,pom如下:
org.apache.iotdb
iotdb-jdbc
${iotdb.version}
創(chuàng)建存儲組,時間序列和準(zhǔn)備數(shù)據(jù)的測試代碼如下:
public class No9JDBCExample {
/**
* 第一次運行 init(),如果一切順利,正面環(huán)境和數(shù)據(jù)準(zhǔn)備完成
*/
public static void main(String[] args) throws Exception {
Class.forName("org.apache.iotdb.jdbc.IoTDBDriver");
try (Connection connection =
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
init(statement);
} catch (IoTDBSQLException e) {
e.printStackTrace();
}
}
public static void init(Statement statement) throws Exception {
setStorageGroup(statement);
statement.execute(
"CREATE TIMESERIES root.lemming.device1.m1 with datatype=FLOAT,encoding=RLE");
statement.execute(
"CREATE TIMESERIES root.lemming.device1.m2 with datatype=FLOAT,encoding=RLE");
statement.execute("INSERT INTO root.lemming.device1(timestamp,m1, m2) VALUES (1,3333,4444)");
ResultSet resultSet = statement.executeQuery("SELECT timestamp, m1 as m1, m2 as m2 FROM root.lemming.device1");
outputResult(resultSet);
}
public static void setStorageGroup(Statement statement) throws Exception{
try {
statement.execute("SET STORAGE GROUP TO root.lemming");
}catch (Exception e){
statement.execute("DELETE STORAGE GROUP root.lemming");
statement.execute("SET STORAGE GROUP TO root.lemming");
}
}
private static void outputResult(ResultSet resultSet) throws SQLException {
if (resultSet != null) {
System.out.println("--------------------------");
final ResultSetMetaData metaData = resultSet.getMetaData();
final int columnCount = metaData.getColumnCount();
for (int i = 0; i < columnCount; i++) {
System.out.print(metaData.getColumnLabel(i + 1) + " ");
}
System.out.println();
while (resultSet.next()) {
for (int i = 1; ; i++) {
System.out.print(resultSet.getString(i));
if (i < columnCount) {
System.out.print(", ");
} else {
System.out.println();
break;
}
}
}
System.out.println("--------------------------\n");
}
}
}如果你運行得到如下結(jié)果,正面一切準(zhǔn)備工作就緒:
04 UDF解決加法
我們接下來看看Apache IoTDB如何解決加法問題,目前IoTDB提供來自定義UDF的方式解決用戶自定義的操作。如果要使用UDF我們需要增加依賴如下:
org.apache.iotdb
iotdb-server
0.12.0-SNAPSHOT
provided
完整配置查閱:https://github.com/sunjincheng121/know_how_know_why/blob/master/khkw_iotdb/No9udf/pom.xml
添加依賴之后,我們可以實現(xiàn)如下UDTF(User Defined Timeseries Generating Function), 這里寫一下全稱,不然大家會以為是User defined Table Function。當(dāng)然不可否則,UDF的接口我們還可以有改進的空間,我們就目前現(xiàn)狀與大家簡單說明,如下:
public interface UDTF extends UDF {
/**
* This method is mainly used to customize UDTF. In this method, the user can do the following
* things:
*
* - Use UDFParameters to get the time series paths and parse key-value pair attributes
* entered by the user.
* - Set the strategy to access the original data and set the output data type in
* UDTFConfigurations.
* - Create resources, such as establishing external connections, opening files, etc.
*
*
*
*
This method is called after the UDTF is instantiated and before the beginning of the
* transformation process.
*
* @param parameters used to parse the input parameters entered by the user
* @param configurations used to set the required properties in the UDTF
* @throws Exception the user can throw errors if necessary
*/
@SuppressWarnings("squid:S112")
void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) throws Exception;
/**
* When the user specifies {@link RowByRowAccessStrategy} to access the original data in {@link
* UDTFConfigurations}, this method will be called to process the transformation. In a single UDF
* query, this method may be called multiple times.
*
* @param row original input data row (aligned by time)
* @param collector used to collect output data points
* @throws Exception the user can throw errors if necessary
* @see RowByRowAccessStrategy
*/
@SuppressWarnings("squid:S112")
default void transform(Row row, PointCollector collector) throws Exception {}
/**
* When the user specifies {@link SlidingSizeWindowAccessStrategy} or {@link
* SlidingTimeWindowAccessStrategy} to access the original data in {@link UDTFConfigurations},
* this method will be called to process the transformation. In a single UDF query, this method
* may be called multiple times.
*
* @param rowWindow original input data window (rows inside the window are aligned by time)
* @param collector used to collect output data points
* @throws Exception the user can throw errors if necessary
* @see SlidingSizeWindowAccessStrategy
* @see SlidingTimeWindowAccessStrategy
*/
@SuppressWarnings("squid:S112")
default void transform(RowWindow rowWindow, PointCollector collector) throws Exception {}
/**
* This method will be called once after all {@link UDTF#transform(Row, PointCollector) calls or
* {@link UDTF#transform(RowWindow, PointCollector) calls have been executed. In a single UDF
* query, this method will and will only be called once.
*
* @param collector used to collect output data points
* @throws Exception the user can throw errors if necessary
*/
@SuppressWarnings("squid:S112")
default void terminate(PointCollector collector) throws Exception {}
}
其中最核心的是transform方法,對transform的實現(xiàn)邏輯就是我們業(yè)務(wù)的需求邏輯,面對我們要實現(xiàn) 加法 的邏輯,我們的核心邏輯就是transform實現(xiàn),如下:
public class AddFunc implements UDTF {
...
...
@Override
public void transform(Row row, PointCollector collector) throws Exception {
if (row.isNull(0) || row.isNull(1)) {
return;
}
collector.putLong(
row.getTime(), (long) (extractDoubleValue(row, 0) + extractDoubleValue(row, 1) + addend));
}
private double extractDoubleValue(Row row, int index) {
double value;
switch (row.getDataType(index)) {
case INT32:
value = row.getInt(index);
break;
case INT64:
value = (double) row.getLong(index);
break;
case FLOAT:
value = row.getFloat(index);
break;
case DOUBLE:
value = row.getDouble(index);
break;
default:
throw new UnSupportedDataTypeException(row.getDataType(index).toString());
}
return value;
}
}完整代碼查閱:https://github.com/sunjincheng121/know_how_know_why/blob/master/khkw_iotdb/No9udf/src/main/java/org/khkw/iotdb/no9/AddFunc.java
當(dāng)然開發(fā)完UDF,你還需要編寫查詢邏輯,如下:
public static void udf(Statement statement) {
try{
statement.execute("CREATE FUNCTION plus AS \"org.khkw.iotdb.no9.AddFunc\"");
statement.execute("SHOW FUNCTIONS");
ResultSet resultSet = statement.executeQuery("SELECT timestamp, plus(m1, m2) FROM root.lemming.device1");
outputResult(resultSet);
}catch (Exception e) {
e.printStackTrace();
}
}如上代碼我們注冊了名為 plus的UDF,然后plus(m1, m2)的方式實現(xiàn)了加法需求。當(dāng)然,我們還不能直接運行udf方法,直接運行會報出找不到rg.khkw.iotdb.no9.AddFunc類,如下:
這是因為IoTDB配置udf的JAR加載需要指定JAR所在目錄,我們利用 mvn clean package 生成JAR,然后再修改一下iotdb-engine.properties配置文件,如下:
我配置成為自己項目的jar所在路徑了,如下:
配置完成之后,重新啟動IoTDB服務(wù)實例。啟動日志你會發(fā)現(xiàn)對UDF的加載:
apache-iotdb-0.12.0-SNAPSHOT-server-bin git:(master) sbin/start-server.sh
。。。
。。。
2021-04-07 08:17:53,398 [main] INFO o.a.i.d.q.u.s.UDFClassLoaderManager:56 - UDF lib root: /Users/jincheng/work/know_how_know_why/khkw_iotdb/No9udf/target
。。。
2021-04-07 12:34:11,622 [main] INFO o.a.i.d.s.t.ThriftService:125 - IoTDB: start RPC ServerService successfully, listening on ip 0.0.0.0 port 6667
確保我們自定義的UDF已經(jīng)被加載之后,我們運行udf,如下:
好,到目前我們利用UDF解決了加法需求,那么UDF是否可以在CLI里面使用呢?
05 CLI使用UDF
當(dāng)然,我們可以在CLI里面使用UDF,我們啟動一個CLI客戶端,并用命令 SHOW FUNCTIONS 顯示已有的FUNCTIONS,如下:
發(fā)現(xiàn)我們自定義的UDF已經(jīng)在剛才運行Java程序時候注冊了,我們可以刪除讓后在CLI在操作一遍(演示作用),如下:
IoTDB> CREATE FUNCTION plus AS "org.khkw.iotdb.no9.AddFunc"
Msg: 411: Failed to register non-TEMPORARY UDF PLUS(org.khkw.iotdb.no9.AddFunc), because a non-TEMPORARY UDF PLUS(org.khkw.iotdb.no9.AddFunc) with the same function name and the class name has already been registered.
IoTDB> DROP FUNCTION plus
Msg: The statement is executed successfully.
IoTDB> CREATE FUNCTION plus AS "org.khkw.iotdb.no9.AddFunc"
Msg: The statement is executed successfully.
IoTDB> SELECT timestamp, plus(m1, m2) FROM root.lemming.device1;
+-----------------------------+------------------------------------------------------+
| Time|plus(root.lemming.device1.m1, root.lemming.device1.m2)|
+-----------------------------+------------------------------------------------------+
|1970-01-01T08:00:00.001+08:00| 7777|
+--------------------http://www.5511xx.com/article/dhdspph.html


咨詢
建站咨詢
