日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
源碼剖析-輕量級異步爬蟲框架Ruia

前言

為啥要看?主要是在閱讀 Liuli 的過程中,順手看了一下 ruia 的倉庫,發(fā)現(xiàn)代碼量很少,其宣傳中又強(qiáng)調(diào)除爬蟲核心功能外的所有功能都通過插件的方式實現(xiàn),我便對其插件系統(tǒng)的實現(xiàn)感到好奇,是像 Flask 那種動態(tài)引入呢?還是其他我不知道的方式?

封丘網(wǎng)站建設(shè)公司成都創(chuàng)新互聯(lián)公司,封丘網(wǎng)站設(shè)計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為封丘1000多家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)網(wǎng)站建設(shè)要多少錢,請找那個售后服務(wù)好的封丘做網(wǎng)站的公司定做!

老規(guī)矩,看前先理一下興趣點,不然一頭扎入細(xì)節(jié),最后啥也帶不走,我主要對以下 2 點感興趣:

  • ruia 的設(shè)計架構(gòu)是怎么樣的?
  • ruia 如何使用插件系統(tǒng)?

觀前提示,ruia 并沒有實現(xiàn)插件系統(tǒng),而是利用中間件的形式來實現(xiàn)所謂的插件,跟我自己理解的插件有所差異。

ruia 設(shè)計架構(gòu)

如果你熟悉 Scrapy,那么 ruia 的使用方式和架構(gòu)你會非常熟悉,因為我在自己開設(shè)的進(jìn)階爬蟲課里手把手帶領(lǐng)大家剖析過 Scrapy 框架的代碼,所以我比較熟悉Scrapy,如果你不熟悉,本文你可能會比較懵逼。

ruia 相比于 Scrapy 輕量很多,它沒有調(diào)度器相關(guān)的邏輯,而是直接通過 Spider 完成完整的爬取邏輯,先以一個 Demo 為例,看看 ruia 的基本使用,部分代碼如下:

class DoubanSpider(Spider):
# 爬蟲名稱
name = "DoubanSpider"
# 入口url
start_urls = ["https://movie.douban.com/top250"]
# 爬蟲相關(guān)配置
request_config = {"RETRIES": 3, "DELAY": 0, "TIMEOUT": 20}
concurrency = 10
# aiohttp config
aiohttp_kwargs = {}

# 異步方法
async def parse(self, response):
# 異步阻塞等待
html = await response.text()
etree = response.html_etree(html=html)
pages = ["?start=0&filter="] + [
# 通過css選擇器,獲得需要進(jìn)一步爬蟲的url
i.get("href") for i in etree.cssselect(".paginator>a")
]
for index, page in enumerate(pages):
url = self.start_urls[0] + page
# 構(gòu)建新的請求
# 回調(diào)方法為parse_item
yield self.request(
url=url, metadata={"index": index}, callback=self.parse_item
)

async def parse_item(self, response):
async for item in DoubanItem.get_items(html=await response.text()):
yield item

async def process_item(self, item: DoubanItem):
self.logger.info(item)


if __name__ == "__main__":
# 啟動爬蟲
DoubanSpider.start()

從上述代碼中,通過 Spider 的 start 方法啟動爬蟲,該方法代碼如下:

@classmethod
def start(
cls,
middleware: typing.Union[typing.Iterable, Middleware] = None,
loop=None,
after_start=None,
before_stop=None,
close_event_loop=True,
**spider_kwargs,
):
# 獲取事件循環(huán)
loop = loop or asyncio.new_event_loop()
# 實例化當(dāng)前類,即類方法中的spider_ins變量為當(dāng)前類的實例 - Scrapy也是這樣做的
spider_ins = cls(middleware=middleware, loop=loop, **spider_kwargs)

# 啟動事件循環(huán),執(zhí)行爬蟲實例的_start方法
spider_ins.loop.run_until_complete(
spider_ins._start(after_start=after_start, before_stop=before_stop)
)
# 關(guān)閉事件循環(huán)中的異步生成器對象(asynchronous generator)
spider_ins.loop.run_until_complete(spider_ins.loop.shutdown_asyncgens())
if close_event_loop:
# 關(guān)閉事件循環(huán)
spider_ins.loop.close()

