1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
|
/**
* Clipped for post length
*/
#ifndef _DATABASE_CONNECTION_POOL_
#define _DATABASE_CONNECTION_POOL_
#define DATABASE_CONNECTION_POOL_DEBUG_MODE
#include <list>
using std::list;
#include <string>
using std::string;
#include <boost/thread.hpp>
using boost::thread;
#include <boost/thread/mutex.hpp>
using boost::mutex;
#include <boost/thread/tss.hpp>
using boost::thread_specific_ptr;
#include "EventReporter.hpp" // A (poorly written) virtual logger
#include <boost/lexical_cast.hpp>
using boost::lexical_cast;
template<class DatabaseConnection>
class DatabaseConnectionPool
{
private:
///The number of connections that are created with the pool
unsigned char initialConnections_;
///The maximum number of connections that can be created to accomodate demand
unsigned char maxConnections_;
///The time (in seconds) that connections should remain in the pool if not in use
unsigned int idleConnectionTimer_;
///The time (in seconds) that a thread should be blocked while waiting for a connection before failing
unsigned int threadBlockTimeout_;
///The database host
string host_;
///The username to log into the database with
string user_;
///The password to log into the database with
string pass_;
///The database schema to set
string schema_;
///A mutex for synchronizing pool access
mutex mutex_;
///A list of available database connections
list<DatabaseConnection*>* availableConnections_;
///A list of database connections in use
list<DatabaseConnection*>* connectionsInUse_;
///A thread specific pointer to manage the aquisition timeout.
thread_specific_ptr<unsigned int> timeout_;
#ifdef DATABASE_CONNECTION_POOL_DEBUG_MODE
///A timeout debugging variable to monitor how long threads are waiting for a connection
unsigned int _TIMEOUT_DEBUG_;
#endif // DATABASE_CONNECTION_POOL_DEBUG_MODE
public:
/*
* Clipped for post length
*/
DatabaseConnectionPool(unsigned char initialConnections, unsigned char maxConnections, unsigned int idleConnectionTimer, unsigned int threadBlockTimeout, string host, string user, string pass, string schema)
: initialConnections_(initialConnections), maxConnections_(maxConnections), idleConnectionTimer_(idleConnectionTimer), threadBlockTimeout_(threadBlockTimeout), host_(host), user_(user), pass_(pass), schema_(schema)
{
this->availableConnections_ = new list<DatabaseConnection*>();
this->connectionsInUse_ = new list<DatabaseConnection*>();
for(auto counter = 0; counter < this->initialConnections_; counter++)
{
DatabaseConnection* databaseConnection = new DatabaseConnection();
if(databaseConnection->connect(this->host_, this->user_, this->pass_) && databaseConnection->setSchema(this->schema_))
{
this->availableConnections_->push_front(databaseConnection);
#ifdef DATABASE_CONNECTION_POOL_DEBUG_MODE
EventReporter::getInstance()->reportEvent(string(">> Added connection ").append(lexical_cast<string>((int)counter + 1)).append(" of ").append(lexical_cast<string>((int)this->initialConnections_)).append(" to the pool"));
#endif // DATABASE_CONNECTION_POOL_DEBUG_MODE
}
else
{
EventReporter::getInstance()->reportError(string("!! Unable to populate connection pool @ ").append(lexical_cast<string>(__FILE__)).append(lexical_cast<string>(__LINE__)));
//Server::getInstance()->setErrorStatus(_DATABASE_POOL_ERROR_1_); This failure should terminate the application.
}
}
}
/**
* Clipped for post length
*/
DatabaseConnection* aquire(void)
{
if(this->timeout_.get() == nullptr)
{
this->timeout_.reset(new unsigned int(0));
}
else if((*this->timeout_.get())++ > this->threadBlockTimeout_)
{
EventReporter::getInstance()->reportError(string("!! Connection aquisition timeout (").append(lexical_cast<string>(*this->timeout_.get())).append(") @ ").append(lexical_cast<string>(__FILE__)).append(lexical_cast<string>(__LINE__)));
return nullptr;
}
#ifdef DATABASE_CONNECTION_POOL_DEBUG_MODE
else
{
this->_TIMEOUT_DEBUG_ = *this->timeout_.get();
}
#endif // DATABASE_CONNECTION_POOL_DEBUG_MODE
if(this->mutex_.timed_lock(boost::posix_time::seconds(this->threadBlockTimeout_)))
{
if(this->availableConnections_->size() > 0)
{
DatabaseConnection* availableConnection = this->availableConnections_->front();
this->availableConnections_->pop_front();
this->connectionsInUse_->push_back(availableConnection);
this->mutex_.unlock();
if(availableConnection != nullptr)
{
#ifdef DATABASE_CONNECTION_POOL_DEBUG_MODE
EventReporter::getInstance()->reportEvent(">> Connection taken normally from pool");
#endif // DATABASE_CONNECTION_POOL_DEBUG_MODE
return availableConnection;
}
else
{
EventReporter::getInstance()->reportError(string("!! A null pointer was encountered @ ").append(lexical_cast<string>(__FILE__)).append(lexical_cast<string>(__LINE__)));
availableConnection = new DatabaseConnection();
return availableConnection;
}
}
else
{
if(this->connectionsInUse_->size() < this->maxConnections_)
{
#ifdef DATABASE_CONNECTION_POOL_DEBUG_MODE
EventReporter::getInstance()->reportEvent("** New connection added to pool");
#endif // DATABASE_CONNECTION_POOL_DEBUG_MODE
DatabaseConnection* newConnection = new DatabaseConnection();
this->connectionsInUse_->push_front(newConnection);
this->mutex_.unlock();
return newConnection;
}
else
{
#ifdef DATABASE_CONNECTION_POOL_DEBUG_MODE
EventReporter::getInstance()->reportEvent("..Waiting for available connection...");
#endif // DATABASE_CONNECTION_POOL_DEBUG_MODE
this->mutex_.unlock();
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
return this->aquire();
}
}
} /// I assume that the timed lock automatically unlocks itself upon failure? I don't like hidden operations like this...
#ifdef DATABASE_CONNECTION_POOL_DEBUG_MODE
EventReporter::getInstance()->reportEvent("..Waiting for available connection...");
#endif // DATABASE_CONNECTION_POOL_DEBUG_MODE
return this->aquire();
}
/**
* Clipped for post length
*/
void release(DatabaseConnection* databaseConnection)
{
mutex::scoped_lock(this->mutex_);
this->connectionsInUse_->remove(databaseConnection);
this->availableConnections_->push_front(databaseConnection);
#ifdef DATABASE_CONNECTION_POOL_DEBUG_MODE
EventReporter::getInstance()->reportEvent("<< Returned connection to pool");
#endif // DATABASE_CONNECTION_POOL_DEBUG_MODE
}
/**
* Clipped for post length
*/
void prune(void)
{
mutex::scoped_lock(this->mutex_);
if(this->availableConnections_ != nullptr && this->availableConnections_->size() > 0)
{
unsigned char erased = 0;
this->availableConnections_->remove_if([=, &erased](const DatabaseConnection* databaseConnection)->bool {
if(databaseConnection != nullptr)
{
if(databaseConnection->lastUsed() > this->idleConnectionTimer_)
{
erased++;
return true;
}
}
return false;
});
EventReporter::getInstance()->reportEvent(string("~~Removed ").append(lexical_cast<string>((int)erased)).append(" inactive database connections from pool"));
}
}
};
#endif //_DATABASE_CONNECTION_POOL_
|