Gaudi Framework, version v23r0

Home   Generated: Mon Jan 30 2012
Classes | Public Member Functions | Protected Types | Protected Member Functions | Protected Attributes

Gaudi::IODataManager Class Reference

#include <IODataManager.h>

Inheritance diagram for Gaudi::IODataManager:
Inheritance graph
[legend]
Collaboration diagram for Gaudi::IODataManager:
Collaboration graph
[legend]

List of all members.

Classes

struct  Entry

Public Member Functions

 IODataManager (CSTR nam, ISvcLocator *loc)
 the incident service
virtual ~IODataManager ()
 Standard destructor.
virtual StatusCode initialize ()
 IService implementation: initialize the service.
virtual StatusCode finalize ()
 IService implementation: finalize the service.
virtual StatusCode connectRead (bool keep_open, Connection *ioDesc)
 Open data stream in read mode.
virtual StatusCode connectWrite (Connection *con, IoType mode=Connection::CREATE, CSTR doctype="UNKNOWN")
 Open data stream in write mode.
virtual StatusCode disconnect (Connection *ioDesc)
 Release data stream.
virtual Connection * connection (const std::string &dsn) const
 Retrieve known connection.
virtual Connections connections (const IInterface *owner) const
 Get connection by owner instance (0=ALL)
virtual StatusCode read (Connection *ioDesc, void *const data, size_t len)
 Read raw byte buffer from input stream.
virtual StatusCode write (Connection *con, const void *data, int len)
 Write raw byte buffer to output stream.
virtual long long int seek (Connection *ioDesc, long long int where, int origin)
 Seek on the file described by ioDesc. Arguments as in seek()

Protected Types

typedef const std::stringCSTR
typedef std::map< std::string,
Entry * > 
ConnectionMap
typedef std::map< std::string,
std::string
FidMap

Protected Member Functions

StatusCode connectDataIO (int typ, IoType rw, CSTR fn, CSTR technology, bool keep, Connection *con)
StatusCode reconnect (Entry *e)
StatusCode error (CSTR msg, bool rethrow)
StatusCode establishConnection (Connection *con)

Protected Attributes

std::string m_catalogSvcName
 Property: Name of the file catalog service.
int m_ageLimit
 Property: Age limit.
bool m_useGFAL
 Property: Flag for auto gfal data access.
bool m_quarantine
 Property: Flag if unaccessible files should be quarantines in job.
ConnectionMap m_connectionMap
 Map with I/O descriptors.
SmartIF< IFileCatalogm_catalog
 Reference to file catalog.
FidMap m_fidMap
 Map of FID to PFN.
SmartIF< IIncidentSvcm_incSvc

Detailed Description

Definition at line 29 of file IODataManager.h.


Member Typedef Documentation

typedef std::map<std::string,Entry*> Gaudi::IODataManager::ConnectionMap [protected]

Definition at line 41 of file IODataManager.h.

typedef const std::string& Gaudi::IODataManager::CSTR [protected]

Definition at line 31 of file IODataManager.h.

typedef std::map<std::string, std::string> Gaudi::IODataManager::FidMap [protected]

Definition at line 42 of file IODataManager.h.


Constructor & Destructor Documentation

IODataManager::IODataManager ( CSTR  nam,
ISvcLocator loc 
)

the incident service

Initializing constructor

Parameters:
[in]namName of the service
[in]locPointer to the service locator object
Returns:
Initialized reference to service object

Definition at line 22 of file IODataManager.cpp.

  : base_class(nam, svcloc), m_ageLimit(2)
{
  declareProperty("CatalogType",     m_catalogSvcName="Gaudi::MultiFileCatalog/FileCatalog");
  declareProperty("UseGFAL",         m_useGFAL = true);
  declareProperty("QuarantineFiles", m_quarantine = true);
  declareProperty("AgeLimit",        m_ageLimit = 2);
}
virtual Gaudi::IODataManager::~IODataManager (  ) [inline, virtual]

Standard destructor.

Definition at line 76 of file IODataManager.h.

{}

Member Function Documentation

