Using Semaphore
When the buffer size is one, two semaphores will solve the problem.
import threading
import random
import time
queue = []
queueIsAvailable = threading.Semaphore(1)
dataIsAvailable = threading.Semaphore(0)
def producer():
nums = range(5)
global queue
while True:
num = random.choice(nums)
queueIsAvailable.acquire()
queue.append(num)
print("Produced", num, queue)
dataIsAvailable.release()
time.sleep(random.randrange(0, 3))
def consumer():
global queue
while True:
dataIsAvailable.acquire()
num = queue.pop(0)
print("Consumed", num, queue)
queueIsAvailable.release()
time.sleep(random.randrange(0, 3))
producerThread = threading.Thread(target=producer)
consumerThread = threading.Thread(target=consumer)
producerThread.start()
consumerThread.start()
When the buffer size is greater than one, this solution will not work since there is a race condition (Multiple processes accessing the buffer (queue) at the same time).
Adding a mutex (I used Python’s Lock primitive) will solve the problem.
import threading
import random
import time
queue = []
queueIsAvailable = threading.Semaphore(5)
dataIsAvailable = threading.Semaphore(0)
mutex = threading.Lock()
def producer():
nums = range(5)
global queue
while True:
num = random.choice(nums)
queueIsAvailable.acquire()
mutex.acquire() # added
queue.append(num)
print("Produced", num, queue)
mutex.release() # added
dataIsAvailable.release()
time.sleep(random.randrange(0, 3))
def consumer():
global queue
while True:
dataIsAvailable.acquire()
mutex.acquire() # added
num = queue.pop(0)
print("Consumed", num, queue)
mutex.release() # added
queueIsAvailable.release()
time.sleep(random.randrange(0, 3))
producerThread = threading.Thread(target=producer)
consumerThread = threading.Thread(target=consumer)
producerThread.start()
consumerThread.start()
Using Condition
We can use a condition variable supported by Python to solve the problem. Note that we used the condition variable to suspend both producer thread and consumer thread.
import threading
import random
import time
queue = []
MAX_SIZE = 5
cv = threading.Condition()
def producer():
nums = range(5)
global queue
while True:
num = random.choice(nums)
cv.acquire()
while len(queue) >= MAX_SIZE:
cv.wait()
queue.append(num)
print("Produced", num, queue)
cv.notify()
cv.release()
time.sleep(random.randrange(0, 3))
def consumer():
global queue
while True:
cv.acquire()
while len(queue) < 1:
cv.wait()
num = queue.pop(0)
print("Consumed", num, queue)
cv.notify()
cv.release()
time.sleep(random.randrange(0, 3))
producerThread = threading.Thread(target=producer)
consumerThread = threading.Thread(target=consumer)
producerThread.start()
consumerThread.start()
Comments