return spider_ins

從上述代碼可知,爬蟲的完全流程為:

  • 創(chuàng)建事件循環(huán)
  • 實例化爬蟲
  • 將爬蟲爬取邏輯放入事件循環(huán)中異步執(zhí)行
  • 關(guān)閉事件循環(huán)中異步生成器對象,簡單理解即關(guān)閉事件循環(huán)中的任務(wù)
  • 關(guān)閉事件循環(huán)本身

run_until_complete 方法是 asyncio 中比較底層的方法,用于將協(xié)程函數(shù)或任務(wù)對象(task)添加到事件循環(huán)中并啟動,其基本用法如下:

In [1]: import asyncio

In [2]: # 定義協(xié)程函數(shù)

In [3]: async def fun(a):
...: print(a)
...:

In [4]: # 調(diào)用協(xié)程函數(shù),生成一個協(xié)程對象,此時協(xié)程函數(shù)并未執(zhí)行

In [5]: coroutine = fun('hello world')

In [6]: # 創(chuàng)建事件循環(huán)

In [7]: loop = asyncio.get_event_loop()

In [8]: # 將協(xié)程函數(shù)添加到事件循環(huán),并啟動

In [9]: loop.run_until_complete(coroutine)
hello world

關(guān)于 Python 事件循環(huán)更多信息,可以閱讀文檔:https://docs.python.org/zh-cn/3/library/asyncio-eventloop.html

從 start 方法可知,爬蟲主邏輯其實在_start 方法中,該方法代碼比較長,我們拆分來看,首先是添加 signal 的邏輯,相關(guān)代碼如下:

# Add signal
for signal in (SIGINT, SIGTERM):
try:
self.loop.add_signal_handler(
signal, lambda: asyncio.ensure_future(self.stop(signal))
)
except NotImplementedError:
self.logger.warning(
f"{self.name} tried to use loop.add_signal_handler "
"but it is not implemented on this platform."
)

上述代碼中,將 SIGINT 和 SIGTERM 這兩個信號添加當(dāng)前事件循環(huán)中,除了這兩個外,SIGKILL 也是我們常見的信號,注意,這 3 個信號只實現(xiàn)于類 Unix 平臺,即 Linux、MacOS 中有,Windows 沒有,Windows 有自己的一套類似實現(xiàn)。

SIGINT、SIGTERM 和 SIGKILL 作用是啥?

當(dāng)我們按 ctrl+c 來停止進(jìn)程時,Linux 系統(tǒng)其實就向進(jìn)程發(fā)送 SIGINT 信號,進(jìn)程接收到 SIGINT 信號后,會停止當(dāng)前進(jìn)程,也會停止子進(jìn)程,注意,SIGINT 信號只能結(jié)束前臺進(jìn)程。

在 Linux 中,通過 kill 命令可以干掉某個進(jìn)程,如果 kill 命令不加任何參數(shù)的話,該命令的底層,Linux 系統(tǒng)會向進(jìn)程發(fā) SIGTERM 信號,注意,當(dāng)前進(jìn)程會收到該信號,但子進(jìn)程不會,即如果進(jìn)程被 kill 了,那么當(dāng)前進(jìn)程的父進(jìn)程比如是 init,即 PID 為 1 的進(jìn)程,如果是有用戶進(jìn)程創(chuàng)建出的進(jìn)程,SIGTERM 是無法關(guān)閉的。

對于需要強(qiáng)制關(guān)閉的情況,我們會使用 kill -9 來 kill 進(jìn)程,此時 Linux 會向進(jìn)程發(fā)送 SIGKILL 信號,該信號是無法被捕捉的,進(jìn)程收到 SIGKILL 信號后,當(dāng)前進(jìn)程以及相關(guān)的子進(jìn)程都會被 kill 掉。

