qsieve

qsieve Mercurial Source Tree


Root/src/Client_IO.cc

/*! @file
 * @brief
 * implementation of concepts for client-sided I/O to the server
 */
 
#include "Client_IO.H"
 
// -------------------------------------------------------------------------
 
const int ConnectRetries = 3; // #retries to server before client gives up
const int RetryDelay = 60; // delay in seconds between retries
 
// example 1:
// Abort immediately after detecting that the server is not reachable:
//  ConnectRetries = 0;
 
// example 2:
// Wait up to 5 minutes after detecting that the server is not reachable,
// try to reconnect every minute:
// ConnectRetries = 5; RetryDelay = 60;
 
 
string CClientPolynomFetcher::buffer;
 
void * CClientPolynomFetcher::THREAD_fetch_polynom(void *)
{
  static unsigned int PolyInterval = 1000;
  static time_t prev_time = 0;
  if (PolyInterval>500 && time(NULL)>prev_time+300) PolyInterval=100*(PolyInterval/200);
  else if (time(NULL)<prev_time+120) PolyInterval+=100*(PolyInterval/200);
  prev_time=time(NULL);
 
  int retries_on_err = 0;
try_again:
  try
   {
     unix_io_stream tcp_ServerConnection(communication_name, server_port);
     cout_network << "Fetching mpqs polynomial interval (" << PolyInterval << ")..." << endl;
     tcp_ServerConnection << "Polynom? " << PolyInterval << endl;
     buffer.clear();
     while (!tcp_ServerConnection.eof())
      {
        char c;
        tcp_ServerConnection.get(c);
        buffer+=c;
      }
     cout_network << "background thread: polynomial fetched." << endl;
     return NULL;
   }
  catch (unix_buffer_exception &e)
   {
     cerr << "caught exception in background thread while fetching polynomial:" << endl
          << e.what() << endl;
     if (prev_time!=0 && ++retries_on_err<=ConnectRetries) { sleep(RetryDelay); goto try_again; }
     else throw; // propagate exception
   }
  catch (...)
   {
     cerr << "caught unknown exception in background thread while fetching polynomial." << endl;
     throw; // propagate exception
   }
}
 
void CClientPolynomFetcher::fetch(mpz_t UpperBound_D)
{
  static pthread_t thread_fetch_polynom;
  int retcode;
 
  static bool first_time_first_love = true;
  if (first_time_first_love)
   {
     first_time_first_love=false;
     retcode = pthread_create(&thread_fetch_polynom, NULL, THREAD_fetch_polynom, NULL);
     if (retcode != 0)
      {
        cerr << "CClientPolynomFetcher: pthread_create failed!" << endl;
        exit(1);
      }
   }
 
  // join thread
  cout_network << "Polynomfetch join..." << endl;
  retcode = pthread_join(thread_fetch_polynom, NULL);
  if (retcode != 0)
   {
     cerr << "CClientPolynomFetcher: Joining thread failed!" << endl;
     exit(1);
   }
 
  cout << "Setting mpqs polynomial interval..." << endl;
 
  // read in polynomial data from internal string buffer
  istringstream is(buffer);
  Polynom.load(is);
  cout << "Old UpperBound_D = " << UpperBound_D << endl;
  is >> UpperBound_D;
  cout << "New UpperBound_D = " << UpperBound_D << endl;
 
  // and fetch the next polynomial (threaded in background)
  retcode = pthread_create(&thread_fetch_polynom, NULL, THREAD_fetch_polynom, NULL);
  if (retcode != 0)
   {
     cerr << "CClientPolynomFetcher::pthread_create failed!" << endl;
     exit(1);
   }
}
 
 
// ----------------------------------------------------------------------
 
 
CMutex CClientDynamicFactorFetcher::Mutex;
queue<int> CClientDynamicFactorFetcher::buffer;
 
