新聞中心
作者:vivo互聯(lián)網(wǎng)服務(wù)器團(tuán)隊(duì)-Hao Guangshi

創(chuàng)新互聯(lián)公司是一家集網(wǎng)站建設(shè),清水企業(yè)網(wǎng)站建設(shè),清水品牌網(wǎng)站建設(shè),網(wǎng)站定制,清水網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營(yíng)銷(xiāo),網(wǎng)絡(luò)優(yōu)化,清水網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競(jìng)爭(zhēng)力??沙浞譂M(mǎn)足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專(zhuān)業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶(hù)成長(zhǎng)自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。
一、背景
字段血緣是在表處理的過(guò)程中將字段的處理過(guò)程保留下來(lái)。為什么會(huì)需要字段血緣呢?
有了字段間的血緣關(guān)系,便可以知道數(shù)據(jù)的來(lái)源去處,以及字段之間的轉(zhuǎn)換關(guān)系,這樣對(duì)數(shù)據(jù)的質(zhì)量,治理有很大的幫助。
Spark SQL 相對(duì)于 Hive 來(lái)說(shuō)通常情況下效率會(huì)比較高,對(duì)于運(yùn)行時(shí)間、資源的使用上面等都會(huì)有較大的收益。
平臺(tái)計(jì)劃將 Hive 任務(wù)遷移到 Spark SQL 上,同時(shí)也需要實(shí)現(xiàn)字段血緣的功能。
二、前期調(diào)研
開(kāi)發(fā)前我們做了很多相關(guān)調(diào)研,從中得知 Spark 是支持?jǐn)U展的:允許用戶(hù)對(duì) Spark SQL 的 SQL 解析、邏輯計(jì)劃的分析和檢查、邏輯計(jì)劃的優(yōu)化、物理計(jì)劃的形成等進(jìn)行擴(kuò)展。
該方案可行,且對(duì) Spark 的源碼沒(méi)有改動(dòng),代價(jià)也比較小,確定使用該方案。
三、Spark SQL 擴(kuò)展
3.1 Spark 可擴(kuò)展的內(nèi)容
SparkSessionExtensions是比較重要的一個(gè)類(lèi),其中定義了注入規(guī)則的方法,現(xiàn)在支持以下內(nèi)容:
- 【Analyzer Rules】邏輯計(jì)劃分析規(guī)則
- 【Check Analysis Rules】邏輯計(jì)劃?rùn)z查規(guī)則
- 【Optimizer Rules.】 邏輯計(jì)劃優(yōu)化規(guī)則
- 【Planning Strategies】形成物理計(jì)劃的策略
- 【Customized Parser】自定義的sql解析器
- 【(External) Catalog listeners catalog】監(jiān)聽(tīng)器
在以上六種可以用戶(hù)自定義的地方,我們選擇了【Check Analysis Rules】。因?yàn)樵摍z查規(guī)則在方法調(diào)用的時(shí)候是不需要有返回值的,也就意味著不需要對(duì)當(dāng)前遍歷的邏輯計(jì)劃樹(shù)進(jìn)行修改,這正是我們需要的。
而【Analyzer Rules】、【Optimizer Rules】則需要對(duì)當(dāng)前的邏輯計(jì)劃進(jìn)行修改,使得我們難以迭代整個(gè)樹(shù),難以得到我們想要的結(jié)果。
3.2 實(shí)現(xiàn)自己的擴(kuò)展
class ExtralSparkExtension extends (SparkSessionExtensions => Unit) {
override def apply(spark: SparkSessionExtensions): Unit = {
//字段血緣
spark.injectCheckRule(FieldLineageCheckRuleV3)
//sql解析器
spark.injectParser { case (_, parser) => new ExtraSparkParser(parser) }
}
}上面按照這種方式實(shí)現(xiàn)擴(kuò)展,并在 apply 方法中把自己需要的規(guī)則注入到 SparkSessionExtensions 即可,除了以上四種可以注入的以外還有其他的規(guī)則。要讓 ExtralSparkExtension 起到作用的話(huà)我們需要在spark-default.conf下配置
spark.sql.extensions=org.apache.spark.sql.hive.ExtralSparkExtension
在啟動(dòng) Spark 任務(wù)的時(shí)候即可生效。
注意到我們也實(shí)現(xiàn)了一個(gè)自定義的SQL解析器,其實(shí)該解析器并沒(méi)有做太多的事情。只是在判斷如果該語(yǔ)句包含insert的時(shí)候就將 SQLText(SQL語(yǔ)句)設(shè)置到一個(gè)為FIELD_LINE_AGE_SQL,之所以將SQLText放到FIELD_LINE_AGE_SQL里面。因?yàn)樵?DheckRule 里面是拿不到SparkPlan的我們需要對(duì)SQL再次解析拿到 SprkPlan,而FieldLineageCheckRuleV3的實(shí)現(xiàn)也特別簡(jiǎn)單,重要的在另一個(gè)線(xiàn)程實(shí)現(xiàn)里面。
這里我們只關(guān)注了insert語(yǔ)句,因?yàn)椴迦胝Z(yǔ)句里面有從某些個(gè)表里面輸入然后寫(xiě)入到某個(gè)表。
class ExtraSparkParser(delegate: ParserInterface) extends ParserInterface with Logging{
override def parsePlan(sqlText: String): LogicalPlan = {
val lineAgeEnabled = SparkSession.getActiveSession
.get.conf.getOption("spark.sql.xxx-xxx-xxx.enable").getOrElse("false").toBoolean
logDebug(s"SqlText: $sqlText")
if(sqlText.toLowerCase().contains("insert")){
if(lineAgeEnabled){
if(FIELD_LINE_AGE_SQL_COULD_SET.get()){
//線(xiàn)程本地變量在這里
FIELD_LINE_AGE_SQL.set(sqlText)
}
FIELD_LINE_AGE_SQL_COULD_SET.remove()
}
}
delegate.parsePlan(sqlText)
}
//調(diào)用原始的sqlparser
override def parseExpression(sqlText: String): Expression = {
delegate.parseExpression(sqlText)
}
//調(diào)用原始的sqlparser
override def parseTableIdentifier(sqlText: String): TableIdentifier = {
delegate.parseTableIdentifier(sqlText)
}
//調(diào)用原始的sqlparser
override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
delegate.parseFunctionIdentifier(sqlText)
}
//調(diào)用原始的sqlparser
override def parseTableSchema(sqlText: String): StructType = {
delegate.parseTableSchema(sqlText)
}
//調(diào)用原始的sqlparser
override def parseDataType(sqlText: String): DataType = {
delegate.parseDataType(sqlText)
}
}3.3 擴(kuò)展的規(guī)則類(lèi)
case class FieldLineageCheckRuleV3(sparkSession:SparkSession) extends (LogicalPlan=>Unit ) {
val executor: ThreadPoolExecutor =
ThreadUtils.newDaemonCachedThreadPool("spark-field-line-age-collector",3,6)
override def apply(plan: LogicalPlan): Unit = {
val sql = FIELD_LINE_AGE_SQL.get
FIELD_LINE_AGE_SQL.remove()
if(sql != null){
//這里我們拿到sql然后啟動(dòng)一個(gè)線(xiàn)程做剩余的解析任務(wù)
val task = new FieldLineageRunnableV3(sparkSession,sql)
executor.execute(task)
}
}
}很簡(jiǎn)單,我們只是拿到了 SQL 然后便啟動(dòng)了一個(gè)線(xiàn)程去得到 SparkPlan,實(shí)際邏輯在
FieldLineageRunnableV3。
3.4 具體的實(shí)現(xiàn)方法
3.4.1 得到 SparkPlan
我們?cè)?run 方法中得到 SparkPlan:
override def run(): Unit = {
val parser = sparkSession.sessionState.sqlParser
val analyzer = sparkSession.sessionState.analyzer
val optimizer = sparkSession.sessionState.optimizer
val planner = sparkSession.sessionState.planner
............
val newPlan = parser.parsePlan(sql)
PASS_TABLE_AUTH.set(true)
val analyzedPlan = analyzer.executeAndCheck(newPlan)
val optimizerPlan = optimizer.execute(analyzedPlan)
//得到sparkPlan
val sparkPlan = planner.plan(optimizerPlan).next()
...............
if(targetTable != null){
val levelProject = new ArrayBuffer[ArrayBuffer[NameExpressionHolder]]()
val predicates = new ArrayBuffer[(String,ArrayBuffer[NameExpressionHolder])]()
//projection
projectionLineAge(levelProject, sparkPlan.child)
//predication
predicationLineAge(predicates, sparkPlan.child)
...............
為什么要使用 SparkPlan 呢?當(dāng)初我們考慮的時(shí)候,物理計(jì)劃拿取字段關(guān)系的時(shí)候是比較準(zhǔn)的,且鏈路比較短也更直接。
在這里補(bǔ)充一下 Spark SQL 解析的過(guò)程如下:
經(jīng)過(guò)SqlParser后會(huì)得到邏輯計(jì)劃,此時(shí)表名、函數(shù)等都沒(méi)有解析,還不能執(zhí)行;經(jīng)過(guò)Analyzer會(huì)分析一些綁定信息,例如表驗(yàn)證、字段信息、函數(shù)信息;經(jīng)過(guò)Optimizer 后邏輯計(jì)劃會(huì)根據(jù)既定規(guī)則被優(yōu)化,這里的規(guī)則是RBO,當(dāng)然 Spark 還支持CBO的優(yōu)化;經(jīng)過(guò)SparkPlanner后就成了可執(zhí)行的物理計(jì)劃。
我們看一個(gè)邏輯計(jì)劃與物理計(jì)劃對(duì)比的例子:
一個(gè) SQL 語(yǔ)句:
select item_id,TYPE,v_value,imei from t1
union all
select item_id,TYPE,v_value,imei from t2
union all
select item_id,TYPE,v_value,imei from t3
邏輯計(jì)劃是這樣的:
物理計(jì)劃是這樣的:
顯然簡(jiǎn)化了很多。
得到 SparkPlan 后,我們就可以根據(jù)不同的SparkPlan節(jié)點(diǎn)做迭代處理。
我們將字段血緣分為兩種類(lèi)型:projection(select查詢(xún)字段)、predication(wehre查詢(xún)條件)。
這兩種是一種點(diǎn)對(duì)點(diǎn)的關(guān)系,即從原始表的字段生成目標(biāo)表的字段的對(duì)應(yīng)關(guān)系。
想象一個(gè)查詢(xún)是一棵樹(shù),那么迭代關(guān)系會(huì)如下從樹(shù)的頂端開(kāi)始迭代,直到樹(shù)的葉子節(jié)點(diǎn),葉子節(jié)點(diǎn)即為原始表:
那么我們迭代查詢(xún)的結(jié)果應(yīng)該為
id ->tab1.id ,
name->tab1.name,tabb2.name,
age→tabb2.age。
注意到有該變量
val levelProject = new ArrayBuffer
[ArrayBuffer[NameExpressionHolder]](),通過(guò)projecti-onLineAge 迭代后 levelProject
存儲(chǔ)了頂層id,name,age對(duì)應(yīng)的(tab1.id),(tab1.name,tabb2.name),(tabb2.age)。
當(dāng)然也不是簡(jiǎn)單的遞歸迭代,還需要考慮特殊情況例如:Join、ExplandExec、Aggregate、Explode、GenerateExec等都需要特殊考慮。
例子及效果:
SQL:
with A as (select id,name,age from tab1 where id > 100 ) ,
C as (select id,name,max(age) from A group by A.id,A.name) ,
B as (select id,name,age from tabb2 where age > 28)
insert into tab3
select C.id,concat(C.name,B.name) as name, B.age from
B,C where C.id = B.id
效果:
{
"edges": [
{
"sources": [
3
],
"targets": [
0
],
"expression": "id",
"edgeType": "PROJECTION"
},
{
"sources": [
4,
7
],
"targets": [
1
],
"expression": "name",
"edgeType": "PROJECTION"
},
{
"sources": [
5
],
"targets": [
2
],
"expression": "age",
"edgeType": "PROJECTION"
},
{
"sources": [
6,
3
],
"targets": [
0,
1,
2
],
"expression": "INNER",
"edgeType": "PREDICATE"
},
{
"sources": [
6,
5
],
"targets": [
0,
1,
2
],
"expression": "((((default.tabb2.`age` IS NOT NULL) AND (CAST(default.tabb2.`age` AS INT) > 28)) AND (B.`id` > 100)) AND (B.`id` IS NOT NULL))",
"edgeType": "PREDICATE"
},
{
"sources": [
3
],
"targets": [
0,
1,
2
],
"expression": "((default.tab1.`id` IS NOT NULL) AND (default.tab1.`id` > 100))",
"edgeType": "PREDICATE"
}
],
"vertices": [
{
"id": 0,
"vertexType": "COLUMN",
"vertexId": "default.tab3.id"
},
{
"id": 1,
"vertexType": "COLUMN",
"vertexId": "default.tab3.name"
},
{
"id": 2,
"vertexType": "COLUMN",
"vertexId": "default.tab3.age"
},
{
"id": 3,
"vertexType": "COLUMN",
"vertexId": "default.tab1.id"
},
{
"id": 4,
"vertexType": "COLUMN",
"vertexId": "default.tab1.name"
},
{
"id": 5,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.age"
},
{
"id": 6,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.id"
},
{
"id": 7,
"vertexType": "COLUMN",
"vertexId": "default.tabb2.name"
}
]
}
四、總結(jié)
在 Spark SQL 的字段血緣實(shí)現(xiàn)中,我們通過(guò)其自擴(kuò)展,首先拿到了 insert 語(yǔ)句,在我們自己的檢查規(guī)則中拿到
SQL 語(yǔ)句,通過(guò)SparkSqlParser、Analyzer、Optimizer、SparkPlanner,最終得到了物理計(jì)劃。
我們通過(guò)迭代物理計(jì)劃,根據(jù)不同執(zhí)行計(jì)劃做對(duì)應(yīng)的轉(zhuǎn)換,然后就得到了字段之間的對(duì)應(yīng)關(guān)系。當(dāng)前的實(shí)現(xiàn)是比較簡(jiǎn)單的,字段之間是直線(xiàn)的對(duì)應(yīng)關(guān)系,中間過(guò)程被忽略,如果想實(shí)現(xiàn)字段的轉(zhuǎn)換的整個(gè)過(guò)程也是沒(méi)有問(wèn)題的。
當(dāng)前題目:Spark SQL 字段血緣在 vivo 互聯(lián)網(wǎng)的實(shí)踐
瀏覽路徑:http://www.5511xx.com/article/coigdho.html


咨詢(xún)
建站咨詢(xún)
