Python Queue examples

Tags: , , ,

In this post, I will discuss how to use the python Queue module. This module implements queues for multiple thread programming. Specifically, the python Queue object can be easily used to solve the multi-producer, multi-consumer problem, where messages must be exchanged safely between multiple threads.  As the locking semantics have already been implemented in the Queue class, you don’t need to handle the low level lock, unlock operations, which can easily cause the dead lock problems.

Note

Tips: queue is one of the most widely used data structures in computer science. It is a First In First Out (FIFO) data structure: the first element added to the queue will be the first one to be removed.  There is an excellent article for learning queue

Python FIFO Queue

The python Queue class implements a basic first-in, first-out collection. Items can be added to the end of the container using put(), and removed from the head using get().

The constructor for a FIFO queue is as follows:

class Queue.Queue(maxsize=0)

The parameter maxsize is an integer used to limit the items that can be added into the queue.

Insertion will be blocked once the queue is full, until items are consumed.  The queue size is infinite if maxsize <= 0.

See the following example for how to use the FIFO queue:

In the above example, the queue is used in a single thread. You can see the items removed from the queue are in the same order as they are inserted. 

Python LIFO Queue

The python Queue model also provides the LifoQueue class, which implements the Last in First out data structure, normally called stack.  In Stack, when calling put(), the item is added in the head of the container. When calling get(), the item is also removed from the head. 

The Constructor for a LIFO queue is as follows:

class Queue.LifoQueue(maxsize=0)

maxsize is an integer used to limit the number of items that can be added into the queue. Insertion will be blocked once the queue is full, until the items are consumed by calling get() and accomplished by calling task_done().  The queue size will be infinite if maxsize <= 0.

 See the following example on how to use python LIFO queue:

Python PriorityQueue

The order of items in both the FIFO Queue and LIFO Queue are related to the order of insertion. However, there are many cases, the order of items in a queue are determined based on their importance or priority. For instance, to develop a task scheduler, one common design is to server the most agent task in the queue.  PriorityQueue is such a data structure that can be used pick up the items from the queue based on their priority value. 

The constructor of python PriorityQueue is as follows:

class Queue.PriorityQueue(maxsize=0)

maxsize is an integer used to limit the items that can be placed in the queue. Insertion will block once queue is full, until queue items are consumed . The queue size is infinite if the maxsize is negative or zero.

Please note that: The lowest valued entries are retrieved first. A typical pattern for entries is a tuple in this form: (the_priority_number, data_value).

In the following example, we use Python PriorityQueue to simulate a task scheduler.  You can see that after put the tasks into the queue, the tasks are removed from the queue based on their priority score. Please note that: an item with the lowest value of the priority score will be removed first from the queue.

Python Queue for multiple Thread programming

In this section, I will show how to solve the multiple producer and consumer problem using python Queue class. First of all, let’s look at what methods are provided by the Queue class in terms of multiple thread computing. 

Python Queue methods

The most important methods are put(), get() and join(), task_done() and join(). Here are the descriptions of these methods:

Queue.put(item[, block[, timeout]])

Put item into the queue. In default,  block is true and timeout is None. the queue will block until a free slot is available.

If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no space was available within that time.

When block is false, put an item on the queue if a free slot is immediately available, else raise the Full exception.

Queue.put_nowait(item)

Equivalent to put(item, False).

Queue.get([block[, timeout]])

Remove and return an item from the queue. In default,  block is true and timeout is None, it  blocks if necessary until an item is available.

If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time.

If block is false, return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

Queue.get_nowait()

Equivalent to get(False).

Queue.task_done()

This method is used by queue consumer threads to indicate that a formerly enqueued task is complete.  

For each task fetched by calling get(), a subsequent call totask_done() indicates the queue that the processing on that task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Queue.join()

Blocks until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread callstask_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join()unblocks.

 
Some other methods:
Queue.qsize()

Return the approximate size of the queue. Note, qsize() > 0 doesn’t guarantee that a subsequent get() will not block, nor will qsize() < maxsize guarantee that put() will not block.

Queue.empty()

Return True if the queue is empty, False otherwise. If empty() returns True it doesn’t guarantee that a subsequent call to put() will not block. Similarly, if empty() returns False it doesn’t guarantee that a subsequent call to get() will not block.

Queue.full()

Return True if the queue is full, False otherwise. If full() returns True it doesn’t guarantee that a subsequent call to get() will not block. Similarly, if full() returns False it doesn’t guarantee that a subsequent call to put() will not block.

exception Queue.Empty

Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty.

exception Queue.Full

Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full. 

Use Python Queue for multithread programming 

In this section, I show how to use python Queue to solve the one producer and multiple consumer program. The producer keep putting new items to the queue, and consumer will pick up items from the queue, process it, then call task_done() to indicate the queue that one item has been fetched and successfully processed. 

The output is:

Thread: ConsumerThread-0 start get item from queue[current size = 0] at time = 22:22:47

Thread: ConsumerThread-1 start get item from queue[current size = 0] at time = 22:22:47

Thread: ConsumerThread-2 start get item from queue[current size = 0] at time = 22:22:47

Thread: ProducerThread start put item into queue[current size = 0] at time = 22:22:47

Thread: ProducerThread successfully put item into queue[current size = 1] at time = 22:22:47

Thread: ProducerThread start put item into queue[current size = 1] at time = 22:22:47

Thread: ProducerThread successfully put item into queue[current size = 1] at time = 22:22:47

Thread: ProducerThread start put item into queue[current size = 1] at time = 22:22:47

Thread: ProducerThread successfully put item into queue[current size = 2] at time = 22:22:47

Thread: ProducerThread start put item into queue[current size = 2] at time = 22:22:47

Thread: ProducerThread successfully put item into queue[current size = 3] at time = 22:22:47

Thread: ProducerThread start put item into queue[current size = 1] at time = 22:22:47

Thread: ProducerThread successfully put item into queue[current size = 2] at time = 22:22:47

Thread: ConsumerThread-1 finish process item from queue[current size = 2] at time = 22:22:50

Thread: ConsumerThread-1 start get item from queue[current size = 2] at time = 22:22:50
Thread: ConsumerThread-0 finish process item from queue[current size = 2] at time = 22:22:50

Thread: ConsumerThread-0 start get item from queue[current size = 2] at time = 22:22:50

Thread: ConsumerThread-2 finish process item from queue[current size = 0] at time = 22:22:50

Thread: ConsumerThread-2 start get item from queue[current size = 0] at time = 22:22:50

Thread: ConsumerThread-1 finish process item from queue[current size = 0] at time = 22:22:53

Thread: ConsumerThread-0 finish process item from queue[current size = 0] at time = 22:22:53

Thread: ConsumerThread-0 start get item from queue[current size = 0] at time = 22:22:53

Thread: ConsumerThread-1 start get item from queue[current size = 0] at time = 22:22:53 

From the output, we can see that when the queue is empty, the consumer will wait until the producer put items to the queue. Once the queue is full, the producer has to wait until some items have been fetched from queue, consumed, and the task_done() method is called. 

You may want to refer to this article to see another example on how to use python Queue to build a multiple thread crawler.