introduction to alexander richards ([email protected])[email protected] thanks to...

Alex Richards, ICLsdfgafgafga 1 Introduction to Alexander Richards ([email protected] ) Thanks to Patrick Owen, ICL for some of the slides content 6

Upload: emery-lane

Post on 30-Dec-2015




0 download


Page 1: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICLsdfgafgafga1

Introduction to

Alexander Richards ([email protected])

Thanks to Patrick Owen, ICL for some of the slides content


Page 2: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 2



Output control

Post processing

Parallel user code

Future work

Page 3: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 3



Output control

Post processing

Parallel user code

Future work

Page 4: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6



• Ganga 6 is the first major release of Ganga for ~4 years• Several new features have been implemented which will be introduced in

the coming sections.• There have been some teething problems but I think we are over them now

and I recommend people switch to Ganga 6.• The purpose of this talk is to provide an update of the new functionality that

users will find in Ganga 6 complete with examples to hopefully make things more obvious

• If you require a basic introduction to Ganga in general please see:•

=10&sessionId=4&resId=1&materialId=slides&confId=159521 • As always help is available from:

• Interactively using help(object)• Online via Ganga manuals or FAQs:

• Via the LHCb DAST mailing list ([email protected])

Page 5: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 5



Output control

Post processing

Parallel user code

Future work

Page 6: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6


A more generic output definition system

Previously:• Two separate job attributes ‘outputsandbox’ and ‘outputdata’.• Limited scope for output destination automation for a given job.

• Outputsandbox -> local• Outputdata -> DIRAC SE / mass storage (depending on backend)

• Not obvious where to expect any given data.• Any future expansion would have been a hack around this structure.

Motivation:• To provide a general output definition system that should be obvious to

the user where their data will end up.• To provide a system which will be scalable enough to incorporate any

future changes/additions to the output specification• To give the user more flexibility about where a given jobs data will end up.

i.e. decouple the dependence of outputdata location on backend where the job runs

• To provide the user with useful metadata about any given piece of data

Page 7: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 7


The implementation involves:

• The amalgamating of outputsandbox and outputdata into the single job attribute ‘outputfiles’

• This outputfiles list attribute can take any type of outputfile object. It is this object that is responsible for making sure that the jobs output turns up in the right place.

• The currently defined outputfile objects include:

• SandboxFile -> return file to local worksapce

• DiracFile -> upload file to DIRAC SE

• LCGSEFile -> upload to grid SE (for LHCb, just use DiracFile)

• MassStorageFile -> upload to mass storage (e.g. castor)

» Can change copy command etc from config such that this can also just represent a scratch file on disk – may be making a new file type with this setup as default

Page 8: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6


• All outputfiles should have as a minimum a ‘namePattern’ attribute

Outputfile objects

Class DiracFile(IOutputFile): namePattern = ‘ ’ localDir = None lfn = ‘ ’ guid = ‘ ’ locations = [ ]

def get(): ‘’’ get the file locally’’’

def put(): ‘’’ upload the file to DIRAC SE’’’

• localDir attribute is the local directory which the file will come back to or be uploaded from. Default None means that it will be the jobs working dir ( or current working dir for standalone, see later )

Page 9: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 9


• We will see here how to replicate the behaviour of the outputsandbox and outputdata attributes.

Normal job setup here

Page 10: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 10

Automatic type detection

• To ease the users entry of these file types, automatic type detection is employed which by default will send all *.dst files to DIRAC while returning everything else to the local workspace.

• For example, we can reproduce our previous example in a shortened quicker form:

Normal job setup here

Page 11: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6


Using config file to change the auto file type detection


