ptypes

ptypes Mercurial Source Tree


Root/doc/async.examples.html

<html><!-- #BeginTemplate "/Templates/tmpl.dwt" --><!-- DW6 -->
<head>
<!-- #BeginEditable "doctitle" --> 
<title>PTypes: multithreading: examples</title>
<!-- #EndEditable --> 
<meta http-equiv="Content-Type" content="text/html; charset=iso-8859-1">
<link rel="stylesheet" href="styles.css">
</head>
<body bgcolor="#FFFFFF" leftmargin="40" marginwidth="40">
<p><a href="../index.html"><img src="title-21.png" width="253" height="39" alt="C++ Portable Types Library (PTypes) Version 2.1" border="0"></a> 
<hr size="1" noshade>
<!-- #BeginEditable "body" --> 
<p class="hpath"><a href="index.html">Top</a>: <a href="async.html">Multithreading</a>: 
Examples </p>
<p><b>Example 1</b>. This simple example shows the use of mutex objects to safely 
perform calculation that involves more than one global variable. Note that using 
the <span class="lang">rwlock</span> class instead of <span class="lang">mutex</span> 
can improve performance by allowing multiple threads call <span class="lang">avg_get()</span> 
at once.</p>
<blockquote> 
<pre>int avg_sum = 0;
int avg_cnt = 0;

mutex avg_lock;

void avg_add(int v)
{
    scopelock lock(avg_lock);
    avg_sum += v;
    avg_cnt++;
}

int avg_get()
{
    int result;
    {
        scopelock lock(avg_lock);
        if (avg_cnt == 0)
            result = 0;
        else
            result = avg_sum / avg_cnt;
    }
    return result;
}
</pre>
</blockquote>
<p><br>
<b>Example 2</b>. A multithreaded TCP server that uses the <span class="lang">jobqueue</span> 
class to maintain a thread pool - a fixed list of reusable threads that receive 
job `assignments' from a queue. The code below can be used as a template for a 
multithreaded network server.</p>
<blockquote> 
<pre>#include <ptypes.h>

#include <ptime.h>
#include <pasync.h>
#include <pinet.h>

USING_PTYPES


const int testport = 8085;
const int maxthreads = 30;
const int maxtoken = 4096;

const int MSG_MYJOB = MSG_USER + 1;


class myjobthread: public thread
{
protected:
    int id;
    jobqueue* jq;
    virtual void execute();
public:
    myjobthread(int iid, jobqueue* ijq)
        : thread(false), id(iid), jq(ijq)  {}
    ~myjobthread()  { waitfor(); }
};


class myjob: public message
{
public:
    ipstream* client;
    myjob(ipstream* iclient)
        : message(MSG_MYJOB), client(iclient)  {}
    ~myjob()  { delete client; }
};


void myjobthread::execute()
{
    bool quit = false;
    while (!quit)
    {
        <span class="comment">// get the next message from the queue</span>
        message* msg = jq->getmessage();

        try
        {
            switch (msg->id)
            {
            case MSG_MYJOB:
                {
                    ipstream* client = ((myjob*)msg)->client;

                    <span class="comment">// read the request line</span>
                    string req = lowercase(client->line(maxtoken));
                    if (req == "hello")
                    {
                        <span class="comment">// send our greeting to the client</span>
                        client->putline("Hello, " + iptostring(client->get_ip()) + ", nice to see you!");
                        client->flush();

                        <span class="comment">// log this request</span>
                        pout.putf("%t  greeting received from %a, handled by thread %d\n",
                            now(), long(client->get_ip()), id);
                    }
                    client->close();
                }
                break;

            case MSG_QUIT:
                <span class="comment">// MSG_QUIT is not used in our example</span>
                quit = true;
                break;
            }
        }
        catch(exception*)
        {
            <span class="comment">// the message object must be freed!</span>
            delete msg;
            throw;
        }
        delete msg;
    }
}


void servermain(ipstmserver& svr)
{
    jobqueue jq;
    tobjlist<myjobthread> threads(true);

    <span class="comment">// create the thread pool</span>
    int i;
    for(i = 0; i < maxthreads; i++)
    {
        myjobthread* j = new myjobthread(i + 1, &jq);
        j->start();
        threads.add(j);
    }

    ipstream* client = new ipstream();

    pout.putf("Ready to answer queries on port %d\n", testport);

    while(true)
    {
        svr.serve(*client);

        if (client->get_active())
        {
            <span class="comment">// post the job to the queue; the client object will be freed
            // automatically by the job object</span>
            jq.post(new myjob(client));
            client = new ipstream();
        }
    }
}


int main()
{
    ipstmserver svr;

    try
    {
        <span class="comment">// bind to all local addresses on port 8085</span>
        svr.bindall(testport);

        <span class="comment">// enter an infinite loop of serving requests</span>
        servermain(svr);
    }
    catch(estream* e)
    {
        perr.putf("FATAL: %s\n", pconst(e->get_message()));
        delete e;
    }

    return 0;
}
</pre>
</blockquote>
<p class="seealso">See also: <a href="async.thread.html">thread</a>, <a href="async.mutex.html">mutex</a>, 
<a href="async.rwlock.html">rwlock</a>, <a href="async.jobqueue.html">jobqueue</a>, 
<a href="async.message.html">message</a>, <a href="inet.examples.html">Networking 
examples</a></p>
<!-- #EndEditable -->
<hr size="1">
<a href="../index.html" class="ns">PTypes home</a>
</body>
<!-- #EndTemplate --></html>

Archive Download this file

Branches

Tags

Page rendered in 0.73991s using 11 queries.