為了校驗上面我說的是否合理,可以構(gòu)建一段簡單的代碼來判斷一下,代碼如下:

import sys
import asyncio
from signal import SIGINT, SIGTERM
import traceback

# 不同Python版本不同
if sys.version_info >= (3, 9):
async_all_tasks = asyncio.all_tasks
async_current_task = asyncio.current_task
else:
async_all_tasks = asyncio.Task.all_tasks
async_current_task = asyncio.tasks.Task.current_task

async def fun(a):
# 添加信號
for signal in (SIGINT, SIGTERM):
try:
loop.add_signal_handler(
signal, lambda: asyncio.ensure_future(stop(signal))
)
except Exception as e:
traceback.print_exc()
await asyncio.sleep(600)

async def stop(_signal):
print("Stopping async")
# 取消事件循環(huán)中所有的任務(wù)
await cancel_all_tasks()
# 關(guān)停事件循環(huán)
loop.stop()

async def cancel_all_tasks():
tasks = []
for task in async_all_tasks():
if task is not async_current_task():
tasks.append(task)
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)

coroutine = fun('hello world')
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)

上述代碼中,使用 ruia 相似的邏輯弄了一個簡單腳本,在 Linux 中運行,然后通過 ctrl+c 將其關(guān)閉,效果如下圖,可以發(fā)現(xiàn) Stopping async 被打印了出來。

回到_start 方法,添加完信號量后,其代碼如下:

# hook - 爬取前需要執(zhí)行的邏輯
await self._run_spider_hook(after_start)

try:
# 爬蟲主邏輯
await self.start_master()
finally:

# hook - 爬取結(jié)束前要執(zhí)行的邏輯
await self._run_spider_hook(before_stop)
await self.request_session.close()

先看到 start_master 方法,代碼如下:

async def start_master(self):
"""
Actually start crawling
"""
# 獲得Request類實例對象
async for request_ins in self.process_start_urls():
# 完成了請求,獲得了response和callback方法處理的結(jié)果
self.request_queue.put_nowait(self.handle_request(request_ins))
workers = [
# asyncio.ensure_future方法將start_worker協(xié)程方法構(gòu)建成task
asyncio.ensure_future(self.start_worker())
for i in range(self.worker_numbers)
]
for worker in workers:
self.logger.info(f"Worker started: {id(worker)}")

# 阻塞至隊列中所有的元素都被接收和處理完畢
await self.request_queue.join()

if not self.is_async_start:
await self.stop(SIGINT)
else:
if self.cancel_tasks:
await self.cancel_all_tasks()

start_master 方法有兩大塊邏輯,一塊是構(gòu)建異步任務(wù)邏輯,一塊是并發(fā)執(zhí)行構(gòu)建好的異步任務(wù)。一步步看一下,先看 process_start_urls 方法,代碼如下:

async def process_start_urls(self):
for url in self.start_urls:
# 獲得Request類實例
yield self.request(url=url, callback=self.parse, metadata=self.metadata)

process_start_urls 方法的作用就是 start_urls 列表中填寫的 url 構(gòu)建成 Request 類對象,此時并沒有完成對網(wǎng)頁數(shù)據(jù)的請求,這里需要注意的是 callback 參數(shù),直接使用 parse 方法作為 callback 的值。

請求的具體邏輯在 handle_request 方法中,該方法代碼如下(只摘取了關(guān)鍵代碼展示):

async def handle_request(
self, request: Request
) -> typing.Tuple[AsyncGeneratorType, Response]:
callback_result, response = None, None
await self._run_request_middleware(request)
# 完成請求
callback_result, response = await request.fetch_callback(self.sem)
await self._run_response_middleware(request, response)
await self._process_response(request=request, response=response)
# response結(jié)果
return callback_result, response

handle_request 方法中,有中間件效果邏輯的調(diào)用,即請求前,通過_run_request_middleware 方法調(diào)用相關(guān)的中間件邏輯處理請求前的 Request 類實例,請求后是類似的,通過_run_response_middleware 方法處理,最后調(diào)用_process_response 方法處理最終的 Reponse 類實例。