MassStorageFile = {'fileExtensions‘ : ['*.dummy'],

'backendPostprocess':{ 'LSF‘ : 'WN', 'LCG‘ : 'client',

'CREAM‘ : 'client', 'Localhost‘ : 'WN‘ },

'uploadOptions':{ 'mkdir_cmd‘ : 'nsmkdir', 'cp_cmd‘ : 'rfcp', 'ls_cmd‘ : 'nsls',

'path‘ : /castor/}}

DiracFile = {'fileExtensions‘ : ['*.dst'],

'backendPostprocess':{ 'Dirac‘ : 'WN', 'LSF‘ : 'WN',

'LCG‘ : 'WN', 'CREAM‘ : 'WN', 'Localhost‘ : 'WN'},

'uploadOptions‘ : {}}

• If the namePattern of an outputfile doesn’t match any of the fileExtensions then it defaults to a SandboxFile

• Uploading either happens on the WN or if backend is set to client then it’s performed via a two step process.

1. Firstly the file returns locally

2. File is uploaded to destination from the local client

Page 12: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6



• Wildcards are fully supported and can be used as the files namePattern.

• Once a job is completed an outputfile with a wildcard in the namePattern is automatically expanded.

e.g. DiracFile(‘*.root’) turns into [ DiracFile(‘a.root’), DiracFile(‘b.root’) ]

• For consistency with the original job, a copy of this job will reduce the wildcard outputfile back to it’s original state. In this case DiracFile(‘*.root’). Then when this copy has finished running it too will be expanded.

Page 13: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 13

Getting a jobs files

• Getting a file back locally should be just a case of calling the file objects ‘get’ method.

If running a DIRAC job then

• Alternatively can use the ‘getOutputData’ method from the DIRAC backend to get all DiracFiles locally

Page 14: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 14

Removing a DIRAC SE file

• If you wish to remove a DiracFile from DIRAC SE can use the ‘remove’ method (This method not available for all outputfile types)

• Note if in your .gangarc file you set the config variable AutoRemoveFilesWithJob = True (False by default) then removing a job will automatically remove it’s DiracFiles as well as any other outputfile types defined in the AutoRemoveFileTypes config var

[output]#AutoRemoveFilesWithJob = False# AutoRemoveFileTypes = [‘DiracFile’]

Page 15: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 15

Filtering of job.outputfiles list

• To ease the users experience of the outputfiles attribute and to avoid the need for users to code some list filters themselves, a ‘get’ method has been added to the GangaList

• This method takes one argument and in the case of the outputfiles list will return a subset based on the argument given.

• For outputfiles, this argument can be either:• A string (including wildcards) which will match the

namePattern attribute• A class type which will match the class type• A class instance which will perform a == type match (which

should match type and namePattern attribute)

Page 16: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6



By way of example lets set up the following:

Now we try the filter system:• Firstly a string

• Secondly using a string with wildcard

Page 17: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 17


• Thirdly using a type

• Finally using a instance of an outputfile

Page 18: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 18

Labouring the point…

• As shown previously, the getOutputData method of the Dirac backend now loops over the jobs DiracFile outputfiles and gets the data back locally.


• Using the outputfiles filters one can achieve the same thing for any backend in one line with:

Page 19: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6


StandaloneCan be done standalone as well

• Here with the localDir = None (default) the file is returned to and uploaded from the current working dir.

• Can even store these outputfile objects in the box for later downloading or just for record keeping.

• Note that unless stored in the box, standalone files will not be persisted and therefore information like an uploaded files LFN will be lost between Ganga sessions.

Page 20: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6


Examples – Uploading to / downloading from CASTOR

• By way of example lets look at uploading to CASTOR

• Uploading done using the ‘put’ method

• Once uploaded the MassStorageFile should have filled metadata

• Can now retrieve this file again locally using the ‘get’ method

Page 21: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6


Examples – DiracFile

• Using the same syntax we can upload a file to DIRAC SE

• localDir default is from the current dir.

• Again using ‘put’ to perform the upload

• DiracFile has different metadata from MassStorageFile

• Retrieve the file locally with ‘get’ method

• Remove the file from DIRAC SE with the ‘remove’ method

Page 22: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 22

• If you know the LFN of a file on DIRAC SE you can manually make a DiracFile object to represent it

• Using the ‘getMetadata’ method you can automatically populate the DiracFile’s metadata

• The using the ‘get’ method can get it as normal

Examples – Setting up DiracFile for existing LFN

Page 23: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 23



Output control

Post processing

Parallel user code

Future work

Page 24: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6


Post completion job activities

Previously:• Only thing that a user could specify to happen when a job finished was the merging of

outputdata.• This was done using a merger object attached to the jobs merger attribute• A job was marked as completed or failed purely on the return status (either exit code

or DIRAC status)• The only way to know if a job had finished was to look at Ganga regularly

Motivation:• To provided a more powerful and flexible set of post job tools that the user can use as

and when they see fit.• To provide a more general attribute framework to use with these tools which will allow

for the easy addition of not only new tools but new types as well• To provide tools to allow the final status of a job to be determined in a smarter and

more customisable way• To provide an email notification system for users to inform them of job progress.

Page 25: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 25


The implementation involves:

• Replacing the restrictive job.merger attribute with the far more general job.postprocessors.

• Job.postprocessors is a list into which you can add multiple postprocessor objects.

• Migrating the mergers to use this new framework

• Currently there are three types of postprocessors:

1. Mergers -> as before with addition of LHCbFileMerger

2. Checkers -> objects which can fail the jobs based on some criteria

3. Notifier -> an email notification system

• Postprocessors are executed by class in the order merger->checker->notifier and within class type by original list order.

Page 26: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 26


Minor changes in the mergers from the user point of view:

• Mergers have been re-written but interface stays the same

• MultipleMerger is removed, just add the separate mergers to the postprocessors list

• DSTMerger is removed, instead use the new LHCbFileMerger which should merge all LHCb file types (.dst, .sim etc.)

» For more info use:

• Attached to jobs now via the postprocessors attribute as with all postprocessor objects

Page 27: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 27


The following are types of checkers currently in Ganga 6• FileChecker -> checks the list of output files and fails the job if a

particular string is found (or not found).• LHCbMetaDataChecker -> can fail the job based on the jobs

metadata• CustomChecker -> mimicking that of CustomMerger, gives the user

the ability to define custom actions to perform when checking jobs• RootFileChecker -> checks all root files are mergeable and that they

all have the same structure and fails the job when this is not the case.» New (not in release yet) so not really discussed in this talk.

Care is needed as Auto resubmission will happen if a job is failed even by a checker. This may not make sense as if job ran properly but user fails it in a checker, resubmitted job will always fail.

Page 28: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6



• The FileChecker can fail a job based on the presence or absence of a search string.

• Specify the files to search through as a list with the ‘files’ attribute

• The text strings to search for should be defined as a list with the ‘searchStrings’ attribute

• Whether it fails the job on first appearance of this string or first absence can be specified with the Boolean ‘failIfFound’ attribute.

Page 29: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6



• LHCb Gaudi type jobs populate the metadata attribute upon completion.

• The LHCbMetaDataChecker allows the user to check a simple expression involving the jobs metadata.

• The expression uses keywords to represent the metadata items

• Currently only three understood• 'inputevents' =['input']• 'outputevents' =['output']• 'lumi' = j.lumi (float of the value)

Page 30: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 30


• CustomChecker much like the CustomMerger allows users to have their own code run as a checker

• It requires that they create a python file that defines a check function taking one argument which is the job to be checked.

• Return should be a Boolean representing if the job should be marked completed (True) or failed (False).

• Then attach the CustomChecker to your job, pointing to your file as the module attribute

Page 31: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 31


• Attach a Notifier object to your job to receive emails.

• Default behaviour is to email when master jobs are completed/failed, and when sub jobs have failed.

• Important: emails will only be sent when Ganga is running, so this is only useful if you run Ganga in the background.

• Please don't reply to the emails, if you have questions email the DAST list instead: [email protected]

Page 32: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 32



Output control

Post processing

Parallel user code

Future work

Page 33: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 33

Parallel processing of user tasks


• While monitoring was performed on a separate thread, all user activity happened in a synchronous way.

• This on occasion meant the user having to effectively wait some time for something to complete (that could have been performed without further user input e.g. In the background) before again being able to interact with Ganga.

• While the monitoring was threaded, it to acted in a synchronous way when dealing with completed jobs i.e. downloading outputs and performing actions on these jobs sequentially.

Page 34: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 34

Parallel processing of user tasks


• Provide the user with a convenient way to asynchronously execute commands via Ganga-aware threads.

• Provide the ability for the monitoring to be truly independent of the post job activities like sandbox downloading via asynchronous use of Ganga-aware threads.

• Balance performance issues by having a thread pool of a fixed size and the ability to queue up execution tasks.

• Provide a way to asynchronously execute and capture output from shell commands as well as running python commands in a separate process within the DIRAC environment. (experts only)

• Provide a convenient user friendly and familiar way to monitor the status of the thread pool and queues.

Page 35: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 35


The implementation involves:

• The addition of a new command similar to jobs but called ‘queues’ that will allow the user to inspect the current state of the threads in the pool as well as the state of the queues

• Allowing the queues command to also act as the interface for users to define execution blocks (python callable) to be added to the queue for asynchronous execution.

• Allowing the user to be able to specify the number of worker threads in the thread pool via a config option (default is 5)


#NumWorkerThreads = 5

Page 36: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 36


• By using the queues command we can see the current status of the thread pool and queues for both the user threads as well as the monitoring threads

• Only the user thread pool can be manipulated via the queues interface.

• The monitoring thread pool is there for information only

Page 37: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6


Adding code to run asynchronously

• To add a python callable object to the queue simple pass it to the queues ‘add’ method along with any arguments required

• It doesn’t have to be user made callable objects, any callable will be fine. This includes methods commonly used with jobs like remove, submit etc.

• Be careful of the common pitfall of adding the return of a callable ! ( not asynchronous )

Page 38: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6


• To add a python callable object to the queue simple pass it to the queues ‘add’ method along with any arguments required

• It doesn’t have to be user made callable objects, any callable will be fine. This includes methods commonly used with jobs like remove, submit etc.

• Be careful of the common pitfall of adding the return of a callable ! ( not asynchronous )

Adding code to run asynchronously

queues.add( f( 123 ) )

f( 123 ) returns None

Page 39: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 39

Busy worker threads

• Last example finishes too quickly to show the worker busy so re-define function f

• Now using queues command can see a worker thread busy processing the function

Example only, DON’T DO THIS!

Page 40: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6


Queuing up executable items

• Users may add more callables that there are threads available.

• All additions go into the queue from which threads pick items when idle.

• Here can see that all workers are busy and there are three extra items in the queue waiting to be processed

Page 41: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 41

Purging the queue

• Unfortunately threads cannot be interrupted so make sure that whatever you get them to execute will not get stuck

• The queue however can be purged using the queues ‘purge’ method

Page 42: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6


Exiting Ganga with active workers or full queue

• As queues’ user threads are Ganga-aware you will be warned if they are still active when you request Ganga exit just as with other Ganga threads.

• Forcing the exit will terminate all the threads immediately irrespective of what they were doing.

• This is most obvious to the user if jobs are finalising as they can get stuck in completing.

• Items in the queue will be dropped and lost just as if purged.

• Exception to this is job_finalisation in the monitoring queue. Here queue items will be dropped at exit but reinstated at start up without a call to DIRAC to check the status.

Page 43: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 43



Output control

Post processing

Parallel user code

Future work

Page 44: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6


Future plans

• Implementing auto resubmit tool as a dedicated postprocessor

• Unifying the approach to job input with the new output framework

• Adding the possibility to have locked ‘named’ templates which can represent anything from debug cases to best practise examples.

• Ability for external package authors (e.g. DaVinci) to maintain best practise / recommended workflow named templates which could be imported into Ganga at build time.

Page 45: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 45


For more info on the outputsystem stuff see:

For more info on the postprocessing stuff see:

Page 46: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6


( Advanced features )

Page 47: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6



Getting all user LFNs:

• Users can obtain a list of all files on DIRAC SE using the ‘getDiracFiles’ command.

• This returns a GangaList of ready made DiracFile objects

• This uses the dirac command dirac-dms-user-lfns and time taken will depend on the number of lfns the user has

DIRAC environment:

• All DIRAC commands are performed in a subprocess exposed to the LHCbDirac environment.

• Users can run python commands directly in this environment using the diracAPI type commands

Page 48: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6



For the diracAPI command, returned objects is tried (in this order):

• The object passed to output() (unless output redefined)

• The evaluated stdout

• The stdout as string

Note the environment contains the definitions:

Along with some Ganga functions which can be seen in:

• GangaDirac/Lib/Server/

• GangaLHCb/Lib/Server/

from Dirac.Interfaces.API.Dirac import Diracfrom Dirac.Interfaces.API.DiracAdmin import DiracAdminfrom LHCbDirac.Interfaces.API.DiracLHCb import DiracLHCbdirac = Dirac()diraclhcb = DiracLHCb()

Page 49: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6



• As well as the straight diracAPI command, two other varients exist.

1. diracAPI_async

2. diracAPI_interactive

• diracAPI_async allows users to put their command onto the queue for asynchronous execution.

• Note that there will be no return here so best used with commands where return is unimportant e.g. kill

• diracAPI_interactive allows users to interactively supply commands and retrieve output from the subprocess using sockets.

• This is useful for interactive debugging

• In this case a separate prompt appears

Page 50: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6



Generalised process control:• The examples above have been provided as more specific cases

• They are however just specialisations of the more general control system accessed via the queues monitoring system with the method ‘addProcess’.

• The user can attach to the queue any command line or python commands which will be executed within the DIRAC environment as described earlier.

• They will be executed within a separate DIRAC aware subprocess.

• Note: There will be nothing returned as the result of execution goes to a dedicated callback function.

def addProcess( self, command, timeout, env, cwd, shell, priority,

callback_func, callback_args, callback_kwargs )

Page 51: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6



The methods args are:

• command = the command to execute

• timeout = the commands timeout (default config var [DIRAC] Timeout)

• env = the environment (defaults to SetupProject LHCbDIRAC)

• cwd = where to execute the command (default = os.getcwd)

• shell = whether to treat command as shell command (True) or to treat as a python command (False) in which case python DIRAC API is loaded as with diracAPI above (default False)

• priority = the queue system is a priority queue where low value = highest priority (default 5)

• callback_func = a python callable which must take (at least) the return of the command as an argument (optional)

• callback_args = additional args to callback_func as a tuple (optional)

• callback_kwargs = additional kwargs to callback_func as a dict (optional)

Page 52: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 52

Backup - Examples

In[0]: def printer( x ): print x

In[1]: queues.addProcess( 'ls -lh', shell=True, callback_func = printer)total 16Mdrwxr-xr-x. 19 arichard res0 4.0K Mar 1 14:34 pythondrwxr-xr-x. 5 arichard res0 16K Apr 15 11:20 release-rw-r--r--. 1 arichard res0 58 Mar 26 10:33 roottest.C-rwxr-xr-x. 1 arichard res0 59 Mar 6 09:11

In[2]: queues.addProcess('print dirac.status([123])', callback_func = printer){'OK': True, 'Value': {}}

In[3]: queues.addProcess('dirac-dms-user-lfns --help', shell = True, callback_func = printer)

Page 53: Introduction to Alexander Richards ( Thanks to Patrick Owen, ICL for some of the slides content 6

Alex Richards, ICL 53


Get the list of all the user files.


dirac-dms-user-lfns [option|cfgfile] ...

General options:

-o: --option= : Option=value to add


For more information please type the following at the Ganga prompt

• help( queues.addProcess )