Logo Search packages:      
Sourcecode: dbbalancer version File versions

DBPostgresPooledConnection.cc

#include "../conn_pool/DBPooledConnection.hh"
#include "../config/DBHostConfig.hh"
#include "../config/DBBalancerConfig.hh"
#include "DBPostgresBackend.hh"
#include "DBPostgresFrontend.hh"
#include "DBPostgresPooledConnection.hh"

// Constructor: Copy the attributes, and init the backend connection.

DBPostgresPooledConnection::DBPostgresPooledConnection(DBBalancerConfig* config, DBHostConfig* host, string id) {

  _mutex.acquire();
  this->setId(id);

  // Create Backend with the host where to connect configuration.
  _be = new DBPostgresBackend(config, host);
  _be->setId(this->getId());
  // Create Frontend with his "connection accept" parameters.
  _fe = new DBPostgresFrontend(config);

  // Open up connection to the BE.

  _valid = _be->startUp();
  
  // We assign the secret key to the FE from the BE.
  _fe->setKey(_be->getKeyPointer(),_be->getKeySize());

  _free = true;
  _mutex.release();

}


DBPostgresPooledConnection::~DBPostgresPooledConnection() {

  ACE_DEBUG((LM_DEBUG,"(%t,%T) Destroying DBPostgresPooledConnection(): %s.\n",this->getId().c_str()));

  delete _fe;
  delete _be;

}


// Interfaz DBPooledConnection.

bool DBPostgresPooledConnection::handleConnection(int fe_socket) {

  int nmesg;
  int modified;
  int max_descriptor;

  int be_socket = _be->getSocket();

  ACE_DEBUG((LM_DEBUG,"(%t, %T) (%s) Handling connection.\n",getId().c_str()));
  ACE_DEBUG((LM_DEBUG,"(%t, %T) Processing new conection. Receiving login data. (socket: %d)\n",fe_socket));

  if (_fe->receiveStartup(fe_socket)) {

    ACE_DEBUG((LM_DEBUG,"(%t, %T) Login data received OK.\n"));

  } else {

    ACE_DEBUG((LM_ERROR,"(%t, %T) Login process failed.\n"));
    return handleConnectionFrontendError("Login process failed.",true,fe_socket);

  }


  // Now we'll loop to process backend and frontend messages. The only protocol
  // command that's relevant to us is the connection closing. If we receive it, we
  // have to filter it, close the socket, and mark the connection as free.
  
  nmesg=0;
  if (fe_socket>be_socket) {
    max_descriptor = fe_socket+1;
  } else {
    max_descriptor = be_socket+1;
  }

  ACE_DEBUG((LM_DEBUG," (%t, %T) Max Descriptor %d.\n",max_descriptor));
  
  unsigned int loop_iterations = 0;

  while (true) {

    ACE_DEBUG((LM_DEBUG,"************** handleConnectionLoop (it: %d) *************\n",++loop_iterations));

    fd_set read_set;
    FD_ZERO(&read_set);
    FD_SET(fe_socket,&read_set);
    FD_SET(be_socket,&read_set);

    if ((modified=select(max_descriptor,&read_set,NULL,NULL,NULL))<1) {

      ACE_DEBUG((LM_ERROR,"(%t) select(2) error (modified %d descriptors): %m\n",modified));

      // Throw exception

      if (errno==EBADF) {
      ACE_DEBUG((LM_ERROR,"(%t) Error: Invalid file descriptor (%d,%d): %m\n",fe_socket,be_socket));
      return handleConnectionFrontendError("Invalid file descriptor",true,fe_socket);
      }

      return handleConnectionFrontendError("Error in select()",true,fe_socket);

    } else {

      ACE_DEBUG ((LM_DEBUG," (%t, %T) %d descriptors modified.\n",modified));

      // Data in the Front End socket.

      if (FD_ISSET(fe_socket,&read_set)) {
      
      ACE_DEBUG((LM_DEBUG," (%t, %T) Data in FE socket.\n"));

      // Now we'll loop till we read all data in FE socket
      unsigned int result;
      do {

        // Read data.
        result =_fe->read(fe_socket);

        switch (result) {

        case DBPostgresFrontend::CONN_CLOSED:
          
          return handleConnectionClose(fe_socket);
          
        case DBPostgresFrontend::READ_EXCEPTION:

          return handleConnectionFrontendError("Exception reading from FE socket.",true,fe_socket);

        default:

          // Write data to the backend.
          if (_fe->writeBack(be_socket)==DBPostgresFrontend::WRITE_EXCEPTION) 
            return handleConnectionBackendError("Exception writing to the BE.",0,fe_socket);

        }
        
      } while (result==DBPostgresFrontend::BUFF_MORE);

      // Detect situations where clients hold the connection and don't send data.
      // That happens when suddenly closing the DB...
      if ((loop_iterations>MAX_LOOP_ITERATIONS) && (_fe->getBufferFilling()==0))
        return handleConnectionFrontendError("Client hung and not sending data?.",0,fe_socket);

      
      }

      // If is there data in the "back end" it's pushed right to "front end"

      if (FD_ISSET(be_socket,&read_set)) {

      //ACE_DEBUG((LM_DEBUG," (%t, %T) Data in BE socket.\n"));

      // Now we'll loop till we read all data in BE socket
      unsigned int result = 0;
      do {

        // Read data.
        if (_be->read()==DBPostgresBackend::READ_EXCEPTION) {

          // Throw exception (it should have been thrown by _be).
          ACE_DEBUG ((LM_ERROR," (%t, %T) Just %d bytes read from BE socket: %m. !!\n",result));
          return handleConnectionBackendError("Exception reading from BE.",0,fe_socket);

        } else {

          if (_be->writeForward(fe_socket,_be->getBufferFilling()) == DBPostgresBackend::WRITE_EXCEPTION)
            return handleConnectionFrontendError("Exception writing to FE.",true,fe_socket);
      
        }
        
      } while (result==DBPostgresBackend::BUFF_MORE);

      // Detect situations where clients hold the connection and don't send data.
      // That happens when suddenly closing the DB...
      if ((loop_iterations>MAX_LOOP_ITERATIONS) && (_be->getBufferFilling()==0))
        return handleConnectionBackendError("Client hung and not sending data?.",0,fe_socket);



      }

    } // while(1)
    

  }

  return 0;

}
// What to do in case of an error.

