Logo Search packages:      
Sourcecode: dbbalancer version File versions

DBPostgresBackend.cc

#include "DBPostgresBackend.hh"
#include "../config/DBHostConfig.hh"
#include "../config/DBBalancerConfig.hh"

// Constructor: Copy the attributes, and init the backend connection.

DBPostgresBackend::DBPostgresBackend(DBBalancerConfig *config, DBHostConfig *host) {
  
  // Store backend connection data, just in case it's needed.
  _user = host->getDbUser();
  _password = host->getDbPassword();
  _db = host->getDbName();
  _host = host->getName();
  _port = host->getPort();
  _unixSocketBasePath = config->getUnixSocketBasePath();

}


DBPostgresBackend::~DBPostgresBackend() {

  // Destructor: Close the backend connection
  ACE_DEBUG((LM_DEBUG,"(%t,%T) Destroying and closing DBPostgresBackend(): %s.\n",_id.c_str()));

  char closeCommand[1];
  closeCommand[0]='X';

  send(_be_socket,closeCommand,sizeof(closeCommand),SEND_FLAGS);
  close(_be_socket);


}

/**
 * Starts up the connection with the backend, managing the authentication
 * process as requested by that backend.
 *
 */

bool DBPostgresBackend::startUp() {

  char startup[296];
  char auth_response[16];
  char ready_for_query[1];
  int be_socket;
        
  if ((be_socket = this->getSocketConnection(_host,_port))<0) {
    ACE_DEBUG((LM_ERROR," (%t,%T) Couldn't connect to backend (%s:%d) !!\n",_host.c_str(),_port));
    return false;
  }

  _be_socket = be_socket;

  // Let's create the initial packet.

  memset(startup,0,sizeof(startup));

  // Packet size: Int32(296)
  startup[0]=0;
  startup[1]=0;
  startup[2]=0x1; // 255
  startup[3]=0x28;

  // Protocol version: Int32
  startup[4]=0;
  startup[5]=2;
  startup[6]=0;
  startup[7]=0;

  // Database Name: LimString64
  // Check limits of "db"

  for (unsigned int i=0; i<_db.length(); i++) {
    startup[i+8]=_db[i];
  }

  // Database user: LimString32
  // Check limits of "user"
  
  for (unsigned int i=0; i<_user.length(); i++) {
    startup[i+72]=_user[i];
  }
  
  
  // We'll send the initial packet
  
  if (send(be_socket,startup,sizeof(startup),SEND_FLAGS)==-1) {

    ACE_DEBUG((LM_ERROR," (%t) Error writing into the BE socket: %m !!\n"));
    return false;

  }

  // Wait for a reply (should be a authentication request (if PLAINTEXT_PASSWORD_AUTH) or
  // an authentication confirmation (if TRUSTED_AUTH).

  int auth_response_size;
  if ((auth_response_size = recv(be_socket,auth_response,5,RECV_FLAGS))==-1) {

    ACE_DEBUG((LM_ERROR," (%t) Error reading auth response from BE socket: %m !!\n "));
    return false;

  }
  auth_response[sizeof(auth_response)-1]=0;

  //ACE_LOG_MSG->log_hexdump(LM_DEBUG,auth_response,auth_response_size);

  if (auth_response[0]=='R') {

    switch (auth_response[4]) {


    case TRUSTED_AUTH_MODE: // Trusted auth.

      break;


    case PLAINTEXT_PASSWORD_AUTH_MODE:  // We send the plain text password

      char size[4];
      memset(size,0,sizeof(size));
      // Grandísima chapuza (limita el password a 256 chars), arreglarlo con desplazamientos.
      // Chequear el tamaño del password.

      size[3]=_password.length()+5;

      // Send password;
      send(be_socket,size,4,SEND_FLAGS);
      send(be_socket,_password.c_str(),_password.length()+1,SEND_FLAGS);

      // Receive the new auth_response.
      if ((auth_response_size=recv(be_socket,auth_response,5,RECV_FLAGS))==-1) {
      ACE_DEBUG((LM_ERROR,"(%t, %T) Error reading from socket !!\n "));
      return false;
      }
      auth_response[5]=0;
      break;

    default:
    
      ACE_DEBUG((LM_ERROR," (%t) Received a auth request different from what we expected (plain text password). We received %x (expecting 0x3).\n",auth_response[4]));
      return false;
      
    }
    
  } else {

    // The response wasn't a authentication request. Surely there is an error in the startup (user, dbname)
    // Should read the error and dump it in a more correct way !

    ACE_DEBUG((LM_ERROR," (%t) Didn't receive an auth request. Startup packet must be incorrect: %x.\n",auth_response[0]));
    ACE_LOG_MSG->log_hexdump(LM_DEBUG,auth_response,sizeof(auth_response));
    ACE_DEBUG ((LM_DEBUG,"\n"));
    return false;

  }


  //ACE_LOG_MSG->log_hexdump(LM_DEBUG,auth_response,auth_response_size);
  
  // Here we must have a valid authentication response. If not, maybe the user or user/password are incorrect.

  if ((auth_response[0]!='R') || (auth_response[4]!=0)) {

    ACE_DEBUG((LM_ERROR,"(%t,%T) Error connecting to Back End (probably access denied). Please note that so far we only support TRUST and PASSWORD authentication modes in the backend databases\n"));
    ACE_DEBUG((LM_ERROR,"(%t,%T) Postgres Error: %s\n",auth_response+(sizeof(char))));
    close(be_socket);
    return false;

  }

  // Read the secret key from the server. Used for normally for async querys. 
  // But in DBBalancer is not currently being used (and hence it doesn't suppport async querys).
  // Here a problem appeared with Postgres 7.2, since is doesn't seem to send a zero after they key,
  // amounting only 9 bytes instead of 10.

  //ACE_DEBUG((LM_DEBUG,"(%t,%T) Waiting for SECRET_KEY\n"));

  memset(_secret_key_response,0,sizeof(_secret_key_response)-1);
  if (recv(be_socket,_secret_key_response,sizeof(_secret_key_response)-1,RECV_FLAGS)==-1) {

    ACE_DEBUG((LM_ERROR,"(%t) Error reading secret key from the Back End socket: %m\n"));

  }
  _secret_key_response[sizeof(_secret_key_response)-1]=0;

  // Read the READY_FOR_QUERY confirmation from the server. It's the final indication that everything is OK

  //ACE_DEBUG((LM_DEBUG,"(%t,%T) Waiting for READY_FOR_QUERY\n"));
  if (recv(be_socket,ready_for_query,1,RECV_FLAGS)==-1) {

    ACE_DEBUG((LM_ERROR,"(%t) Error reading from the Back End socket: %m\n"));
    return false;

  }

  if (ready_for_query[0]=='Z') {

    // Everything has gone OK!

    return true;

  } else {

    // A problem happened (it's rare that anything fail at this level).

    ACE_DEBUG((LM_ERROR,"(%t, %T) Ready for Query not received. '%c' instead.\n",ready_for_query[0]));
    close(be_socket);
    return false;

  }


}


