Python實現線程安全隊列
最近學習spark,我主要使用pyspark api進行編程。
之前使用Python都是現學現用,用完就忘了也沒有理解和記憶,因此這里把Python相關的知識也彌補和記錄下來吧
多線程任務隊列在實際項目中非常有用,關鍵的地方要實現隊列的多線程同步問題,也即保證隊列的多線程安全
例如:可以開多個消費者線程,每個線程上綁定一個隊列,這樣就實現了多個消費者同時處理不同隊列上的任務
同時可以有多個生產者往隊列發送消息,實現異步消息處理
先復習下互斥量和條件變量的概念:
互斥量(mutex)從本質上說是一把鎖,在訪問共享資源前對互斥量進行加鎖,在訪問完成后釋放互斥量上的鎖。對互斥量進行加鎖以后,任何其他試圖再次對互斥鎖加鎖的線程將會阻塞直到當前線程釋放該互斥鎖。如果釋放互斥鎖時有多個線程阻塞,所有在該互斥鎖上的阻塞線程都會變成可運行狀態,第一個變為運行狀態的線程可以對互斥鎖加鎖,其他線程將會看到互斥鎖依然被鎖住,只能回去再次等待它重新變為可用。
條件變量(cond)是在多線程程序中用來實現"等待--》喚醒"邏輯常用的方法。條件變量利用線程間共享的全局變量進行同步的一種機制,主要包括兩個動作:一個線程等待"條件變量的條件成立"而掛起;另一個線程使“條件成立”。為了防止競爭,條件變量的使用總是和一個互斥鎖結合在一起。線程在改變條件狀態前必須首先鎖住互斥量,函數pthread_cond_wait把自己放到等待條件的線程列表上,然后對互斥鎖解鎖(這兩個操作是原子操作)。在函數返回時,互斥量再次被鎖住
條件變量總是與互斥鎖一起使用的
Python的threading中定義了兩種鎖:threading.Lock和threading.RLock
兩者的不同在于后者是可重入鎖,也就是說在一個線程內重復LOCK同一個鎖不會發生死鎖,這與POSIX中的PTHREAD_MUTEX_RECURSIVE也就是可遞歸鎖的概念是相同的, 互斥鎖的API有三個函數,分別執行分配鎖,上鎖,解鎖操作。

Python的threading中的條件變量默認綁定了一個RLock,也可以在初始化條件變量的時候傳進去一個自己定義的鎖.

最后貼出我自己實現的簡單線程安全任務隊列

測試代碼
