Logo Search packages:      
Sourcecode: dbbalancer version File versions  Download package

DBBalancerDaemon.cc

#include <netinet/in.h> 
#include <sys/socket.h>
#include <ace/Log_Msg.h>
#include <ace/Get_Opt.h>
#include "DBBalancer.hh"
#include "DBBalancerDaemon.hh"
#include "DBSignalHandler.hh"
#include "config/DBBalancerTxtConfig.hh"


DBBalancerDaemon::DBBalancerDaemon(int argc, char **argv) {

  this->processCommandLineParameters(argc,argv);

  _it = 0;
  _run = true;

}

void DBBalancerDaemon::init() {

  // Create the config object and feed it with the
  // attributes read from the command line options.
  _config = new DBBalancerTxtConfig(_config_file);
  _config->setUnixSocketBasePath(_unix_socket_path);
  _config->setPidFileBasePath(_pid_file_path);
  _config->setMode(_mode);

  this->initLogger(_config->getDaemonLogFile());  


  // Create the DBBalancer object.
  _db = new DBBalancer(_config);

  // Fill other attributes.
  _port = (_mode==DBBalancer::READER_PROCESS)?_config->getDaemonReaderPort():_config->getDaemonWriterPort();
  _host = _config->getDaemonHost();

  this->savePid(_config);

}


DBBalancerDaemon::~DBBalancerDaemon() {

  ACE_DEBUG((LM_DEBUG,"(%t,%T) Destroying DBBalancerDaemon.\n"));
  delete _db;
  delete _config;
  // Security wait to let things end!
  ACE_DEBUG((LM_DEBUG,"(%t,%T) Destroyed DBBalancerDaemon.\n"));
  //ACE_OS::sleep(5);

}


void DBBalancerDaemon::hello() {

  ACE_DEBUG((LM_INFO,"\nDBBalancer version %d.%d.%d %s started.\n",_db->getMajVersion(),_db->getMinVersion(),_db->getPatchLevel(), _db->getStatus().c_str()));

}

void DBBalancerDaemon::savePid(DBBalancerConfig *config) {

  char pidfile[1024];
  snprintf(pidfile,sizeof(pidfile)-1,"%s/dbbalancer-%d.pid",config->getPidFileBasePath().c_str(),_port);
  pidfile[sizeof(pidfile)-1]=0;

  ofstream of(pidfile);
                
  if (of.is_open()) {
    of << getpid() << endl;
    of.close();
  } else {
    ACE_DEBUG((LM_CRITICAL,"(%t,%T) Impossible to write pid file (%s): %m \n",pidfile));
    this->shutdown(-1);
  }

}

void DBBalancerDaemon::shutdown(int code) {

  // Autodestroy!
  delete this;
  ACE_OS::exit(code);

}

void DBBalancerDaemon::run () {

  int          sd, sd_current;
  unsigned int addrlen;
  struct       sockaddr cin;

  // Get the socket while creating sockaddr structures for INET or UNIX sockets.

  sd = this->createServerSocket(_host,_port);


  // Main request accepting loop.
  while (_run) { 
    
    ACE_DEBUG((LM_DEBUG,"(%t, %T) Waiting for a connection to server socket.\n"));
    
    // Wait for a client to talk to us
    if ((sd_current = accept(sd, (struct sockaddr *)  &cin, &addrlen)) == -1) {
      ACE_ERROR((LM_ERROR,"(%t) Error accepting client: %m"));
      break;
    }

    ACE_DEBUG((LM_DEBUG,"(%t, %T) Connection received.\n"));

    _db->processConnection(sd_current);

    _it++;

  }

  close(sd);                    
}

int DBBalancerDaemon::createServerSocket(string host, int port) {

  if (host=="localhost" || host=="local") {
    return createUnixServerSocket(port);
  } else {
    return createInetServerSocket(host,port);
  }
  
}

