新聞中心
01 問題
正文本篇我們要解決 ??No?6,?No7提到的網(wǎng)友問題,如下:

?簡單說就是如何處理兩條時間線的數(shù)值計算?上面例子是一個 “+” 加法。
02 數(shù)據(jù)準備
我們首先利用InfluxDB解決上述問題,首先進行數(shù)據(jù)準備,建立一個測試的bucket,建立之前先檢查一下現(xiàn)有的bucket。
啟動InfluxDB實例,如下:
啟動之后,我們查看一下現(xiàn)有的bucket,如下:
influxdb git:(master) bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx bucket list
ID Name Retention Shard group duration Organization ID
98e86f05543f5866 _monitoring 168h0m0s 24h0m0s 56b35f89025991c8
b9b9609ae3e08b97 _tasks 72h0m0s 24h0m0s 56b35f89025991c8
創(chuàng)建名為iot的bucket,如下命令:
bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx setup \
--username iot \
--password 2021iotdb \
--org org \
--bucket 2021iotdb \
--retention 1h \
--token iot_test_token \
--host http://localhost:8086 \
--force
執(zhí)行成功之后會顯示如下:
influxdb git:(master) bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx setup \
--username iot \
--password 2021iotdb \
--org org \
--bucket 2021iotdb \
--retention 1h \
--token iot_test_token \
--host http://localhost:8086 \
--force
Config default has been stored in /Users/jincheng/.influxdbv2/configs.
User Organization Bucket
iot org 2021iotdb
我們用命令查看一下:
influxdb git:(master) bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx bucket list
ID Name Retention Shard group duration Organization ID
c05283f56bf9cead 2021iotdb 1h0m0s 1h0m0s 0b1ad4c0cd4db9ca
e70f5bb2fdaa5dd2 _monitoring 168h0m0s 24h0m0s 0b1ad4c0cd4db9ca
56241b01789c1a1b _tasks 72h0m0s 24h0m0s 0b1ad4c0cd4db9ca
插入兩條時間線數(shù)據(jù),如下:
influxdb git:(master) bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx write --bucket 2021iotdb --precision s "m1 vm=3333 $(date +%s)"
influxdb git:(master) bin/$(uname -s | tr '[:upper:]' '[:lower:]')/influx write --bucket 2021iotdb --precision s "m2 vn=4444 $(date +%s)"
我們插入兩條時間線數(shù)據(jù),m1的vm=3333,m2的vn=4444,我們的需求是vm + vn。
03 JOIN查詢
我們看一下JOIN的功能定義:
The join() function merges two or more input streams, whose values are equal on a set of common columns, into a single output stream. Flux allows you to join on any columns common between two data streams and opens the door for operations such as cross-measurement joins and math across measurements.
語法:join(tables: {key1: table1, key2: table2}, on: ["_time", "_field"], method: "inner")
這個和我們標準數(shù)據(jù)庫的JOIN語義基本一致,我們先查看一下用于測試的數(shù)據(jù),我們既可以用influxCLI,如下:
我們發(fā)現(xiàn)數(shù)據(jù)已經(jīng)插入成功。也可以用fluxCLI,InlfuxDB社區(qū)更推進用flux,我們打開一個flux repl。細節(jié)可以查閱 前面一篇No6。我用IDE打開如下:
> from(bucket:"2021iotdb") |> range(start:-1h)
Result: _result
Error: unauthorized access
如圖,我們在IDE里面執(zhí)行查詢時候,提示我們需要token,那么influx query為啥不需要呢,IDE沒有默認去讀取配置文件,我們可以配置環(huán)境變量也可以直接添加token,查詢語句如下:
> from(bucket:"2021iotdb", org:"org", token:"iot_test_token") |> range(start:-1h)
Result: _result
Table: keys: [_start, _stop, _field, _measurement]
_start:time _stop:time _field:string _measurement:string _time:time _value:float
------------------------------ ------------------------------ ---------------------- ---------------------- ------------------------------ ----------------------------
2021-04-06T05:36:50.079542000Z 2021-04-06T06:36:50.079542000Z vm m1 2021-04-06T06:23:16.000000000Z 3333
Table: keys: [_start, _stop, _field, _measurement]
_start:time _stop:time _field:string _measurement:string _time:time _value:float
------------------------------ ------------------------------ ---------------------- ---------------------- ------------------------------ ----------------------------
2021-04-06T05:36:50.079542000Z 2021-04-06T06:36:50.079542000Z
好的,一切都還算順利,我們看看如果計算 vm + vn呢?如果我們把 m1和m2兩個時間序列看成是兩個流(表),那么我們要進行兩個表的操作,第一想到的應(yīng)該是兩個表進行JOIN將兩個表的數(shù)據(jù)合并成一個寬表,然后在進行列求值,如下:
tab1 = from(bucket:"2021iotdb", org:"org", token:"iot_test_token")
|> range(start:-1h)
|> filter(fn:(r) => r._measurement == "m1")
tab2 = from(bucket:"2021iotdb", org:"org", token:"iot_test_token")
|> range(start:-1h)
|> filter(fn:(r) => r._measurement == "m2")
得到兩個表之后我們在進行JOIN操作,查詢語句如下:
join(tables: {m1:tab1, m2:tab2},
on: ["_time"]
) |> map(fn:(r) => ({_time: r._time,
_value: r._value_m1 + r._value_m2
}))上面的on表示JOIN的條件,但是我們發(fā)現(xiàn),tab1和tab2中時間字段并不相同,如下:
所以我們需要再快速的插入兩條數(shù)據(jù),使得時間字段相同,我們才能拿到結(jié)果,插入之后數(shù)據(jù)如下:
這樣我們再進行查詢:
join(tables: {m1:tab1, m2:tab2},
on: ["_time"]
) |> map(fn:(r) => ({_time: r._time,
_value: r._value_m1 + r._value_m2
}))如上我們完成了查詢需求。哈哈,那是不是在InfluxDB里面進行這類查詢都是用JOIN的方式嗎?是否有更簡單的方式?看下面部分:)
03 PIOVT查詢
我們看一下PIVOT的功能定義:
The pivot() function collects values stored vertically (column-wise) in a table and aligns them horizontally (row-wise) into logical sets.
語法:pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
其實在標準數(shù)據(jù)庫里面也有PIVOT,在InfluxDB里面pivot可以將行轉(zhuǎn)換為列,進而將兩個時序數(shù)據(jù)值變成一個Table中的兩個列,這個內(nèi)置也可以為用戶進行內(nèi)部優(yōu)化處理。我們看看如何操作:
> from(bucket:"2021iotdb", org:"org", token:"iot_test_token")
|> range(start:-1h)
|> pivot(rowKey:["_time"], columnKey: ["_measurement","_field"],valueColumn: "_value")
如上語句執(zhí)行結(jié)果如下:
我們發(fā)現(xiàn)m1的vm和m2的vn都變成一個表的某一列了,這樣pivot就完美的將兩個時序數(shù)據(jù)合并成寬表的列了。我們再加上具體的過濾條件,如下:
接下來我們再進行計算,如下:
from(bucket:"2021iotdb", org:"org", token:"iot_test_token")
|> range(start:-1h)
|> filter(fn:(r) => r._measurement == "m1" or r._measurement == "m2")
|> pivot(rowKey:["_time"], columnKey: ["_measurement","_field"],valueColumn: "_value")
|> map(fn:(r) => ({_time: r._time, _value:r.m1_vm + r.m2_vn}))
OK, 大家是不是趕緊PIVOT非常方便?:)
04 問題
最后,留個問題給大家,大家知道標準數(shù)據(jù)庫里面PIVOT和UNPIVOT的使用場景嗎?或者Flink&Spark如何支持PIVOT?或者知道Apache IoTDB里面如何處理多條時序數(shù)據(jù)分析梳理嗎?我們下一篇見。
作者介紹
孫金城,社區(qū)編輯,Apache Flink PMC 成員,Apache Beam Committer,Apache IoTDB PMC 成員,ALC Beijing 成員,Apache ShenYu 導(dǎo)師,Apache 軟件基金會成員。關(guān)注技術(shù)領(lǐng)域流計算和時序數(shù)據(jù)存儲。
分享文章:No.8-時序數(shù)據(jù)庫隨筆-InfluxDB多條時序數(shù)據(jù)聯(lián)合分析
文章網(wǎng)址:http://www.5511xx.com/article/cdhodjd.html


咨詢
建站咨詢
