日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
手把手教你在Windows下設(shè)置分布式隊列Celery的心跳輪詢

手把手教你在Windows下設(shè)置分布式隊列Celery的心跳輪詢

作者: 吳老板 2021-03-05 08:52:00

系統(tǒng)

Windows

分布式 大家好,我是吳老板。用Celery 官方的話來說,Celery 是一個非常優(yōu)秀的分布式隊列,可應(yīng)用于分布式共享中間隊列和定時任務(wù)等等。

10多年的章貢網(wǎng)站建設(shè)經(jīng)驗,針對設(shè)計、前端、開發(fā)、售后、文案、推廣等六對一服務(wù),響應(yīng)快,48小時及時工作處理。成都營銷網(wǎng)站建設(shè)的優(yōu)勢是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動調(diào)整章貢建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計,從而大程度地提升瀏覽體驗。成都創(chuàng)新互聯(lián)從事“章貢網(wǎng)站設(shè)計”,“章貢網(wǎng)站推廣”以來,每個客戶項目都認(rèn)真落實執(zhí)行。

1 前言

大家好,我是吳老板。用Celery 官方的話來說,Celery 是一個非常優(yōu)秀的分布式隊列,可應(yīng)用于分布式共享中間隊列和定時任務(wù)等等。

2 版本的差異

Celery 有很多個版本,各版本之間的差異可謂不小,比如最新的 Celery6.0 版本在穩(wěn)定性遠(yuǎn)不如 Celery4.0,所以在使用不同版本的時候,系統(tǒng)給到我們的反饋可能并不能如我們所愿。

3 服務(wù)

在 windows 下掛在 Celery 服務(wù)有時候會出現(xiàn)不穩(wěn)定的情況(unix中暫時未發(fā)現(xiàn)這種情況),比如在執(zhí)行定時任務(wù)的時候,過了一段時間之后,Celery 出現(xiàn)了假死狀態(tài),以至于不能按照我們指定的時間點去執(zhí)行任務(wù)。

這些任務(wù)只是加入到待運行隊列中(堆積在 Redis 中),只能人為重啟 Celery 服務(wù)之后才能將堆積的任務(wù)釋放出來運行。

這樣一來,第一是定時任務(wù)在指定時間點沒有正常運行,其二是在其他時間運行了這些任務(wù),很可能會產(chǎn)生更新數(shù)據(jù)不及時,時間節(jié)點混亂的問題,不僅達(dá)不到業(yè)務(wù)需求,還會反受其害。

4 設(shè)置心跳

為了解決 Celery 在 windows 中的這種弊端,可以為 Celery 任務(wù)隊列設(shè)置一個心跳時間,比如每一分鐘或者每五分鐘向 Redis 數(shù)據(jù)庫發(fā)送一次數(shù)據(jù)以保證隊列始終是活躍的狀態(tài),這樣只要你的電腦不關(guān)機并保持網(wǎng)絡(luò)暢通(如果是遠(yuǎn)程 Redis),Celery 任務(wù)隊列服務(wù)就不會出現(xiàn)假死狀態(tài)。

5 舉個栗子

我總是很喜歡用示例來說話,前些時間在對某平臺的商家后臺進(jìn)行數(shù)據(jù)采集的時候,為了使用時能自動獲取該網(wǎng)站的 cookie ,

用Pyppeteer 寫了一個自動化登陸的腳本,和往常一樣仍在 Celery 隊列中并迅速的啟動服務(wù)。