StatusCode IODataManager::connectDataIO ( int  typ,
IoType  rw,
CSTR  fn,
CSTR  technology,
bool  keep,
Connection *  con 
) [protected]

Definition at line 234 of file IODataManager.cpp.

                                                                                                                     {
  MsgStream log(msgSvc(),name());
  std::string dsn = dataset;
  try  {
    StatusCode sc(StatusCode::SUCCESS,true);
    if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 )
      dsn = dataset.substr(4), typ = FID;
    else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 )
      dsn = dataset.substr(4), typ = LFN;
    else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 )
      dsn = dataset.substr(4), typ = PFN;
    else if ( typ == UNKNOWN )
      return connectDataIO(PFN, rw, dsn, technology, keep_open, connection);

    if(std::find(s_badFiles.begin(),s_badFiles.end(),dsn) != s_badFiles.end())  {
      m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
      return IDataConnection::BAD_DATA_CONNECTION;
    }
    if ( typ == FID )  {
      ConnectionMap::iterator fi = m_connectionMap.find(dsn);
      if ( fi == m_connectionMap.end() )  {
        IFileCatalog::Files files;
        m_catalog->getPFN(dsn,files);
        if ( files.size() == 0 ) {
          if ( !m_useGFAL )   {
            if ( m_quarantine ) s_badFiles.insert(dsn);
            m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
            error("connectDataIO> failed to resolve FID:"+dsn,false).ignore();
            return IDataConnection::BAD_DATA_CONNECTION;
          }
          else if ( dsn.length() == 36 && dsn[8]=='-' && dsn[13]=='-' )  {
            std::string gfal_name = "gfal:guid:" + dsn;
            m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[gfal_name] = dsn;
            sc = connectDataIO(PFN, rw, gfal_name, technology, keep_open, connection);
            if ( sc.isSuccess() ) return sc;
            if ( m_quarantine ) s_badFiles.insert(dsn);
          }
          if ( m_quarantine ) s_badFiles.insert(dsn);
          m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
          error("connectDataIO> Failed to resolve FID:"+dsn,false).ignore();
          return IDataConnection::BAD_DATA_CONNECTION;
        }
        std::string pfn = files[0].first;
        m_fidMap[dsn] = m_fidMap[dataset] = m_fidMap[pfn] = dsn;
        sc = connectDataIO(PFN, rw, pfn, technology, keep_open, connection);
        if ( !sc.isSuccess() )  {
          if ( m_quarantine ) s_badFiles.insert(pfn);
          m_incSvc->fireIncident(Incident(pfn,IncidentType::FailInputFile));
          return IDataConnection::BAD_DATA_CONNECTION;
        }
        
        return sc;
      }
      return S_ERROR;
      //Connection* c = (*fi).second->connection;
      //sc = connectDataIO(PFN, rw, c->pfn(), technology, keep_open, connection);
      //if ( !sc.isSuccess() && m_quarantine ) s_badFiles.insert(c->pfn());
      //return sc;
    }
    std::string fid;
    FidMap::iterator j = m_fidMap.find(dsn);
    if ( j == m_fidMap.end() )  {
      IFileCatalog::Files files;
      switch(typ)  {
      case LFN:
        fid = m_catalog->lookupLFN(dsn);
        if ( fid.empty() )  {
          m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
          log << MSG::ERROR << "Failed to resolve LFN:" << dsn
              << " Cannot access this dataset." << endmsg;
          return IDataConnection::BAD_DATA_CONNECTION;
        }
        break;
      case PFN:
        fid = m_catalog->lookupPFN(dsn);
        if ( !fid.empty() ) m_catalog->getPFN(fid, files);
        if ( files.empty() )   {
          if ( rw == Connection::CREATE || rw == Connection::RECREATE )  {
            if ( fid.empty() ) fid = m_catalog->createFID();
            m_catalog->registerPFN(fid,dsn,technology);
            log << MSG::INFO << "Referring to dataset " << dsn
                << " by its file ID:" << fid << endmsg;
          }
          else  {
            fid = dsn;
          }
        }
        break;
      }
    }
    else {
      fid = (*j).second;
    }
    if ( typ == PFN )  {
      // Open PFN
      ConnectionMap::iterator fi = m_connectionMap.find(fid);
      if ( fi == m_connectionMap.end() )  {
        connection->setFID(fid);
        connection->setPFN(dsn);
        Entry* e = new Entry(technology, keep_open, rw, connection);
        // Here we open the file!
        if ( !reconnect(e).isSuccess() )   {
          delete e;
          if ( m_quarantine ) s_badFiles.insert(dsn);
          m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
          error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false).ignore();
          return IDataConnection::BAD_DATA_CONNECTION;
        }
        fid = connection->fid();
        m_fidMap[dataset] = m_fidMap[dsn] = m_fidMap[fid] = fid;
        if (  !(rw==Connection::CREATE || rw==Connection::RECREATE) )  {
          if ( strcasecmp(dsn.c_str(),fid.c_str()) == 0 )  {
            log << MSG::ERROR << "Referring to existing dataset " << dsn
                << " by its physical name." << endmsg;
            log << "You may not be able to navigate back to the input file"
                << " -- processing continues" << endmsg;
          }
        }
        m_connectionMap.insert(std::make_pair(fid,e)).first;
        return S_OK;
      }
      // Here we open the file!
      if ( !reconnect((*fi).second).isSuccess() )   {
        if ( m_quarantine ) s_badFiles.insert(dsn);
        m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
        error("connectDataIO> Cannot connect to database: PFN="+dsn+" FID="+fid,false).ignore();
        return IDataConnection::BAD_DATA_CONNECTION;
      }
      return S_OK;
    }
    sc = connectDataIO(FID, rw, fid, technology, keep_open, connection);
    if ( !sc.isSuccess() && m_quarantine ) {
      s_badFiles.insert(fid);
    }
    else if ( typ == LFN ) {
      m_fidMap[dataset] = fid;
    }
    return sc;
  }
  catch (std::exception& e)  {
    error(std::string("connectDataIO> Caught exception:")+e.what(), false).ignore();
  }
  catch(...) {
    error(std::string("connectDataIO> Caught unknown exception"), false).ignore();
  }
  m_incSvc->fireIncident(Incident(dsn,IncidentType::FailInputFile));
  error("connectDataIO> The dataset "+dsn+" cannot be opened.",false).ignore();
  s_badFiles.insert(dsn);
  return IDataConnection::BAD_DATA_CONNECTION;
}
virtual Connection* Gaudi::IODataManager::connection ( const std::string dsn ) const [virtual]

