實現 Python 的多工處理

多執行緒和多程序的運作

wyatthoho
23 min readJul 24, 2022

【零】程式、程序、執行緒

這篇文章會介紹 Python 提供的內建模組multiprocessingthreading,這兩個模組可以用來實現任務的多工處理、提升運算效率。這兩個模組看起來相似,但其實背後是透過不同的機制。在示範之前,需要先了解程式 (program)、程序 (process) 和執行緒 (thread) 之間的差異。

我查到上面這篇文章,內容寫得清楚明瞭,容我做個簡單的摘要。

  • 程式 (program):尚未載入記憶體之程式碼 (code),像是一座工廠的藍圖
  • 程序 (process):已執行且載入記憶體之程式,像是一座實體的工廠
  • 執行緒 (thread):進行運算排程的最小單位,像是工廠裡的工人

電腦裡的每個 CPU 核心 (core) 一次只能執行一個 process,且每個 process 是獨立的,而同一個 process 底下的 thread 則共享記憶體。

以我的電腦為例,處理器是 Intel® Core™ i7–11370H,有 4 個核心,每個核心有 2 條執行緒,所以總共有 8 個執行緒。因此可以將任務妥善分配給這些核心或執行緒,以達到最高的運算效率。

【壹】任務型態

於是回頭來看 Python 提供的multiprocessingthreading這兩種模組,可以想像multiprocessing的機制是建造許多工廠來執行任務,而threading便是在工廠裡雇用許多工人來增加執行效率。那麼這兩種模組之中,該採用哪一個比較好呢?這個問題需要考慮到任務的型態。

任務型態一般可以區分成 CPU 密集型 (CPU bound) 以及 I/O 密集型 (I/O bound):

  • CPU 密集型:執行該任務所耗費的時間,主要取決於 CPU 的運算速度
  • I/O 密集型:執行該任務所耗費的時間,主要來自於「等待輸入輸出」

舉例來說,剪輯一段高畫質影片,由於電腦要處理龐大的影音資訊,所以這個任務偏向 CPU 密集型;而股票交易程式因為要接收眾多使用者的下單資訊,並隨時更新即時股價,因此就偏向 I/O 密集型。 (例子是我亂舉的,可能不是舉得很好)

若屬於一個 CPU 密集型任務,那麼適合採用multiprocessing;反之,若屬於一個 I/O 密集型任務,則適合採用threading。以下便是multiprocessingthreading這兩種模組的實際示範。

【貳】Python 的多執行緒運算

零、範例情境解說

假設有間 UberEats 的餐廳正在營業,陸續接到三筆訂單,依序點了 825 份餐點,若每一道餐點的料理時間都一樣,那麼該如何安排廚師們有效率地完成這些餐點呢?

辛苦手繪的圖嗚嗚嗚,靈感來自於火影忍者的一樂拉麵

這一題,可以建立多個執行緒或是程序來代表廚師,而處理這三筆訂單就代表三個需要被完成的任務

一、建立多執行緒

首先,將threading模組 import 進來。接著,並利用threading.Thread()建立三個執行緒,也就是三位廚師,有兩個需要指定的參數:

  1. target:執行緒要執行的函式,也就是任務
  2. args:執行函式的輸入參數

由於三位廚師都必須進行料理任務,所以他們的target都指向CookMeals這個函式。料理函式要輸入廚師編號coolIdx及負責餐點數量meals,因此在args依序指定(1,8)(2,2)(3,5),表示廚師一號負責八份餐點、廚師二號負責兩份餐點、廚師三號負責五份餐點。

方法.start()表示命令三位廚師開始工作。方法.join()表示命令主執行緒必須等待三位廚師完成工作再執行後序的指令。

import threading
import time


def CookMeals(cookIdx, meals):
msg = 'Cook {}: Start to cook.'.format(cookIdx)
print(msg)

cookTime = meals
time.sleep(cookTime)

msg = 'Cook {}: End of cooking. ({} meals)'.format(cookIdx, meals)
print(msg)


if __name__ == '__main__':
cook1 = threading.Thread(target=CookMeals, args=(1,8))
cook2 = threading.Thread(target=CookMeals, args=(2,2))
cook3 = threading.Thread(target=CookMeals, args=(3,5))

cook1.start()
cook2.start()
cook3.start()

cook1.join()
cook2.join()
cook3.join()

執行結果如下,

Cook 1: Start to cook.
Cook 2: Start to cook.
Cook 3: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 3: End of cooking. (5 meals)
Cook 1: End of cooking. (8 meals)

觀察結果,廚師一號、二號、三號依序開始工作,而結束順序則是二號、三號、一號。因為廚師二號只負責兩份餐點,所以需要最少的時間就能完成,而廚師一號負責最多的八份餐點,因此最後完成。

