Job Merging Support in Ganga

Introduction

Users often partition jobs, either manually, or automatically with a splitter, so that the output files from several jobs needs to be combined before an analysis can done. This page seeks to clarify the main use cases and GPI syntax for defining a merge functionality allowing users to combine lists of jobs in an automatic way.

Use Case 1: Merging subjobs

A user wants to take a Job object that has previously had a splitter run on it, so that it contains subjobs, and merge the output of the subjobs when it completes.

rm = RootMerge()
rm.filelist = ['hist.root','tuple.root']
rm.ignorefailed = True

#j is a Job that contains subjobs
j.merger = rm

The merge will be done when the status of the master job changes to completed. If the ignorefailed flag has been set, then it will also be run when the master jobs status is set to failed.

The output of the merge will be put in the master jobs outputdir unless the merger tool has its outputdir variable set, in which case the specified location will be used. Any files output will be decorated with the master job ID to avoid clashes.

As the merge is done automatically, there is no need to have a merge method on the Job class. If a user wants to run merge on the job, they can use the methods presented in use case 2.

Use Case 2: Merging a collection of Jobs

rm = RootMerge()
rm.filelist = 'hist.root'
rm.outputdir = '~/files'

joblist = []
for j in jobs:
     #do something here
     joblist.append(j)

rm.merge(joblist)

The merge method will accept any list of Jobs, or a single Job object. This includes both a simple list and a job slice (jobs[0:20]). If any of the Jobs in the list contain subjobs then the merge object will first be set on the Job and then merge called.

The outputdir must be specified, eventually having a sensible default in .gangarc. No name decoration will be done, so if an operation would have caused overwriting, then an error will be raised.

If the user wants to force overwriting, they can set the overwrite flag on the Merge object:

rm.overwrite = True

The user can use the same merge instance to merge multiple lists of jobs.

Proposed Merge Objects

* A UML diagram of the Mergers framework:
merge.jpeg

RootMerge

Will use hadd or the Atlas variant of it to combine root files. Should be able to support histograms and ntuples. The exact functionality will depend on the hadd variant chosen. I would prefer to just use the vanilla hadd to avoid the maintenance burden of a similar tool.

TextMerge

This will basically just cat the selected text files together, inserting some sort of separator between files. The concatenation will be done in essentially random order (i.e. the order in which jobs are processed) and no pattern matching will be done. It will feature a compress= flag, which if set will gzip the output files.

MuiltpleMerge

Will allow multiple types of merge to be done at the same time. This will be most useful for user case 1.

mm = MultipleMerge()
rm = RootMerge()
tm = TextMerge()

#do some config
mm.addMerge(rm)
mm.addMerge(tm)

j.merger = mm

SmartMerge

There is clearly scope for a smart merge that uses the same sort of file extension registering as used for the Jobs.peek functionality. This would be similar to the MultipleMerge. This is of lower priority and so will be considered when the initial merge implementation has been completed.

Getting the Code

A fully featured prototype is now in CVS on the branch ganga-4-4-dev-branch-will-mergers. To use this:

  • Set up a development area as per HowToSetupDevelopmentArea.
  • cvs co -r Ganga-4-3-6 ganga
  • cvs update -dj ganga-4-4-dev-branch-will-mergers
  • ./ganga/bin/ganga --test Ganga.test.GPI.Mergers.*

When I ran the tests on lxplus just now, I got 21 passes and 0 failures, although I did have to rerun one test that timed out.

The bulk of the code is in Ganga/Lib/Mergers/Mergers.py.

Implementing a new Merger

The framework is designed to allow the addition of new merger objects without too much pain. To take full advantage of the framework, and to get a consistent user experiance between different merger objects, you need to define two objects - a GPI object and a merge tool. Here are some details on how to add a basic and stand alone merger object.

MergeTools

A MergeTool is the class that actually performs the merge. There is an interface, IMergeTool, which defines the interface. MergeTools provide a mergefiles(self, file_list, output_file) method. The file_list is a list of fully resolved files paths as strings. Details of the algorithm used to produce this can be seen in the AbstractMerger class. The output_file is a path to a file where the merge tool should put the resultant file. It can be assumed that any over-writing issues have already been resolved (i.e. the MergeTool should always overwrite without asking) and that any directory structure has already been created if appropriate. If the merger fails, the tool can communicate this back to the Merger object via a MergeError. The message from this will be passed to the user, and so should be helpfull!

Below is an edited MergerTool based on the _TextMergerTool:

class _TextMergeTool(IMergeTool):
    _category = 'merge_tools'
    _hidden = 1
    _name = '_TextMergeTool'
    _schema = IMergeTool._schema.inherit_copy()

    def mergefiles(self, file_list, output_file):

        #merge files
        for f in file_list:
             #append to output_file

Merger Objects

These are the GPI objects, and in most cases will mostly be documentation rather than code! They should inherit from AbstractMerger, which takes care of merging and file issues such as overwriting and missing files. The class should define an exported merge method, and then call super. Doing this ensures that every Merger object gets the same consistent merge algorithm. A merge tool instance should be passed to the AbstractMerger by calling super in the constructor - this will then be used to perform the actual merge.

Below is an example, again based on the TextMerger:

class TextMerger(AbstractMerger):
    """Merger class for text

    Some documentation for this type of Merger.

    """
    _category = 'mergers'
    _exportmethods = ['merge']
    _name = 'TextMerger'
    _schema = AbstractMerger._schema.inherit_copy()
         
    def __init__(self):
        super(TextMerger,self).__init__(_TextMergeTool())

    def merge(self, jobs, outputdir = None, ignorefailed = None, overwrite = None):
        self.merge_tool.compress = self.compress
        #needed as exportmethods doesn't seem to cope with inheritance
        return super(TextMerger,self).merge(jobs, outputdir, ignorefailed, overwrite)

Registering the Classes

Finally, you have to register your new classes as plugins:

#configure the plugins        
from Ganga.Utility.Plugin import allPlugins
allPlugins.add(_TextMergeTool,'merge_tools','_TextMergeTool')

As a final note, there is a Mergers section of the config object which is used for configuring the existing Merge objects.

-- Main.wreece - 12 Apr 2007

Topic attachments
I Attachment History Action Size Date Who Comment
JPEGjpeg merge.jpeg r1 manage 53.0 K 2007-07-13 - 15:47 UnknownUser A UML diagram of the Mergers framework
Edit | Attach | Watch | Print version | History: r5 < r4 < r3 < r2 < r1 | Backlinks | Raw View | WYSIWYG | More topic actions
Topic revision: r5 - 2007-09-17 - WillReece
 
    • Cern Search Icon Cern Search
    • TWiki Search Icon TWiki Search
    • Google Search Icon Google Search

    ArdaGrid All webs login

This site is powered by the TWiki collaboration platform Powered by PerlCopyright &© 2008-2024 by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
or Ideas, requests, problems regarding TWiki? use Discourse or Send feedback