/*! @file
* @brief
* implementation of concepts for client-sided I/O to the server
*/
#include "Client_IO.H"
// -------------------------------------------------------------------------
const int ConnectRetries = 3; // #retries to server before client gives up
const int RetryDelay = 60; // delay in seconds between retries
// example 1:
// Abort immediately after detecting that the server is not reachable:
// ConnectRetries = 0;
// example 2:
// Wait up to 5 minutes after detecting that the server is not reachable,
// try to reconnect every minute:
// ConnectRetries = 5; RetryDelay = 60;
string CClientPolynomFetcher::buffer;
void * CClientPolynomFetcher::THREAD_fetch_polynom(void *)
{
static unsigned int PolyInterval = 1000;
static time_t prev_time = 0;
if (PolyInterval>500 && time(NULL)>prev_time+300) PolyInterval=100*(PolyInterval/200);
else if (time(NULL)<prev_time+120) PolyInterval+=100*(PolyInterval/200);
prev_time=time(NULL);
int retries_on_err = 0;
try_again:
try
{
unix_io_stream tcp_ServerConnection(communication_name, server_port);
cout_network << "Fetching mpqs polynomial interval (" << PolyInterval << ")..." << endl;
tcp_ServerConnection << "Polynom? " << PolyInterval << endl;
buffer.clear();
while (!tcp_ServerConnection.eof())
{
char c;
tcp_ServerConnection.get(c);
buffer+=c;
}
cout_network << "background thread: polynomial fetched." << endl;
return NULL;
}
catch (unix_buffer_exception &e)
{
cerr << "caught exception in background thread while fetching polynomial:" << endl
<< e.what() << endl;
if (prev_time!=0 && ++retries_on_err<=ConnectRetries) { sleep(RetryDelay); goto try_again; }
else throw; // propagate exception
}
catch (...)
{
cerr << "caught unknown exception in background thread while fetching polynomial." << endl;
throw; // propagate exception
}
}
void CClientPolynomFetcher::fetch(mpz_t UpperBound_D)
{
static pthread_t thread_fetch_polynom;
int retcode;
static bool first_time_first_love = true;
if (first_time_first_love)
{
first_time_first_love=false;
retcode = pthread_create(&thread_fetch_polynom, NULL, THREAD_fetch_polynom, NULL);
if (retcode != 0)
{
cerr << "CClientPolynomFetcher: pthread_create failed!" << endl;
exit(1);
}
}
// join thread
cout_network << "Polynomfetch join..." << endl;
retcode = pthread_join(thread_fetch_polynom, NULL);
if (retcode != 0)
{
cerr << "CClientPolynomFetcher: Joining thread failed!" << endl;
exit(1);
}
cout << "Setting mpqs polynomial interval..." << endl;
// read in polynomial data from internal string buffer
istringstream is(buffer);
Polynom.load(is);
cout << "Old UpperBound_D = " << UpperBound_D << endl;
is >> UpperBound_D;
cout << "New UpperBound_D = " << UpperBound_D << endl;
// and fetch the next polynomial (threaded in background)
retcode = pthread_create(&thread_fetch_polynom, NULL, THREAD_fetch_polynom, NULL);
if (retcode != 0)
{
cerr << "CClientPolynomFetcher::pthread_create failed!" << endl;
exit(1);
}
}
// ----------------------------------------------------------------------
CMutex CClientDynamicFactorFetcher::Mutex;
queue<int> CClientDynamicFactorFetcher::buffer;
void * CClientDynamicFactorFetcher::THREAD_fetch_DynamicFactors(void *)
{
int sleep_secs = 100;
while(true)
{
try
{
unix_io_stream tcp_ServerConnection(communication_name, server_port);
cout << "Fetching dynamic factors from server..." << endl;
static unsigned int dynfac_pos = 0;
tcp_ServerConnection << "DynamicFactors?_ab_index " << dynfac_pos << endl;
int factor = 0;
int counter = 0;
while (true) // fetching dynamic relations
{
//tcp_ServerConnection >> factor;
// cout << factor << "\r" << flush;
char c[4];
if (tcp_ServerConnection.peek()==EOF)
{
// magical mystery tour...
// it seems that EOF is set sometimes although the stream
// is still fully readable...
// for that reason we clear the stream just before the
// real read...
if ( Cpoll(tcp_ServerConnection).readable_chars_within(128,2000) < 4)
cerr << "possibly failing..." << endl;
tcp_ServerConnection.clear();
}
tcp_ServerConnection.read(c,4);
factor=static_cast<unsigned int>(static_cast<unsigned char>(c[0]));
factor|=static_cast<unsigned int>(static_cast<unsigned char>(c[1]))<<8;
factor|=static_cast<unsigned int>(static_cast<unsigned char>(c[2]))<<16;
factor|=static_cast<unsigned int>(static_cast<unsigned char>(c[3]))<<24;
if (factor<=0 || tcp_ServerConnection.fail())
{
MARK; break;
}
Mutex.lock(); buffer.push(factor); Mutex.unlock();
++counter;
}
dynfac_pos+=counter;
cout_network << "background thread: " << counter << " dynamic factors fetched." << endl;
if (counter>200) sleep_secs-=sleep_secs/4;
else if (counter<20 && sleep_secs<3600) sleep_secs+=(sleep_secs+3)/4;
}
catch (unix_buffer_exception &e)
{
cerr << "caught exception in background thread while fetching dynamic factors:" << endl
<< e.what() << endl;
sleep_secs=90; // retry in 90 seconds
}
catch (...)
{
cerr << "caught unknown exception in background thread while fetching dynamic factors." << endl;
throw; // propagate exception
}
cout_network << "next dynamic factor fetch request in " << sleep_secs << " seconds." << endl;
sleep(sleep_secs); // wait a few seconds...
}
return NULL;
}
void CClientDynamicFactorFetcher::fetch()
{
static pthread_t thread_fetch_DynamicFactors;
int retcode;
static bool first_time_first_love = true;
if (first_time_first_love)
{
first_time_first_love=false;
retcode = pthread_create(&thread_fetch_DynamicFactors, NULL, THREAD_fetch_DynamicFactors, NULL);
if (retcode != 0)
{
cerr << "CClientDynamicFactorFetcher: pthread_create failed!" << endl;
exit(1);
}
}
CUnlockMutexAtDestruction Unlocker(Mutex); Mutex.lock();
// Mutex will be automatically unlocked at destructor call of Unlocker!
if (buffer.empty()) return;
#ifdef VERBOSE
cout << "Inserting " << buffer.size() << " dynamic factors..." << endl;
#endif
// read in dynamic factors from internal class buffer
TDynamicFactorRelation relation;
relation.fpos=0; relation.factor=0;
while (!buffer.empty()) // get dynamic relation
{
relation.factor=buffer.front(); buffer.pop();
// cout << relation.factor << "\r" << flush;
relation.append_for_sieving();
DynamicFactorRelations.insert(relation);
}
}
// ---------------------------------------------------------------------------
void CClientRelation_Delivery::init(void)
{
// first check whether the host is reachable
// and get the hostname
#ifdef CYGWIN_COMPAT
struct hostent *hp;
hp = gethostbyname(communication_name.c_str());
if (hp == NULL)
{
cerr << "Unknown host " << communication_name << endl;
exit (1);
}
#else
// refer "man getaddrinfo" and "man socket"
struct addrinfo hints; // our wishes are placed here
memset(&hints,0,sizeof(hints)); // must be zeroed for default options
hints.ai_family=PF_INET; // we want IPv4 as protocol
hints.ai_socktype=SOCK_STREAM; // and we need a stream, not datagram!
struct addrinfo *addrinfo_res = NULL; // here the result will be delivered
const int retval = getaddrinfo(communication_name.c_str(),NULL,&hints,&addrinfo_res);
if ( retval || addrinfo_res==NULL ) // any error?
{
cerr << "can't reach " << "\"" << communication_name << "\"" << endl;
cerr << "Error given by getaddrinfo: " << endl;
cerr << gai_strerror(retval) << endl;
exit(1);
}
if (addrinfo_res->ai_socktype!=SOCK_STREAM) // we got a "stream"-protocol?
{
cerr << "provided protocol doesn't support SOCK_STREAM" << endl;
exit(1);
}
freeaddrinfo(addrinfo_res); // free resources allocated by getaddrinfo
#endif
// now initialize consumer thread, which transmits the relations to the server
pthread_attr_t detached_thread;
pthread_t thread_transmit_relations;
pthread_attr_init(&detached_thread);
pthread_attr_setdetachstate(&detached_thread, PTHREAD_CREATE_DETACHED);
const int retcode = pthread_create(&thread_transmit_relations, &detached_thread,
CClientRelation_Delivery::THREAD_transmit_Relations, NULL);
if (retcode != 0)
{
cerr << "pthread_create failed for THREAD_transmit_Relations!" << endl;
exit(1);
}
}
void * CClientRelation_Delivery::THREAD_transmit_Relations(void *)
{
// important: this thread shall not run concurrently to itself!
string s;
char line[1024];
// if the first attempt to connect the server fails: abort the program
// if the first attempt succeeded, but later attempts fail, this
// indicates that the server is temporarily unavailable.
// Wait a few minutes and try again. If this fails too often, then give up.
unix_io_stream *connection_to_server = NULL;
int err_count = 0;
static bool connection_reachable = false;
bool retried_transmission = false;
ostringstream *temp_relations = NULL;
reconnect:
try
{
if (connection_to_server!=NULL) delete connection_to_server;
// wait until the pipe produces input
PipeInput.get(); PipeInput.unget();
connection_to_server = new unix_io_stream(communication_name, server_port);
connection_reachable=true;
}
catch (unix_buffer_exception &e)
{
connection_to_server=NULL;
cerr << "caught an exception: " << e.what() << endl;
if (connection_reachable && ++err_count<=ConnectRetries)
{
cout << "sleeping " << RetryDelay << " seconds before retrying..." << endl;
sleep(RetryDelay);
goto reconnect;
}
else
{
goto done;
}
}
*connection_to_server << "NewRelations! " << kN << endl;
// verify, that relations belong to our factorization
*connection_to_server >> s;
if (s!="proceed")
{
cerr << "Oops! Server does not accept my relations: " << endl;
cerr << s << endl;
goto done;
}
// if this is omitted, then the network socket name will be used as account name by the server
if (ClientAccountName!="") *connection_to_server << "Account: " << ClientAccountName << endl; // sending Account name for registering statistical data
if (retried_transmission)
{
// previous transmission failed, retry it instead of giving up
MARK;
retried_transmission=false;
cout << "retry sending relations." << endl;
if (temp_relations==NULL || temp_relations->str().empty())
{
cout << "nothing to transmit..." << endl;
}
else
{
*connection_to_server << temp_relations->str() << flush;
if (connection_to_server->fail())
{
MARK;
cerr << "stream state indicates that transmission has failed!" << endl;
retried_transmission=true;
goto reconnect;
}
// synchronize communication with server
static int counter = 0;
*connection_to_server << "Relationsblock_Sync " << flush;
s="(empty)"; *connection_to_server >> s;
if (s!="synced.")
{
cerr << "sync with server failed: " << endl; cerr << s << endl;
retried_transmission=true;
if (++counter > 3)
{
cerr << "too many retries. giving up. sorry." << endl;
goto done;
}
goto reconnect;
}
counter = 0; // reset counter
cout << "retry sending relations succeeded." << endl;
}
}
transmit:
while (PipeInput.peek()!=EOF)
{
delete temp_relations; temp_relations = new ostringstream();
int count=1000+1; // send maximal count-1 relations per block
const int max_duration = 8; // Maximal duration of a transmission for a block in seconds (to prevent life-locks...)
const time_t starttime = time(NULL);
while (PipeInput.peek()!=EOF && --count && time(NULL)-starttime<=max_duration)
{
*connection_to_server << "RL "; // new token for "Relation! "
*temp_relations << "RL ";
while (PipeInput.peek()!='\n')
{
PipeInput.get(line,sizeof(line),'\n');
*connection_to_server << line;
*temp_relations << line;
}
char c; PipeInput.get(c);
*connection_to_server << '\n';
*temp_relations << '\n';
}
*connection_to_server << flush;
if (connection_to_server->fail())
{
MARK;
cerr << "stream state indicates that transmission has failed!" << endl;
retried_transmission=true;
goto reconnect;
}
if (PipeInput.peek()!=EOF)
{
// synchronize communication with server
*connection_to_server << "Relationsblock_Sync " << flush;
s="(empty)"; *connection_to_server >> s;
if (s!="synced.")
{
cerr << "sync with server failed: " << endl; cerr << s << endl;
retried_transmission=true;
goto reconnect;
}
}
}
PipeInput.clear();
// synchronize communication with server
cout << "syncing with server..." << endl;
*connection_to_server << "Relationsblock_Sync " << flush;
s="(empty)"; *connection_to_server >> s;
if (s!="synced.")
{
cerr << "sync with server failed: " << endl; cerr << s << endl;
goto done;
}
sleep(5); // wait to let relations drop into the pipe
if (PipeInput.peek()!=EOF) goto transmit;
*connection_to_server << "Relationsblock_Ende " << flush;
*connection_to_server >> s;
if (s=="ignoriert!")
{
cerr << "Oops! - Transmitted relations were rejected by the server!" << endl;
goto done;
}
if (s!="empfangen.")
{
cerr << "Transmission of relations not acknowledged!" << endl;
goto done;
}
goto reconnect; // end this transmission and respawn a new transmission when new relations arrive...
done:
// we're finished with our job...
cout << "ending transmissions... bye, bye..." << endl;
delete temp_relations;
delete connection_to_server;
exit(1);
}