fixed flushing problems.

This commit is contained in:
Federico Ponchio 2011-11-29 12:41:28 +00:00
parent 42208fe186
commit 1a00f09ac0
3 changed files with 146 additions and 47 deletions

View File

@ -19,6 +19,8 @@ using namespace std;
/** Cache virtual base class. You are required to implement the pure virtual functions get, drop and size. /** Cache virtual base class. You are required to implement the pure virtual functions get, drop and size.
*/ */
template <typename Token> class Transfer;
template <typename Token> template <typename Token>
class Cache: public Provider<Token> { class Cache: public Provider<Token> {
@ -29,10 +31,15 @@ public:
bool quit; bool quit;
///keeps track of changes (if 1 then something was loaded or dropped ///keeps track of changes (if 1 then something was loaded or dropped
QAtomicInt changed; QAtomicInt changed;
///callback for changed
void (*callback)(void *data);
///data is fetched from here ///data is fetched from here
Provider<Token> *input; Provider<Token> *input;
///threads running over cache...
std::vector<Transfer<Token> *> transfers;
protected: protected:
///max space available ///max space available
quint64 s_max; quint64 s_max;
@ -58,15 +65,16 @@ public:
///empty the cache. Make sure no resource is locked before calling this. ///empty the cache. Make sure no resource is locked before calling this.
/// Require pause or stop before. Ensure there no locked item /// Require pause or stop before. Ensure there no locked item
void flush() { void flush() {
std::vector<Token *> tokens; //std::vector<Token *> tokens;
{ {
for(int i = 0; i < this->heap.size(); i++) { for(int i = 0; i < this->heap.size(); i++) {
Token *token = &(this->heap[i]); Token *token = &(this->heap[i]);
tokens.push_back(token); //tokens.push_back(token);
s_curr -= drop(token); s_curr -= drop(token);
assert(!(token->count >= Token::LOCKED)); assert(!(token->count >= Token::LOCKED));
if(final) if(final)
token->count.testAndSetOrdered(Token::READY, Token::CACHE); token->count.testAndSetOrdered(Token::READY, Token::CACHE);
input->heap.push(token);
} }
this->heap.clear(); this->heap.clear();
} }
@ -76,11 +84,11 @@ public:
} }
//assert(s_curr == 0); //assert(s_curr == 0);
{ /* {
for(unsigned int i = 0; i < tokens.size(); i++) { for(unsigned int i = 0; i < tokens.size(); i++) {
input->heap.push(tokens[i]); input->heap.push(tokens[i]);
} }
} }*/
} }
///empty the cache. Make sure no resource is locked before calling this. ///empty the cache. Make sure no resource is locked before calling this.
@ -112,6 +120,7 @@ public:
} }
} }
virtual void abort() {}
protected: protected:
///return the space used in the cache by the loaded resource ///return the space used in the cache by the loaded resource
virtual int size(Token *token) = 0; virtual int size(Token *token) = 0;
@ -119,6 +128,8 @@ protected:
virtual int get(Token *token) = 0; virtual int get(Token *token) = 0;
///return amount removed ///return amount removed
virtual int drop(Token *token) = 0; virtual int drop(Token *token) = 0;
///make sure the get function do not access token after abort is returned.
@ -136,16 +147,15 @@ protected:
2) make room until eliminating an element would leave space. */ 2) make room until eliminating an element would leave space. */
begin(); begin();
while(!this->quit) { while(!this->quit) {
input->check_queue.enter(true); //wait for cache below to load something or priorities to change input->check_queue.enter(); //wait for cache below to load something or priorities to change
if(this->quit) break; if(this->quit) break;
if(unload() || load()) { if(unload() || load()) {
changed.testAndSetOrdered(0, 1); //if not changed, set as changed changed.testAndSetOrdered(0, 1); //if not changed, set as changed
input->check_queue.open(); //we signal ourselves to check again input->check_queue.open(); //we signal ourselves to check again
} }
input->check_queue.leave();
} }
flush();
this->quit = false; //in case someone wants to restart; this->quit = false; //in case someone wants to restart;
end(); end();
} }
@ -255,4 +265,19 @@ protected:
} }
}; };
template<typename Token>
class Transfer: public QThread {
public:
Transfer(Cache<Token> *_cache): cache(_cache) {}
private:
Cache<Token> *cache;
void run() {
cache->loop();
//end();
}
};
#endif // GCACHE_H #endif // GCACHE_H

View File