腳本是這樣的(非常接近實際的偽代碼,沒辦法,保命要緊)

  
 
 
  1. # -*- coding: utf-8 -*- 
  2. from db.redisCurd import RedisQueue 
  3. import asyncio 
  4. import random 
  5. import tkinter 
  6. from pyppeteer.launcher import launch 
  7. from platLogin.config import USERNAME, PASSWORD, LOGIN_URL 
  8.  
  9. class Login(): 
  10.     def __init__(self, shopId): 
  11.         self.shopId = shopId 
  12.         self.RedisQueue = RedisQueue("cookie") 
  13.  
  14.     def screen_size(self): 
  15.         tk = tkinter.Tk() 
  16.         width = tk.winfo_screenwidth() 
  17.         height = tk.winfo_screenheight() 
  18.         tk.quit() 
  19.         return {'width': width, 'height': height} 
  20.  
  21.     async def login(self, username, password, url): 
  22.         browser = await launch( 
  23.             { 
  24.                 'headless': False, 
  25.                 'dumpio': True 
  26.             }, 
  27.             args=['--no-sandbox', '--disable-infobars', '--user-data-dir=./userData'], 
  28.         ) 
  29.         page = await browser.newPage()  # 啟動新的瀏覽器頁面 
  30.  
  31.         try: 
  32.             await page.setViewport(viewport=self.screen_size()) 
  33.             await page.setJavaScriptEnabled(enabled=True)  # 啟用js 
  34.             await page.setUserAgent( 
  35.                 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36 Edge/16.16299' 
  36.             ) 
  37.             await self.page_evaluate(page) 
  38.             await page.goto(url) 
  39.             await asyncio.sleep(2) 
  40.             # 輸入用戶名,密碼 
  41.             await page.evaluate(f'document.querySelector("#userName").value=""') 
  42.             await page.type('#userName', username, {'delay': self.input_time_random() - 50})  # delay是限制輸入的時間 
  43.             await page.evaluate('document.querySelector("#passWord").value=""') 
  44.             await page.type('#passWord', password, {'delay': self.input_time_random()}) 
  45.             await page.waitFor(6000) 
  46.  
  47.             loginImgVcode = await page.waitForSelector('#checkCode')   
  48.             await loginImgVcode.screenshot({'path': './loginImg.png'}) 
  49.             await page.waitFor(6000) 
  50.  
  51.             res = use_cjy("./loginImg.png") 
  52.             pic_str = res.get("pic_str") if res.get("err_str") == "OK" else "1234" 
  53.  
  54.             await page.waitFor(6000) 
  55.             await page.type('#checkWord', pic_str, {'delay': self.input_time_random() - 50}) 
  56.             await page.waitFor(6000) 
  57.  
  58.             await page.click('#subMit') 
  59.             await page.waitFor(6000) 
  60.             await asyncio.sleep(2) 
  61.             await self.get_cookie(page) 
  62.             await page.waitFor(3000) 
  63.             await self.page_close(browser) 
  64.             return {'code': 200, 'msg': '登陸成功'} 
  65.         except: 
  66.             return {'code': -1, 'msg': '出錯'} 
  67.  
  68.         finally: 
  69.             await page.waitFor(3000) 
  70.             await self.page_close(browser) 
  71.  
  72.     # 獲取登錄后cookie 
  73.     async def get_cookie(self, page): 
  74.         cookies_list = await page.cookies() 
  75.         cookies = '' 
  76.         for cookie in cookies_list: 
  77.             str_cookie = '{0}={1}; ' 
  78.             str_cookie = str_cookie.format(cookie.get('name'), cookie.get('value')) 
  79.             cookies += str_cookie 
  80.         # 將cookie 放入 cookie 池 
  81.         self.RedisQueue.put_hash(self.shopId, cookies) 
  82.         return cookies 
  83.  
  84.     async def page_evaluate(self, page): 
  85.         await page.evaluate('''() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => undefined } }) }''') 
  86.         await page.evaluate('''() =>{ window.navigator.chrome = { runtime: {},  }; }''') 
  87.         await page.evaluate( 
  88.             '''() =>{ Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); }''') 
  89.         await page.evaluate( 
  90.             '''() =>{ Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5,6], }); }''') 
  91.         await page.waitFor(3000) 
  92.  
  93.     async def page_close(self, browser): 
  94.         for _page in await browser.pages(): 
  95.             await _page.close() 
  96.         await browser.close() 
  97.  
  98.     def input_time_random(self): 
  99.         return random.randint(100, 151) 
  100.  
  101.     def run(self, username=USERNAME, password=PASSWORD, url=LOGIN_URL): 
  102.         loop = asyncio.get_event_loop() 
  103.         i_future = asyncio.ensure_future(self.login(username, password, url)) 
  104.         loop.run_until_complete(i_future) 
  105.         return i_future.result() 
  106.  
  107.  
  108. if __name__ == '__main__': 
  109.     Z = Login(shopId="001") 
  110.     Z.run() 

