fixed changed and some docs

This commit is contained in:
Federico Ponchio 2011-11-26 18:08:30 +00:00
parent 059152baba
commit 42208fe186
4 changed files with 71 additions and 53 deletions

View File

@ -4,6 +4,7 @@
#include <iostream> #include <iostream>
#include <limits.h> #include <limits.h>
#include <vector> #include <vector>
#include <list>
#include <QThread> #include <QThread>
#include "provider.h" #include "provider.h"
@ -21,20 +22,24 @@ using namespace std;
template <typename Token> template <typename Token>
class Cache: public Provider<Token> { class Cache: public Provider<Token> {
public: public:
bool final; //true if this is the last cache (the one we use the data from) ///true if this is the last cache (the one we use the data from)
bool quit; //graceful exit bool final;
bool changed; //if true the cache will exit at the first opportunity
bool quit;
///keeps track of changes (if 1 then something was loaded or dropped
QAtomicInt changed;
///data is fetched from here ///data is fetched from here
Provider<Token> *input; Provider<Token> *input;
protected: protected:
///max space available ///max space available
quint64 s_max; quint64 s_max;
///current space used ///current space used
quint64 s_curr; quint64 s_curr;
public: public:
Cache(quint64 _capacity = INT_MAX): Cache(quint64 _capacity = INT_MAX):
final(false), quit(false), changed(false), input(NULL), s_max(_capacity), s_curr(0) {} final(false), quit(false), changed(false), input(NULL), s_max(_capacity), s_curr(0) {}
virtual ~Cache() {} virtual ~Cache() {}
@ -43,15 +48,15 @@ class Cache: public Provider<Token> {
quint64 capacity() { return s_max; } quint64 capacity() { return s_max; }
quint64 size() { return s_curr; } quint64 size() { return s_curr; }
void setCapacity(quint64 c) { s_max = c; } void setCapacity(quint64 c) { s_max = c; }
///return true if the cache is waiting for priority to change ///return true if the cache is waiting for priority to change
bool isChanged() { bool isChanged() {
bool r = changed; bool r = changed.testAndSetOrdered(1, 0); //if changed is 1, r is true
changed = false;
return r; return r;
//return input->check_queue.isWaiting();
} }
///empty the cache. Make sure no resource is locked before calling this. Require pause or stop before. ///empty the cache. Make sure no resource is locked before calling this.
/// Require pause or stop before. Ensure there no locked item
void flush() { void flush() {
std::vector<Token *> tokens; std::vector<Token *> tokens;
{ {
@ -78,7 +83,8 @@ class Cache: public Provider<Token> {
} }
} }
///ensure there no locked item ///empty the cache. Make sure no resource is locked before calling this.
/// Require pause or stop before. Ensure there no locked item
template <class FUNCTOR> void flush(FUNCTOR functor) { template <class FUNCTOR> void flush(FUNCTOR functor) {
std::vector<Token *> tokens; std::vector<Token *> tokens;
{ {
@ -106,15 +112,15 @@ class Cache: public Provider<Token> {
} }
} }
protected: protected:
///return the space used in the cache by the loaded resource ///return the space used in the cache by the loaded resource
virtual int size(Token *token) = 0; virtual int size(Token *token) = 0;
///returns amount of space used in cache -1 for failed transfer ///returns amount of space used in cache -1 for failed transfer
virtual int get(Token *token) = 0; virtual int get(Token *token) = 0;
///return amount removed ///return amount removed
virtual int drop(Token *token) = 0; virtual int drop(Token *token) = 0;
///
virtual Token *ready() { return NULL; }
///called in as first thing in run() ///called in as first thing in run()
virtual void begin() {} virtual void begin() {}
@ -135,7 +141,7 @@ class Cache: public Provider<Token> {
if(this->quit) break; if(this->quit) break;
if(unload() || load()) { if(unload() || load()) {
changed = true; //some modification happened changed.testAndSetOrdered(0, 1); //if not changed, set as changed
input->check_queue.open(); //we signal ourselves to check again input->check_queue.open(); //we signal ourselves to check again
} }
} }
@ -146,7 +152,9 @@ class Cache: public Provider<Token> {
///should be protected /** Checks wether we need to make room in the cache because of:
size() - sizeof(lowest priority item) > capacity()
**/
bool unload() { bool unload() {
Token *remove = NULL; Token *remove = NULL;
//make room int the cache checking that: //make room int the cache checking that:
@ -181,18 +189,18 @@ class Cache: public Provider<Token> {
} }
if(remove) { if(remove) {
int size = drop(remove);
assert(size >= 0);
s_curr -= size;
{ {
QMutexLocker input_locker(&(input->heap_lock)); QMutexLocker input_locker(&(input->heap_lock));
int size = drop(remove);
assert(size >= 0);
s_curr -= size;
input->heap.push(remove); input->heap.push(remove);
} }
return true; return true;
} }
return false; return false;
} }
///should be protected ///should be protected
bool load() { bool load() {
Token *insert = NULL; Token *insert = NULL;
@ -215,7 +223,7 @@ class Cache: public Provider<Token> {
if(input->heap.size()) { //we need something in input to tranfer. if(input->heap.size()) { //we need something in input to tranfer.
Token &first = input->heap.max(); Token &first = input->heap.max();
if(first.count > Token::REMOVE && if(first.count > Token::REMOVE &&
(!last || last->priority < first.priority)) { //if !last we already decided we want a transfer., otherwise check for a swap (!last || last->priority < first.priority)) { //if !last we already decided we want a transfer., otherwise check for a swap
insert = input->heap.popMax(); //remove item from heap, while we transfer it. insert = input->heap.popMax(); //remove item from heap, while we transfer it.
} }
} }

View File

@ -10,10 +10,12 @@
template <class Token> template <class Token>
class Controller { class Controller {
public: public:
///should be private ///tokens waiting to be added, should be private
std::vector<Token *> tokens; //tokens waiting to be added std::vector<Token *> tokens;
bool quit; //gracefully terminate. /// threads still running, but no door is open in caches,
///transfers might still be going on!
bool paused; bool paused;
///all cache threads are stopped
bool stopped; bool stopped;
public: public:
@ -22,7 +24,7 @@ class Controller {
///should be protected ///should be protected
std::vector<Cache<Token> *> caches; std::vector<Cache<Token> *> caches;
Controller(): quit(false), paused(false), stopped(true) {} Controller(): paused(false), stopped(true) {}
~Controller() { finish(); } ~Controller() { finish(); }
///called before the cache is started to add a cache in the chain ///called before the cache is started to add a cache in the chain
@ -35,6 +37,7 @@ class Controller {
assert(cache->input); assert(cache->input);
caches.push_back(cache); caches.push_back(cache);
} }
///insert the token in the cache if not already present (actual insertion is done on updatePriorities) ///insert the token in the cache if not already present (actual insertion is done on updatePriorities)
bool addToken(Token *token) { bool addToken(Token *token) {
if(token->count.testAndSetOrdered(Token::OUTSIDE, Token::CACHE)) { if(token->count.testAndSetOrdered(Token::OUTSIDE, Token::CACHE)) {
@ -110,8 +113,9 @@ class Controller {
void pause() { void pause() {
if(paused) return; if(paused) return;
provider.check_queue.lock(); provider.check_queue.lock();
for(unsigned int i = 0; i < caches.size()-1; i++) for(unsigned int i = 0; i < caches.size()-1; i++) {
caches[i]->check_queue.lock(); caches[i]->check_queue.lock();
}
/* provider.heap_lock.lock(); /* provider.heap_lock.lock();
for(unsigned int i = 0; i < caches.size(); i++) for(unsigned int i = 0; i < caches.size(); i++)
caches[i]->heap_lock.lock(); */ caches[i]->heap_lock.lock(); */
@ -131,11 +135,13 @@ class Controller {
} }
///empty all caches AND REMOVES ALL TOKENS! ///empty all caches AND REMOVES ALL TOKENS!
void flush() { void flush() {
pause(); bool running = !stopped;
stop();
for(int i = (int)caches.size()-1; i >= 0; i--) for(int i = (int)caches.size()-1; i >= 0; i--)
caches[i]->flush(); caches[i]->flush();
provider.heap.clear(); provider.heap.clear();
resume(); if(running)
start();
} }
bool isChanged() { bool isChanged() {
bool c = false; bool c = false;

View File

@ -42,6 +42,7 @@ class QDoor {
QSemaphore _close; QSemaphore _close;
public: public:
QDoor(): _open(0), _close(1) {} //this means closed QDoor(): _open(0), _close(1) {} //this means closed
void open() { void open() {
if(_close.tryAcquire(1)) //check it is not open if(_close.tryAcquire(1)) //check it is not open
_open.release(1); //open _open.release(1); //open
@ -53,21 +54,23 @@ class QDoor {
void enter(bool close = false) { void enter(bool close = false) {
_open.acquire(1); _open.acquire(1);
if(close) if(close)
_close.release(1); //and close door behind _close.release(1); //close door behind
else else
_open.release(1); //and leave door opened _open.release(1); //leave door opened
} }
bool isOpen() { return _open.available() == 1; } bool isOpen() { return _open.available() == 1; }
void lock() { void lock() {
//door might be open or closed, but we might happen just in the middle //door might be open or closed, but we might happen just in the middle
//of someone opening, closing or entering it. //of someone opening, closing or entering it.
while(!_open.tryAcquire(1) && !_close.tryAcquire(1)) {} while(!_open.tryAcquire(1) && !_close.tryAcquire(1)) {}
//no resources left //no resources left, door is locked
} }
void unlock() { void unlock(bool open = false) {
//unlock will not open the door, but allow someone to open it. if(open)
_close.release(1); _open.release(1)
else
_close.release(1);
} }
}; };
@ -118,7 +121,8 @@ class QDoor {
void lock() { //prevend door opening and entering void lock() { //prevend door opening and entering
m.lock(); m.lock();
} }
void unlock() { //reverse effect of lock void unlock(bool open = false) { //reverse effect of lock
doorOpen = open;
m.unlock(); m.unlock();
} }
private: private: