Logo Search packages:      
Sourcecode: dbbalancer version File versions

DBPostgresWriterConnection.cc

#include "../conn_pool/DBPooledConnection.hh"

#include "DBPostgresBackend.hh"
#include "DBPostgresFrontend.hh"
#include "DBPostgresWriterConnection.hh"

#include "../config/DBHostConfig.hh"
#include "../config/DBBalancerConfig.hh"


// Copy the attributes, and init the frontend and backend connections.

DBPostgresWriterConnection::DBPostgresWriterConnection(DBBalancerConfig *config, vector<DBHostConfig *> hosts, string id) {

  _mutex.acquire();
  this->setId(id);
  _writerTransactionPerRequest = config->getWriterTransactionPerRequest();
  
  // Backend connections (as many as hosts replicated).

  for (unsigned int i=0; i<hosts.size();i++) {

    char _id[12];
    memset(_id,0,12);
    string id_st;

    DBPostgresBackend *be = new DBPostgresBackend(config, hosts[i]);
    sprintf(_id,":%s:%d",id.c_str(),i);
    id_st = _id;
    be->setId(id_st);

    be->startUp();
    _be.push_back(be);


  }

  // Frontend connection (just one: The Client).

  _fe = new DBPostgresFrontend(config);

  // Assigning the key to the FE from the first BE. This is absolutely arbitrary...
  // but FE needs one ...

  if (hosts.size()>0) {

    //_be[0]->dump();
    _fe->setKey(_be[0]->getKeyPointer(),_be[0]->getKeySize());

  }

  
  _valid = this->checkConnection();
  _free = true;

  _mutex.release();

}


DBPostgresWriterConnection::~DBPostgresWriterConnection() {

  ACE_DEBUG((LM_DEBUG,"(%t,%T) Destroying DBPostgresWriterConnection(): %s.\n",this->getId().c_str()));

  // Delete the connections.
  for (unsigned int i=0; i<_be.size();i++) {

      DBPostgresBackend *be = _be.back();
      _be.pop_back();
      delete be;

  }
  delete _fe;
}


// ****************************************************************************
// CONNECTION PROCESS
// ****************************************************************************