void * CClientDynamicFactorFetcher::THREAD_fetch_DynamicFactors(void *)
{
 int sleep_secs = 100;
 while(true)
  {
    try
     {
       unix_io_stream tcp_ServerConnection(communication_name, server_port);
       cout << "Fetching dynamic factors from server..." << endl;
       static unsigned int dynfac_pos = 0;
       tcp_ServerConnection << "DynamicFactors?_ab_index " << dynfac_pos << endl;
 
       int factor = 0;
       int counter = 0;
       while (true) // fetching dynamic relations
        {
          //tcp_ServerConnection >> factor;
          // cout << factor << "\r" << flush;
          char c[4];
          if (tcp_ServerConnection.peek()==EOF)
           {
             // magical mystery tour...
             // it seems that EOF is set sometimes although the stream
             // is still fully readable...
             // for that reason we clear the stream just before the
             // real read...
             if ( Cpoll(tcp_ServerConnection).readable_chars_within(128,2000) < 4)
               cerr << "possibly failing..." << endl;
             tcp_ServerConnection.clear();
           }
          tcp_ServerConnection.read(c,4);
          factor=static_cast<unsigned int>(static_cast<unsigned char>(c[0]));
          factor|=static_cast<unsigned int>(static_cast<unsigned char>(c[1]))<<8;
          factor|=static_cast<unsigned int>(static_cast<unsigned char>(c[2]))<<16;
          factor|=static_cast<unsigned int>(static_cast<unsigned char>(c[3]))<<24;
          if (factor<=0 || tcp_ServerConnection.fail())
           {
             MARK; break;
           }
          Mutex.lock(); buffer.push(factor); Mutex.unlock();
          ++counter;
        }
       dynfac_pos+=counter;
       cout_network << "background thread: " << counter << " dynamic factors fetched." << endl;
       if (counter>200) sleep_secs-=sleep_secs/4;
       else if (counter<20 && sleep_secs<3600) sleep_secs+=(sleep_secs+3)/4;
     }
    catch (unix_buffer_exception &e)
     {
       cerr << "caught exception in background thread while fetching dynamic factors:" << endl
            << e.what() << endl;
       sleep_secs=90; // retry in 90 seconds
     }
    catch (...)
     {
       cerr << "caught unknown exception in background thread while fetching dynamic factors." << endl;
       throw; // propagate exception
     }
 
    cout_network << "next dynamic factor fetch request in " << sleep_secs << " seconds." << endl;
    sleep(sleep_secs); // wait a few seconds...
  }
 return NULL;
}
 
 
void CClientDynamicFactorFetcher::fetch()
{
  static pthread_t thread_fetch_DynamicFactors;
 
  int retcode;
  static bool first_time_first_love = true;
 
  if (first_time_first_love)
   {
     first_time_first_love=false;
     retcode = pthread_create(&thread_fetch_DynamicFactors, NULL, THREAD_fetch_DynamicFactors, NULL);
     if (retcode != 0)
      {
        cerr << "CClientDynamicFactorFetcher: pthread_create failed!" << endl;
        exit(1);
      }
   }
 
  CUnlockMutexAtDestruction Unlocker(Mutex); Mutex.lock();
  // Mutex will be automatically unlocked at destructor call of Unlocker!
 
  if (buffer.empty()) return;
 
#ifdef VERBOSE
  cout << "Inserting " << buffer.size() << " dynamic factors..." << endl;
#endif
 
  // read in dynamic factors from internal class buffer
  TDynamicFactorRelation relation;
  relation.fpos=0; relation.factor=0;
  while (!buffer.empty()) // get dynamic relation
   {
     relation.factor=buffer.front(); buffer.pop();
     // cout << relation.factor << "\r" << flush;
     relation.append_for_sieving();
     DynamicFactorRelations.insert(relation);
   }
}
 
 
// ---------------------------------------------------------------------------
 
 
void CClientRelation_Delivery::init(void)
{
  // first check whether the host is reachable
  // and get the hostname
 
#ifdef CYGWIN_COMPAT
  struct hostent *hp;
  hp = gethostbyname(communication_name.c_str());
  if (hp == NULL)
    {
      cerr << "Unknown host " << communication_name << endl;
      exit (1);
    }
#else
  // refer "man getaddrinfo" and "man socket"
  struct addrinfo hints; // our wishes are placed here
  memset(&hints,0,sizeof(hints)); // must be zeroed for default options
  hints.ai_family=PF_INET; // we want IPv4 as protocol
  hints.ai_socktype=SOCK_STREAM; // and we need a stream, not datagram!
  struct addrinfo *addrinfo_res = NULL; // here the result will be delivered
  const int retval = getaddrinfo(communication_name.c_str(),NULL,&hints,&addrinfo_res);
  if ( retval || addrinfo_res==NULL ) // any error?
   {
     cerr << "can't reach " << "\"" <<  communication_name << "\"" << endl;
     cerr << "Error given by getaddrinfo: " << endl;
     cerr << gai_strerror(retval) << endl;
     exit(1);
   }
  if (addrinfo_res->ai_socktype!=SOCK_STREAM) // we got a "stream"-protocol?
   {
     cerr << "provided protocol doesn't support SOCK_STREAM" << endl;
     exit(1);
   }
 
  freeaddrinfo(addrinfo_res); // free resources allocated by getaddrinfo
#endif
 
 
 // now initialize consumer thread, which transmits the relations to the server
  pthread_attr_t detached_thread;
  pthread_t thread_transmit_relations;
 
  pthread_attr_init(&detached_thread);
  pthread_attr_setdetachstate(&detached_thread, PTHREAD_CREATE_DETACHED);
 
  const int retcode = pthread_create(&thread_transmit_relations, &detached_thread,
                       CClientRelation_Delivery::THREAD_transmit_Relations, NULL);
 
  if (retcode != 0)
   {
     cerr << "pthread_create failed for THREAD_transmit_Relations!" << endl;
     exit(1);
   }
 
}
 
 
 