Retrieve known connection.

IODataManager::Connections IODataManager::connections ( const IInterface owner ) const [virtual]

Get connection by owner instance (0=ALL)

Definition at line 73 of file IODataManager.cpp.

                                                                                  {
  Connections conns;
  for(ConnectionMap::const_iterator i=m_connectionMap.begin(); i!=m_connectionMap.end();++i) {
    IDataConnection* c = (*i).second->connection;
    if ( 0 == owner || c->owner() == owner )
      conns.push_back(c);
  }
  return conns;
}
StatusCode IODataManager::connectRead ( bool  keep_open,
Connection *  ioDesc 
) [virtual]

Open data stream in read mode.

Connect data file for writing.

Definition at line 84 of file IODataManager.cpp.

                                                                      {
  if ( !establishConnection(con) )  {
    return connectDataIO(UNKNOWN,Connection::READ,con->name(),"UNKNOWN",keep_open,con);
  }
  std::string dsn = con ? con->name() : std::string("Unknown");
  return error("Failed to connect to data:"+dsn,false);
}
StatusCode IODataManager::connectWrite ( Connection *  con,
IoType  mode = Connection::CREATE,
CSTR  doctype = "UNKNOWN" 
) [virtual]

Open data stream in write mode.

Connect data file for reading.

Definition at line 93 of file IODataManager.cpp.

                                                                                {
  if ( !establishConnection(con) )  {
    return connectDataIO(UNKNOWN,mode,con->name(),doctype,true,con);
  }
  std::string dsn = con ? con->name() : std::string("Unknown");
  return error("Failed to connect to data:"+dsn,false);
}
StatusCode IODataManager::disconnect ( Connection *  ioDesc ) [virtual]

