av激情亚洲男人的天堂国语,日韩欧美精品一中文字幕,无码av一区二区三区无码,国产又色又爽又刺激的a片,国产又色又爽又刺激的a片

使用AirFlow調(diào)度MaxCompute

背景

成都創(chuàng)新互聯(lián)公司主營(yíng)通州網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營(yíng)網(wǎng)站建設(shè)方案,app開(kāi)發(fā)定制,通州h5成都微信小程序搭建,通州網(wǎng)站營(yíng)銷(xiāo)推廣歡迎通州等地區(qū)企業(yè)咨詢(xún)

airflow是Airbnb開(kāi)源的一個(gè)用python編寫(xiě)的調(diào)度工具,基于有向無(wú)環(huán)圖(DAG),airflow可以定義一組有依賴(lài)的任務(wù),按照依賴(lài)依次執(zhí)行,通過(guò)python代碼定義子任務(wù),并支持各種Operate操作器,靈活性大,能滿足用戶的各種需求。本文主要介紹使用Airflow的python Operator調(diào)度MaxCompute 任務(wù)

一、環(huán)境準(zhǔn)備

Python 2.7.5 PyODPS支持Python2.6以上版本
Airflow apache-airflow-1.10.7

1.安裝MaxCompute需要的包

pip install setuptools>=3.0

pip install requests>=2.4.0

pip install greenlet>=0.4.10 # 可選,安裝后能加速Tunnel上傳。

pip install cython>=0.19.0 # 可選,不建議Windows用戶安裝。

pip install pyodps

注意:如果requests包沖突,先卸載再安裝對(duì)應(yīng)的版本

2.執(zhí)行如下命令檢查安裝是否成功

python -c "from odps import ODPS"

二、開(kāi)發(fā)步驟

1.在Airflow家目錄編寫(xiě)python調(diào)度腳本Airiflow_MC.py

 
 
 
 
  1. # -*- coding: UTF-8 -*-
  2. import sys
  3. import os
  4. from odps import ODPS
  5. from odps import options
  6. from airflow import DAG
  7. from airflow.operators.python_operator import PythonOperator
  8. from datetime import datetime, timedelta
  9. from configparser import ConfigParser
  10. import time
  11. reload(sys)
  12. sys.setdefaultencoding('utf8')
  13. #修改系統(tǒng)默認(rèn)編碼。
  14. # MaxCompute參數(shù)設(shè)置
  15. options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}
  16. cfg = ConfigParser()
  17. cfg.read("odps.ini")
  18. print(cfg.items())
  19. odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint"))

 
 
 
 
  1. default_args = {
  2. 'owner': 'airflow',
  3. 'depends_on_past': False,
  4. 'retry_delay': timedelta(minutes=5),
  5. 'start_date':datetime(2020,1,15)
  6. # 'email': ['[email protected]'],
  7. # 'email_on_failure': False,
  8. # 'email_on_retry': False,
  9. # 'retries': 1,
  10. # 'queue': 'bash_queue',
  11. # 'pool': 'backfill',
  12. # 'priority_weight': 10,
  13. # 'end_date': datetime(2016, 1, 1),
  14. }
  15. dag = DAG(
  16. 'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))
  17. def read_sql(sqlfile):
  18. with io.open(sqlfile, encoding='utf-8', mode='r') as f:
  19. sql=f.read()
  20. f.closed
  21. return sql
  22. def get_time():
  23. print '當(dāng)前時(shí)間是{}'.format(time.time())
  24. return time.time()
  25. def mc_job ():
  26. project = odps.get_project() # 取到默認(rèn)項(xiàng)目。
  27. instance=odps.run_sql("select * from long_chinese;")
  28. print(instance.get_logview_address())
  29. instance.wait_for_success()
  30. with instance.open_reader() as reader:
  31. count = reader.count
  32. print("查詢(xún)表數(shù)據(jù)條數(shù):{}".format(count))
  33. for record in reader:
  34. print record
  35. return count
  36. t1 = PythonOperator (
  37. task_id = 'get_time' ,
  38. provide_context = False ,
  39. python_callable = get_time,
  40. dag = dag )
  41. t2 = PythonOperator (
  42. task_id = 'mc_job' ,
  43. provide_context = False ,
  44. python_callable = mc_job ,
  45. dag = dag )
  46. t2.set_upstream(t1)

2.提交

 
 
 
 
  1. python Airiflow_MC.py

3.進(jìn)行測(cè)試

 
 
 
 
  1. # print the list of active DAGs
  2. airflow list_dags
  3. # prints the list of tasks the "tutorial" dag_id
  4. airflow list_tasks Airiflow_MC
  5. # prints the hierarchy of tasks in the tutorial DAG
  6. airflow list_tasks Airiflow_MC --tree
  7. #測(cè)試task
  8. airflow test Airiflow_MC get_time 2010-01-16
  9. airflow test Airiflow_MC mc_job 2010-01-16

4.運(yùn)行調(diào)度任務(wù)

登錄到web界面點(diǎn)擊按鈕運(yùn)行

5.查看任務(wù)運(yùn)行結(jié)果

1.點(diǎn)擊view log

2.查看結(jié)果


本文名稱(chēng):使用AirFlow調(diào)度MaxCompute
文章源于:http://uogjgqi.cn/article/dpidcgd.html
掃二維碼與項(xiàng)目經(jīng)理溝通

我們?cè)谖⑿派?4小時(shí)期待你的聲音

解答本文疑問(wèn)/技術(shù)咨詢(xún)/運(yùn)營(yíng)咨詢(xún)/技術(shù)建議/互聯(lián)網(wǎng)交流