Sunday, January 25, 2009

Python: Example of Threads and Queue

Just an example of making thread workers, that executes tasks from the task queue. I used it to check if a PC box with multicore processor is really 100% busy with threads. The code multiplies some large, random matrices, so it does not do anything useful. However, it can serve as an example or a template of a simple multithread application.

import threading, Queue, time
from scipy import *

class MultiplyMatrixes(threading.Thread):
def __init__(self,queue,name):
self.queue=queue
threading.Thread.__init__(self,name=name)

def run(self):
while True:
#wait until there is a task
# in queue. Get task from gueue.
n=self.queue.get()

print "Spawning: ",self.getName(),n
if n !=None:
# do the task
p=self.doMultiplication(n)

print "Finished:", self.getName(),n,p

#indicate that the task has been
#complited
self.queue.task_done()

def doMultiplication(self,n):
m1=matrix(rand(n,n)) #some random matrix
m2=matrix(rand(n,n)) #some random matrix
m3=m1*m2
return m3.mean() # return mean



class MainThread():
def __init__(self,noOfThreads):
self.no=noOfThreads

def doTest(self):
t1=time.time()
self.test()
print "Elapsed time",time.time()-t1

def test(self):
#change maxMtxSize for longer or shorter
# execution
minMtxSize,maxMtxSize=100,1200
nT=range(minMtxSize,maxMtxSize,100)

#make task queue
queue=Queue.Queue()

#create number of workers (threads)
for i in range(self.no):
p=MultiplyMatrixes(queue,name=str(i+1))
p.setDaemon(True)
p.start()

#put some tasks into the queue
for n in nT:
queue.put(n)

#wait until the queue is finshed (no more tasks)
queue.join()

#give some time for threads
#to finished before exiting.
time.sleep(1)

#Otherwise the follwing error is more likely to occur:
# Exception in thread 1 (most likely raised during interpreter shutdown):
# Traceback (most recent call last):
# File "/usr/lib/python2.5/threading.py", line 486, in __bootstrap_inner
# File "multithreads.py", line 19, in run
# File "/usr/lib/python2.5/Queue.py", line 165, in get
# File "/usr/lib/python2.5/threading.py", line 209, in wait
# : 'NoneType' object is not callable

print "Finished!!!"

def test():
MP=MainThread(noOfThreads=4)
MP.doTest()


if __name__ == '__main__':
test()
Example result:Spawning: 1 100
Finished: 1 100 25.3551569392
Spawning: 2 200
Spawning: 3 300
Spawning: 1 400
Spawning: 4 500
Finished: 2 200 49.9585540799
Spawning: 2 600
Finished: 3 300 75.163025977
Spawning: 3 700
Finished: 1 400 99.7075107712
Spawning: 1 800
Finished: 4 500 124.849660963
Spawning: 4 900
Finished: 2 600 149.99426813
Spawning: 2 1000
Finished: 3 700 175.035573614
Spawning: 3 1100
Finished: 1 800 199.893719979
Finished: 4 900 225.168134578
Finished: 2 1000 250.157828058
Finished: 3 1100 275.025345136
Finished!!!
Elapsed time 73.96342206