bool DBPostgresWriterConnection::handleConnection(int fe_socket) {

  int i = 0;
  int nmesg;
  int modified;
  int max_descriptor;
  int loop_iterations;
  unsigned int discarded_reads;
  
  ACE_DEBUG((LM_DEBUG,"(%t, %T) (%s) Handling connection.\n",getId().c_str()));
  ACE_DEBUG((LM_DEBUG,"(%t, %T) Processing new conection. Receiving login data. (socket: %d)\n",fe_socket));

  if (_fe->receiveStartup(fe_socket)) {

    ACE_DEBUG((LM_DEBUG,"(%t, %T) Login data received OK.\n"));

  } else {

    return handleConnectionFrontendError("Error receiving login data.\n",true,fe_socket);

  }

  // Decide which is the max file descriptor. The BE part could be done just once .. :-m  
  nmesg=0;
  _highest_socket_descriptor=0;

  for (vector<DBPostgresBackend *>::iterator p = _be.begin(); p!=_be.end(); ++p) {
      DBPostgresBackend *be = *p;
      if (max_descriptor<be->getSocket()) max_descriptor=be->getSocket();
  }

  if (fe_socket>_highest_socket_descriptor) {
    _highest_socket_descriptor = fe_socket+1;
  } else {
    _highest_socket_descriptor += 1;
  }

  ACE_DEBUG((LM_DEBUG," (%t, %T) Max Descriptor %d.\n",_highest_socket_descriptor));

  // Start the automatic TRANSACTION if needed
  if (_writerTransactionPerRequest)
    this->sendTransactionCommand("begin",fe_socket);
  
  // Now we'll wait to process backend and frontend messages. Starting connection handling loop
  discarded_reads = 0;
  loop_iterations = 0;
  while (true) {

    ACE_DEBUG((LM_DEBUG,"************** handleConnectionLoop (it: %d) *************\n",++loop_iterations));

    // Adjust the sets for the "select"
    fd_set read_set;
    FD_ZERO(&read_set);
    FD_SET(fe_socket,&read_set);
    for (vector<DBPostgresBackend *>::iterator p = _be.begin(); p!=_be.end(); ++p) {
      DBPostgresBackend *be = *p;
      FD_SET(be->getSocket(),&read_set);
    }

    // Check the modified descriptors
    if ((modified=select(_highest_socket_descriptor,&read_set,NULL,NULL,NULL))<1) {

      // Error
      ACE_DEBUG((LM_ERROR,"(%t) select() error (modified %d descriptors): %m\n",modified));
      if (errno==EBADF) {
      ACE_DEBUG((LM_ERROR,"(%t) Error: Invalid file descriptor : %m\n"));
      return handleConnectionFrontendError("Invalid file descriptor",true,fe_socket);
      }

      return handleConnectionFrontendError("Error in select()",true,fe_socket);

    } else {

      // Ok
      ACE_DEBUG ((LM_DEBUG," (%t, %T) (%s) %d descriptors modified.\n",getId().c_str(),modified));

      // If there's data in front end socket.

      if (FD_ISSET(fe_socket,&read_set)) {
      
      ACE_DEBUG((LM_DEBUG," (%t, %T) Data in FE socket.\n"));
      // Reset the NEW COMMAND indicator. Used to know if this chunk is the beggining or the middle of a command.
      _new_command = true;

      // Now we'll loop till we read all data in FE socket
      unsigned int result;
      do {

        // Read data.
        result =_fe->read(fe_socket);

        switch (result) {

        case DBPostgresFrontend::CONN_CLOSED:
          
          return handleConnectionClose(fe_socket);
          
        case DBPostgresFrontend::READ_EXCEPTION:

          return handleConnectionFrontendError("Exception reading from FE socket.",true,fe_socket);

        default:

          // Write data to all backends.
          for (vector<DBPostgresBackend *>::iterator p = _be.begin(); p!=_be.end(); ++p) 
            if (_fe->writeBack((*p)->getSocket())==DBPostgresFrontend::WRITE_EXCEPTION) {
            return handleConnectionBackendError("Exception writing to a BE.",(*p),fe_socket);
            };
        }

      } while (result==DBPostgresFrontend::BUFF_MORE);

      }

      // If there is data in ALL of the "back ends" then we read it. Then
      // the whole buffer is checked for consistence between channels, and then pumped to the
      // "front end".

      int be_with_data = 0;
      for (unsigned int i=0; i<_be.size(); i++) {
      if (FD_ISSET(_be[i]->getSocket(),&read_set))
          be_with_data++;
      }

      if (be_with_data==_be.size()) {

      for (int i = 0; i<_be.size(); i++) {

        DBPostgresBackend *be = _be[i];
                  
        // We'll have to wait to flush the buffer, and that needs that the other buffers are filled too.
        // If one connection breaks, we can have data in the other buffers waiting indefinitely to be read.
        
        // We know for sure that EVERY backend has data. INFINITE LOOP warning here. Timeout in the select.

        be_with_data++;

        // There's data in this BE and we'll read it.
        ACE_DEBUG((LM_DEBUG,"(%t, %T) Data in BE socket (number %s).\n",be->getId().c_str()));
        
        switch (be->read()) {
          
        case DBPostgresBackend::READ_EXCEPTION:
          return handleConnectionBackendError("Exception while reading from a BE.",be,fe_socket);
          
        case DBPostgresBackend::BUFF_MORE:
          break;
          
        case DBPostgresBackend::BUFF_DATA:
          break;
          
        default:
          break;
          
        }


      } // for

      this->checkAndSendData(fe_socket);  

      } else {

      ACE_DEBUG((LM_DEBUG,"(%t, %T) Didn't have data in all Backends (%d / %d).\n",be_with_data,_be.size()));
      
      }

    } // while(1) main loop
    
    
  }

  return 0;

}