bool DBPostgresPooledConnection::handleConnectionFrontendError(string message, bool state, int socket) {

  if (state) {
    _free = true;
  }

  ACE_DEBUG((LM_ERROR,"(%t, %T) DBPostgresPooledConnection: %s\n",message.c_str()));
  close(socket);

  return false;

}

bool DBPostgresPooledConnection::handleConnectionBackendError(string message, int be_number, int socket) {

  
  // We should send some error to the client....
  close(socket);
  // Connection State
  _free = true;
  _valid = false;
  // We should delete this BE conn from the list of valid conns.

  ACE_DEBUG((LM_ERROR,"(%t, %T) DBPostgresPooledConnection: %s (%m)\n",message.c_str()));
  close(socket);

  return false;

}


// What to do when a client wants to close the connection.

bool DBPostgresPooledConnection::handleConnectionClose(int socket) {

  close(socket);
  _free = true;
  ACE_DEBUG((LM_DEBUG,"(%t, %T) Connection: %s has finished. Now FREE!\n",getId().c_str()));
  return true;

}

// This code is executed by the reaper thread to make sure our connection is OK.

bool DBPostgresPooledConnection::checkConnection() {

  if (!_valid || !(_valid=_be->testConnection()) ) {
    
    ACE_DEBUG((LM_DEBUG,"(%t, %T) checkConnection: Going to recover conn %s.\n",getId().c_str()));
    if (_valid = _be->recoverConnection()) {
      ACE_DEBUG((LM_NOTICE,"(%t, %T) Connection %s has been RECOVERED!!\n",getId().c_str()));;
      _fe->setKey(_be->getKeyPointer(),_be->getKeySize());
    }

  } else {
    
    ACE_DEBUG((LM_DEBUG,"(%t, %T) checkConnection: wasOk %s \n",getId().c_str()));

  }
 
  _free = true;
  return _valid;

}




bool DBPostgresPooledConnection::isFree() {

  return _free;

}

bool DBPostgresPooledConnection::isValid() {

  return _valid;

}

DBPooledConnection *DBPostgresPooledConnection::usable() {

  _mutex.acquire();

  if (_valid && _free) {

    _free = false;
    _mutex.release();
    ACE_DEBUG((LM_DEBUG,"(%t, %T) Connection %s is going to be used!!.\n",getId().c_str()));
    
    return this;

  }
  
  if (!_valid) {

    ACE_DEBUG((LM_WARNING,"(%t, %T) Invalid Connection %s!!!.\n",getId().c_str()));

  }

  if (!_free) ACE_DEBUG((LM_DEBUG,"(%t, %T) Connection %s is NOT free!!!.\n",getId().c_str()));


  _mutex.release();
  return NULL;

}

DBPooledConnection *DBPostgresPooledConnection::testable() {
  
  // Here we've got to define the algorithm to choose a connection for test.
  // There could be three cases:
  // a) !valid && free
  // b) valid && !free && (time_use > timeout)
  // c) valid && free: routine check.

  // The algorithm could live here or in DBPool. Now lives here but maybe
  // later I'll move it.

  _mutex.acquire();

  if (_free) {

    _free = false;
    _mutex.release();
    ACE_DEBUG((LM_DEBUG,"(%t, %T) Connection %s is going to be tested !!.\n",getId().c_str()));
    
    return this;

  }

  _mutex.release();
  return NULL;

}


void DBPostgresPooledConnection::closeConnection() {

  //ACE_DEBUG((LM_WARNING,"(%t) closeConnection() still not implemented in DBPostgresPooledConnection().\n"));
  return ;

}


// Private Methods

// This method checks the things we want to intercept like ends of connection (always), 
// querys (in r/w mode, to update the tables and possible executing updates), and 
// updates (in r/w mode, to update the tables).

/*
bool DBPostgresPooledConnection::interceptFrontEndMessage(int fe_socket,int be_socket, char *message, int size, bool first) {


  //  if (first && message[0]=='X' && message[1]==0) {
  if (first && message[0]=='X') {

    ACE_DEBUG((LM_DEBUG," (%t) End of connection intercepted.\n"));
    return false;

  } else {
  
    ACE_DEBUG((LM_DEBUG," (%t) Writing %d bytes to BE.\n",size));
    send(be_socket,message,size,SEND_FLAGS);

  }
    
  return true;


}
*/

Generated by  Doxygen 1.6.0   Back to index