Analysis Tutorial #8: Parallel processing

Typically, data analysis jobs in particle physics can be very well parallelised. This is because the same operation is performed on each event, so there is no need to process all the events one after another. If one has multiple cores (or computers) available, one can split the job into multiple sub-processes (programs) where each process analyses part of the data. At the end of the run the results from sub-jobs (i.e. in our case histograms) are combined.

In this section we will show how to parallelise our simple code. The design outlined here is very simple and will work on any computer that has multiple cores. For more powerful computing resources like LHC computing grid or batch systems, different designs must used. However, they are our of scope of this tutorial.

In our implementation, the parallelisation is done as follows:

  • Each event loop will be executed as one job. So we will have one job per data sample. It is possible (and not that hard) to split one event loop into multiple jobs but we will not attempt that.
  • Jobs are executed as sub-processes from the main steering script “runMe.py” until maximum number of jobs (set by user) is reached. Then the submission stops and waits until some running jobs finish so that more can be submitted. One should not submit (much) more jobs that available cores because in that case things get actually slower.
  • In our previous example, we have always run the event loop for just one sample (Higgs produced in gluon-gluon fusion). In such case, parallelisation has no sense because we would only submit one job. Therefore, we have to add more samples to make use of multiple core processing.

We will start by creating a new python module “Jobs.py”:

from time import sleep
import subprocess

class Jobs(object):
 """ Execute event loops in parallel processes
 """
 def __init__(self, eventLoops):
  """ Constructor. Generates parallel jobs for the provided eventloops
  """
  self.jobNames = []
  for eventLoop in eventLoops:
   self.createExecutable(eventLoop)
 
 def createExecutable(self, eventLoop):
  """ for a given event loop, generate stand-alone python script
  """
  jobName = "job.{}.py".format(eventLoop.name)
 
  # open py file for writing
  with open(jobName, "w") as f:
   # create common steering script preamble
   f.write('from ROOT import gSystem, TH1\n')
   f.write('TH1.AddDirectory(False)\n')
   f.write('gSystem.Load("Analysis.so")\n')
   f.write('from Samples import *\n')
   f.write('from Algorithms import * \n')
 
   # create EventLoop class
   className = eventLoop.__class__.__name__
   f.write('eventLoop = {}()\n'.format(className))

   # create algorithm classes
   f.write('algs = []\n')
   for alg in eventLoop.algs:
    algClassName = alg.__class__.__name__
    f.write('algs += [ {}() ]\n'.format(algClassName))
    f.write('eventLoop.addAlgorithms( algs )\n')
 
   # execute and save calls
   f.write('eventLoop.execute()\n')
   f.write('eventLoop.save()\n')
   f.write('print "all OK"\n')
 
   # save the job file name for further use
   print "Generated job executable {}".format(jobName)
   self.jobNames += [ jobName ]

 def execute(self, nProcesses=5, niceness=10):
  """ Executes parallel jobs
      nProcesses - max number of parallel processes
      niceness - how nice we are to other users of the computer.
  """  
  jobsToSubmit = self.jobNames[:] # copy the list of jobs
  runningProcesses = []
  msg = ""
  while True:
   # printout
   newMsg = "{} jobs out of {} running...".format(len(runningProcesses), len(self.jobNames))
   if newMsg!=msg:
    print newMsg
    msg = newMsg

   # execute new job. Will only happen if number of running jobs is less than nProcesses
   if len(jobsToSubmit) > 0 and len(runningProcesses) < nProcesses:
    command = "python {} > {}.log 2>&1".format(jobsToSubmit[0], jobsToSubmit[0])
    print "Submitting job '{}'".format(jobsToSubmit[0])
    proc = subprocess.Popen("nice -n {} {}".format(niceness,command), shell=True)
    runningProcesses += [ proc ]
    jobsToSubmit.remove(jobsToSubmit[0])

   # check how many jobs running
   for proc in runningProcesses[:]:
    if proc.poll() != None:
     # this process has terminated. Remove from the running
     runningProcesses.remove(proc)

   # exit the loop if all is submitted/done
   if len(jobsToSubmit)+len(runningProcesses)==0:
    print "Done" 
    break

   # Wait if not in the submission stage
   if len(jobsToSubmit)==0 or len(runningProcesses) >= nProcesses:
    sleep(1)

