Producer-Consumer Problem in Python

Using Semaphore

When the buffer size is one, two semaphores will solve the problem.

import threading
import random
import time

queue = []
queueIsAvailable = threading.Semaphore(1)
dataIsAvailable = threading.Semaphore(0)

def producer():
    nums = range(5)
    global queue
    while True:
        num = random.choice(nums)

        queueIsAvailable.acquire()

        queue.append(num)
        print("Produced", num, queue)

        dataIsAvailable.release()

        time.sleep(random.randrange(0, 3))

def consumer():
    global queue
    while True:
        dataIsAvailable.acquire()

        num = queue.pop(0)
        print("Consumed", num, queue)

        queueIsAvailable.release()

        time.sleep(random.randrange(0, 3))

producerThread = threading.Thread(target=producer)
consumerThread = threading.Thread(target=consumer)

producerThread.start()
consumerThread.start()

When the buffer size is greater than one, this solution will not work since there is a race condition (Multiple processes accessing the buffer (queue) at the same time).

Adding a mutex (I used Python’s Lock primitive) will solve the problem.

import threading
import random
import time

queue = []
queueIsAvailable = threading.Semaphore(5)
dataIsAvailable = threading.Semaphore(0)
mutex = threading.Lock()

def producer():
    nums = range(5)
    global queue
    while True:
        num = random.choice(nums)

        queueIsAvailable.acquire()

        mutex.acquire()         # added

        queue.append(num)
        print("Produced", num, queue)

        mutex.release()         # added

        dataIsAvailable.release()

        time.sleep(random.randrange(0, 3))

def consumer():
    global queue
    while True:
        dataIsAvailable.acquire()

        mutex.acquire()         # added

        num = queue.pop(0)
        print("Consumed", num, queue)

        mutex.release()         # added

        queueIsAvailable.release()

        time.sleep(random.randrange(0, 3))

producerThread = threading.Thread(target=producer)
consumerThread = threading.Thread(target=consumer)

producerThread.start()
consumerThread.start()

Using Condition

We can use a condition variable supported by Python to solve the problem. Note that we used the condition variable to suspend both producer thread and consumer thread.

import threading
import random
import time

queue = []
MAX_SIZE = 5
cv = threading.Condition()

def producer():
    nums = range(5)
    global queue
    while True:
        num = random.choice(nums)

        cv.acquire()

        while len(queue) >= MAX_SIZE:
                cv.wait()

        queue.append(num)
        print("Produced", num, queue)

        cv.notify()

        cv.release()

        time.sleep(random.randrange(0, 3))

def consumer():
    global queue
    while True:

        cv.acquire()

        while len(queue) < 1:
                cv.wait()

        num = queue.pop(0)
        print("Consumed", num, queue)

        cv.notify()

        cv.release()

        time.sleep(random.randrange(0, 3))

producerThread = threading.Thread(target=producer)
consumerThread = threading.Thread(target=consumer)

producerThread.start()
consumerThread.start()

Comments