Logo Search packages:      
Sourcecode: dbbalancer version File versions

DBPoolContainer.cc

#include "../DBBalancer.hh"
#include "../config/DBBalancerConfig.hh"
#include "../config/DBHostConfig.hh"
#include "../conn_pool/DBPooledConnection.hh"
#include "DBPool.hh"
#include "DBPoolContainer.hh"

DBPoolContainer::DBPoolContainer(DBBalancerConfig* config, unsigned int mode) {

  _config = config;
  _mode = mode;

  // Here we choose if this is a reader or writer process
  if (mode==DBBalancer::WRITER_PROCESS) 
    initAsWriter();
  else 
    initAsReader();
  
}

DBPoolContainer::~DBPoolContainer() {

  int n = _pools.size();

  ACE_DEBUG((LM_DEBUG,"(%t,%T) Destroying DBPoolContainer(): %d pools\n",n));

  for (unsigned int i=0; i<n; i++) {

      DBPool* pool = _pools.back();
      _pools.pop_back();
      delete pool;

  }
  
  ACE_DEBUG((LM_DEBUG,"(%t,%T) Destroyed DBPoolContainer(): %d pools\n",n));

}

// This method serves a client connection.

void DBPoolContainer::processConnection(int socket) {

 DBPooledConnection* dbpc = getPooledConnection(_current_strategy);
 if (dbpc) {
   dbpc->handleConnection(socket);
 } else {
   ACE_DEBUG((LM_WARNING,"(%t, %T) We have not been able to get a connection. Closing client connection.\n"));   
   close(socket);
 }

}


// This method returns a free connection scanning thru all the DBPools.
// If !"test", the connection will be free && valid, if "test", only free.

DBPooledConnection* DBPoolContainer::getPooledConnection (int strategy, bool test) {

  DBPooledConnection *dbpc;

  int pools_size = _pools.size();
  int tries=0;
  int max_tries = pools_size*30;
  int pool;


  switch (strategy) {

  case 0: //DBBalancer::ST_ROUND_ROBIN:

    // Here we'll use a ROUND_ROBIN strategy.
    // We'll go trying with each pool succesively.

    while (tries<max_tries) {
      
      _used_pool_mutex.acquire();
      _used_pool_index=(_used_pool_index<(pools_size-1))?(_used_pool_index+1):0;
      _used_pool_mutex.release();      

      //ACE_DEBUG((LM_DEBUG,"(%t, %T) Pool %d is going to be asked for a connection... \n",pool));
      
      if ((dbpc = test? _pools[_used_pool_index]->getPooledConnectionForTest(strategy):_pools[_used_pool_index]->getPooledConnection(strategy))) 
      return dbpc;
      else 
      ACE_DEBUG((LM_DEBUG,"(%t, %T) Pool %d won't give us a connection (try %d)!!! \n",_used_pool_index,tries));

      // This "sleep" is fundamental to avoid quick consuming of all tries
      // without giving DBManagementMO the oportunity to create more...
      int usecs = ((int)tries/pools_size)*10000;
      if (usecs>0) {
      ACE_Time_Value delay(0,usecs);
      ACE_OS::sleep(delay);
      ACE_DEBUG((LM_DEBUG,"(%t, %T) Sleeping (request) %d msecs\n",delay.msec()));
      }
      tries++;

    }

    // Raise an exception. We couldn't get a connection.
    ACE_ERROR((LM_ERROR,"(%t, %T) Impossible to get a connection after %d tries!!! \n",tries));
    return NULL;
         
    
  default: 
    return NULL;


  }

}

void DBPoolContainer::adjustConnections(int threadGain) {

  // Have to create/destroy as many connections as new threads we've got.
  // We've got two possibilities, READER or WRITER
  
  if (_mode==DBBalancer::READER_PROCESS) {

  // a) If reader, we distribute the connections between the DBPools.
    if (threadGain>0)
      growReaderMode(threadGain);
    else
      shrinkReaderMode(-threadGain);
    

  } else {
  // b) If writer, we create as many new DBPools (they're single multi-connection each) as new conns.

    if (threadGain>0)
      growWriterMode(threadGain);
    else
      shrinkWriterMode(-threadGain);
    
  
  }

}


// ************************************************
// Private Methods

void DBPoolContainer::initAsReader() {

  // We'll create as many Pools as backends, pasing
  // each one the number of connections it
  // has to have.


  vector<DBHostConfig *> hosts = _config->getHosts();
  
  _current_strategy = 0;//ST_ROUND_ROBIN;
  _used_pool_index = 0;
  _tested_pool_index = 0;

  ACE_DEBUG((LM_DEBUG, " DBPoolContainer (reader): \n"));
  for (unsigned int i=0; i<hosts.size(); i++) {

    char id[3];
    sprintf(id,"%.2d",i);
    id[2]=0;
    string id2(id);

    _pools.push_back(new DBPool(_config,
                        hosts[i],
                        id2));

  }

}

void DBPoolContainer::initAsWriter() {

  // We'll create as many Pools as connections we should have,
  // passing each Pool all DB's data for making a connection
  // to each one.
  // We get the connection number data from the first machine,
  // because it has to be the same

 _current_strategy = 0;//ST_ROUND_ROBIN;
 _used_pool_index = 0;
 _tested_pool_index = 0;

 ACE_DEBUG((LM_DEBUG," DBPoolContainer (writer): \n"));

 // Create the pools

 for (unsigned int i=0; i<_config->getInitDbConnections(); i++) {

   char id[3];
   sprintf(id,"%.2d",i);
   id[2]=0;
   string id2(id);
   
   _pools.push_back(new DBPool(_config,
                         _config->getHosts(),
                         id2));

 }

}


void DBPoolContainer::growReaderMode(int conns) {

  unsigned int connsPerPool = conns / _pools.size();

  ACE_DEBUG((LM_NOTICE,"(%t,%T) >> DBPoolContainer: Growing %d connections, %d per pool.\n",conns,connsPerPool));
  for (unsigned int i=0; i<_pools.size(); i++) {
    _pools[i]->grow(connsPerPool);
  }
 

}

void DBPoolContainer::shrinkReaderMode(int conns) {

  unsigned int connsPerPool = conns / _pools.size();
  ACE_DEBUG((LM_NOTICE,"(%t,%T) << DBPoolContainer: Shrinking %d connections, %d per pool.\n",conns,connsPerPool));

  for (unsigned int i=0; i<_pools.size(); i++) {
    _pools[i]->shrink(connsPerPool);
    
  }
}

void DBPoolContainer::growWriterMode(int conns) {
  
  unsigned int _npools = _pools.size();

  for (unsigned int i=_npools; i<(_npools+conns); i++) {

    char id[2];
    sprintf(id,"%d",i);
    
    _pools.push_back(new DBPool(_config,
                        _config->getHosts(),
                        id));
    
  }
 

}

void DBPoolContainer::shrinkWriterMode(int conns) {

  // Mutex everything till shrunken. That should not be a problem to
  // performance, given that we're shrinking due to LOW load :-)

  for(int i=0; i<abs(conns); i++) {

    vector<DBPool*>::iterator pool = _pools.end();
    _pools.erase(pool);

    // Hey, this should be done in a more civilized way..., shouldn't it??? (DV, talking to himself...)
    delete *pool;

  }

}

Generated by  Doxygen 1.6.0   Back to index