Let’s look at the code:

  • At the beginning, module “subprocess” is imported. This is a Python module for subprocess management. We use it to spawn parallel processes.
  • The job submission is done using the “Jobs” class. We pass in the list of instances of the EventLoop class. The Jobs class takes the list and for each event loop it generates a new executable python steering script (a minimal version of the “runMe.py”). This is done in the constructor:  a loop over “eventLoops” list and then the “createExecutable” is called for each eventLoop.
  • In “createExecutable” the actual job script is created. The script is named “job.LoopName.py”. So for our ggH event loop the name will be “job.ggH.py”. The actual python source file is created using the “with open(jobName, “w”) as f” construct. Each line of the script is then written into the file using “f.write”. Note that there has to be ‘\n’ symbol (end of the line) written at the end of each write statement.
  • When we generate the sub-job script, we need to know the name of the event loop class (class, not instance!). Do we create instance of “SampleGGH” class or “SampleVBF” class or somethings else? Here we make use of another cool python feature: we can get a string with name of the class by acceding attribute “__class__.__name__” of its instance. In other words
    className = eventLoop.__class__.__name__
    will be “SampleGGH” is the eventLoop variable is an instance of the SampleGGH class. We then use this “className” in our generated sub-job python script.
  • We use the same trick again when we generate the code which adds algorithms into the event loop.
  • The main actual submission is performed in the “execute” method.  Here we create a list of all python executables:
    jobsToSubmit = self.jobNames[:]
    The “[:]” symbol is important! It means that the content of the “self.jobNames” list is copied item-by-item into the new list. So if we modify “jobsToSubmit” the original list will stay unchanged. If we had just used “jobsToSubmit = self.jobNames” it would mean that “jobsToSubmit” and “self.jobNames” represent the same list in memory. Modification of “”jobsToSubmit” would also modify “self.jobNames”. This is a general python feature you should be aware of!
  • Now that the sub-job script names are stored in a new list, we have a loop where the actual submission happens. If there are still jobs to submit (“jobsToSubmit” list is not empty), we execute them using “subprocess.Popen” command. The actual command that is executed looks for example like this:
    nice -n 10 python job.ggH.py > job.ggH.py.log 2>&1

    Note that we do not execute the python script directly but we do it through the “nice” command. It just means the jobs run with lower priority being nice to other users of the computer (generally a good idea if you do not want to swamp the computer). Also, we redirect output from the sub-job into the log file (“>” and “2>&1” means redirection of standard output and error output into the same log file).

  • After the submission, the “jobsToSubmit” is shortened and the running process information is stored in the “runningProcesses” list.
  • Status of the running processes is checked next. “runningProcesses” list and for each we check its “poll()” method. If it returns “None” we know process has terminated. We remove it from the list of  “runningProcesses”.
  • Finally, when all jobs are submitted and no processes are running, we can terminate the loop.
  • Other stuff in the loop just deals with the printouts. Also, some delay (“sleep” calls) is needed between the submission processes so that the master process does not use too much CPU while just waiting for the jobs to finish.

Now we have all that is needed to make our little program parallelised. Let’s open the main script “runMe.py” and do the needed modifications:

# here we load the shared library created using the Makefile
from ROOT import gSystem, TH1, TStopwatch
gSystem.Load("Analysis.so")
TH1.AddDirectory(False)

# now we can create instance of the class EventLoop
from Samples import *
eventLoops = []
eventLoops += [ SampleGGH() ]
eventLoops += [ SampleVBFH() ]