int DBBalancerDaemon::createInetServerSocket(string host,int port) {

  int sd;
  
  // Get an internet domain socket
  if ((sd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
    ACE_ERROR((LM_CRITICAL,"(%t) Error creating socket: %m"));
    this->shutdown(-1);
  }
  setsockopt(sd,SO_REUSEADDR,0,0,0);

  // Complete the socket structure
  _s_in.sin_family = AF_INET;
  _s_in.sin_addr.s_addr = INADDR_ANY;
  _s_in.sin_port = htons(port);
  
  // Bind the socket to the port number
  if (bind(sd, (struct sockaddr *) &_s_in, sizeof(_s_in)) == -1) {
    ACE_ERROR((LM_CRITICAL,"(%t) Error binding to socket: %m\n"));
    //throw this;
    this->shutdown(-1);
  }

  ACE_ERROR((LM_DEBUG,"(%t,%T) Server TCP socket at %s:%d.\n",host.c_str(),port));
  
  // Show that we are willing to listen
  if (listen(sd, 5) == -1) {
    ACE_ERROR((LM_CRITICAL,"(%t) Error when starting to listen: %m\n"));
    this->shutdown(-1);
  }

  return sd;

}

int DBBalancerDaemon::createUnixServerSocket(int port) {

  int sd;
  char path[sizeof(_s_un.sun_path)];
  
  // Get an internet domain socket
  if ((sd = socket(PF_UNIX, SOCK_STREAM, 0)) == -1) {
    ACE_ERROR((LM_CRITICAL,"(%t) Error creating socket: %m"));
    this->shutdown(-1);
  }

  snprintf(path,sizeof(path)-1,"%s/.s.PGSQL.%d",_db->getConfig()->getUnixSocketBasePath().c_str(),port);
  path[sizeof(path)-1]=0;

  if (!testUnixSocketPath(path)) {
    ACE_ERROR((LM_CRITICAL,"(%t) Didn't try to bind to socket due to an error: %m\n"));
    //throw this;
    this->shutdown(-1);
  };

  // Complete the socket structure
  _s_un.sun_family = AF_UNIX;
  strncpy(_s_un.sun_path,path,sizeof(path));

  // Bind the socket to the address.
  if (bind(sd, (struct sockaddr *) &_s_un, sizeof(_s_un)) == -1) {
    ACE_ERROR((LM_CRITICAL,"(%t) Error binding to socket: %m\n"));
    //throw this;
    this->shutdown(-1);
  }
  
  ACE_DEBUG((LM_DEBUG,"(%t, %T) Server UNIX socket at %s.\n",path));

  // Show that we are willing to listen
  if (listen(sd, 5) == -1) {
    ACE_ERROR((LM_CRITICAL,"(%t) Error when starting to listen: %m\n"));
    this->shutdown(-1);
  }

  // Change permissions to allow everybody to connect
  if (chmod(path, S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP | S_IROTH | S_IWOTH | S_IXOTH) == -1) {
    ACE_ERROR((LM_CRITICAL,"(%t) Error when changing permissions to %s: %m\n",path));
    this->shutdown(-1);
  }

  return sd;

}

bool DBBalancerDaemon::testUnixSocketPath(char *path) {

  struct stat s;
  int result;

  result = stat(path,&s);
  if (result<0) {
      ACE_DEBUG((LM_DEBUG,"(%t,%T) Checking stat for old UNIX socket %s (not found is good ;-): %m\n",path));
      return true;
  } else {

    if (S_ISSOCK(s.st_mode)) {
      ACE_DEBUG((LM_DEBUG,"(%t,%T) Already exists an UNIX socket in %s. Deleting.\n",path));
    } else {
      ACE_DEBUG((LM_DEBUG,"(%t,%T) Already exists a file with the same name as the UNIX socket we wanted to create in %s. Deleting.\n",path));
    }

    if (remove(path)<0) {
      ACE_DEBUG((LM_DEBUG,"(%t,%T) Error deleting a previous UNIX server socket in %s: %m.\n",path));
      return false;
    } else {
      return true;
    }

  }

  
  return true;

}

void DBBalancerDaemon::processCommandLineParameters(int argc, char *argv[]) {

  char c;
  bool ok = true;
  
  // Default config values
  _config_file="etc/DBBalancerConfig.conf";
  _unix_socket_path="/tmp";
  _pid_file_path="/var/run";
  _mode = DBBalancer::READER_PROCESS;

  ACE_Get_Opt get_opt(argc,argv,"?drwf:k:p:",1,1);

  while ((c=get_opt())!=EOF) 

    switch (c) {

    case 'd':

      _debug = true;
      break;

    case 'r':
      // This is a reader process.
      _mode = DBBalancer::READER_PROCESS;
      break;

    case 'w':

      // This is a writer process;
      _mode = DBBalancer::WRITER_PROCESS;
      break;

    case 'f':

      // The path to the config file;
      _config_file = get_opt.optarg;
      break;

    case 'k':

      // The base path for the UNIX socket connection
      _unix_socket_path = get_opt.optarg;
      break;

    case 'p':

      // The base path for the PID file
      _pid_file_path = get_opt.optarg;
      break;

    case '?':

      // Heeelp!
      help();
      ACE_OS::exit(-1);
    

    default:

      ok = false;
      
    }
  
  if (!ok) {

    this->usage();
    ACE_OS::exit(-1);

  }


  //ACE_DEBUG((LM_DEBUG,"Config File: %s\n",_config_file.c_str()));

  
}

void DBBalancerDaemon::usage() {

  ACE_DEBUG((LM_NOTICE,"Usage: DBBalancerDaemon [-r|-w] [-d] [-f config_file] [-k unix_sockets_path]\n"));

}

void DBBalancerDaemon::help() {

  this->usage();

  ACE_DEBUG((LM_NOTICE,"'dbbalancerd' is the DBBalancer daemon. Client requests should be directed to it, which redirects them to the proper backend.\n"));
  ACE_DEBUG((LM_NOTICE,"\t-r\tThis is a READER process. [default]\n"));
  ACE_DEBUG((LM_NOTICE,"\t-w\tThis is a WRITER process.\n"));
  ACE_DEBUG((LM_NOTICE,"\t-d\tShow extended debug messages (quite verbose).\n"));
  ACE_DEBUG((LM_NOTICE,"\t-f\tComplete path to the config file.[etc/DBBalancerConfig.conf]\n"));
  ACE_DEBUG((LM_NOTICE,"\t-k\tBase path for UNIX sockets. [/tmp]\n"));
  ACE_DEBUG((LM_NOTICE,"\t-p\tPath where to write the pid files. [/var/run]\n"));
  ACE_DEBUG((LM_NOTICE,"\t-?\tShow this message.\n"));

}

void DBBalancerDaemon::initLogger(string logFile) {

 
  if (logFile=="syslog") {
#ifdef ACE_HAS_SYSLOG   
    // Use syslog, if possible, or else "sderr"
    if (ACE_LOG_MSG->open ("dbbalancerd", ACE_Log_Msg::SYSLOG,"dbbalancerd") == -1)
      ACE_ERROR ((LM_ERROR, "(%t,%T) Cannot open syslog logger!!!(%m)\n"));
#else
    // Send log to standard error.
    ACE_LOG_MSG->set_flags(ACE_Log_Msg::STDERR);
#endif

  } else if (logFile=="stdout") {

    // Send log to standard output.
    ACE_LOG_MSG->clr_flags(ACE_Log_Msg::STDERR);
    ACE_LOG_MSG->set_flags(ACE_Log_Msg::OSTREAM);
    ACE_LOG_MSG->msg_ostream (&cout);      

  } else if (logFile=="stderr") {

    // Send log to standard error.
    ACE_LOG_MSG->set_flags(ACE_Log_Msg::STDERR);
    //ACE_LOG_MSG->msg_ostream (&cerr); 

  } else {

    // Send log to file.
    _logFile = new ofstream();
    _logFile->open(logFile.c_str(), ios::out | ios::app);
    if (_logFile->bad()) {
      ACE_DEBUG((LM_CRITICAL,"(%t,%T) Couldn't open log file (%s): %m\n",logFile.c_str()));
      ACE_OS::exit(-1);
    } else {
      ACE_LOG_MSG->set_flags(ACE_Log_Msg::OSTREAM);
      ACE_LOG_MSG->clr_flags(ACE_Log_Msg::STDERR);
      ACE_LOG_MSG->msg_ostream (_logFile); 
    }
  }

  // Disable LM_TRACE and LM_DEBUG messages if not in debug mode.
  if (!_debug) {
    ACE_LOG_MSG->priority_mask((LM_TRACE|LM_DEBUG)^ACE_LOG_MSG->priority_mask(ACE_Log_Msg::PROCESS),ACE_Log_Msg::PROCESS);
  }
  ACE_DEBUG((LM_INFO,"(%t,%T) Started DBBalancer logger (logfile=%s,debug=%d)\n",logFile.c_str(),_debug));

  
}

int DBBalancerDaemon::daemonize() {
  
  if (_debug) return 0;

  pid_t pid;

  pid = fork();

  if (pid == (pid_t) -1) {

    // Boink!

    ACE_DEBUG((LM_CRITICAL,"(%t,%T) Failed to fork dbbalancerd: %m"));
    ACE_OS::exit(-1);
    return -1;

  } else if (pid) {                             

    // The parent. Just quit.

    ACE_OS::exit(0);

  }

  // The child. Redirect to /dev/null inputs and outputs.

  int i = open("/dev/null", O_RDWR | O_BINARY);
  dup2(i, 0);
  dup2(i, 1);
  dup2(i, 2);
  close(i);

  return getpid();

}

int main(int argc, char **argv) {

  DBBalancerDaemon *d = new DBBalancerDaemon(argc,argv);
  DBSignalHandler *sh = new DBSignalHandler(d);

  d->daemonize(); // Really only does it if not in debug mode.

  d->init();
  d->hello();
  d->run();

  delete d;
  ACE_DEBUG((LM_NOTICE,"(%t,%T) Goodbye!\n"));

}



Generated by  Doxygen 1.6.0   Back to index