實現 Python 的多工處理
【零】程式、程序、執行緒
這篇文章會介紹 Python 提供的內建模組multiprocessing
和threading
,這兩個模組可以用來實現任務的多工處理、提升運算效率。這兩個模組看起來相似,但其實背後是透過不同的機制。在示範之前,需要先了解程式 (program)、程序 (process) 和執行緒 (thread) 之間的差異。
我查到上面這篇文章,內容寫得清楚明瞭,容我做個簡單的摘要。
- 程式 (program):尚未載入記憶體之程式碼 (code),像是一座工廠的藍圖
- 程序 (process):已執行且載入記憶體之程式,像是一座實體的工廠
- 執行緒 (thread):進行運算排程的最小單位,像是工廠裡的工人
電腦裡的每個 CPU 核心 (core) 一次只能執行一個 process,且每個 process 是獨立的,而同一個 process 底下的 thread 則共享記憶體。
以我的電腦為例,處理器是 Intel® Core™ i7–11370H,有 4 個核心,每個核心有 2 條執行緒,所以總共有 8 個執行緒。因此可以將任務妥善分配給這些核心或執行緒,以達到最高的運算效率。
【壹】任務型態
於是回頭來看 Python 提供的multiprocessing
和threading
這兩種模組,可以想像multiprocessing
的機制是建造許多工廠來執行任務,而threading
便是在工廠裡雇用許多工人來增加執行效率。那麼這兩種模組之中,該採用哪一個比較好呢?這個問題需要考慮到任務的型態。
任務型態一般可以區分成 CPU 密集型 (CPU bound) 以及 I/O 密集型 (I/O bound):
- CPU 密集型:執行該任務所耗費的時間,主要取決於 CPU 的運算速度
- I/O 密集型:執行該任務所耗費的時間,主要來自於「等待輸入輸出」
舉例來說,剪輯一段高畫質影片,由於電腦要處理龐大的影音資訊,所以這個任務偏向 CPU 密集型;而股票交易程式因為要接收眾多使用者的下單資訊,並隨時更新即時股價,因此就偏向 I/O 密集型。 (例子是我亂舉的,可能不是舉得很好)
若屬於一個 CPU 密集型任務,那麼適合採用multiprocessing
;反之,若屬於一個 I/O 密集型任務,則適合採用threading
。以下便是multiprocessing
和threading
這兩種模組的實際示範。
【貳】Python 的多執行緒運算
零、範例情境解說
假設有間 UberEats 的餐廳正在營業,陸續接到三筆訂單,依序點了 8、2、5 份餐點,若每一道餐點的料理時間都一樣,那麼該如何安排廚師們有效率地完成這些餐點呢?
這一題,可以建立多個執行緒或是程序來代表廚師,而處理這三筆訂單就代表三個需要被完成的任務。
一、建立多執行緒
首先,將threading
模組 import 進來。接著,並利用threading.Thread()
建立三個執行緒,也就是三位廚師,有兩個需要指定的參數:
target
:執行緒要執行的函式,也就是任務args
:執行函式的輸入參數
由於三位廚師都必須進行料理任務,所以他們的target
都指向CookMeals
這個函式。料理函式要輸入廚師編號coolIdx
及負責餐點數量meals
,因此在args
依序指定(1,8)
、(2,2)
及(3,5)
,表示廚師一號負責八份餐點、廚師二號負責兩份餐點、廚師三號負責五份餐點。
方法.start()
表示命令三位廚師開始工作。方法.join()
表示命令主執行緒必須等待三位廚師完成工作再執行後序的指令。
import threading
import time
def CookMeals(cookIdx, meals):
msg = 'Cook {}: Start to cook.'.format(cookIdx)
print(msg)
cookTime = meals
time.sleep(cookTime)
msg = 'Cook {}: End of cooking. ({} meals)'.format(cookIdx, meals)
print(msg)
if __name__ == '__main__':
cook1 = threading.Thread(target=CookMeals, args=(1,8))
cook2 = threading.Thread(target=CookMeals, args=(2,2))
cook3 = threading.Thread(target=CookMeals, args=(3,5))
cook1.start()
cook2.start()
cook3.start()
cook1.join()
cook2.join()
cook3.join()
執行結果如下,
Cook 1: Start to cook.
Cook 2: Start to cook.
Cook 3: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 3: End of cooking. (5 meals)
Cook 1: End of cooking. (8 meals)
觀察結果,廚師一號、二號、三號依序開始工作,而結束順序則是二號、三號、一號。因為廚師二號只負責兩份餐點,所以需要最少的時間就能完成,而廚師一號負責最多的八份餐點,因此最後完成。
二、物件導向寫法
若喜歡物件導向寫法的同學可以參考以下範例。這裡直接建立「廚師」物件Cook
並且繼承threading.Thread
,其執行的任務則寫在此物件的run
方法,特別注意run
是特定寫法,在呼叫.start()
之後,便會自動執行run
裡面的內容。
import threading
import time
class Cook(threading.Thread):
def __init__(self, cookIdx, meals):
super().__init__()
self.cookIdx = cookIdx
self.meals = meals
def run(self):
msg = 'Cook {}: Start to cook.'.format(self.cookIdx)
print(msg)
cookTime = self.meals
time.sleep(cookTime)
msg = 'Cook {}: End of cooking. ({} meals)'.format(self.cookIdx, self.meals)
print(msg)
if __name__ == '__main__':
cook1 = Cook(cookIdx=1, meals=8)
cook2 = Cook(cookIdx=2, meals=2)
cook3 = Cook(cookIdx=3, meals=5)
cook1.start()
cook2.start()
cook3.start()
cook1.join()
cook2.join()
cook3.join()
執行結果如下,
Cook 1: Start to cook.
Cook 2: Start to cook.
Cook 3: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 3: End of cooking. (5 meals)
Cook 1: End of cooking. (8 meals)
執行結果與前一範例相同。
三、佇列 Queue
上面兩個範例的想法都很簡單,既然有三筆訂單,那就分配給三位廚師負責。但假如餐廳裡只有兩位廚師,此狀況該如何分配?這時可以搭配使用queue
模組,建立一個佇列 (queue),
my_queue = queue.Queue()
再把訂單放進這個佇列,
for meals in order:
my_queue.put(meals)
如此一來,便能夠命令兩位廚師從佇列中接單,
self.queue.get()
當有閒置的廚師,他會再看看佇列中是否有未完成的單,若有,那就去接單處理。
import threading
import time
import queue
class Cook(threading.Thread):
def __init__(self, cookIdx, queue):
super().__init__()
self.cookIdx = cookIdx
self.queue = queue
def run(self):
while not self.queue.empty():
meals = self.queue.get()
msg = 'Cook {}: Start to cook.'.format(self.cookIdx)
print(msg)
cookTime = meals
time.sleep(cookTime)
msg = 'Cook {}: End of cooking. ({} meals)'.format(self.cookIdx, meals)
print(msg)
if __name__ == '__main__':
orders = [8, 2, 5]
my_queue = queue.Queue()
for meals in orders:
my_queue.put(meals)
cook1 = Cook(cookIdx=1, queue=my_queue)
cook2 = Cook(cookIdx=2, queue=my_queue)
cook1.start()
cook2.start()
cook1.join()
cook2.join()
執行結果如下,
Cook 1: Start to cook.
Cook 2: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 2: Start to cook.
Cook 2: End of cooking. (5 meals)
Cook 1: End of cooking. (8 meals)
廚師一號和廚師二號先各自接了一組客人,由於廚師二號的客人只點了兩份餐點,因此很快就完成了。此時廚師一號還在忙著處理第一組客人的八份餐點,所以廚師二號就必須自動再去接第三組客人的五份餐點。
結果,廚師一號在處理完他的餐點以前,廚師二號就把另外兩組客人的餐點都消化完畢。對餐廳老闆來說,這是最省錢的安排,因為發現即使少聘用一位廚師,還是可以維持一樣的出餐效率。
但對第一組客人來說,可能會覺得不太開心,因為明明是最早到的客人,但是卻最晚拿到餐點。於是,以下要介紹的寫法便是為了因應這種窘境。
四、鎖定 Lock
這邊不只建立一個佇列,另外還建立了一把鎖,
my_lock = threading.Lock()
雖然名為鎖,但不妨想像成一間廚房,被指派訂單的廚師會要求使用這間廚房,
self.lock.acquire()
先搶到廚房的廚師便開始料理,沒搶到廚房的廚師只好在外等候,直到佔用廚房的廚師完成工作並將廚房釋出,
self.lock.release()
此時在外等候的廚師就可以接著使用,而原先完成工作的廚師必須換他出來等候。
import threading
import time
import queue
class Cook(threading.Thread):
def __init__(self, cookIdx, queue, lock):
super().__init__()
self.cookIdx = cookIdx
self.queue = queue
self.lock = lock
def run(self):
while not self.queue.empty():
meals = self.queue.get()
self.lock.acquire()
msg = 'Cook {}: Start to cook.'.format(self.cookIdx)
print(msg)
cookTime = meals
time.sleep(cookTime)
msg = 'Cook {}: End of cooking. ({} meals)'.format(self.cookIdx, meals)
print(msg)
self.lock.release()
if __name__ == '__main__':
orders = [8, 2, 5]
my_queue = queue.Queue()
for meals in orders:
my_queue.put(meals)
my_lock = threading.Lock()
cook1 = Cook(cookIdx=1, queue=my_queue, lock=my_lock)
cook2 = Cook(cookIdx=2, queue=my_queue, lock=my_lock)
cook1.start()
cook2.start()
cook1.join()
cook2.join()
執行結果如下,
Cook 1: Start to cook.
Cook 1: End of cooking. (8 meals)
Cook 2: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 1: Start to cook.
Cook 1: End of cooking. (5 meals)
最終達到了依序出單的目的。
【參】Python 的多程序運算
一、建立多程序
建立多程序和建立多執行緒的寫法雷同,只要把threading.Thread()
替換為multiprocessing.Process()
就好了。一樣要指定target
和args
參數,並使用.start()
執行以及使用.join()
進行等待。
import multiprocessing
import time
def CookMeals(cookIdx, meals):
msg = 'Cook {}: Start to cook.'.format(cookIdx)
print(msg)
cookTime = meals
time.sleep(cookTime)
msg = 'Cook {}: End of cooking. ({} meals)'.format(cookIdx, meals)
print(msg)
if __name__ == '__main__':
cook1 = multiprocessing.Process(target=CookMeals, args=(1,8))
cook2 = multiprocessing.Process(target=CookMeals, args=(2,2))
cook3 = multiprocessing.Process(target=CookMeals, args=(3,5))
cook1.start()
cook2.start()
cook3.start()
cook1.join()
cook2.join()
cook3.join()
Cook 3: Start to cook.
Cook 2: Start to cook.
Cook 1: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 3: End of cooking. (5 meals)
Cook 1: End of cooking. (8 meals)
二、物件導向寫法
物件導向的寫法也一樣,只是把繼承類別替換為multiprocessing.Process()
即可。
import multiprocessing
import time
class Cook(multiprocessing.Process):
def __init__(self, cookIdx, meals):
super().__init__()
self.cookIdx = cookIdx
self.meals = meals
def run(self):
msg = 'Cook {}: Start to cook.'.format(self.cookIdx)
print(msg)
cookTime = self.meals
time.sleep(cookTime)
msg = 'Cook {}: End of cooking. ({} meals)'.format(self.cookIdx, self.meals)
print(msg)
if __name__ == '__main__':
cook1 = Cook(cookIdx=1, meals=8)
cook2 = Cook(cookIdx=2, meals=2)
cook3 = Cook(cookIdx=3, meals=5)
cook1.start()
cook2.start()
cook3.start()
cook1.join()
cook2.join()
cook3.join()
Cook 1: Start to cook.
Cook 2: Start to cook.
Cook 3: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 3: End of cooking. (5 meals)
Cook 1: End of cooking. (8 meals)
三、佇列 Queue
若建立多程序,multiprocessing
本身可以直接建立佇列,不需要額外import queue
,
multiprocessing.Queue()
import multiprocessing
import time
class Cook(multiprocessing.Process):
def __init__(self, cookIdx, queue):
super().__init__()
self.cookIdx = cookIdx
self.queue = queue
def run(self):
while not self.queue.empty():
meals = self.queue.get()
msg = 'Cook {}: Start to cook.'.format(self.cookIdx)
print(msg)
cookTime = meals
time.sleep(cookTime)
msg = 'Cook {}: End of cooking. ({} meals)'.format(self.cookIdx, meals)
print(msg)
if __name__ == '__main__':
orders = [8, 2, 5]
my_queue = multiprocessing.Queue()
for meals in orders:
my_queue.put(meals)
cook1 = Cook(cookIdx=1, queue=my_queue)
cook2 = Cook(cookIdx=2, queue=my_queue)
cook1.start()
cook2.start()
cook1.join()
cook2.join()
Cook 1: Start to cook.
Cook 2: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 2: Start to cook.
Cook 2: End of cooking. (5 meals)
Cook 1: End of cooking. (8 meals)
四、鎖定 Lock
同樣可以直接透過multiprocessing
建立 lock,指令如下,
my_lock = multiprocessing.Lock()
import multiprocessing
import time
class Cook(multiprocessing.Process):
def __init__(self, cookIdx, queue, lock):
super().__init__()
self.cookIdx = cookIdx
self.queue = queue
self.lock = lock
def run(self):
while not self.queue.empty():
meals = self.queue.get()
self.lock.acquire()
msg = 'Cook {}: Start to cook.'.format(self.cookIdx)
print(msg)
cookTime = meals
time.sleep(cookTime)
msg = 'Cook {}: End of cooking. ({} meals)'.format(self.cookIdx, meals)
print(msg)
self.lock.release()
if __name__ == '__main__':
orders = [8, 2, 5]
my_queue = multiprocessing.Queue()
for meals in orders:
my_queue.put(meals)
my_lock = multiprocessing.Lock()
cook1 = Cook(cookIdx=1, queue=my_queue, lock=my_lock)
cook2 = Cook(cookIdx=2, queue=my_queue, lock=my_lock)
cook1.start()
cook2.start()
cook1.join()
cook2.join()
Cook 1: Start to cook.
Cook 1: End of cooking. (8 meals)
Cook 2: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 1: Start to cook.
Cook 1: End of cooking. (5 meals)
五、管道 Pipe
一開始提過,每個 process 之間的記憶體是獨立的,若不同的 process 要共享資訊,則要建立管道 (pipe) 來互相連通。
conn1, conn2 = multiprocessing.Pipe()
上面的指令會回傳兩個 connection 物件,可以想像為管道的頭尾兩端。套用到本文的範例,廚師能透過某一端的 connection 發送訊息,
self.conn1.send()
其他廚師們則能夠在另一端聽聽看管道裡是否存在訊息,
self.conn2.poll()
若確定要收下這個訊息,那麼就執行以下指令,
self.conn2.recv()
解釋以下的範例,如果某位廚師負責的訂單有太多餐點,那麼他就要從conn1
送出求救訊號,並將一份餐點轉單出去,當其他廚師從conn2
聽到訊息,就要接收這份多出來的餐點。
import multiprocessing
import time
class Cook(multiprocessing.Process):
def __init__(self, cookIdx, meals, conn1, conn2):
super().__init__()
self.cookIdx = cookIdx
self.meals = meals
self.conn1 = conn1
self.conn2 = conn2
def run(self):
msg = 'Cook {}: Start to cook.'.format(self.cookIdx)
print(msg)
if self.meals > 6:
sendMeals = 1
self.meals -= sendMeals
self.conn1.send({'Cook': self.cookIdx, 'sendMeals': sendMeals})
elif self.conn2.poll():
recvMeals = self.conn2.recv()['sendMeals']
self.meals += recvMeals
cookTime = self.meals
time.sleep(cookTime)
msg = 'Cook {}: End of cooking. ({} meals)'.format(self.cookIdx, self.meals)
print(msg)
if __name__ == '__main__':
conn1, conn2 = multiprocessing.Pipe()
cook1 = Cook(cookIdx=1, meals=8, conn1=conn1, conn2=conn2)
cook2 = Cook(cookIdx=2, meals=2, conn1=conn1, conn2=conn2)
cook3 = Cook(cookIdx=3, meals=5, conn1=conn1, conn2=conn2)
cook1.start()
cook2.start()
cook3.start()
cook1.join()
cook2.join()
cook3.join()
執行結果如下,
Cook 2: Start to cook.
Cook 1: Start to cook.
Cook 3: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 3: End of cooking. (6 meals)
Cook 1: End of cooking. (7 meals)
因為廚師一號要處理八份餐點,太多了,所以把其中一份轉了出去,最後他只做了七份餐點。而廚師三號接收到求救訊息,因此廚師三號就要幫忙多做一份餐點,最後做了六份。
六、程序池 Pool
建立一個程序池 (pool) 可以取代反覆的呼叫multiprocessing.Process()
快速啟動多個程序,
pool = multiprocessing.Pool()
原先在multiprocessing.Process()
必須指定參數target
和args
,而 pool 的指定參數亦類似,
func
:程序們所要執行的函式iterable
:由多個args
所組成,每個args
代表單一程序的輸入參數
import multiprocessing
import time
def CookMeals(cookIdx, meals):
msg = 'Cook {}: Start to cook.'.format(cookIdx)
print(msg)
cookTime = meals
time.sleep(cookTime)
msg = 'Cook {}: End of cooking. ({} meals)'.format(cookIdx, meals)
print(msg)
if __name__ == '__main__':
pool = multiprocessing.Pool(3)
pool.starmap(func=CookMeals, iterable=[(1, 8), (2, 2), (3, 5)])
pool.close()
執行結果如下,
Cook 1: Start to cook.
Cook 2: Start to cook.
Cook 3: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 3: End of cooking. (5 meals)
Cook 1: End of cooking. (8 meals)
若多程序的運算並未牽涉到較複雜的機制,例如 queue 和 pipe 等,那麼直接採用 pool 看似會更加方便啦~