新聞中心
Threading模塊從 Python 1.5.2 版開始出現(xiàn),用于增強(qiáng)底層的多線程模塊thread。Threading 模塊讓操作多線程變得更簡單,并且支持程序同時運(yùn)行多個操作。

成都網(wǎng)站制作、成都網(wǎng)站建設(shè)的關(guān)注點不是能為您做些什么網(wǎng)站,而是怎么做網(wǎng)站,有沒有做好網(wǎng)站,給創(chuàng)新互聯(lián)一個展示的機(jī)會來證明自己,這并不會花費您太多時間,或許會給您帶來新的靈感和驚喜。面向用戶友好,注重用戶體驗,一切以用戶為中心。
注意,Python 中的多線程最好用于處理有關(guān) I/O 的操作,如從網(wǎng)上下載資源或者從本地讀取文件或者目錄。如果你要做的是 CPU 密集型操作,那么你需要使用 Python 的multiprocessing模塊。這樣做的原因是,Python 有一個全局解釋器鎖 (GIL),使得所有子線程都必須運(yùn)行在同一個主線程中。正因為如此,當(dāng)你通過多線程來處理多個 CPU 密集型任務(wù)時,你會發(fā)現(xiàn)它實際上運(yùn)行的更慢。因此,我們將重點放在那些多線程最擅長的領(lǐng)域:I/O 操作!
線程簡介
多線程能讓你像運(yùn)行一個獨立的程序一樣運(yùn)行一段長代碼。這有點像調(diào)用子進(jìn)程(subprocess),不過區(qū)別是你調(diào)用的是一個函數(shù)或者一個類,而不是獨立的程序。在我看來,舉例說明更有助于解釋。下面來看一個簡單的例子:
- import threading
- def doubler(number):
- """
- 可以被線程使用的一個函數(shù)
- """
- print(threading.currentThread.getName + '\n')
- print(number * 2)
- if __name__ == '__main__':
- for i in range(5):
- my_thread = threading.Thread(target=doubler, args=(i,))
- my_thread.start
這里,我們導(dǎo)入 threading 模塊并且創(chuàng)建一個叫 doubler的常規(guī)函數(shù)。這個函數(shù)接受一個值,然后把這個值翻一番。它還會打印出調(diào)用這個函數(shù)的線程的名稱,并在最后打印一行空行。然后在代碼的最后一塊,我們創(chuàng)建五個線程并且依次啟動它們。在我們實例化一個線程時,你會注意到,我們把 doubler 函數(shù)傳給target參數(shù),同時也給 doubler 函數(shù)傳遞了參數(shù)。Args參數(shù)看起來有些奇怪,那是因為我們需要傳遞一個序列給 doubler 函數(shù),但它只接受一個變量,所以我們把逗號放在尾部來創(chuàng)建只有一個參數(shù)的序列。
需要注意的是,如果你想等待一個線程結(jié)束,那么需要調(diào)用 join方法。
當(dāng)你運(yùn)行以上這段代碼,會得到以下輸出內(nèi)容:
- Thread-1
- 0
- Thread-2
- 2
- Thread-3
- 4
- Thread-4
- 6
- Thread-5
- 8
當(dāng)然,通常情況下你不會希望輸出打印到標(biāo)準(zhǔn)輸出。如果不幸真的這么做了,那么最終的顯示效果將會非?;靵y。你應(yīng)該使用 Python 的 logging 模塊。它是線程安全的,并且表現(xiàn)出色。讓我們用 logging模塊修改上面的例子并且給我們的線程命名。代碼如下:
- import logging
- import threading
- def get_logger:
- logger = logging.getLogger("threading_example")
- logger.setLevel(logging.DEBUG)
- fh = logging.FileHandler("threading.log")
- fmt = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
- formatter = logging.Formatter(fmt)
- fh.setFormatter(formatter)
- logger.addHandler(fh)
- return logger
- def doubler(number, logger):
- """
- 可以被線程使用的一個函數(shù)
- """
- logger.debug('doubler function executing')
- result = number * 2
- logger.debug('doubler function ended with: {}'.format(
- result))
- if __name__ == '__main__':
- logger = get_logger
- thread_names = ['Mike', 'George', 'Wanda', 'Dingbat', 'Nina']
- for i in range(5):
- my_thread = threading.Thread(
- target=doubler, name=thread_names[i], args=(i,logger))
- my_thread.start
代碼中最大的改變就是加入了 get_logger函數(shù)。這段代碼將創(chuàng)建一個被設(shè)置為調(diào)試級別的日志記錄器。它將日志保存在當(dāng)前目錄(即腳本運(yùn)行所在的目錄)下,然后設(shè)置每行日志的格式。格式包括時間戳、線程名、日志記錄級別以及日志信息。
在 doubler 函數(shù)中,我們把 print語句換成 logging 語句。你會注發(fā)現(xiàn),在創(chuàng)建線程時,我們給 doubler 函數(shù)傳入了 logger 對象。這樣做的原因是,如果在每個線程中實例化 logging 對象,那么將會產(chǎn)生多個 logging 單例(singleton),并且日志中將會有很多重復(fù)的內(nèi)容。
最后,創(chuàng)建一個名稱列表,然后使用 name關(guān)鍵字參數(shù)為每一個線程設(shè)置具體名稱,這樣就可以為線程命名。運(yùn)行以上代碼,將會得到包含以下內(nèi)容的日志文件:
- 2016-07-24 20:39:50,055 - Mike - DEBUG - doubler function executing
- 2016-07-24 20:39:50,055 - Mike - DEBUG - doubler function ended with: 0
- 2016-07-24 20:39:50,055 - George - DEBUG - doubler function executing
- 2016-07-24 20:39:50,056 - George - DEBUG - doubler function ended with: 2
- 2016-07-24 20:39:50,056 - Wanda - DEBUG - doubler function executing
- 2016-07-24 20:39:50,056 - Wanda - DEBUG - doubler function ended with: 4
- 2016-07-24 20:39:50,056 - Dingbat - DEBUG - doubler function executing
- 2016-07-24 20:39:50,057 - Dingbat - DEBUG - doubler function ended with: 6
- 2016-07-24 20:39:50,057 - Nina - DEBUG - doubler function executing
- 2016-07-24 20:39:50,057 - Nina - DEBUG - doubler function ended with: 8
輸出結(jié)果不言自明,所以繼續(xù)介紹其他內(nèi)容。在本節(jié)中再多說一點,即通過繼承 threading.Thread實現(xiàn)多線程。舉最后一個例子,通過繼承 threading.Thread 創(chuàng)建子類,而不是直接調(diào)用 Thread 函數(shù)。
更新后的代碼如下:
- import logging
- import threading
- class MyThread(threading.Thread):
- def __init__(self, number, logger):
- threading.Thread.__init__(self)
- self.number = number
- self.logger = logger
- def run(self):
- """
- 運(yùn)行線程
- """
- logger.debug('Calling doubler')
- doubler(self.number, self.logger)
- def get_logger:
- logger = logging.getLogger("threading_example")
- logger.setLevel(logging.DEBUG)
- fh = logging.FileHandler("threading_class.log")
- fmt = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
- formatter = logging.Formatter(fmt)
- fh.setFormatter(formatter)
- logger.addHandler(fh)
- return logger
- def doubler(number, logger):
- """
- 可以被線程使用的一個函數(shù)
- """
- logger.debug('doubler function executing')
- result = number * 2
- logger.debug('doubler function ended with: {}'.format(
- result))
- if __name__ == '__main__':
- logger = get_logger
- thread_names = ['Mike', 'George', 'Wanda', 'Dingbat', 'Nina']
- for i in range(5):
- thread = MyThread(i, logger)
- thread.setName(thread_names[i])
- thread.start
這個例子中,我們只是創(chuàng)建一個繼承于 threading.Thread的子類。像之前一樣,傳入一個需要翻一番的數(shù)字,以及 logging 對象。但是這次,設(shè)置線程名稱的方式有點不太一樣,變成了通過調(diào)用 thread 對象的setName方法來設(shè)置。不過仍然需要調(diào)用start來啟動線程,不過你可能注意到我們并不需要在子類中定義該方法。當(dāng)調(diào)用start時,它會通過調(diào)用run方法來啟動線程。在我們的類中,我們調(diào)用 doubler 函數(shù)來做處理。輸出結(jié)果中除了一些添加的額外信息內(nèi)容幾乎差不多。運(yùn)行下這個腳本,看看你會得到什么。
線程鎖與線程同步
當(dāng)你有多個線程,就需要考慮怎樣避免線程沖突。我的意思是說,你可能遇到多個線程同時訪問同一資源的情況。如果不考慮這些問題并且制定相應(yīng)的解決方案,那么在開發(fā)產(chǎn)品過程中,你總會在最糟糕的時候遇到這些棘手的問題。
解決辦法就是使用線程鎖。鎖由 Python 的 threading 模塊提供,并且它最多被一個線程所持有。當(dāng)一個線程試圖獲取一個已經(jīng)鎖在資源上的鎖時,該線程通常會暫停運(yùn)行,直到這個鎖被釋放。來讓我們看一個非常典型沒有卻應(yīng)具備鎖功能的例子:
- import threading
- total = 0
- def update_total(amount):
- """
- Updates the total by the given amount
- """
- global total
- total += amount
- print (total)
- if __name__ == '__main__':
- for i in range(10):
- my_thread = threading.Thread(
- target=update_total, args=(5,))
- my_thread.start
如果往以上代碼添加 time.sleep函數(shù)并給出不同長度的時間,可能會讓這個例子更有意思。無論如何,這里的問題是,一個線程可能已經(jīng)調(diào)用update_total函數(shù)并且還沒有更新完成,此時另一個線程也有可能調(diào)用它并且嘗試更新內(nèi)容。根據(jù)操作執(zhí)行順序的不同,該值可能只被增加一次。
讓我們給這個函數(shù)添加鎖。有兩種方法可以實現(xiàn)。第一種方式是使用 try/finally,從而確保鎖肯定會被釋放。下面是示例:
- import threading
- total = 0
- lock = threading.Lock
- def update_total(amount):
- """
- Updates the total by the given amount
- """
- global total
- lock.acquire
- try:
- total += amount
- finally:
- lock.release
- print (total)
- if __name__ == '__main__':
- for i in range(10):
- my_thread = threading.Thread(
- target=update_total, args=(5,))
- my_thread.start
如上,在我們做任何處理之前就獲取鎖。然后嘗試更新 total 的值,最后釋放鎖并打印出 total 的當(dāng)前值。事實上,我們可以使用 Python 的 with語句避免使用 try/finally 這種較為繁瑣的語句:
- import threading
- total = 0
- lock = threading.Lock
- def update_total(amount):
- """
- Updates the total by the given amount
- """
- global total
- with lock:
- total += amount
- print (total)
- if __name__ == '__main__':
- for i in range(10):
- my_thread = threading.Thread(
- target=update_total, args=(5,))
- my_thread.start
正如你看到的那樣,我們不再需要 try/finally作為上下文管理器,而是由with語句作為替代。
當(dāng)然你也會遇到要在代碼中通過多個線程訪問多個函數(shù)的情況。當(dāng)你第一次編寫并發(fā)代碼時,代碼可能是這樣的:
- import threading
- total = 0
- lock = threading.Lock
- def do_something:
- lock.acquire
- try:
- print('Lock acquired in the do_something function')
- finally:
- lock.release
- print('Lock released in the do_something function')
- return "Done doing something"
- def do_something_else:
- lock.acquire
- try:
- print('Lock acquired in the do_something_else function')
- finally:
- lock.release
- print('Lock released in the do_something_else function')
- return "Finished something else"
- if __name__ == '__main__':
- result_one = do_something
- result_two = do_something_else
這樣的代碼在上面的情況下能夠正常工作,但假設(shè)你有多個線程都調(diào)用這兩個函數(shù)呢。當(dāng)一個線程正在運(yùn)行這兩個函數(shù),然后另外一個線程也可能會修改這些數(shù)據(jù),最后得到的就是不正確的結(jié)果。問題是,你甚至可能沒有馬上意識到結(jié)果錯了。有什么解決辦法呢?讓我們試著找出答案。
通常首先想到的就是在調(diào)用這兩個函數(shù)的地方上鎖。讓我們試著修改上面的例子,修改成如下所示:
- import threading
- total = 0
- lock = threading.RLock
- def do_something:
- with lock:
- print('Lock acquired in the do_something function')
- print('Lock released in the do_something function')
- return "Done doing something"
- def do_something_else:
- with lock:
- print('Lock acquired in the do_something_else function')
- print('Lock released in the do_something_else function')
- return "Finished something else"
- def main:
- with lock:
- result_one = do_something
- result_two = do_something_else
- print (result_one)
- print (result_two)
- if __name__ == '__main__':
- main
當(dāng)你真正運(yùn)行這段代碼時,你會發(fā)現(xiàn)它只是掛起了。究其原因,是因為我們只告訴 threading 模塊獲取鎖。所以當(dāng)我們調(diào)用第一個函數(shù)時,它發(fā)現(xiàn)鎖已經(jīng)被獲取,隨后便把自己掛起了,直到鎖被釋放,然而這將永遠(yuǎn)不會發(fā)生。
真正的解決辦法是使用重入鎖(Re-Entrant Lock)。threading 模塊提供的解決辦法是使用RLock函數(shù)。即把lock = threading.lock替換為lock = threading.RLock,然后重新運(yùn)行代碼,現(xiàn)在代碼就可以正常運(yùn)行了。
如果你想在線程中運(yùn)行以上代碼,那么你可以用以下代碼取代直接調(diào)用 main函數(shù):
- if __name__ == '__main__':
- for i in range(10):
- my_thread = threading.Thread(
- target=main)
- my_thread.start
每個線程都會運(yùn)行 main 函數(shù),main 函數(shù)則會依次調(diào)用另外兩個函數(shù)。最終也會產(chǎn)生 10 組結(jié)果集。
定時器
Threading 模塊有一個優(yōu)雅的 Timer類,你可以用它來實現(xiàn)在指定時間后要發(fā)生的動作。它們實際上會啟動自己的自定義線程,通過調(diào)用常規(guī)線程上的start方法即可運(yùn)行。你也可以調(diào)用它的cancel方法停止定時器。值得注意的是,你甚至可以在開始定時器之前取消它。
有一天,我遇到一個特殊的情況:我需要與已經(jīng)啟動的子進(jìn)程通信,但是我需要它有超時處理。雖然處理這種特殊問題有很多不同的方法,不過我最喜歡的解決方案是使用 threading 模塊的 Timer 類。
在下面這個例子中,我們將使用 ping指令作為演示。在 Linux 系統(tǒng)中,ping 命令會一直運(yùn)行下去直到你手動殺死它。所以在 Linux 世界里,Timer 類就顯得非常方便。示例如下:
- import subprocess
- from threading import Timer
- kill = lambda process: process.kill
- cmd = ['ping', 'www.google.com']
- ping = subprocess.Popen(
- cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- my_timer = Timer(5, kill, [ping])
- try:
- my_timer.start
- stdout, stderr = ping.communicate
- finally:
- my_timer.cancel
- print (str(stdout))
這里我們在 lambda 表達(dá)式中調(diào)用 kill 殺死進(jìn)程。接下來啟動 ping 命令,然后創(chuàng)建 Timer 對象。你會注意到,第一個參數(shù)就是需要等待的秒數(shù),第二個參數(shù)是需要調(diào)用的函數(shù),緊跟其后的參數(shù)是要調(diào)用函數(shù)的入?yún)ⅰT诒纠校覀兊暮瘮?shù)是一個 lambda 表達(dá)式,傳入的是一個只有一個元素的列表。如果你運(yùn)行這段代碼,它應(yīng)該會運(yùn)行 5 秒鐘,然后打印出 ping 的結(jié)果。
其他線程組件
Threading 模塊包含對其他功能的支持。例如,你可以創(chuàng)建信號量(Semaphore),這是計算機(jī)科學(xué)中最古老的同步原語之一。基本上,一個信號量管理一個內(nèi)置的計數(shù)器。當(dāng)你調(diào)用acquire時計數(shù)器就會遞減,相反當(dāng)你調(diào)用release時就會遞增。根據(jù)其設(shè)計,計數(shù)器的值無法小于零,所以如果正好在計數(shù)器為零時調(diào)用 acquire 方法,該方法將阻塞線程。
譯者注:通常使用信號量時都會初始化一個大于零的值,如 semaphore = threading.Semaphore(2)
另一個非常有用的同步工具就是事件(Event)。它允許你使用信號(signal)實現(xiàn)線程通信。在下一節(jié)中我們將舉一個使用事件的實例。
最后,在 Python 3.2 中加入了 Barrier對象。Barrier 是管理線程池中的同步原語,在線程池中多條線程需要相互等待對方。如果要傳遞 barrier,每一條線程都要調(diào)用wait方法,在其他線程調(diào)用該方法之前線程將會阻塞。全部調(diào)用之后將會同時釋放所有線程。
線程通信
某些情況下,你會希望線程之間互相通信。就像先前提到的,你可以通過創(chuàng)建 Event對象達(dá)到這個目的。但更常用的方法是使用隊列(Queue)。在我們的例子中,這兩種方式都會有所涉及。下面讓我們看看到底是什么樣子的:
- import threading
- from queue import Queue
- def creator(data, q):
- """
- 生成用于消費的數(shù)據(jù),等待消費者完成處理
- """
- print('Creating data and putting it on the queue')
- for item in data:
- evt = threading.Event
- q.put((item, evt))
- print('Waiting for data to be doubled')
- evt.wait
- def my_consumer(q):
- """
- 消費部分?jǐn)?shù)據(jù),并做處理
- 這里所做的只是將輸入翻一倍
- """
- while True:
- data, evt = q.get
- print('data found to be processed: {}'.format(data))
- processed = data * 2
- print(processed)
- evt.set
- q.task_done
- if __name__ == '__main__':
- q = Queue
- data = [5, 10, 13, -1]
- thread_one = threading.Thread(target=creator, args=(data, q))
- thread_two = threading.Thread(target=my_consumer, args=(q,))
- thread_one.start
- thread_two.start
- q.join
讓我們掰開揉碎分析一下。首先,我們有一個創(chuàng)建者(creator)函數(shù)(亦稱作生產(chǎn)者(producer)),我們用它來創(chuàng)建想要操作(或者消費)的數(shù)據(jù)。然后用另外一個函數(shù) my_consumer來處理剛才創(chuàng)建出來的數(shù)據(jù)。Creator 函數(shù)使用 Queue 的put方法向隊列中插入數(shù)據(jù),消費者將會持續(xù)不斷的檢測有沒有更多的數(shù)據(jù),當(dāng)發(fā)現(xiàn)有數(shù)據(jù)時就會處理數(shù)據(jù)。Queue 對象處理所有的獲取鎖和釋放鎖的過程,這些不用我們太關(guān)心。
在這個例子中,先創(chuàng)建一個列表,然后創(chuàng)建兩個線程,一個用作生產(chǎn)者,一個作為消費者。你會發(fā)現(xiàn),我們給兩個線程都傳遞了 Queue 對象,這兩個線程隱藏了關(guān)于鎖處理的細(xì)節(jié)。隊列實現(xiàn)了數(shù)據(jù)從第一個線程到第二個線程的傳遞。當(dāng)?shù)谝粋€線程把數(shù)據(jù)放入隊列時,同時也傳遞一個 Event 事件,緊接著掛起自己,等待該事件結(jié)束。在消費者側(cè),也就是第二個線程,則做數(shù)據(jù)處理工作。當(dāng)完成數(shù)據(jù)處理后就會調(diào)用 Event 事件的 set方法,通知第一個線程已經(jīng)把數(shù)據(jù)處理完畢了,可以繼續(xù)生產(chǎn)了。
最后一行代碼調(diào)用了 Queue 對象的 join方法,它會告知 Queue 等待所有線程結(jié)束。當(dāng)?shù)谝粋€線程把所有數(shù)據(jù)都放到隊列中,它也就運(yùn)行結(jié)束了。
結(jié)束語
以上涵蓋了關(guān)于線程的諸多方面,主要包括:
- 線程基礎(chǔ)知識
- 鎖的工作方式
- 什么是事件以及如何使用
- 如何使用定時器
- 通過 Queues/Events 實現(xiàn)線程間通信
現(xiàn)在你們知道如何使用線程以及線程擅長什么了,希望在你們的代碼中能有它們的用武之地。
標(biāo)題名稱:一篇文章讀懂Python多線程
網(wǎng)頁網(wǎng)址:http://www.5511xx.com/article/copipce.html


咨詢
建站咨詢