二、物件導向寫法

若喜歡物件導向寫法的同學可以參考以下範例。這裡直接建立「廚師」物件Cook並且繼承threading.Thread,其執行的任務則寫在此物件的run方法,特別注意run是特定寫法,在呼叫.start()之後,便會自動執行run裡面的內容。

import threading
import time


class Cook(threading.Thread):
def __init__(self, cookIdx, meals):
super().__init__()
self.cookIdx = cookIdx
self.meals = meals

def run(self):
msg = 'Cook {}: Start to cook.'.format(self.cookIdx)
print(msg)

cookTime = self.meals
time.sleep(cookTime)

msg = 'Cook {}: End of cooking. ({} meals)'.format(self.cookIdx, self.meals)
print(msg)


if __name__ == '__main__':
cook1 = Cook(cookIdx=1, meals=8)
cook2 = Cook(cookIdx=2, meals=2)
cook3 = Cook(cookIdx=3, meals=5)

cook1.start()
cook2.start()
cook3.start()

cook1.join()
cook2.join()
cook3.join()

執行結果如下,

Cook 1: Start to cook.
Cook 2: Start to cook.
Cook 3: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 3: End of cooking. (5 meals)
Cook 1: End of cooking. (8 meals)

執行結果與前一範例相同。

三、佇列 Queue

上面兩個範例的想法都很簡單,既然有三筆訂單,那就分配給三位廚師負責。但假如餐廳裡只有兩位廚師,此狀況該如何分配?這時可以搭配使用queue模組,建立一個佇列 (queue),

my_queue = queue.Queue()

再把訂單放進這個佇列,

for meals in order:
my_queue.put(meals)

如此一來,便能夠命令兩位廚師從佇列中接單,

self.queue.get()

當有閒置的廚師,他會再看看佇列中是否有未完成的單,若有,那就去接單處理。

import threading
import time
import queue


class Cook(threading.Thread):
def __init__(self, cookIdx, queue):
super().__init__()
self.cookIdx = cookIdx
self.queue = queue

def run(self):
while not self.queue.empty():
meals = self.queue.get()

msg = 'Cook {}: Start to cook.'.format(self.cookIdx)
print(msg)

cookTime = meals
time.sleep(cookTime)

msg = 'Cook {}: End of cooking. ({} meals)'.format(self.cookIdx, meals)
print(msg)


if __name__ == '__main__':
orders = [8, 2, 5]

my_queue = queue.Queue()
for meals in orders:
my_queue.put(meals)

cook1 = Cook(cookIdx=1, queue=my_queue)
cook2 = Cook(cookIdx=2, queue=my_queue)

cook1.start()
cook2.start()

cook1.join()
cook2.join()

執行結果如下,

Cook 1: Start to cook.
Cook 2: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 2: Start to cook.
Cook 2: End of cooking. (5 meals)
Cook 1: End of cooking. (8 meals)

廚師一號和廚師二號先各自接了一組客人,由於廚師二號的客人只點了兩份餐點,因此很快就完成了。此時廚師一號還在忙著處理第一組客人的八份餐點,所以廚師二號就必須自動再去接第三組客人的五份餐點。

結果,廚師一號在處理完他的餐點以前,廚師二號就把另外兩組客人的餐點都消化完畢。對餐廳老闆來說,這是最省錢的安排,因為發現即使少聘用一位廚師,還是可以維持一樣的出餐效率。

但對第一組客人來說,可能會覺得不太開心,因為明明是最早到的客人,但是卻最晚拿到餐點。於是,以下要介紹的寫法便是為了因應這種窘境。

四、鎖定 Lock

這邊不只建立一個佇列,另外還建立了一把鎖,

my_lock = threading.Lock()

雖然名為鎖,但不妨想像成一間廚房,被指派訂單的廚師會要求使用這間廚房,

self.lock.acquire()

先搶到廚房的廚師便開始料理,沒搶到廚房的廚師只好在外等候,直到佔用廚房的廚師完成工作並將廚房釋出,

self.lock.release()

此時在外等候的廚師就可以接著使用,而原先完成工作的廚師必須換他出來等候。

import threading
import time
import queue


class Cook(threading.Thread):
def __init__(self, cookIdx, queue, lock):
super().__init__()
self.cookIdx = cookIdx
self.queue = queue
self.lock = lock

def run(self):
while not self.queue.empty():
meals = self.queue.get()
self.lock.acquire()

msg = 'Cook {}: Start to cook.'.format(self.cookIdx)
print(msg)

cookTime = meals
time.sleep(cookTime)

msg = 'Cook {}: End of cooking. ({} meals)'.format(self.cookIdx, meals)
print(msg)
self.lock.release()


