Logo Search packages:      
Sourcecode: dbbalancer version File versions

DBPool.cc

#include "DBPool.hh"
#include "DBPooledConnection.hh"
#include "../config/DBHostConfig.hh"
#include "../config/DBBalancerConfig.hh"
#include "../postgres/DBPostgresPooledConnection.hh"
#include "../postgres/DBPostgresWriterConnection.hh"

using namespace std;

/**
 * R-O connection, handled by DBPostgresPooledConnection.
 * This kind of pool has SEVERAL connections with ONE host.
 */

DBPool::DBPool(DBBalancerConfig* config, DBHostConfig* host, string id) {

  // We save this data because maybe we'll need to make more connections.
  _config = config;
  _host = host;

  _used_conn_index=0;
  _tested_conn_index=0;

  _id = id;

  // We create the minimum number of connections
  // Firstly they will be DBPostgresPooledConnection classes, but
  // could be any class that extends DBPooledConnection.

  ACE_DEBUG((LM_NOTICE,"\nPostgres Pool in %s:%d/%s as %s: ",host->getName().c_str(),host->getPort(),host->getDbName().c_str(),host->getDbUser().c_str()));

  for (int i=0; i< config->getInitDbConnections(); i++) {
                  
    char c[3];
    sprintf(c,"%.2d",i);
    c[2]=0;
    string id2(c);

 
    DBPostgresPooledConnection *post = new DBPostgresPooledConnection(config,host,id2);
     

    if (post->isValid()) {
   
      ACE_DEBUG((LM_NOTICE," %d",i));

    } else {
 
      ACE_DEBUG((LM_NOTICE," X"));

    }

    post->setId(id+"::"+id2);
    _connections.push_back(post);
      
  }
  ACE_DEBUG((LM_NOTICE,"\n"));
  _nConns = _connections.size();

}


/**
 * R-W connection, handled by DBPostresWriterConnection.
 * Here we're creating a Pool that has ONE connection in EACH of the different hosts. The writes
 * are doing in parallel between the client and all the hosts.
 */

DBPool::DBPool(DBBalancerConfig *config, vector<DBHostConfig *> hosts, string id) {

  _config = config;

  // Conn index: Always 0, because this kind of pool only has one (though multiplexed) virtual connection.
  _used_conn_index=0;
  _tested_conn_index=0;

  _id = id;

  cout << "    Postgres Writing Pool in: ";

  for (vector<DBHostConfig *>::iterator p = hosts.begin(); p!=hosts.end(); ++p) {
    cout << (*p)->getName() << " ";
  }
            
  DBPostgresWriterConnection *post = new DBPostgresWriterConnection(config, hosts, id);
  
  if (post->isValid()) {

    ACE_DEBUG((LM_NOTICE," %s\n",id.c_str()));    
    
  } else {
    
    ACE_DEBUG((LM_NOTICE," X\n"));    
    
  }
  
  post->setId(id);
  _connections.push_back(post);
  _nConns = _connections.size();
  
}


DBPool::~DBPool() {

  int n = _connections.size();

  ACE_DEBUG((LM_DEBUG,"(%t,%T) Destroying DBPool(): %s. %d connections.\n",_id.c_str(),n));
  
  for (unsigned int i=0; i<n; i++) {

    DBPooledConnection *conn = _connections.back();
    _connections.pop_back();
    delete conn;

  }

}

/** 
 * Here we've got to define the algorithm to choose a connection for test.
 * This method will be called by the connection reaper thread, which searches
 * for invalid or stuck connections.
 * There could be three cases:
 * a) !valid: urgent check and recover
 * b) valid && !free && (time_use > timeout): check and free.
 * c) valid && free: routine check.
 */

DBPooledConnection *DBPool::getPooledConnectionForTest(int strategy) {

  unsigned int connection;

  // Lock while computing next connection. (Just needed
  // if we've got several reapers).
  
  _tested_conn_index=((_tested_conn_index+1)>=_nConns)?0:(_tested_conn_index+1);

  connection = _tested_conn_index;
  // Unlock
  
  ACE_DEBUG((LM_DEBUG,"(%t %T) DBPool::getConnectionForTest(): Lets see if pool %s, Conn %d... \n ",_id.c_str(),connection));
  
  return _connections[connection]->testable();
     
      
}


// We should do one pass over every connection in the DBPool to
// get one that is in state "usable()==true".

DBPooledConnection *DBPool::getPooledConnection(int strategy) {


  DBPooledConnection* db;

  switch (strategy) {

  case 0:
    
    for (unsigned int i=0; i<_nConns; i++) {

      // Lock while computing next connection.
      _used_conn_mutex.acquire();
      _used_conn_index=((_used_conn_index+1)>=_nConns)?0:(_used_conn_index+1);
      _used_conn_mutex.release();
      // Unlock

      ACE_DEBUG((LM_DEBUG,"(%t %T) DBPool::getPooledConnection(): Checking Pool %s, Conn %d, NConns %d. \n ",_id.c_str(),_used_conn_index, _nConns));
      db = _connections[_used_conn_index]->usable();
      if (db) return db;

    }
    
    return NULL;
     
      
  default:
    return NULL;

  }

}

// This method increases the number of connections (used only when READER DBPool)

int DBPool::grow(int growConns) {

  ACE_DEBUG((LM_NOTICE,"(%t,%T) >> DBPool(%s): Growing (%d->%d).\n",_id.c_str(),_connections.size(),_connections.size()+growConns));

  for (unsigned int i=_nConns; i< _nConns+growConns; i++) {

    char c[3];
    sprintf(c,"%d\0",i);

    DBPostgresPooledConnection *post = new DBPostgresPooledConnection(_config,_host, _id+"::"+c);

    if (post->isValid()) {
      ACE_DEBUG((LM_DEBUG," %d",i));
    } else {
      ACE_DEBUG((LM_DEBUG," X"));
    }

    _connections.push_back(post);
      
  }
  ACE_DEBUG((LM_DEBUG,"\n"));
  _nConns += growConns;
  return _nConns;


}

// This method decreases the number of connections (used only when READER DBPool)
// We have to find a way to make sure the conns are not being used when we delete them (dbc->usable())
// and to make sure the conn number remains stable.

int DBPool::shrink(int shrinkConns) {

  ACE_DEBUG((LM_NOTICE,"(%t,%T) << DBPool(%s): Shrinking (%d->%d).\n",_id.c_str(),_connections.size(),_connections.size()-shrinkConns));
  for(int i=0; i<abs(shrinkConns);) {

    DBPooledConnection* dbc = _connections.back();
    if (dbc->usable()) {
      _used_conn_mutex.acquire();
      _nConns -= 1;
      dbc->closeConnection();
      _connections.pop_back();
      delete dbc;
      _used_conn_mutex.release();
      i++;
    } else {
      ACE_DEBUG((LM_DEBUG,"(%t,%T) DBPool::shrink Could not delete connection (still in use or invalid). Trying again in 1 second.\n"));
      ACE_OS::sleep(1);
    }

  }
  return _nConns;

}



Generated by  Doxygen 1.6.0   Back to index