void * CClientRelation_Delivery::THREAD_transmit_Relations(void *)
{
  // important: this thread shall not run concurrently to itself!
  string s;
  char line[1024];
 
  // if the first attempt to connect the server fails: abort the program
  // if the first attempt succeeded, but later attempts fail, this
  // indicates that the server is temporarily unavailable.
  // Wait a few minutes and try again. If this fails too often, then give up.
 
  unix_io_stream *connection_to_server = NULL;
  int err_count = 0;
  static bool connection_reachable = false;
  bool retried_transmission = false;
  ostringstream *temp_relations = NULL;
 
reconnect:
  try
   {
     if (connection_to_server!=NULL) delete connection_to_server;
  
     // wait until the pipe produces input
     PipeInput.get(); PipeInput.unget();
 
     connection_to_server = new unix_io_stream(communication_name, server_port);
     connection_reachable=true;
   }
  catch (unix_buffer_exception &e)
   {
     connection_to_server=NULL;
     cerr << "caught an exception: " << e.what() << endl;
     if (connection_reachable && ++err_count<=ConnectRetries)
      {
        cout << "sleeping " << RetryDelay << " seconds before retrying..." << endl;
        sleep(RetryDelay);
        goto reconnect;
      }
     else
      {
        goto done;
      }
   }
 
 
  *connection_to_server << "NewRelations! " << kN << endl;
  // verify, that relations belong to our factorization
  *connection_to_server >> s;
  if (s!="proceed")
   {
     cerr << "Oops! Server does not accept my relations: " << endl;
     cerr << s << endl;
     goto done;
   }
 
  // if this is omitted, then the network socket name will be used as account name by the server
  if (ClientAccountName!="") *connection_to_server << "Account: " << ClientAccountName << endl; // sending Account name for registering statistical data
 
  if (retried_transmission)
   {
     // previous transmission failed, retry it instead of giving up
     MARK;
     retried_transmission=false;
     cout << "retry sending relations." << endl;
     if (temp_relations==NULL || temp_relations->str().empty())
      {
        cout << "nothing to transmit..." << endl;
      }
     else
      {
        *connection_to_server << temp_relations->str() << flush;
        if (connection_to_server->fail())
         {
           MARK;
           cerr << "stream state indicates that transmission has failed!" << endl;
           retried_transmission=true;
           goto reconnect;
         }
 
        // synchronize communication with server
         
        static int counter = 0;
        *connection_to_server << "Relationsblock_Sync " << flush;
        s="(empty)"; *connection_to_server >> s;
        if (s!="synced.")
         {
           cerr << "sync with server failed: " << endl; cerr << s << endl;
           retried_transmission=true;
           if (++counter > 3)
            {
              cerr << "too many retries. giving up. sorry." << endl;
              goto done;
            }
           goto reconnect;
         }
        counter = 0; // reset counter
        cout << "retry sending relations succeeded." << endl; 
      }
   }
 
transmit:
  while (PipeInput.peek()!=EOF)
   {
     delete temp_relations; temp_relations = new ostringstream();
 
     int count=1000+1; // send maximal count-1 relations per block
     const int max_duration = 8; // Maximal duration of a transmission for a block in seconds (to prevent life-locks...)
     const time_t starttime = time(NULL);
     while (PipeInput.peek()!=EOF && --count && time(NULL)-starttime<=max_duration)
      {
        *connection_to_server << "RL "; // new token for "Relation! "
        *temp_relations << "RL ";
    while (PipeInput.peek()!='\n')
     {
       PipeInput.get(line,sizeof(line),'\n');
       *connection_to_server << line;
       *temp_relations << line;
     }
    char c; PipeInput.get(c);
    *connection_to_server << '\n';
    *temp_relations << '\n';
      }
       
     *connection_to_server << flush;
      
     if (connection_to_server->fail())
      {
    MARK;
        cerr << "stream state indicates that transmission has failed!" << endl;
        retried_transmission=true;
        goto reconnect;
      }
 
     if (PipeInput.peek()!=EOF)
      {
        // synchronize communication with server
        *connection_to_server << "Relationsblock_Sync " << flush;
        s="(empty)"; *connection_to_server >> s;
        if (s!="synced.")
         {
           cerr << "sync with server failed: " << endl; cerr << s << endl;
           retried_transmission=true;
           goto reconnect;
         }
      }
   }
  PipeInput.clear();
 
  // synchronize communication with server
  cout << "syncing with server..." << endl;
  *connection_to_server << "Relationsblock_Sync " << flush;
  s="(empty)"; *connection_to_server >> s;
  if (s!="synced.")
   {
     cerr << "sync with server failed: " << endl; cerr << s << endl;
     goto done;
   }
 
  sleep(5); // wait to let relations drop into the pipe
  if (PipeInput.peek()!=EOF) goto transmit;
 
  *connection_to_server << "Relationsblock_Ende " << flush;
  *connection_to_server >> s;
  if (s=="ignoriert!")
   {
     cerr << "Oops! - Transmitted relations were rejected by the server!" << endl;
     goto done;
   }
  if (s!="empfangen.")
   {
     cerr << "Transmission of relations not acknowledged!" << endl;
     goto done;
   }
 
  goto reconnect; // end this transmission and respawn a new transmission when new relations arrive...
 
done:
  // we're finished with our job...
  cout << "ending transmissions... bye, bye..." << endl;
  delete temp_relations;
  delete connection_to_server;
  exit(1);
}
Source at commit 3c9c236610dc created 11 years 9 months ago.
By Nathan Adams, fixing access issue

Archive Download this file

Branches

Tags

Page rendered in 0.77138s using 11 queries.