看到 fetch_callback 方法,該方法完成了請求并獲得了相應(yīng)的結(jié)果。

async def fetch_callback(
self, sem: Semaphore
) -> Tuple[AsyncGeneratorType, Response]:
try:
async with sem:
# 請求邏輯
response = await self.fetch()
except Exception as e:
response = None
self.logger.error(f"")

if self.callback is not None:
if iscoroutinefunction(self.callback):
# 調(diào)用callback方法處理后的結(jié)果
callback_result = await self.callback(response)
else:
callback_result = self.callback(response)
else:
callback_result = None
return callback_result, response

fetch_callback 方法其實就一層封裝,該方法內(nèi)通過 fetch 方法完成對網(wǎng)頁的請求,通過 callback 方法完成對 response 的處理。

看到 fetch 方法,它調(diào)用了_make_request 方法實現(xiàn)請求,然后將請求的結(jié)果封裝成 Response 類實例,相關(guān)代碼如下(只展示相關(guān)代碼):

async def fetch(self, delay=True) -> Response:
async with async_timeout.timeout(timeout):
# 發(fā)送請求
resp = await self._make_request()
try:
resp_encoding = resp.get_encoding()
except:
resp_encoding = self.encoding
# 使用請求獲得的結(jié)果構(gòu)建Response對象
response = Response(
url=str(resp.url),
method=resp.method,
encoding=resp_encoding,
metadata=self.metadata,
cookies=resp.cookies,
headers=resp.headers,
history=resp.history,
status=resp.status,
aws_json=resp.json,
aws_text=resp.text,
aws_read=resp.read,
)

關(guān)鍵的_make_request 方法其實就是利用 aiohttp 相關(guān)的方實現(xiàn)了異步請求,代碼如下:

async def _make_request(self):
self.logger.info(f"<{self.method}: {self.url}>")
if self.method == "GET":
request_func = self.current_request_session.get(
self.url, headers=self.headers, ssl=self.ssl, **self.aiohttp_kwargs
)
else:
request_func = self.current_request_session.post(
self.url, headers=self.headers, ssl=self.ssl, **self.aiohttp_kwargs
)
resp = await request_func
return resp

fetch 方法看完后,來看 callback 相關(guān)的邏輯,在 spider.py 的 process_start_urls 方法中,callback 參數(shù)的入?yún)?parse 方法,所以這里會調(diào)用 parse 方法的邏輯來處理網(wǎng)頁返回的數(shù)據(jù),而 parse 方法通常會實現(xiàn)在最外部的子類中(我們自己實現(xiàn)對網(wǎng)頁的解析邏輯)。

回到 start_master 方法,該方法第一步的邏輯就講完了,這部分邏輯的效果就是將 callback_reuslt 和 response 添加到了 request_queue 隊列中,但需要注意,因為這里是異步操作,所以 requests_queue 是異步對象實例,而不是 callback_reuslt 與 response。

然后看到其第二部分的邏輯,因為第一部分邏輯都是異步操作,第二部分邏輯,主要是并發(fā)執(zhí)行第一部分異步邏輯,然后獲得 callback_reuslt 與 response,其主要邏輯實現(xiàn)在 start_worker 方法中。

start_worker 方法代碼如下:

async def start_worker(self):
"""
Start spider worker
:return:
"""
while True:
# 獲得Request類實例
request_item = await self.request_queue.get()
# 添加到任務(wù)列表中
self.worker_tasks.append(request_item)
if self.request_queue.empty():
# 并發(fā)運行 self.worker_tasks 序列中的可等待對象
results = await asyncio.gather(
*self.worker_tasks, return_exceptions=True
)
for task_result in results:
if not isinstance(task_result, RuntimeError) and task_result:
callback_results, response = task_result
if isinstance(callback_results, AsyncGeneratorType):
await self._process_async_callback(
callback_results, response
)
self.worker_tasks = []
self.request_queue.task_done()

這部分邏輯沒啥好講的,主要就是通過 asyncio.gather 并發(fā)執(zhí)行 worker_tasks 中的任務(wù),然后獲得 callback_results 和 response。

