Logo Search packages:      
Sourcecode: dbbalancer version File versions

DBThreadPool.cc

#include "DBThreadCtlMO.hh"
#include "DBThreadPool.hh"


DBThreadPool::DBThreadPool(int initThreads = 4, 
                     int minThreads = 4, 
                     int maxThreads = 20) {

  ACE_DEBUG((LM_DEBUG,"(%t) Thread Pool created\n"));

  // Thread parameters:

  _nThreads = initThreads;
  _initThreads = initThreads;
  _minThreads = minThreads;
  _maxThreads = maxThreads;

  this->open(_nThreads);

  // Accounting
  _growCount = 0;
  _shrinkCount = 0;
  _roundsCalm = 0;

  // Memory accounting
  /*
  _initialMemory = 0;
  _lastMemory = 0;
  _initialMemory = checkmem("DBThreadPool end constructor");
  _lastMemory = _initialMemory;
  */

}

void DBThreadPool::open(int nthreads) {

  this->activate(THR_NEW_LWP | THR_DETACHED, nthreads, 1);

}


DBThreadPool::~DBThreadPool() {

  ACE_DEBUG((LM_DEBUG,"(%t,%T) Destroying DBThreadPool\n"));
  ACE_DEBUG((LM_DEBUG,"(%t,%T) DBThreadPool: Stopping %d threads.\n",_nThreads));

  this->shrinkThreads(_nThreads);
  ACE_OS::sleep(5); // Wait till all the threads are (we expect they are) closed. 
                    // Maybe we should use a more scientific method in the future, like a barrier.
  ACE_DEBUG((LM_DEBUG,"(%t,%T) Destroyed DBThreadPool\n"));
  

}

void DBThreadPool::submit(ACE_Method_Object *t) {

  this->activation_queue_.enqueue(t);

}


int DBThreadPool::svc(void) {

  // The auto pointer deletes the Method_Object on next iteration.
  bool run = true;

  while(run) {

    auto_ptr<ACE_Method_Object> mo (this->activation_queue_.dequeue());
    if (!mo.get() || (mo->call() == -1)) run = false;

  }

  ACE_DEBUG((LM_DEBUG,"(%t %T) DBThreadPool: A thread exited.\n"));
  return 0;

}




// This method will be called by a "manteinance" thread, and will create and
// destroy threads to adapt to the load.
// The parameter informs us about how many threads we can grow or shrink at once.
// This parameter usually will be equal to the number of Connections Pools, so that
// we can grow or shrink one thread for every connection grown or shrinked.

int DBThreadPool::reviewThreads(int threadSet) {

  int objectsInQueue = this->getQueueSize();
  //int track = updateLoadHistory(objectsInQueue);
          
  ACE_DEBUG((LM_DEBUG,"(%t %T) DBThreadPool::reviewThreads. Current: %d, Queue: %d, Calm Rounds: %d.\n", _nThreads,objectsInQueue,_roundsCalm));

  if (objectsInQueue>0 && (_nThreads+threadSet)<=_maxThreads) {
    _roundsCalm=0;
    return growThreads(threadSet);
  } else {
    _roundsCalm++;
  }


  //ACE_OS::sleep(ACE_Time_Value(1));

  if (_roundsCalm>=MAX_ROUNDS_CALM && (_nThreads-threadSet)>=_minThreads) {
    _roundsCalm=0;
    return shrinkThreads(threadSet);
  }

  return 0;

}


// PRIVATE ******************************************************************

// Update queue history and return a value to help us determine if we need
// to grow or to shrink the pools.

/*
int DBThreadPool::updateQueueHistory(int value) {

  int total = 0;

  for (unsigned int i=0;i<QUEUE_HISTORY_SIZE-1;i++) {
    _queueHistory[i] = _queueHistory[i+1];
    total += _queueHistory[i];
  }

  _queueHistory[QUEUE_HISTORY_SIZE-1] = value;
  total += value;

  return ((int)total / QUEUE_HISTORY_SIZE);

}
*/

int DBThreadPool::growThreads(int growth) {

    ACE_DEBUG((LM_NOTICE,"(%t %T) >> DBThreadPool: Grow (%d->%d), %d growths so far \n",_nThreads,_nThreads+growth,++_growCount)); 
    //checkmem("Before growing!!");
    this->open(growth);
    _nThreads += growth;
    //checkmem("After growing!!");

    return growth;

}

int DBThreadPool::shrinkThreads(int shrink) {

    ACE_DEBUG((LM_NOTICE,"(%t %T) << DBThreadPool: Shrink (%d->%d). %d shrinks so far \n",_nThreads,_nThreads-shrink, --_shrinkCount));

    //checkmem("Before shrinking!!");
    for (int i=0;i<shrink;i++)
      this->submit(new DBThreadCtlMO());
    _nThreads -= shrink;
    //checkmem("After shinking!!");

    return -shrink;

}



/*

int DBThreadPool::checkmem(string message) {

  char file[127];
  string tmp;
  int ncampo = 23;

  sprintf(file,"/proc/%d/stat",getpid());
  ifstream f(file);
  for (int i=0;i<ncampo;i++) f >> tmp;
  f.close();

  int mem = atoi(tmp.c_str());

  ACE_DEBUG((LM_NOTICE,"DBBalancerMEM: (%s) (pid= %d, mem=#%d#,%d,%d) \n",message.c_str(),getpid(),mem,mem-_lastMemory,mem-_initialMemory));
  _lastMemory = mem;

  return mem;

}

*/

Generated by  Doxygen 1.6.0   Back to index