if __name__ == '__main__':
orders = [8, 2, 5]

my_queue = queue.Queue()
for meals in orders:
my_queue.put(meals)

my_lock = threading.Lock()

cook1 = Cook(cookIdx=1, queue=my_queue, lock=my_lock)
cook2 = Cook(cookIdx=2, queue=my_queue, lock=my_lock)

cook1.start()
cook2.start()

cook1.join()
cook2.join()

執行結果如下,

Cook 1: Start to cook.
Cook 1: End of cooking. (8 meals)
Cook 2: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 1: Start to cook.
Cook 1: End of cooking. (5 meals)

最終達到了依序出單的目的。

【參】Python 的多程序運算

一、建立多程序

建立多程序和建立多執行緒的寫法雷同,只要把threading.Thread()替換為multiprocessing.Process()就好了。一樣要指定targetargs參數,並使用.start()執行以及使用.join()進行等待。

import multiprocessing
import time


def CookMeals(cookIdx, meals):
msg = 'Cook {}: Start to cook.'.format(cookIdx)
print(msg)

cookTime = meals
time.sleep(cookTime)

msg = 'Cook {}: End of cooking. ({} meals)'.format(cookIdx, meals)
print(msg)


if __name__ == '__main__':
cook1 = multiprocessing.Process(target=CookMeals, args=(1,8))
cook2 = multiprocessing.Process(target=CookMeals, args=(2,2))
cook3 = multiprocessing.Process(target=CookMeals, args=(3,5))

cook1.start()
cook2.start()
cook3.start()

cook1.join()
cook2.join()
cook3.join()
Cook 3: Start to cook.
Cook 2: Start to cook.
Cook 1: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 3: End of cooking. (5 meals)
Cook 1: End of cooking. (8 meals)

二、物件導向寫法

物件導向的寫法也一樣,只是把繼承類別替換為multiprocessing.Process()即可。

import multiprocessing
import time


class Cook(multiprocessing.Process):
def __init__(self, cookIdx, meals):
super().__init__()
self.cookIdx = cookIdx
self.meals = meals

def run(self):
msg = 'Cook {}: Start to cook.'.format(self.cookIdx)
print(msg)

cookTime = self.meals
time.sleep(cookTime)

msg = 'Cook {}: End of cooking. ({} meals)'.format(self.cookIdx, self.meals)
print(msg)


if __name__ == '__main__':
cook1 = Cook(cookIdx=1, meals=8)
cook2 = Cook(cookIdx=2, meals=2)
cook3 = Cook(cookIdx=3, meals=5)

cook1.start()
cook2.start()
cook3.start()

cook1.join()
cook2.join()
cook3.join()
Cook 1: Start to cook.
Cook 2: Start to cook.
Cook 3: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 3: End of cooking. (5 meals)
Cook 1: End of cooking. (8 meals)

三、佇列 Queue

若建立多程序,multiprocessing本身可以直接建立佇列,不需要額外import queue

multiprocessing.Queue()
import multiprocessing
import time


class Cook(multiprocessing.Process):
def __init__(self, cookIdx, queue):
super().__init__()
self.cookIdx = cookIdx
self.queue = queue

def run(self):
while not self.queue.empty():
meals = self.queue.get()

msg = 'Cook {}: Start to cook.'.format(self.cookIdx)
print(msg)

cookTime = meals
time.sleep(cookTime)

msg = 'Cook {}: End of cooking. ({} meals)'.format(self.cookIdx, meals)
print(msg)


if __name__ == '__main__':
orders = [8, 2, 5]

my_queue = multiprocessing.Queue()
for meals in orders:
my_queue.put(meals)

cook1 = Cook(cookIdx=1, queue=my_queue)
cook2 = Cook(cookIdx=2, queue=my_queue)

cook1.start()
cook2.start()

cook1.join()
cook2.join()
Cook 1: Start to cook.
Cook 2: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 2: Start to cook.
Cook 2: End of cooking. (5 meals)
Cook 1: End of cooking. (8 meals)

四、鎖定 Lock

同樣可以直接透過multiprocessing建立 lock,指令如下,

my_lock = multiprocessing.Lock()
import multiprocessing
import time


class Cook(multiprocessing.Process):
def __init__(self, cookIdx, queue, lock):
super().__init__()
self.cookIdx = cookIdx
self.queue = queue
self.lock = lock

def run(self):
while not self.queue.empty():
meals = self.queue.get()
self.lock.acquire()

msg = 'Cook {}: Start to cook.'.format(self.cookIdx)
print(msg)

cookTime = meals
time.sleep(cookTime)