/**
 * This method should be somewhat intelligent.
 * There could be cases in which we'll have to
 * keep part of the buffer because it hasn't been
 * consolidated on the FE yet (pending).
 */

unsigned int DBPostgresBackend::read () {

  int size;
  int result;


  size = recv(_be_socket, _buffer ,sizeof(_buffer),RECV_FLAGS);

  if (size==sizeof(_buffer)) {

    _buffer_filling = size;
    result = BUFF_MORE;

  } else {

    if (size<0) {
    
      ACE_DEBUG ((LM_DEBUG," (%t, %T) Problem receiving data from BE socket (%d): %m.\n ",size));
      // Throw exception!!!!!
      _buffer_filling = 0;
      return DBPostgresBackend::READ_EXCEPTION;

      
    } else {

      _buffer_filling = size;
      result = BUFF_DATA;

    }

  }

  ACE_DEBUG ((LM_DEBUG," (%t, %T) Received %d bytes from BE socket.\n ",size));
  return result;
  
}

// Get quick and short (shorter than the buffer size) responses from 
// the backend. Used with transaction commands. It auto resets the buffer pointer.

unsigned int DBPostgresBackend::syncRead() {

  int size;
  fd_set read_set;
  FD_ZERO(&read_set);
  FD_SET(_be_socket,&read_set);
  
  // Ensure we've got data
  //ACE_DEBUG((LM_DEBUG,"(%t,%T) DBPostgresBackend::syncRead (pre_select) (%d)\n",_be_socket));
  select(_be_socket+1,&read_set,NULL,NULL,NULL);
  //ACE_DEBUG((LM_DEBUG,"(%t,%T) DBPostgresBackend::syncRead (post_select) (%d)\n",_be_socket));
  size = recv(_be_socket, _buffer, sizeof(_buffer), RECV_FLAGS);

  if (size<0) {
    
    ACE_DEBUG ((LM_DEBUG," (%t, %T) DBPostgresBackend::syncRead() Problem receiving data from BE socket (%d): %m.\n ",size));
    // Throw exception!!!!!
    _buffer_filling = 0;
    return DBPostgresBackend::READ_EXCEPTION;
    
  }

  this->resetBuffer();
  return DBPostgresBackend::READ_OK;

}