在 start_master 方法中,通過 join 方法等待 request_queue 隊列中的任務(wù)都完成執(zhí)行。

至此,ruia 中的主邏輯就閱讀完了,不太復(fù)雜,是學(xué)習(xí) Python asyncio 很好的參考代碼。

ruia 插件系統(tǒng)

ruia 其實沒有設(shè)計所謂的插件系統(tǒng),而是利用中間件的概念來作為所謂的插件,與 scrapy 類似,ruia 中間件會在請求前以及請求后兩個時間點調(diào)用,在上文的代碼中也提及了:_run_request_middleware 方法與_run_response_middleware 方法。

為了進(jìn)一步理解 ruia 的插件,這里拉取 ruia-ua 項目的代碼,該項目是 ruia 作者實現(xiàn)的用于替換請求頭中 User-Agent 的插件,使用 ruia-ua 以及 ruia 來爬取 HackerNews 網(wǎng)站數(shù)據(jù),代碼如下:

from ruia import AttrField, TextField, Item, Spider
from ruia_ua import middleware


class HackerNewsItem(Item):
target_item = TextField(css_select='tr.athing')
title = TextField(css_select='a.storylink')
url = AttrField(css_select='a.storylink', attr='href')


class HackerNewsSpider(Spider):
start_urls = ['https://news.ycombinator.com/news?p=1', 'https://news.ycombinator.com/news?p=2']

async def parse(self, response):
# Do something...
print(response.url)


if __name__ == '__main__':
# 使用ruia-ua插件
HackerNewsSpider.start(middleware=middleware)

從上述代碼可知,所謂使用 ruia-ua 插件,其實就是使用 ruia-ua 提供的中間件。

翻閱 ruia-ua 代碼,其中關(guān)鍵的邏輯如下:

middleware = Middleware()


@middleware.request
async def add_random_ua(spider_ins, request):
ua = await get_random_user_agent()
if request.headers:
request.headers.update({'User-Agent': ua})
else:
request.headers = {
'User-Agent': ua
}

上述代碼很簡單,就是用 middleware.request 裝飾器處理了 add_random_ua 方法,而 add_random_ua 方法實現(xiàn)了替換 User-Agent 的邏輯。

middleware.request 裝飾器的代碼在 ruia/middleware.py 中,相關(guān)代碼如下:

def request(self, *args, **kwargs):
"""
Define a Decorate to be called before a request.
eg: @middleware.request
"""
middleware = args[0]

@wraps(middleware)
def register_middleware(*args, **kwargs):
self.request_middleware.append(middleware)
return middleware

return register_middleware()

middleware.request 方法的邏輯也非常簡單,就是向 request_middleware 列表中添加相應(yīng)的方法對象,然后在_run_request_middleware 方法中被調(diào)用。

_run_request_middleware 方法的代碼如下:

async def _run_request_middleware(self, request: Request):
if self.middleware.request_middleware:
# 順序調(diào)用中間件
for middleware in self.middleware.request_middleware:
if callable(middleware):
try:
aws_middleware_func = middleware(self, request)
if isawaitable(aws_middleware_func):
await aws_middleware_func
else:
self.logger.error(
f" )
except Exception as e:
self.logger.exception(f"

這便是 ruia 中所謂的插件,即利用裝飾器,將需要使用的方法添加到某個 list 對象中,在 ruia 請求網(wǎng)站的過程中,在請求前和請求后都給相應(yīng) list 對象中的方法調(diào)用的機(jī)會,從而實現(xiàn)所謂的插件。

結(jié)尾

總的來說,ruia 是很好的 Python Asyncio 學(xué)習(xí)資料,里面關(guān)于 asyncio 的用法是可以直接抄,至于其插件系統(tǒng),其實是中間件的換一種說法,離真正的插件還有差距。


分享名稱:源碼剖析-輕量級異步爬蟲框架Ruia
網(wǎng)站鏈接:http://www.5511xx.com/article/djipids.html