vcglib/apps/nexus/nxsdispatcher.cpp

193 lines
4.8 KiB
C++
Raw Normal View History

2004-11-18 19:30:15 +01:00
#include "nxsdispatcher.h"
#include "fragment.h"
#include "decimate.h"
#include <iostream>
#include <ptypes/ptypes.h>
using namespace std;
using namespace vcg;
using namespace nxs;
using namespace pt;
2004-12-02 21:16:13 +01:00
void SaveFragment(Nexus &nexus, VChain &chain,
2004-11-18 19:30:15 +01:00
Fragment &fragin,
Fragment &fragout);
void Opener::execute() {
2004-12-03 22:19:00 +01:00
// cerr << "Trying to connect to: " << server->get_host() << endl;
2004-11-18 19:30:15 +01:00
server->reading.lock();
server->writing.lock();
while(1) {
if(get_signaled())
return;
2004-12-03 22:19:00 +01:00
// cerr << "Trying to connect to: " << server->get_host() << endl;
2004-11-18 19:30:15 +01:00
try {
server->open();
server->connected = true;
2004-11-28 02:23:26 +01:00
server->queue = 0;
2004-12-03 22:19:00 +01:00
// cerr << "Connected to: " << server->get_host() << endl;
2004-11-18 19:30:15 +01:00
break;
} catch(...) {
}
2004-11-28 05:16:19 +01:00
psleep(4000);
2004-11-18 19:30:15 +01:00
}
server->reading.unlock();
server->writing.unlock();
}
void FragIO::execute() {
2004-11-28 02:23:26 +01:00
pincrement(&(server->queue));
2004-11-18 19:30:15 +01:00
server->writing.lock();
// cerr << "Writing frag...: " << fragin->id << "\n";
outmemory outm;
outm.open();
fragin->Write(&outm);
pt::string a = outm.get_strdata();
try {
server->write((const char *)a, length(a));
server->flush();
} catch (estream *e) {
2004-11-28 02:23:26 +01:00
perr.putf("Error reading: %s\n", pconst(e->get_message()));
2004-11-18 19:30:15 +01:00
delete e;
2004-11-28 02:23:26 +01:00
server->close();
server->connected = false;
server->writing.unlock();
2004-11-18 19:30:15 +01:00
message *msg = new message(MSG_FAIL, (int)fragin);
dispatcher->post(msg);
2004-12-04 14:22:55 +01:00
server->opener.start();
2004-11-18 19:30:15 +01:00
return;
}
server->reading.lock();
server->writing.unlock();
Fragment *out = new Fragment;
2004-11-28 02:23:26 +01:00
if(!server->waitfor(10000) || (!out->Read(server))) {
perr.putf("Error reading!!\n");
server->close();
server->connected = false;
server->reading.unlock();
2004-11-18 19:30:15 +01:00
message *msg = new message(MSG_FAIL, (int)fragin);
dispatcher->post(msg);
2004-12-04 14:22:55 +01:00
server->opener.start();
2004-11-18 19:30:15 +01:00
return;
}
server->reading.unlock();
2004-11-28 02:23:26 +01:00
pdecrement(&(server->queue));
2004-11-18 19:30:15 +01:00
// cerr << "Received frag: " << out->id << endl;
message *msg = new message(MSG_RECEIVE, (int)fragin);
msg->result = (int)out;
dispatcher->post(msg);
}
bool Dispatcher::Init(const std::string &file) {
FILE *fp = fopen(file.c_str(), "rb");
if(!fp) return false;
char host[256];
int port;
while(fscanf(fp, "%s %d\n", host, &port) == 2) {
cerr << "Host: " << host << " port: " << port << endl;
Server *server = new Server(host, port);
server->opener.start();
servers.push_back(server);
}
fclose(fp);
if(servers.size() == 0) {
cerr << "Empty server file!\n";
return false;
}
return true;
}
Dispatcher::~Dispatcher() {
for(unsigned int i = 0; i < servers.size(); i++) {
Server *server = servers[i];
server->opener.signal();
server->close();
delete server;
}
}
void Dispatcher::SendFragment(Fragment *frag) {
//WARNING this handles no more than 1<<31 fragments!
frag->id = count++;
message *msg = new message(MSG_SEND, (int)frag);
post(msg);
}
Server *Dispatcher::BestServer() {
Server *best = NULL;
for(unsigned int i = 0; i < servers.size(); i++){
if(servers[i]->connected) {
2004-11-28 02:23:26 +01:00
if((servers[i]->queue <= maxqueue) &&
(!best || servers[i]->queue < best->queue)) {
2004-11-18 19:30:15 +01:00
best = servers[i];
2004-11-28 02:23:26 +01:00
// cerr << "best: " << i << " queue: " << best->queue << endl;
2004-11-18 19:30:15 +01:00
}
}
}
return best;
}
void Dispatcher::ReceiveFragment(Fragment *in, Fragment *out) {
//lock nexus if run in thread.
// cerr << "Saving: " << in->id << endl;
SaveFragment(*nexus, *chain, *in, *out);
if(frags.count(in->id)) {
FragIO *frag = frags[in->id];
delete frag;
frags.erase(frags.find(in->id));
}
delete in;
delete out;
}
void Dispatcher::msghandler(message &msg) {
switch(msg.id) {
2004-11-28 02:23:26 +01:00
case MSG_FAIL:
2004-11-18 19:30:15 +01:00
case MSG_SEND: {
//get server!
Server *best = BestServer();
Fragment *fragin = (Fragment *)(msg.param);
2005-01-18 23:25:32 +01:00
if(!best || mode == CLUSTER) { //no server process locally....
2004-11-28 02:23:26 +01:00
// cerr << "Local: " << fragin->id << endl;
2004-11-18 19:30:15 +01:00
vector<Point3f> newvert;
vector<unsigned int> newface;
vector<BigLink> newbord;
Join(*fragin, newvert, newface, newbord);
2004-11-28 08:58:49 +01:00
float error = Decimate(mode,
(unsigned int)((newface.size()/3) * scaling),
2004-11-18 19:30:15 +01:00
newvert, newface, newbord);
2004-11-28 08:58:49 +01:00
2004-11-18 19:30:15 +01:00
Fragment *fragout = new Fragment;
fragout->error = error;
fragout->id = fragin->id;
fragout->seeds = fragin->seeds;
fragout->seeds_id = fragin->seeds_id;
Split(*fragout, newvert, newface, newbord);
ReceiveFragment(fragin, fragout);
} else {
2004-11-28 02:23:26 +01:00
// cerr << "Server: " << fragin->id << endl;
2004-11-18 19:30:15 +01:00
FragIO *frag = new FragIO(best, this, fragin);
2004-11-28 02:23:26 +01:00
if(msg.id == MSG_SEND)
2005-01-18 23:25:32 +01:00
assert(!frags.count(fragin->id));
2004-11-18 19:30:15 +01:00
frags[fragin->id] = frag;
frag->start();
}
} break;
case MSG_RECEIVE:
ReceiveFragment((Fragment *)(msg.param), (Fragment *)(msg.result));
break;
default:
defhandler(msg);
}
}