unsigned int DBPostgresBackend::writeForward (int fe_socket, int size) {

  ACE_LOG_MSG->log_hexdump(LM_DEBUG,_buffer,(size>_buffer_filling)?_buffer_filling:size);
  ACE_DEBUG ((LM_DEBUG,"\n"));

  int result = send(fe_socket, _buffer ,(size>_buffer_filling)?_buffer_filling:size, SEND_FLAGS);
  _buffer_filling = 0;

  if (result<0) {

    ACE_DEBUG ((LM_DEBUG," (%t, %T) Problem sending data to FE socket: %m.\n ",result));
    // Throw exception!!!!!
    return DBPostgresBackend::WRITE_EXCEPTION;

  } else {

    ACE_DEBUG ((LM_DEBUG," (%t, %T) Sent %d bytes to FE socket.\n ",result));
    return DBPostgresBackend::BUFF_DATA;

  }

}

// Synchronous command executed to the backend.

unsigned int DBPostgresBackend::writeSyncCommand(string command) {

  char message[128];
  int size;

  memset(message,0,sizeof(message));
  snprintf(message,127,"Q%s",command.c_str());
       
  ACE_DEBUG((LM_DEBUG,"(%t) DBPostgresBackend: Command to BackEnd: %s\n", message));
  
  if ((size=send(_be_socket,message,strlen(message)+1,SEND_FLAGS))==-1) 
    ACE_DEBUG((LM_ERROR,"(%t) DBPostgresBackend: Error writing command: %m\n"));

  ACE_LOG_MSG->log_hexdump(LM_DEBUG,message,strlen(message)+1);
  ACE_DEBUG((LM_DEBUG,"\n"));
  
  fd_set read_set;
  FD_ZERO(&read_set);
  FD_SET(_be_socket,&read_set);
  
  //Check the modified descriptors
  select(_be_socket+1,&read_set,NULL,NULL,NULL);
  
  
  if ((size=recv(_be_socket,message,sizeof(message)-1,RECV_FLAGS))==-1) 
    ACE_DEBUG((LM_ERROR,"(%t) DBPostgresBackend: Error receiving command result: %m\n"));

  ACE_LOG_MSG->log_hexdump(LM_DEBUG,message,sizeof(message)-1);
  ACE_DEBUG((LM_DEBUG,"\n"));

  if (message[0]!='C') 
    ACE_DEBUG((LM_ERROR,"(%t) DBPostgresBackend: Command didn't succeed: %s\n",message));
  else
    ACE_DEBUG((LM_ERROR,"(%t) DBPostgresBackend: Command  succeed: %s (%d)\n",message,size));

  //ACE_OS::sleep(1);

}

// Asynchronous command executed to the backend.

unsigned int DBPostgresBackend::writeAsyncCommand(string command) {

  char message[128];
  int size;

  memset(message,0,sizeof(message));
  snprintf(message,127,"Q%s;",command.c_str());
       
  ACE_DEBUG((LM_DEBUG,"(%t) DBPostgresBackend: Command to BackEnd: %s\n", message));
  
  if ((size=send(_be_socket,message,strlen(message)+1,SEND_FLAGS))==-1) 
    ACE_DEBUG((LM_ERROR,"(%t) DBPostgresBackend: Error writing command: %m\n"));

  ACE_LOG_MSG->log_hexdump(LM_DEBUG,message,strlen(message)+1);
  ACE_DEBUG((LM_DEBUG,"\n"));
  
}


// WARNING:
// This part, specially the recv return values maybe could be
// different in different platforms. I havent used the EAGAIN
// et al values because they seem to be overridden by ACE_OS.
// Have to think a little about all this.....

bool DBPostgresBackend::testConnection() {

  ACE_DEBUG((LM_DEBUG,"(%T,%t) Testing a connection().\n"));

  fcntl(_be_socket,F_SETFL,O_NONBLOCK);

  int size = recv(_be_socket, _buffer ,sizeof(_buffer),RECV_FLAGS);  
  
  switch (size) {

  case DBPostgresBackend::NO_DATA_IN_SOCKET:
    ACE_DEBUG((LM_DEBUG,"(%t,%T) Testing: OK (nodata).\n"));
    return true;

  case DBPostgresBackend::BROKEN_SOCKET_CONNECTION:
    ACE_DEBUG((LM_DEBUG,"(%t,%T) Testing: 0 (closed).\n"));
    return false;

  }
  
  ACE_DEBUG((LM_DEBUG,"(%t,%T) Testing: Extrange value of result: %d: %m.\n",size));
  return false; 

}