@ -25,7 +25,7 @@ class Controller {
std::vector<Cache<Token> *> caches; std::vector<Cache<Token> *> caches;
Controller(): paused(false), stopped(true) {} Controller(): paused(false), stopped(true) {}
~Controller() { finish(); } ~Controller() { if(!stopped) finish(); }
///called before the cache is started to add a cache in the chain ///called before the cache is started to add a cache in the chain
/** The order in which the caches are added is from the lowest to the highest. */ /** The order in which the caches are added is from the lowest to the highest. */
@ -81,7 +81,7 @@ class Controller {
///start the various cache threads. ///start the various cache threads.
void start() { void start() {
if(!stopped) return; assert(stopped);
assert(!paused); assert(!paused);
assert(caches.size() > 1); assert(caches.size() > 1);
caches.back()->final = true; caches.back()->final = true;
@ -89,11 +89,25 @@ class Controller {
caches[i]->start(); caches[i]->start();
stopped = false; stopped = false;
} }
///stops the ache threads
///stops the cache threads
void stop() { void stop() {
if(stopped) return; assert(!paused);
if(paused) resume(); assert(!stopped);
//stop threads
//signal al caches to quit
for(int i = 0; i < caches.size(); i++)
caches[i]->quit = true;
//abort current gets
for(int i = 0; i < caches.size(); i++)
caches[i]->abort();
//make sure all caches actually run a cycle.
for(unsigned int i = 0; i < caches.size(); i++)
caches[i]->input->check_queue.open();
for(int i = 0; i < caches.size(); i++)
caches[i]->wait();
/* //stop threads
for(int i = caches.size()-1; i >= 0; i--) { for(int i = caches.size()-1; i >= 0; i--) {
caches[i]->quit = true; //hmmmmmmmmmmmmmm not very clean. caches[i]->quit = true; //hmmmmmmmmmmmmmm not very clean.
if(i == 0) if(i == 0)
@ -101,48 +115,57 @@ class Controller {
else else
caches[i-1]->check_queue.open(); //cache i listens on queue i-1 caches[i-1]->check_queue.open(); //cache i listens on queue i-1
caches[i]->wait(); caches[i]->wait();
} } */
stopped = true; stopped = true;
} }
void finish() { void finish() {
flush();
stop(); stop();
flush();
} }
void pause() { void pause() {
if(paused) return; assert(!stopped);
provider.check_queue.lock(); assert(!paused);
for(unsigned int i = 0; i < caches.size()-1; i++) {
caches[i]->check_queue.lock(); //lock all doors.
} for(unsigned int i = 1; i < caches.size(); i++)
/* provider.heap_lock.lock(); caches[i]->input->check_queue.lock();
for(unsigned int i = 0; i < caches.size(); i++)
caches[i]->heap_lock.lock(); */ //abort all pending calls
for(unsigned int i = 1; i < caches.size(); i++)
caches[i]->abort();
//make sure no cache is running (must be done after abort! otherwise we have to wait for the get)
for(unsigned int i = 0; i < caches.size()-1; i++)
caches[i]->input->check_queue.room.lock();
paused = true; paused = true;
} }
void resume() { void resume() {
if(!paused) return; assert(!stopped);
provider.check_queue.unlock(); assert(paused);
for(unsigned int i = 0; i < caches.size()-1; i++)
caches[i]->check_queue.unlock(); //unlock and open all doors
for(unsigned int i = 1; i < caches.size(); i++) {
caches[i]->input->check_queue.unlock();
caches[i]->input->check_queue.open();
}
//allow all cache to enter again.
for(unsigned int i = 0; i < caches.size()-1; i++)
caches[i]->input->check_queue.room.unlock();
/* provider.heap_lock.unlock();
for(unsigned int i = 0; i < caches.size(); i++)
caches[i]->heap_lock.unlock(); */
paused = false; paused = false;
} }
///empty all caches AND REMOVES ALL TOKENS! ///empty all caches AND REMOVES ALL TOKENS!
void flush() { void flush() {
bool running = !stopped;
stop();
for(int i = (int)caches.size()-1; i >= 0; i--) for(int i = (int)caches.size()-1; i >= 0; i--)
caches[i]->flush(); caches[i]->flush();
provider.heap.clear(); provider.heap.clear();
if(running)
start();
} }
bool isChanged() { bool isChanged() {
bool c = false; bool c = false;
for(int i = (int)caches.size() -1; i >= 0; i--) { for(int i = (int)caches.size() -1; i >= 0; i--) {
@ -150,6 +173,7 @@ class Controller {
} }
return c; return c;
} }
bool isWaiting() { bool isWaiting() {
bool waiting = true; bool waiting = true;
for(int i = (int)caches.size() -1; i >= 0; i--) { for(int i = (int)caches.size() -1; i >= 0; i--) {

View File

@ -26,21 +26,62 @@
#define CACHE_DOOR_H #define CACHE_DOOR_H
#include <QDebug> #include <QDebug>
#include <QMutex>
#include <QSemaphore>
#include <QAtomicInt>
#include <QWaitCondition>
//#define USE_SEMAPHORES
#ifdef USE_SEMAPHORES #define METHOD_2
#ifdef METHOD_1
class QDoor {
private:
QSemaphore door;
QMutex room; //lock when entering. unlock when exiting
QAtomicInt key; //keep tracks of door status
public:
QDoor():key(0) {}
void open() {
if(key.testAndSetOrdered(0, 1))
door.release(1);
}
void enter() {
door.acquire(1); //here I am sure that key is 1
//if here a open appends will have no effect.
key.testAndSetOrdered(1, 0);
room.lock();
}
void leave() {
room.unlock();
}
void lock() {
int r = key.fetchAndStoreOrdered(-1);
if(r == 1) //if the door was open
door.tryAcquire(1); //might file if whe are between enter acquire and key = 0.
}
void unlock() {
key = 0;
}
};
#endif
#ifdef METHOD_2
//a door needs to be open for the thread to continue, //a door needs to be open for the thread to continue,
//if it is open the thread enter and closes the door //if it is open the thread enter and closes the door
//this mess is to avoid [if(!open.available()) open.release(1)] //this mess is to avoid [if(!open.available()) open.release(1)]
#include <QSemaphore>
class QDoor { class QDoor {
private: private:
QSemaphore _open; QSemaphore _open;
QSemaphore _close; QSemaphore _close;
public: public:
QMutex room;
QDoor(): _open(0), _close(1) {} //this means closed QDoor(): _open(0), _close(1) {} //this means closed
void open() { void open() {
@ -48,7 +89,7 @@ class QDoor {
_open.release(1); //open _open.release(1); //open
} }
void close() { void close() {
if(_open.tryAcquire(1)) //check not already cloed if(_open.tryAcquire(1)) //check not already closed
_close.release(1); _close.release(1);
} }
void enter(bool close = false) { void enter(bool close = false) {
@ -57,8 +98,9 @@ class QDoor {
_close.release(1); //close door behind _close.release(1); //close door behind
else else
_open.release(1); //leave door opened _open.release(1); //leave door opened
room.lock();
} }
bool isOpen() { return _open.available() == 1; } void leave() { room.unlock(); }
void lock() { void lock() {
//door might be open or closed, but we might happen just in the middle //door might be open or closed, but we might happen just in the middle
@ -68,18 +110,24 @@ class QDoor {
} }
void unlock(bool open = false) { void unlock(bool open = false) {
if(open) if(open)
_open.release(1) _open.release(1);
else else
_close.release(1); _close.release(1);
} }
bool isWaiting() {
if(_open.tryAcquire(1)) {
_close.release(1);
return false;
}
return true;
}
}; };
#else #endif
#include <QMutex>
#include <QWaitCondition>
#ifdef METHOD_3
/** /**
A wait condition class that works as a door. A wait condition class that works as a door.
Should check if the semaphore version is faster. Should check if the semaphore version is faster.
@ -99,8 +147,8 @@ class QDoor {
} }
///attempt to enter the door. if the door is closed the thread will wait until the door is opened. ///attempt to enter the door. if the door is closed the thread will wait until the door is opened.
/** if close is true, the door will be closed after the thread is awakened, this allows to /// if close is true, the door will be closed after the thread is awakened, this allows to
have only one thread entering the door each time open() is called */ /// have only one thread entering the door each time open() is called
void enter(bool close = false) { void enter(bool close = false) {
m.lock(); m.lock();
waiting = true; waiting = true;
@ -112,6 +160,7 @@ class QDoor {
waiting = false; waiting = false;
m.unlock(); m.unlock();
} }
void leave() {}
bool isWaiting() { bool isWaiting() {
m.lock(); m.lock();
bool w = waiting; bool w = waiting;
@ -132,6 +181,7 @@ class QDoor {
bool waiting; bool waiting;
}; };
#endif //ifdef USE_SEMAPHORES #endif
#endif //CACHE_DOOR_H #endif //CACHE_DOOR_H