Release data stream.

Definition at line 116 of file IODataManager.cpp.

                                                       {
  if ( con )  {
    std::string dataset = con->name();
    std::string dsn = dataset;
    StatusCode sc = con->disconnect();
    if ( ::strncasecmp(dsn.c_str(),"FID:",4)==0 )
      dsn = dataset.substr(4);
    else if ( ::strncasecmp(dsn.c_str(),"LFN:",4)==0 )
      dsn = dataset.substr(4);
    else if ( ::strncasecmp(dsn.c_str(),"PFN:",4)==0 )
      dsn = dataset.substr(4);

    FidMap::iterator j = m_fidMap.find(dataset);
    if ( j != m_fidMap.end() )  {
      std::string fid = (*j).second;
      std::string gfal_name = "gfal:guid:" + fid;
      ConnectionMap::iterator i=m_connectionMap.find(fid);
      m_fidMap.erase(j);
      if ( (j=m_fidMap.find(fid)) != m_fidMap.end() )
        m_fidMap.erase(j);
      if ( (j=m_fidMap.find(gfal_name)) != m_fidMap.end() )
        m_fidMap.erase(j);
      if ( i != m_connectionMap.end() ) {
        if ( (*i).second )  {
          IDataConnection* c = (*i).second->connection;
          std::string pfn = c->pfn();
          if ( (j=m_fidMap.find(pfn)) != m_fidMap.end() )
            m_fidMap.erase(j);
          if ( c->isConnected() )  {
            MsgStream log(msgSvc(),name());
            c->disconnect();
            log << MSG::INFO << "Disconnect from dataset " << dsn
                << " [" << fid << "]" << endmsg;
          }
          delete (*i).second;
          m_connectionMap.erase(i);
        }
      }
    }
    return sc;
  }
  return S_ERROR;
}
StatusCode IODataManager::error ( CSTR  msg,
bool  rethrow 
) [protected]

Definition at line 64 of file IODataManager.cpp.

                                                       {
  MsgStream log(msgSvc(),name());
  log << MSG::ERROR << "Error: " << msg << endmsg;
  if ( rethrow )  {
    System::breakExecution();
  }
  return S_ERROR;
}
StatusCode IODataManager::establishConnection ( Connection *  con ) [protected]

Definition at line 211 of file IODataManager.cpp.

                                                              {
  if ( con )  {
    if ( !con->isConnected() )  {
      ConnectionMap::const_iterator i=m_connectionMap.find(con->name());
      if ( i != m_connectionMap.end() )  {
        Connection* c = (*i).second->connection;
        if ( c != con )  { 
          m_incSvc->fireIncident(Incident(con->name(),IncidentType::FailInputFile));
          return error("Severe logic bug: Twice identical connection object for DSN:"+con->name(),true);
        }
        if ( reconnect((*i).second).isSuccess() ) {
          return S_OK;
        }
      }
      return S_ERROR;
    }
    con->resetAge();
    return S_OK;
  }
  return error("Severe logic bug: No connection object avalible.",true);
}
StatusCode IODataManager::finalize (  ) [virtual]

IService implementation: finalize the service.

Reimplemented from Service.

Definition at line 58 of file IODataManager.cpp.

                                    {
  m_catalog = 0; // release
  return Service::finalize();
}
StatusCode IODataManager::initialize (  ) [virtual]

IService implementation: initialize the service.

IService implementation: Db event selector override.

Reimplemented from Service.

Definition at line 32 of file IODataManager.cpp.

                                      {
  // Initialize base class
  StatusCode status = Service::initialize();
  MsgStream log(msgSvc(), name());
  if ( !status.isSuccess() )    {
    log << MSG::ERROR << "Error initializing base class Service!" << endmsg;
    return status;
  }
  // Retrieve conversion service handling event iteration
  m_catalog = serviceLocator()->service(m_catalogSvcName);
  if( !m_catalog.isValid() ) {
    log << MSG::ERROR
        << "Unable to localize interface IFileCatalog from service:"
        << m_catalogSvcName << endmsg;
    return StatusCode::FAILURE;
  }
  m_incSvc = serviceLocator()->service("IncidentSvc");
  if( !m_incSvc.isValid() ) {
    log << MSG::ERROR << "Error initializing IncidentSvc Service!" << endmsg;
    return status;
  }
  
  return status;
}
StatusCode IODataManager::read ( Connection *  ioDesc,
void *const   data,
size_t  len 
) [virtual]

