<
html
>
<
head
>
<
title
>PTypes: multithreading: msgqueue</
title
>
<
meta
http-equiv
=
"Content-Type"
content
=
"text/html; charset=iso-8859-1"
>
<
link
rel
=
"stylesheet"
href
=
"styles.css"
>
</
head
>
<
body
bgcolor
=
"#FFFFFF"
leftmargin
=
"40"
marginwidth
=
"40"
>
<
p
><
a
href
=
"../index.html"
><
img
src
=
"title-21.png"
width
=
"253"
height
=
"39"
alt
=
"C++ Portable Types Library (PTypes) Version 2.1"
border
=
"0"
></
a
>
<
hr
size
=
"1"
noshade>
<
p
class
=
"hpath"
><
a
href
=
"index.html"
>Top</
a
>: <
a
href
=
"async.html"
>Multithreading</
a
>:
msgqueue </
p
>
<
blockquote
>
<
pre
class
=
"lang"
>#include <
pasync.h
>
class msgqueue {
msgqueue(int limit = DEF_QUEUE_LIMIT);
<
span
class
=
"comment"
> // functions calling from the owner thread:</
span
>
void processone();
void processmsgs();
void run();
<
span
class
=
"comment"
> // functions calling from any thread:</
span
>
void post(message* msg);
void post(int id, pintptr param = 0);
void posturgent(message* msg);
void posturgent(int id, pintptr param = 0);
int send(message* msg);
int send(int id, pintptr param = 0);
int get_count();
int get_limit();
<
span
class
=
"comment"
> // message handlers</
span
>
virtual void msghandler(message& msg) = 0;
void defhandler(message& msg);
}
</
pre
>
</
blockquote
>
<
p
>The <
span
class
=
"lang"
>msgqueue</
span
> class implements a queue of <
a
href
=
"async.message.html"
>message</
a
>
objects and is typically used to synchronously exchange data in a multithreaded
environment.</
p
>
<
p
>The thread which created the <
span
class
=
"lang"
>msgqueue</
span
> object can
retrieve messages from the queue using one of <
span
class
=
"lang"
>run()</
span
>,
<
span
class
=
"lang"
>processmsgs()</
span
>, <
span
class
=
"lang"
>processone()</
span
>.
Applications always define a class derived from <
span
class
=
"lang"
>msgqueue</
span
>
and override the pure virtual method <
span
class
=
"lang"
>msghandler()</
span
>. The
overridden method receives <
span
class
=
"lang"
>message</
span
> structures and performs
appropriate actions depending on the message ID. Any other thread or even multiple
threads in your application can send or post messages to the given queue using
<
span
class
=
"lang"
>post()</
span
>, <
span
class
=
"lang"
>posturgent()</
span
> or <
span
class
=
"lang"
>send()</
span
>.</
p
>
<
p
><
span
class
=
"lang"
>Msgqueue</
span
> can serve as a synchronization object between
threads. Unlike semaphores, where both sending and receiving threads can "hang"
when waiting for readiness of the peer, <
span
class
=
"lang"
>msgqueue</
span
> allows
the sender to send data and immediately continue the execution. The receiver in
its turn processes messages one by one in the same order as they were posted.
</
p
>
<
p
>Threads can not only exchange data through a message queue, but also send simple
notifications about various events. Message queues can as well be used in single-threaded
applications with event-driven logic.</
p
>
<
p
>A simple example of using <
span
class
=
"lang"
>msgqueue</
span
> could be a server
application with multiple threads, each serving one client; the server maintains
a log file or a table in a database where it records various events. To record
events synchronously the client threads are sending appropriate messages to the
main thread. The client threads never waste time, they just post their messages
and immediately continue their work.</
p
>
<
p
><
b
>IMPORTANT NOTES</
b
>: (1) a <
span
class
=
"lang"
>message</
span
> object should
always be constructed dynamically, i.e. using operator <
span
class
=
"lang"
>new</
span
>;
(2) a <
span
class
=
"lang"
>message</
span
> object is always destroyed by the queue
manager after it has been processed; (3) a <
span
class
=
"lang"
>message</
span
> object
can be sent and processed only once.</
p
>
<
p
>A slower but more universal alternative to the message queue is local pipe
(see <
a
href
=
"streams.infile.html"
>infile::pipe()</
a
>).</
p
>
<
p
><
span
class
=
"def"
>msgqueue::msgqueue(int limit = DEF_QUEUE_LIMIT)</
span
> constructs
a message queue object. It doesn't matter which thread is creating this object,
but later only one thread can process the queue and handle messages. <
span
class
=
"lang"
>Limit</
span
>
specifies the maximum number of unhandled messages this queue can hold. If the
limit is reached, the next thread that posts a message will wait until the queue
becomes available again. In this version the default for <
span
class
=
"def"
>limit</
span
>
is 5000.</
p
>
<
p
><
span
class
=
"def"
>void msgqueue::processone() </
span
>processes one message
from the queue. This method may "hang" if no messages are available.
<
span
class
=
"lang"
>processone()</
span
> calls the overridden <
span
class
=
"lang"
>msghandler()</
span
>
and then destroys the message object.</
p
>
<
p
><
span
class
=
"def"
>void msgqueue::processmsgs()</
span
> processes all messages
in the queue and returns to the caller. If there are no messages in the queue,
<
span
class
=
"lang"
>processmsgs()</
span
> returns immediately. Each message is processed
as described for <
span
class
=
"lang"
>processone()</
span
>.</
p
>
<
p
><
span
class
=
"def"
>void msgqueue::run()</
span
> enters an infinite loop of message
processing which can only be terminated by sending or posting a special message
<
span
class
=
"lang"
>MSG_QUIT</
span
> (e.g. <
span
class
=
"lang"
>post(MSG_QUIT)</
span
>).
Each message is processed as described for <
span
class
=
"lang"
>processone()</
span
>.</
p
>
<
p
><
span
class
=
"def"
>void msgqueue::post(message* msg)</
span
> adds a message to
the queue. <
span
class
=
"lang"
>Msg</
span
> can be an object of class <
span
class
=
"lang"
>message</
span
>
or any derivative class. The <
span
class
=
"lang"
>message</
span
> object should always
be created dynamically using operator <
span
class
=
"lang"
>new</
span
>. The messages
in the queue are processed in order they were posted, i.e. on first-in-first-out
basis. <
span
class
=
"lang"
>post()</
span
> can be called from any thread, including
the thread owning the queue.</
p
>
<
p
><
span
class
=
"def"
>void msgqueue::post(int id, pintptr param = 0)</
span
> creates
a message object using <
span
class
=
"lang"
>id</
span
> and <
span
class
=
"lang"
>param</
span
>
and calls <
span
class
=
"lang"
>post(message*)</
span
>.</
p
>
<
p
><
span
class
=
"def"
>void msgqueue::posturgent(message* msg)</
span
> posts a message
object "out of turn", i.e. this message will be processed first. The
messages posted through this method are processed on first-in-last-out basis.
<
span
class
=
"lang"
>post()</
span
> and <
span
class
=
"lang"
>posturgent()</
span
> can
be used alternately on the same queue. Like <
span
class
=
"lang"
>post()</
span
>,
this method can be called from any thread.</
p
>
<
p
><
span
class
=
"def"
>void msgqueue::posturgent(int id, pintptr param = 0)</
span
>
creates a message object using <
span
class
=
"lang"
>id</
span
> and <
span
class
=
"lang"
>param</
span
>
and calls <
span
class
=
"lang"
>posturgent(message*)</
span
>.</
p
>
<
p
><
span
class
=
"def"
>pintptr msgqueue::send(message* msg)</
span
> calls the message
handler directly, by-passing the queue. If the sender is the same as the thread
owning the queue, <
span
class
=
"lang"
>send()</
span
> simply calls <
span
class
=
"lang"
>msghandler()</
span
>.
Otherwise, if the sender is a concurrent thread, <
span
class
=
"lang"
>send()</
span
>
enters an effective wait state until the message is processed by the owner thread.
The return value is the value of <
span
class
=
"lang"
>result</
span
> in the message
object. In both cases the message is destroyed upon return from <
span
class
=
"lang"
>send()</
span
>.</
p
>
<
p
><
span
class
=
"def"
>pintptr msgqueue::send(int id, pintptr param = 0)</
span
>
creates a message object using <
span
class
=
"lang"
>id</
span
> and <
span
class
=
"lang"
>param</
span
>
and calls <
span
class
=
"lang"
>send(message*)</
span
>. The return value is the value
of <
span
class
=
"lang"
>result</
span
> in the message object.</
p
>
<
p
><
span
class
=
"def"
>int msgqueue::get_count()</
span
> returns the number of messages
currently in the queue.</
p
>
<
p
><
span
class
=
"def"
>int msgqueue::get_limit()</
span
> returns the queue limit
set by the constructor.</
p
>
<
p
><
span
class
=
"def"
>virtual void msgqueue::msghandler(message& msg)</
span
> this
pure virtual method should be overridden to provide application-specific message
handling functionality. <
span
class
=
"lang"
>msghandler()</
span
> usually checks
the message ID through a <
span
class
=
"lang"
>switch</
span
> statement. If the message
ID is unknown to the application, <
span
class
=
"lang"
>defhandler()</
span
> should
be called. The message object <
span
class
=
"lang"
>msg</
span
> CAN NOT be reused
with <
span
class
=
"lang"
>post()</
span
>, <
span
class
=
"lang"
>posturgent()</
span
>
or <
span
class
=
"lang"
>send()</
span
>, neither it can be destroyed within the message
handler. The message handler can assign some value to <
span
class
=
"lang"
>msg.result</
span
>
to return a simple answer to the caller of <
span
class
=
"lang"
>send()</
span
>.</
p
>
<
p
><
span
class
=
"def"
>void msgqueue::defhandler(message& msg)</
span
> is called
from within user-defined <
span
class
=
"lang"
>msghandler()</
span
> when the message
ID is unknown to the application. <
span
class
=
"lang"
>defhandler()</
span
> processes
some messages internally used by the library, e.g. MSG_QUIT.</
p
>
<
p
class
=
"seealso"
>See also: <
a
href
=
"async.message.html"
>message</
a
>, <
a
href
=
"async.jobqueue.html"
>jobqueue</
a
></
p
>
<
span
class
=
"lang"
></
span
>
<
hr
size
=
"1"
>
<
a
href
=
"../index.html"
class
=
"ns"
>PTypes home</
a
>
</
body
>
</
html
>