Celery 任務(wù)文件是這樣的

  
 
 
  1. # -*- coding: utf-8 -*- 
  2. from __future__ import absolute_import 
  3. import os 
  4. import sys 
  5. import time 
  6. from db.redisCurd import RedisQueue 
  7. from send_msg.weinxin import Send_msg 
  8. base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 
  9. sys.path.append(base_dir) 
  10. from logger.logger import log_v 
  11. from celery import Task 
  12. from platLogin.login import Login  # 登陸類 
  13. from celery import Celery 
  14.  
  15. randomQueue = RedisQueue("cookie") 
  16.  
  17. celery_app = Celery('task') 
  18. celery_app.config_from_object('celeryConfig') 
  19.  
  20. S = Send_msg() 
  21.  
  22. dl_dict = { 
  23.     'demo': { 
  24.         'cookie': '', 
  25.         'loginClass': 'Login', 
  26.     } 
  27.  
  28. # todo 這是三種運行的狀態(tài) 
  29. class task_status(Task): 
  30.     def on_success(self, retval, task_id, args, kwargs):  
  31.         log_v.info('任務(wù)信息 -> id:{} , arg:{} , successful ..... Done'.format(task_id, args)) 
  32.  
  33.     def on_failure(self, exc, task_id, args, kwargs, einfo):   
  34.         log_v.error('task id:{} , arg:{} , failed ! error : {}'.format(task_id, args, exc)) 
  35.  
  36.     def on_retry(self, exc, task_id, args, kwargs, einfo):  
  37.         log_v.warning('task id:{} , arg:{} , retry !  info: {}'.format(task_id, args, exc)) 
  38.  
  39.  
  40. # todo 隨便找個hash key作為輪詢對象, celery在win10系統(tǒng)可能不太穩(wěn)定,有時候會有連接斷開的情況 
  41. @celery_app.task(base=task_status) 
  42. def get_cookie_status(platName="demo"): 
  43.     try: 
  44.         # log_v.debug(f'[+] 輪詢 {platName} 定時器啟動 ..... Done') 
  45.         randomQueue.get_hash(platName).decode() 
  46.         log_v.debug(f'[+] 輪詢 {platName} 成功 ..... Done') 
  47.         return "Erp 輪詢成功" 
  48.     except: 
  49.         return "Erp 輪詢失敗" 
  50.  
  51.  
  52. @celery_app.task(base=task_status) 
  53. def set_plat_cookie(platName="demo", shopId=None): 
  54.     log_v.debug(f"[+] {platName} 正在登陸") 
  55.     core = eval(dl_dict[platName]['loginClass'])(shopId=shopId) 
  56.     result = core.run() 
  57.     return result 

Celery 配置文件是這樣的

  
 
 
  1. from __future__ import absolute_import 
  2. import datetime 
  3. from kombu import Exchange, Queue 
  4. from celery.schedules import crontab 
  5. from urllib import parse 
  6.  
  7. BROKER_URL = f'redis://root:{parse.quote("你的不規(guī)則密碼")}@主機:6379/15' 
  8.  
  9. # 導(dǎo)入任務(wù),如tasks.py 
  10. CELERY_IMPORTS = ('monitor.tasks',) 
  11.  
  12. # 列化任務(wù)載荷的默認(rèn)的序列化方式 
  13. CELERY_TASK_SERIALIZER = 'json' 
  14.  
  15. # 結(jié)果序列化方式 
  16. CELERY_RESULT_SERIALIZER = 'json' 
  17. CELERY_ACCEPT_CONTENT = ['json'] 
  18.  
  19. CELERY_TIMEZONE = 'Asia/Shanghai'  # 指定時區(qū),不指定默認(rèn)為 'UTC' 
  20. # CELERY_TIMEZONE='UTC' 
  21.  
  22. CELERYBEAT_SCHEDULE = { 
  23.     'add-every-60-seconds': { 
  24.         'task': 'tasks.get_cookie_status', 
  25.         'schedule': datetime.timedelta(minutes=1),  # 每 1 分鐘執(zhí)行一次 
  26.         'args': ()  # 任務(wù)函數(shù)參數(shù) 
  27.     }, 

啟動服務(wù)

  
 
 
  1. celery -A tasks beat -l INFO 
  2. celery -A tasks worker -l INFO -c 2 

以 2 個線程啟動消費者隊列服務(wù)并啟用定時任務(wù),當(dāng)發(fā)現(xiàn)當(dāng)前平臺的 cookie 不可用時,我會向 Celery 發(fā)送一個信號(就是調(diào)用了前面的set_plat_cookie 這個方法),消費者得到這個任務(wù)這個就會執(zhí)行自動化腳本以獲取 cookie 并儲存在 Redis 中,使用時在從 Redis 中獲取就能正常請求到該平臺的數(shù)據(jù)。

在空閑時間,Celery中的 get_cookie_status 方法會每隔一分鐘向 Redis 請求數(shù)據(jù),這就是我們設(shè)置的 1分鐘心跳。

這樣不管我們的 Celery 是否是后臺啟動,都不會出現(xiàn)假死、卡死的狀態(tài),則萬事大吉矣!!

6 總結(jié)

本文為了解決 Celery 在 windows 中的這種弊端,為 Celery 任務(wù)隊列設(shè)置一個心跳時間,比如每一分鐘或者每五分鐘向 Redis 數(shù)據(jù)庫發(fā)送一次數(shù)據(jù)以保證隊列始終是活躍的狀態(tài),這樣只要你的電腦不關(guān)機并保持網(wǎng)絡(luò)暢通(如果是遠(yuǎn)程 Redis),Celery 任務(wù)隊列服務(wù)都不會出現(xiàn)假死、卡死的狀態(tài)。


本文名稱:手把手教你在Windows下設(shè)置分布式隊列Celery的心跳輪詢
分享網(wǎng)址:http://www.5511xx.com/article/cohccsd.html