Read raw byte buffer from input stream.

Definition at line 102 of file IODataManager.cpp.

                                                                             {
  return establishConnection(con).isSuccess() ? con->read(data,len) : S_ERROR;
}
StatusCode IODataManager::reconnect ( Entry e ) [protected]

Definition at line 160 of file IODataManager.cpp.

                                             {
  StatusCode sc = S_ERROR;
  if ( e && e->connection )  {
    switch(e->ioType)  {
    case Connection::READ:
      sc = e->connection->connectRead();
      break;
    case Connection::UPDATE:
    case Connection::CREATE:
    case Connection::RECREATE:
      sc = e->connection->connectWrite(e->ioType);
      break;
    default:
      return S_ERROR;
    }
    if ( sc.isSuccess() && e->ioType == Connection::READ )  {
      std::vector<Entry*> to_retire;
      e->connection->resetAge();
      for(ConnectionMap::iterator i=m_connectionMap.begin(); i!=m_connectionMap.end();++i) {
        IDataConnection* c = (*i).second->connection;
        if ( e->connection != c && c->isConnected() && !(*i).second->keepOpen )  {
          c->ageFile();
          if ( c->age() > m_ageLimit ) {
            to_retire.push_back((*i).second);
          }
        }
      }
      if ( !to_retire.empty() )  {
        MsgStream log(msgSvc(),name());
        for(std::vector<Entry*>::iterator j=to_retire.begin(); j!=to_retire.end();++j)  {
          IDataConnection* c = (*j)->connection;
          c->disconnect();
          log << MSG::INFO << "Disconnect from dataset " << c->pfn()
              << " [" << c->fid() << "]" << endmsg;
        }
      }
    }
  }
  return sc;
}
long long int IODataManager::seek ( Connection *  ioDesc,
long long int  where,
int  origin 
) [virtual]

Seek on the file described by ioDesc. Arguments as in seek()

Definition at line 112 of file IODataManager.cpp.

                                                                                   {
  return establishConnection(con).isSuccess() ? con->seek(where,origin) : -1;
}
StatusCode IODataManager::write ( Connection *  con,
const void *  data,
int  len 
) [virtual]

Write raw byte buffer to output stream.

Definition at line 107 of file IODataManager.cpp.

                                                                           {
  return establishConnection(con).isSuccess() ? con->write(data,len) : S_ERROR;
}

Member Data Documentation

int Gaudi::IODataManager::m_ageLimit [protected]

Property: Age limit.

Definition at line 47 of file IODataManager.h.

SmartIF<IFileCatalog> Gaudi::IODataManager::m_catalog [protected]

Reference to file catalog.

Definition at line 56 of file IODataManager.h.

std::string Gaudi::IODataManager::m_catalogSvcName [protected]

Property: Name of the file catalog service.

Definition at line 45 of file IODataManager.h.

ConnectionMap Gaudi::IODataManager::m_connectionMap [protected]

Map with I/O descriptors.

Definition at line 54 of file IODataManager.h.

FidMap Gaudi::IODataManager::m_fidMap [protected]

Map of FID to PFN.

Definition at line 58 of file IODataManager.h.

SmartIF<IIncidentSvc> Gaudi::IODataManager::m_incSvc [protected]

Definition at line 64 of file IODataManager.h.

bool Gaudi::IODataManager::m_quarantine [protected]

Property: Flag if unaccessible files should be quarantines in job.

Definition at line 51 of file IODataManager.h.

bool Gaudi::IODataManager::m_useGFAL [protected]

Property: Flag for auto gfal data access.

Definition at line 49 of file IODataManager.h.


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Defines

Generated at Mon Jan 30 2012 13:53:29 for Gaudi Framework, version v23r0 by Doxygen version 1.7.2 written by Dimitri van Heesch, © 1997-2004