DRDE/ACTK1_0/session/SessionPoolImpl.cpp

393 lines
9.0 KiB
C++

#include "session/SessionPoolImpl.hpp"
#include "session/SessionLimitExceededException.hpp"
#include "session/SessionFactory.hpp"
#include "session/SessionManagerProperties.hpp"
#include "session/StatsManager.hpp"
#include "session/CommandFailedException.hpp"
#include "session/Timer.hpp"
#include "common/AutoMutex.hpp"
#include "common/SystemProperties.hpp"
#include "common/Logger.hpp"
#include <sys/time.h>
#include <sstream>
#include <set>
#include <memory>
#include <typeinfo> // for 'bad_cast'
#include <errno.h>
#include <climits>
#include <limits>
using namespace std;
const float SessionPoolImpl::PI_STO_FRACT = 0.4;
const string SessionPoolImpl::pname("biz.meansofproduction.dnseppus.session");
SessionPoolImpl::SessionPoolImpl(SessionPoolProperties* props)
: lastSession(NULL), sessionsInUse(0), maximumSize(props->getMaximumPoolSize()),
pollInterval((long)(PI_STO_FRACT * (float)props->getServerTimeout())),
waitTimeout(props->getWaitTimeout()), clientTimeout(props->getClientTimeout()),
properties(props),
debugLogger(Logger::getLogger(pname + ".debug")),
userLogger(Logger::getLogger(pname + ".user"))
{
pthread_mutex_init(&mtx, NULL);
pthread_cond_init(&cond, NULL);
ostringstream minPollStr;
minPollStr << MIN_ACCEPTABLE_POLL_INTERVAL;
istringstream configMinPollStr(
SystemProperties::getProperty("client.pollinterval.min", minPollStr.str()));
long minPollInterval;
configMinPollStr >> minPollInterval;
pollInterval = max<long>(pollInterval, minPollInterval);
// wait for lesser of 200ms and 10% command limit interval,
// but no less that 50ms if command limit exceeded
limitExceededWaitTimeout =
max(min(props->getCommandLimitInterval() / 10, 200L),
50L);
}
SessionPoolImpl::~SessionPoolImpl()
{
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cond);
}
const Greeting* SessionPoolImpl::getLastGreeting()
throw (SessionConfigurationException, SessionOpenException, EPPInterruptedException)
{
if (lastSession == NULL) init();
return lastSession->getGreeting();
}
void SessionPoolImpl::init()
throw (SessionConfigurationException, SessionOpenException, EPPInterruptedException)
{
releaseSession(getSession());
}
long SessionPoolImpl::keepAlive() throw (EPPIOException)
{
//debugLogger->LOG_FINEST("enter");
for (PoolIter session = pool.begin(); session != pool.end(); ++session)
{
long mruInterval = (*session)->getStatsManager()->getMruInterval();
if (pollInterval < mruInterval && mruInterval < clientTimeout)
{
(*session)->keepAlive();
}
}
//debugLogger->LOG_FINEST("exit");
return pollInterval;
}
/// Synchronised.
void SessionPoolImpl::empty()
{
AutoMutex lock(&mtx);
if (pool.size() == 0) return;
for (PoolIter session = pool.begin(); session != pool.end(); ++session)
{
if (*session != NULL)
{
(*session)->close();
delete *session;
}
}
pool.clear();
}
/// Synchronised.
void SessionPoolImpl::clean() throw (SessionConfigurationException, SessionOpenException)
{
AutoMutex lock(&mtx);
int count = pool.size();
empty();
for (int i=0; i < count; i++)
{
try
{
openSession();
}
catch (SessionOpenException& e)
{
try
{
dynamic_cast<SessionLimitExceededException&>(e);
continue;
}
catch (bad_cast& e)
{
throw e;
}
}
}
}
Session* SessionPoolImpl::getSession()
throw (SessionConfigurationException, SessionOpenException, EPPInterruptedException)
{
return getSession(NULL);
}
Session* SessionPoolImpl::getSession(const CommandType* type)
throw (SessionConfigurationException, SessionOpenException, EPPInterruptedException)
{
debugLogger->LOG_FINEST("enter");
Session* acquiredSession = NULL;
int failCount = 0;
do
{
acquiredSession = getBestAvailableSession(type);
if (acquiredSession == NULL)
{
if (pool.size() < maximumSize)
{
try
{
openSession();
}
catch (SessionOpenException& soe)
{
try
{
dynamic_cast<SessionLimitExceededException&>(soe);
vector<string> args;
args.push_back("<<total>>");
args.push_back("<<cutoff>>");
vector<string> vals;
ostringstream v;
v << maximumSize;
vals.push_back(v.str());
v.str("");
v << pool.size();
vals.push_back(v.str());
userLogger->warning(
ErrorPkg::getMessage("epp.session.open.fail.limit_exceeded",
args, vals));
waitForRelease();
}
catch (bad_cast&)
{
if (isRepeatableFailedLogin(soe, failCount))
{
ostringstream fc;
fc << failCount;
userLogger->warning(soe.getMessage());
userLogger->warning(ErrorPkg::getMessage(
"epp.server.failure.retry",
"<<count>>",
fc.str()));
failCount++;
}
else throw soe;
}
}
}
else
{
waitForRelease();
}
}
}
while (acquiredSession == NULL);
debugLogger->LOG_FINEST("exit");
return acquiredSession;
}
bool SessionPoolImpl::isRepeatableFailedLogin(const EPPException& e, int count)
{
try
{
dynamic_cast<const LoginException&>(e);
dynamic_cast<const CommandFailedException&>(e);
return count < MAX_ACCEPTABLE_FAIL_COUNT;
}
catch (bad_cast&)
{
return false;
}
}
/// Synchronized.
Session* SessionPoolImpl::getBestAvailableSession(const CommandType* type)
throw (EPPInterruptedException)
{
debugLogger->LOG_FINEST("enter");
Session* bestSession = NULL;
long min = LONG_MAX;
AutoMutex lock(&mtx);
for (PoolIter session = pool.begin(); session != pool.end(); ++session)
{
if ((*session)->isAvailable()) {
int cutoff = INT_MAX;
long cc;
if (type == NULL) {
cc = (*session)->getStatsManager()->getCommandCount();
} else {
cutoff = properties->getCommandLimit(type);
cc = (*session)->getStatsManager()->getCommandCount(type);
}
if (cc <= cutoff && cc < min) {
min = cc;
bestSession = *session;
}
}
}
if (bestSession != NULL)
{
try
{
// A previous user of this session to re-acquire, so this may timeout.
bestSession->acquire();
sessionsInUse++;
}
catch (EPPTimeoutException& e)
{
userLogger->warning("Timeout acquiring 'best' session.");
bestSession = NULL;
}
}
debugLogger->LOG_FINEST("exit");
return bestSession;
}
void SessionPoolImpl::openSession()
throw (SessionConfigurationException, SessionOpenException)
{
Session * newSession = SessionFactory::newInstance(properties);
newSession->open();
{
AutoMutex lock(&mtx);
lastSession = newSession;
pool.insert(newSession);
pthread_cond_signal(&cond);
}
}
void SessionPoolImpl::waitForRelease()
{
AutoMutex lock(&mtx);
if (sessionsInUse < pool.size())
{
// There are only inappropriate session available.
struct timespec until(Timer::msOffset2abs(limitExceededWaitTimeout));
if (pthread_cond_timedwait(&cond, &mtx, &until) != ETIMEDOUT)
{
userLogger->severe("This thread should not be interupted.");
}
}
while (sessionsInUse >= pool.size() && pool.size() >= maximumSize)
{
struct timespec until(Timer::msOffset2abs(waitTimeout));
if (pthread_cond_timedwait(&cond, &mtx, &until) != ETIMEDOUT)
{
userLogger->severe("This thread should not be interupted.");
}
}
}
void SessionPoolImpl::releaseSession(Session * session) {
if (session == NULL) {
return;
}
if (session->isInvalid()) {
session->close();
session->release();
{
AutoMutex lock(&mtx);
pool.erase(session);
delete session;
sessionsInUse--;
pthread_cond_signal(&cond);
}
}
else if (!session->isOpen())
{
session->release();
{
pool.erase(session);
AutoMutex lock(&mtx);
sessionsInUse--;
pthread_cond_signal(&cond);
}
}
else
{
session->release();
{
AutoMutex lock(&mtx);
sessionsInUse--;
pthread_cond_signal(&cond);
}
}
}
int SessionPoolImpl::getCommandCount(const CommandType* type)
{
int res = 0;
for(PoolIter session = pool.begin(); session != pool.end(); ++session)
{
res += (*session)->getStatsManager()->getCommandCount(type);
}
return res;
}
int SessionPoolImpl::getCommandCount()
{
int res = 0;
for(PoolIter session = pool.begin(); session != pool.end(); ++session)
{
res += (*session)->getStatsManager()->getCommandCount();
}
return res;
}
long SessionPoolImpl::getMruInterval() const
{
long minInterval = numeric_limits<long>::max();
for(PoolIter session = pool.begin(); session != pool.end(); ++session)
{
long mruInterval = (*session)->getStatsManager()->getMruInterval();
minInterval = min<long>(mruInterval,minInterval);
}
return minInterval;
}
int SessionPoolImpl::getResultCodeCount(int resultCode)
{
int res = 0;
for(PoolIter session = pool.begin(); session != pool.end(); ++session)
{
res += (*session)->getStatsManager()->getResultCodeCount(resultCode);
}
return res;
}