bool DBPostgresBackend::recoverConnection() {

  ACE_DEBUG((LM_DEBUG,"(%t) Connection %s is trying to be recovered!!.\n",getId().c_str()));

  if (_buffer_filling>0)  ACE_DEBUG((LM_DEBUG,"(%t) Connection %s HAD %d BYTES in his buffers!!.\n",getId().c_str(),_buffer_filling));

  resetBuffer();
  return startUp();


}


void DBPostgresBackend::dump() {



  ACE_DEBUG((LM_DEBUG,"\nDBPostgresBackend: \t#%P#\n",this));


  ACE_DEBUG((LM_DEBUG,"User: \t#%s#\n",_user.c_str()));
  ACE_DEBUG((LM_DEBUG,"Password: \t#%s#\n",_password.c_str()));
  ACE_DEBUG((LM_DEBUG,"Db: \t#%s#\n",_db.c_str()));
  ACE_DEBUG((LM_DEBUG,"Host: \t#%s#\n",_host.c_str()));
  ACE_DEBUG((LM_DEBUG,"Port: \t#%d#\n",_port));


  // Connection state
  //ACE_DEBUG((LM_DEBUG,"Buffer: \t#%P#\n",_buffer));
  //ACE_LOG_MSG->log_hexdump(LM_DEBUG,_buffer,BUFFER_SIZE);

  //ACE_DEBUG((LM_DEBUG,"Key: \t#%P#\n",_secret_key_response));
  //ACE_LOG_MSG->log_hexdump(LM_DEBUG,_secret_key_response,9);

  ACE_DEBUG((LM_DEBUG,"Buff fill: \t#%d#\n",_buffer_filling));

  ACE_DEBUG((LM_DEBUG,"State: \t#%s#\n",_state));

}

/**
 * Returns an open connection to a backend host.
 */


int DBPostgresBackend::getSocketConnection(string host, int port) {
  
  if (host=="localhost" || host=="local")
    return getUnixSocketConnection(_unixSocketBasePath, port);
  else
    return getInetSocketConnection(host,port);


}

/**
 * Returns an open TCP connection to a backend host.
 */

int DBPostgresBackend::getInetSocketConnection(string host, int port) {

  int be_socket;
  struct sockaddr_in  sock_addr;
  struct hostent *he;

  // Socket connection to the BE.

  he = gethostbyname(host.c_str());
   if (!he) {
     ACE_DEBUG((LM_ERROR," (%t,%T) Wrong hostname (%s): %m\n",host.c_str()));
     return -1;
   }
     
   sock_addr.sin_family = he->h_addrtype;
   sock_addr.sin_port = htons(port);
   sock_addr.sin_addr.s_addr = ((unsigned long *) (he->h_addr_list[0]))[0];   
  

  if ((be_socket = socket(AF_INET,SOCK_STREAM,0))<0) {

    ACE_DEBUG((LM_ERROR," (%t,%T) Error creating socket: %m !!!\n"));
    return -1;

  }

  if (connect(be_socket,(struct sockaddr *)&sock_addr, sizeof(sock_addr))<0) {
    
    ACE_DEBUG((LM_ERROR," (%t) Error connecting socket: %m!!!\n"));
    return -1;

  }

  // Nonblocking socket
  //fcntl(_be_socket,F_SETFL,O_NONBLOCK);

  // The socket is connected!!

  return be_socket;

}

/**
 * Returns an open UNIX socket connection to a backend host.
 */

int DBPostgresBackend::getUnixSocketConnection(string unixSocketBasePath, int port) {

  int be_socket;
  struct sockaddr_un sock_addr;

  // Socket connection to the BE.

  if ((be_socket = socket(PF_UNIX,SOCK_STREAM,0))<0) {

    ACE_DEBUG((LM_ERROR," (%t,%T) Error creating UNIX socket: %m !!!\n"));
    return -1;

  }

   sock_addr.sun_family = AF_UNIX;
   snprintf(sock_addr.sun_path,sizeof(sock_addr.sun_path)-1,"%s/.s.PGSQL.%d",unixSocketBasePath.c_str(),port);

  if (connect(be_socket,(struct sockaddr *)&sock_addr, sizeof(sock_addr))<0) {
    
    ACE_DEBUG((LM_ERROR," (%t) Error connecting UNIX socket (%s): %m!!!\n",sock_addr.sun_path));
    return -1;

  }

  // The socket is connected!!


  return be_socket;


}

Generated by  Doxygen 1.6.0   Back to index