# create algorithm and add them into the event loops
from Algorithms import * 
for eventLoop in eventLoops:
 algs = []
 algs += [ AlgDefault() ]
 algs += [ AlgSF() ]
 algs += [ AlgDF() ]
 eventLoop.addAlgorithms( algs )

# execute parallel jobs
timer = TStopwatch()
from Jobs import Jobs
jobs = Jobs(eventLoops)
jobs.execute()
timer.Stop()
print "The processing took {} s".format(timer.RealTime())
  • Note that we have added a new event loop “SampleVBFH” which we have prepared in the previous example. We need at least two event loops to be able to talk about parallel processing. In real data analyses, many more samples need to be processed.
  • Because we now have more than one eventLoop, we need to add Algorithm class instances into all of them. Here we add the same algorithms into both event loops, but in principle it doesn’t need to be the case.
  • Finally, we pass the “eventLoops” list to the “Jobs” class and execute.
  • We have added a timer (TStopwatch class) to monitor how long the execution takes.

Now try to execute the program. On my computer the output looks like this:

> python runMe.py

Generated job executable job.ggH.py
Generated job executable job.VBFH.py
0 jobs out of 2 running...
Submitting job 'job.ggH.py'
1 jobs out of 2 running...
Submitting job 'job.VBFH.py'
2 jobs out of 2 running...
1 jobs out of 2 running...
Done
The processing took 14.5645718575 s

You see how the two sub-job scripts “job.ggH.py” and “job.VBFH.py” were generated, executed and then then finished, one sooner than the other. The whole thing took about 15 s.

We can now try to run the same thing without parallelisation. Just comment out the Jobs-related lines and replace them with simple event loop submits in the main script:

for eventLoop in eventLoops:
 eventLoop.execute()

You will now see the event loops executed one after another. The total time this is about 22 s. The running time was not cut in half because the VBF event loop takes longer than the ggH (there are more events in the vector-boson-fusion Higgs sample). However, you can imagine that if we included more samples the gain would become more and more significant.

Finally, let’s have a look at the files created by the “Jobs” class:

> ls -l job.*

-rw-r--r-- 1 scheirich users  314 Jul 16 11:22 job.VBFH.py
-rw-r--r-- 1 scheirich users 1929 Jul 16 11:23 job.VBFH.py.log
-rw-r--r-- 1 scheirich users  313 Jul 16 11:22 job.ggH.py
-rw-r--r-- 1 scheirich users 1273 Jul 16 11:22 job.ggH.py.log

The “job.ggH.py” and “job.VBFH.py” files are executable python scripts generated by the “Jobs” class:

from ROOT import gSystem, TH1
TH1.AddDirectory(False)
gSystem.Load("Analysis.so")
from Samples import *
from Algorithms import * 
eventLoop = SampleGGH()
algs = []
algs += [ AlgDefault() ]
algs += [ AlgSF() ]
algs += [ AlgDF() ]
eventLoop.addAlgorithms( algs )
eventLoop.execute()
eventLoop.save()
print "all OK"

Files “job.ggH.py.log” and “job.VBFH.py.log” are log files for the corresponding sub-jobs. You should check them to make sure that there were no errors in the sub-jobs.

The output root files have the same names as before:

> ls -l *.root

-rw-r--r-- 1 scheirich users 4437 Jul 16 11:23 histograms.VBFH.AlgDF.root
-rw-r--r-- 1 scheirich users 4466 Jul 16 11:23 histograms.VBFH.AlgDefault.root
-rw-r--r-- 1 scheirich users 4399 Jul 16 11:23 histograms.VBFH.AlgSF.root
-rw-r--r-- 1 scheirich users 4426 Jul 16 11:22 histograms.ggH.AlgDF.root
-rw-r--r-- 1 scheirich users 4454 Jul 16 11:22 histograms.ggH.AlgDefault.root
-rw-r--r-- 1 scheirich users 4387 Jul 16 11:22 histograms.ggH.AlgSF.root

When you checked all the sub-jobs, you can delete all the generated files like this:

> rm job.*

Next section →