/**
 * Here comes the implementation of the flowchart provided in the file DBBalancerReplicacionFlowchart.dia.gz,
 * in the docs/UML/ directory.
 * First, we check the first character.
 * Then, we check the data that comes with the first character.
 */


int DBPostgresWriterConnection::checkAndSendData(int fe_socket) {

  ACE_DEBUG((LM_DEBUG,"\t***checkAndSendData(%d)\n",fe_socket));
  char command;

  if (_new_command) {
    
    _new_command = false;
    command = this->getBEBuffersFirstCharacter();

    ACE_DEBUG((LM_DEBUG,"(%t,%T) The returned command was (%c)\n",command));

  }

  switch (command) {

  case 'P':
    // If we don't have strict checking, ALL OK.
    ACE_DEBUG((LM_DEBUG,"(%t,%T) The backends executed the command OK.\n"));
    if (_be[0]->writeForward(fe_socket,_be[0]->getBufferFilling())==DBPostgresBackend::WRITE_EXCEPTION) {
      return handleConnectionFrontendError("Exception reading from FE socket",true,fe_socket);
    }
    // Maybe we should expect here the same data in every backend, to make exactly the same number reads from
    // every backend. A different number could hang the connection (in handleConnection() we wait for data
    // in EVERY backend, after the SELECT. If one have read more before, it could end earlier, with fatal
    // consecuences.

    this->resetBEBuffers();

    break;

  case 'E':
    // The Backends returned an ERROR. If the text is different, we should CONCATENATE the messages.
    // In any other case, just return the error.
    // We SUPPOSE that the buffers contains the WHOLE error string.
    // Provisionally we'll just return the first error.

    ACE_DEBUG((LM_DEBUG,"(%t,%T) The backends executed the command with ERROR.\n"));
    return processBackendExecutionError(fe_socket);

    //if (_be[0]->writeForward(fe_socket,_be[0]->getBufferFilling())==DBPostgresBackend::WRITE_EXCEPTION) {
    //      return handleConnectionFrontendError("Exception reading from FE socket",true,fe_socket);
    //}

    // Maybe we should expect here the same data in every backend, to make exactly the same number reads from
    // every backend. A different number could hang the connection (in handleConnection() we wait for data
    // in EVERY backend, after the SELECT. If one have read more before, it could end earlier, with fatal
    // consecuences.

    this->resetBEBuffers();
    break;

  case 0:
    // ERROR: Commands aren't the same.
    ACE_DEBUG((LM_DEBUG,"(%t,%T) Different command returned by Backends.\n"));
    for (int i = 0; i<_be.size(); i++) {
      ACE_DEBUG((LM_DEBUG,"\tBuffer %s has %d bytes.\n",_be[i]->getId().c_str(), _be[i]->getBufferFilling()));
      ACE_LOG_MSG->log_hexdump(LM_DEBUG,_be[i]->getBufferPointer(),_be[i]->getBufferSize());
    }     
    return handleConnectionSyncError("Received different response commands from the backends",fe_socket);
    
  default:
    ACE_DEBUG((LM_ERROR,"(%t,%T) Received an unexpected response command from the backends (%c)\n",command));
    return handleConnectionSyncError("Received a unexpected response command from the backends",fe_socket);
    break;
    
  }
  
  for (vector<DBPostgresBackend *>::iterator p = _be.begin(); p!=_be.end(); ++p) {
    
    ACE_DEBUG((LM_DEBUG,"\tBuffer %s has %d bytes.\n",(*p)->getId().c_str(), (*p)->getBufferFilling()));
    

  }


}

