日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
使用AirFlow調(diào)度MaxCompute

背景

成都創(chuàng)新互聯(lián)公司主營通州網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,app開發(fā)定制,通州h5成都微信小程序搭建,通州網(wǎng)站營銷推廣歡迎通州等地區(qū)企業(yè)咨詢

airflow是Airbnb開源的一個用python編寫的調(diào)度工具,基于有向無環(huán)圖(DAG),airflow可以定義一組有依賴的任務(wù),按照依賴依次執(zhí)行,通過python代碼定義子任務(wù),并支持各種Operate操作器,靈活性大,能滿足用戶的各種需求。本文主要介紹使用Airflow的python Operator調(diào)度MaxCompute 任務(wù)

一、環(huán)境準(zhǔn)備

Python 2.7.5 PyODPS支持Python2.6以上版本
Airflow apache-airflow-1.10.7

1.安裝MaxCompute需要的包

pip install setuptools>=3.0

pip install requests>=2.4.0

pip install greenlet>=0.4.10 # 可選,安裝后能加速Tunnel上傳。

pip install cython>=0.19.0 # 可選,不建議Windows用戶安裝。

pip install pyodps

注意:如果requests包沖突,先卸載再安裝對應(yīng)的版本

2.執(zhí)行如下命令檢查安裝是否成功

python -c "from odps import ODPS"

二、開發(fā)步驟

1.在Airflow家目錄編寫python調(diào)度腳本Airiflow_MC.py

 
 
 
 
  1. # -*- coding: UTF-8 -*-
  2. import sys
  3. import os
  4. from odps import ODPS
  5. from odps import options
  6. from airflow import DAG
  7. from airflow.operators.python_operator import PythonOperator
  8. from datetime import datetime, timedelta
  9. from configparser import ConfigParser
  10. import time
  11. reload(sys)
  12. sys.setdefaultencoding('utf8')
  13. #修改系統(tǒng)默認(rèn)編碼。
  14. # MaxCompute參數(shù)設(shè)置
  15. options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}
  16. cfg = ConfigParser()
  17. cfg.read("odps.ini")
  18. print(cfg.items())
  19. odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint"))

 
 
 
 
  1. default_args = {
  2. 'owner': 'airflow',
  3. 'depends_on_past': False,
  4. 'retry_delay': timedelta(minutes=5),
  5. 'start_date':datetime(2020,1,15)
  6. # 'email': ['airflow@cdxwcx.com'],
  7. # 'email_on_failure': False,
  8. # 'email_on_retry': False,
  9. # 'retries': 1,
  10. # 'queue': 'bash_queue',
  11. # 'pool': 'backfill',
  12. # 'priority_weight': 10,
  13. # 'end_date': datetime(2016, 1, 1),
  14. }
  15. dag = DAG(
  16. 'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))
  17. def read_sql(sqlfile):
  18. with io.open(sqlfile, encoding='utf-8', mode='r') as f:
  19. sql=f.read()
  20. f.closed
  21. return sql
  22. def get_time():
  23. print '當(dāng)前時間是{}'.format(time.time())
  24. return time.time()
  25. def mc_job ():
  26. project = odps.get_project() # 取到默認(rèn)項目。
  27. instance=odps.run_sql("select * from long_chinese;")
  28. print(instance.get_logview_address())
  29. instance.wait_for_success()
  30. with instance.open_reader() as reader:
  31. count = reader.count
  32. print("查詢表數(shù)據(jù)條數(shù):{}".format(count))
  33. for record in reader:
  34. print record
  35. return count
  36. t1 = PythonOperator (
  37. task_id = 'get_time' ,
  38. provide_context = False ,
  39. python_callable = get_time,
  40. dag = dag )
  41. t2 = PythonOperator (
  42. task_id = 'mc_job' ,
  43. provide_context = False ,
  44. python_callable = mc_job ,
  45. dag = dag )
  46. t2.set_upstream(t1)

2.提交

 
 
 
 
  1. python Airiflow_MC.py

3.進(jìn)行測試

 
 
 
 
  1. # print the list of active DAGs
  2. airflow list_dags
  3. # prints the list of tasks the "tutorial" dag_id
  4. airflow list_tasks Airiflow_MC
  5. # prints the hierarchy of tasks in the tutorial DAG
  6. airflow list_tasks Airiflow_MC --tree
  7. #測試task
  8. airflow test Airiflow_MC get_time 2010-01-16
  9. airflow test Airiflow_MC mc_job 2010-01-16

4.運行調(diào)度任務(wù)

登錄到web界面點擊按鈕運行

5.查看任務(wù)運行結(jié)果

1.點擊view log

2.查看結(jié)果


分享名稱:使用AirFlow調(diào)度MaxCompute
文章網(wǎng)址:http://www.5511xx.com/article/dpidcgd.html