msg = 'Cook {}: End of cooking. ({} meals)'.format(self.cookIdx, meals)
print(msg)
self.lock.release()


if __name__ == '__main__':
orders = [8, 2, 5]

my_queue = multiprocessing.Queue()
for meals in orders:
my_queue.put(meals)

my_lock = multiprocessing.Lock()

cook1 = Cook(cookIdx=1, queue=my_queue, lock=my_lock)
cook2 = Cook(cookIdx=2, queue=my_queue, lock=my_lock)

cook1.start()
cook2.start()

cook1.join()
cook2.join()
Cook 1: Start to cook.
Cook 1: End of cooking. (8 meals)
Cook 2: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 1: Start to cook.
Cook 1: End of cooking. (5 meals)

五、管道 Pipe

一開始提過,每個 process 之間的記憶體是獨立的,若不同的 process 要共享資訊,則要建立管道 (pipe) 來互相連通。

conn1, conn2 = multiprocessing.Pipe()

上面的指令會回傳兩個 connection 物件,可以想像為管道的頭尾兩端。套用到本文的範例,廚師能透過某一端的 connection 發送訊息,

self.conn1.send()

其他廚師們則能夠在另一端聽聽看管道裡是否存在訊息,

self.conn2.poll()

若確定要收下這個訊息,那麼就執行以下指令,

self.conn2.recv()

解釋以下的範例,如果某位廚師負責的訂單有太多餐點,那麼他就要從conn1送出求救訊號,並將一份餐點轉單出去,當其他廚師從conn2聽到訊息,就要接收這份多出來的餐點。

import multiprocessing
import time


class Cook(multiprocessing.Process):
def __init__(self, cookIdx, meals, conn1, conn2):
super().__init__()
self.cookIdx = cookIdx
self.meals = meals
self.conn1 = conn1
self.conn2 = conn2

def run(self):
msg = 'Cook {}: Start to cook.'.format(self.cookIdx)
print(msg)

if self.meals > 6:
sendMeals = 1
self.meals -= sendMeals
self.conn1.send({'Cook': self.cookIdx, 'sendMeals': sendMeals})

elif self.conn2.poll():
recvMeals = self.conn2.recv()['sendMeals']
self.meals += recvMeals

cookTime = self.meals
time.sleep(cookTime)

msg = 'Cook {}: End of cooking. ({} meals)'.format(self.cookIdx, self.meals)
print(msg)


if __name__ == '__main__':
conn1, conn2 = multiprocessing.Pipe()
cook1 = Cook(cookIdx=1, meals=8, conn1=conn1, conn2=conn2)
cook2 = Cook(cookIdx=2, meals=2, conn1=conn1, conn2=conn2)
cook3 = Cook(cookIdx=3, meals=5, conn1=conn1, conn2=conn2)

cook1.start()
cook2.start()
cook3.start()

cook1.join()
cook2.join()
cook3.join()

執行結果如下,

Cook 2: Start to cook.
Cook 1: Start to cook.
Cook 3: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 3: End of cooking. (6 meals)
Cook 1: End of cooking. (7 meals)

因為廚師一號要處理八份餐點,太多了,所以把其中一份轉了出去,最後他只做了七份餐點。而廚師三號接收到求救訊息,因此廚師三號就要幫忙多做一份餐點,最後做了六份。

六、程序池 Pool

建立一個程序池 (pool) 可以取代反覆的呼叫multiprocessing.Process()快速啟動多個程序,

pool = multiprocessing.Pool()

原先在multiprocessing.Process()必須指定參數targetargs,而 pool 的指定參數亦類似,

  1. func:程序們所要執行的函式
  2. iterable:由多個args所組成,每個args代表單一程序的輸入參數
import multiprocessing
import time


def CookMeals(cookIdx, meals):
msg = 'Cook {}: Start to cook.'.format(cookIdx)
print(msg)

cookTime = meals
time.sleep(cookTime)

msg = 'Cook {}: End of cooking. ({} meals)'.format(cookIdx, meals)
print(msg)


if __name__ == '__main__':
pool = multiprocessing.Pool(3)
pool.starmap(func=CookMeals, iterable=[(1, 8), (2, 2), (3, 5)])
pool.close()

執行結果如下,

Cook 1: Start to cook.
Cook 2: Start to cook.
Cook 3: Start to cook.
Cook 2: End of cooking. (2 meals)
Cook 3: End of cooking. (5 meals)
Cook 1: End of cooking. (8 meals)

若多程序的運算並未牽涉到較複雜的機制,例如 queue 和 pipe 等,那麼直接採用 pool 看似會更加方便啦~

--

--

wyatthoho

在混亂的宇宙裡,我透過寫程式來認識秩序並建立安定。wyatthoho@gmail.com