/**
 * The execution of a command returned error ('E') in all the backends.
 * We'll check the returned errors. If all are the same we'll return a copy of the message. If
 * they're different we'll return a concatenation of the messages. We're assuming that the error
 * messages sizes are smaller than the buffer sizes.
 *
 * @param fe_socket Front end socket for this request.
 */


bool DBPostgresWriterConnection::processBackendExecutionError(int fe_socket) {


  // Compare backends buffers DATA length.
  int length = _be[0]->getBufferFilling();
  for (int i=1;i<_be.size();i++) {
    if (_be[i]->getBufferFilling()!=length)
      return processBackendExecutionErrorWhenNotEqual(fe_socket); // Different
  }

  // Now compare the DATA itself
  char c;
  for (int i=1;i<_be[0]->getBufferFilling();i++) { // Every char in the buffer ...
    c=_be[0]->getBufferPointer()[i];
    //ACE_DEBUG((LM_DEBUG,"Char %d in buffer 0 is %c (buffer is %d long)\n",i,c,_be[0]->getBufferFilling()));
    for (int j=1;j<_be.size();j++) { // ... in every buffer
      //ACE_DEBUG((LM_DEBUG,"Char %d in buffer %d is %c\n",i,j,_be[j]->getBufferPointer()[i]));
      if (_be[j]->getBufferPointer()[i]!=c) {
      return processBackendExecutionErrorWhenNotEqual(fe_socket); // Different
      }

    }
  }  

  return processBackendExecutionErrorWhenEqual(fe_socket);


}

/**
 * We'll return just one error message, as all them are equal
 *
 */

bool DBPostgresWriterConnection::processBackendExecutionErrorWhenEqual(int fe_socket) {


  ACE_DEBUG((LM_DEBUG,"(%t,%T) DBPostgresWriterConnection::processBackendExecutionErrorWhenEqual\n"));
  
  if (_be[0]->writeForward(fe_socket,_be[0]->getBufferFilling())==DBPostgresBackend::WRITE_EXCEPTION) {
    return handleConnectionFrontendError("Exception reading from FE socket",true,fe_socket);
  }
  
  return true;

}

/**
 * We'll combine all the error messages in a big one. Then we'll dump it to the client.
 * As this is a symtom of a lack of synchronicity, we'll notice so.
 *
 */

bool DBPostgresWriterConnection::processBackendExecutionErrorWhenNotEqual(int fe_socket) {

  ACE_DEBUG((LM_DEBUG,"(%t,%T) DBPostgresWriterConnection::processBackendExecutionErrorWhenNotEqual\n"));
  ACE_DEBUG((LM_NOTICE,"(%t,%T) ERROR: The backends returned different error messages. Please check DB sync.\n"));

  char message[22];

  message[0]='E';

  if (send(fe_socket,message,1, DBPostgresFrontend::SEND_FLAGS)<0) {
    return handleConnectionFrontendError("Exception writing to FE socket",true,fe_socket);
  }

  for (int i=0;i<_be.size();i++) {

    snprintf(message,sizeof(message)," (From backend %d):  ",i);
    
    ACE_LOG_MSG->log_hexdump(LM_DEBUG, message, sizeof(message)-2);
    

    if (send(fe_socket,message,sizeof(message)-2,DBPostgresFrontend::SEND_FLAGS)<0) {
      return handleConnectionFrontendError("Exception writing to FE socket",true,fe_socket);
    }
    // We write buffer_filling-3, cos we don't want to return the trailing \n\0'Z' now.
    if (_be[i]->writeForward(fe_socket,_be[i]->getBufferFilling()-3)==DBPostgresBackend::WRITE_EXCEPTION) {
      return handleConnectionFrontendError("Exception writing to FE socket",true,fe_socket);
    }

  }

  message[0]=0;
  message[1]='Z';

  if (send(fe_socket,message,2, DBPostgresFrontend::SEND_FLAGS)<0) {
    return handleConnectionFrontendError("Exception writing to FE socket",true,fe_socket);
  }

  
  return true;


}




// *************************************************************************************************
// ERROR HANDLING
// *************************************************************************************************


/**
 * What to do in case of an error while writing to Front End.
 */

bool DBPostgresWriterConnection::handleConnectionFrontendError(string message, bool state, int socket) {

  if (state) {
    _free = true;
  }

  ACE_DEBUG((LM_ERROR,"(%t, %T) DBPostgresWriterConnection: %s (%m)\n",message.c_str()));
  close(socket);

  return false;

}

/**
 * What to do in case of an error while writing to a Back End.
 */

bool DBPostgresWriterConnection::handleConnectionBackendError(string message, DBPostgresBackend *be, int socket) {

  
  // We should send some error to the client....
  _fe->writeMessage(socket,"DBBalancer (replication mode) Backend error: ", message);

  // Connection State
  close(socket);
  _free = true;
  _valid = false;
  // We should delete this BE conn from the list of valid conns.

  // Clear all the Backend buffers
  this->resetBEBuffers();

  ACE_DEBUG((LM_ERROR,"(%t, %T) DBPostgresWriterConnection: %s (%m)\n",message.c_str()));

  return false;

}

/**
 * If we detect that the backend databases are out of sync
 * we'll send a ROLLBACK to all the backends except the failed one.
 */

bool DBPostgresWriterConnection::handleConnectionSyncError(string message, int socket) {

  
  // We should send some error to the client....
  _fe->writeMessage(socket,"DBBalancer (replication mode) sync error: ", message);

  // ROLLBACK everybody! if needed
  if (_writerTransactionPerRequest)
    this->sendTransactionCommand("rollback", socket);

  // Clear all the Backend buffers
  this->resetBEBuffers();

  ACE_DEBUG((LM_ERROR,"(%t, %T) DBPostgresWriterConnection: %s\n",message.c_str()));

  // Connection State
  //close(socket);
  //_free = true;
  //_valid = false;

  return false;

}


/**
 * What to do when a client wants to close the connection.
 */

bool DBPostgresWriterConnection::handleConnectionClose(int socket) {
  
  if (_writerTransactionPerRequest)
    this->sendTransactionCommand("commit", socket);

  close(socket);
  _free = true;
  ACE_DEBUG((LM_DEBUG,"(%t, %T) Connection: %s IS FREE!\n",getId().c_str()));
  return true;

}

/**
 * Here the approach will be a little different from the
 * one in DBPosgresPooledConnection. We have SEVERAL backends.
 * The whole connection (and possibly EVERY connection) will
 * be invalid if ANY backend is bad.
 *
 */

bool DBPostgresWriterConnection::checkConnection() {


  bool all_ok = true;

  for (vector<DBPostgresBackend *>::iterator b = _be.begin(); b!=_be.end(); ++b) {
    
    if (!(*b)->testConnection())
      if (!(*b)->recoverConnection())
      all_ok = false;
  }
  
  if (all_ok) {
    ACE_DEBUG((LM_DEBUG,"(%t,%T) Connection  Valid\n"));
    _fe->setKey(_be[0]->getKeyPointer(),_be[0]->getKeySize());    
  } else
    ACE_DEBUG((LM_DEBUG,"(%t,%T) Connection  inValid\n"));



  _valid = all_ok;
  _free = true;
  return _valid;

}

bool DBPostgresWriterConnection::isFree() {

  return _free;

}

bool DBPostgresWriterConnection::isValid() {

  return _valid;

}

DBPooledConnection *DBPostgresWriterConnection::usable() {

  _mutex.acquire();

  if (_valid && _free) {

    _free = false;
    _mutex.release();
    ACE_DEBUG((LM_DEBUG,"(%t) Connection %s is going to be used!!.\n",getId().c_str()));
    
    return this;

  }
  
  if (!_valid) {

    ACE_DEBUG((LM_WARNING,"(%t, %T) (%s) Invalid Connection!!!.\n",getId().c_str()));

  }

  if (!_free) ACE_DEBUG((LM_DEBUG,"(%t, %T) Connection %s is NOT free!!!.\n",getId().c_str()));

  _mutex.release();
  return NULL;

}

DBPooledConnection *DBPostgresWriterConnection::testable() {

  _mutex.acquire();

  if (_free) {

    _free = false;
    _mutex.release();
    ACE_DEBUG((LM_DEBUG,"(%t) Connection %s is going to be tested!!.\n",getId().c_str()));
    
    return this;

  }
  
  if (!_free) ACE_DEBUG((LM_DEBUG,"(%t, %T) Connection %s is NOT free for TEST!!!.\n",getId().c_str()));

  _mutex.release();
  return NULL;

}


void DBPostgresWriterConnection::closeConnection() {

  ACE_DEBUG((LM_WARNING,"(%t) closeConnection() still not implemented in DBPostgresWriterConnection().\n"));
  return ;

}


// ***********************************************************************************************

/**
 * Sends a trasaction command (BEGIN, COMMIT or ROLLBACK) to the Backends.
 * The function is synchronous. Writes the command to all backends and then waits for response 
 * also from all backends.
 */

unsigned int DBPostgresWriterConnection::sendTransactionCommand(string command, int socket) {
  
  ACE_DEBUG((LM_DEBUG,"(%T,%t) Sending Transaction Command: %s\n",command.c_str())); 
  
// Broadcast the command to all the backends
  unsigned int result;

  for (int i=0;i<_be.size();i++) {

    _be[i]->resetBuffer();
    result = _be[i]->writeAsyncCommand(command);
    if (result==DBPostgresBackend::READ_EXCEPTION || result==DBPostgresBackend::READ_EXCEPTION)
      return handleConnectionBackendError("Error while broadcasting transaction command to Backends.",_be[i],socket);
    
  }


  // Now we'll read from all the backends.
  for (vector<DBPostgresBackend *>::iterator p = _be.begin(); p!=_be.end(); ++p) {
    
        if ((*p)->syncRead()==DBPostgresBackend::READ_EXCEPTION) {
        return handleConnectionBackendError("Error reading response to transaction command.",(*p),socket);     
      }
  }

  // Check the 'C' return command.
  for (vector<DBPostgresBackend *>::iterator p = _be.begin(); p!=_be.end(); ++p) {

    if ((*p)->getBufferPointer()[0]!='C') {
      ACE_LOG_MSG->log_hexdump(LM_DEBUG,(*p)->getBufferPointer(),(*p)->getBufferFilling());
      return handleConnectionBackendError("Didn't read the right response from transaction.",(*p),socket);       
    }
    (*p)->resetBuffer();
  }

  return 0;
   

}

/**
 * Compare the first characters of the buffers. That's the command the backend
 * returns in response to a SQL command. 
 * The buffer may have or have not data.
 *
 */

char DBPostgresWriterConnection::getBEBuffersFirstCharacter() {

  ACE_DEBUG((LM_DEBUG,"(%t,%T) getBEBuffersFirstCharacter() \n"));

  char c,command;

  command = *(_be[0]->getBufferPointer());

  for (int i=1;i<_be.size();i++) {

    ACE_DEBUG((LM_DEBUG,"(%t,%T) Content of buffer %d\n",i));
    ACE_LOG_MSG->log_hexdump(LM_DEBUG,_be[i]->getBufferPointer(),_be[i]->getBufferFilling());    

    c=*(_be[i]->getBufferPointer()); 
    if (c!=command)
      return (char)NULL;

  }

  return command;

}

void DBPostgresWriterConnection::resetBEBuffers() { 
  
  for (int i=0;i<_be.size(); i++) _be[i]->resetBuffer(); 

}

Generated by  Doxygen 1.6.0   Back to index