#ifndef __PASYNC_H__
#define __PASYNC_H__
#ifdef WIN32
# define _WINSOCKAPI_
# include <windows.h>
#else
# include <pthread.h>
# ifndef __bsdi__
# include <semaphore.h>
# endif
#endif
#ifndef __PPORT_H__
#include "pport.h"
#endif
#ifndef __PTYPES_H__
#include "ptypes.h"
#endif
PTYPES_BEGIN
#ifdef _MSC_VER
#pragma pack(push, 4)
#endif
#ifdef WIN32
typedef int pthread_id_t
typedef HANDLE pthread_t
#else
typedef pthread_t pthread_id_t
#endif
void psleep(uint milliseconds)
bool pthrequal(pthread_id_t id)
pthread_id_t pthrself()
#ifdef WIN32
struct mutex: public noncopyable
{
protected:
CRITICAL_SECTION critsec
public:
mutex() { InitializeCriticalSection(&
~mutex() { DeleteCriticalSection(&critsec); }
void enter() { EnterCriticalSection(&critsec); }
void leave() { LeaveCriticalSection(&critsec); }
void lock() { enter(); }
void unlock() { leave(); }
};
#else
struct mutex: public noncopyable
{
protected:
pthread_mutex_t mtx;
public:
mutex() { pthread_mutex_init(&mtx, 0); }
~mutex() { pthread_mutex_destroy(&mtx); }
void enter() { pthread_mutex_lock(&mtx); }
void leave() { pthread_mutex_unlock(&mtx); }
void lock() { enter(); }
void unlock() { leave(); }
};
#endif
class scopelock: public noncopyable
{
protected:
mutex* mtx;
public:
scopelock(mutex& imtx): mtx(&imtx) { mtx->lock(); }
~scopelock() { mtx->unlock(); }
};
#define _MUTEX_HASH_SIZE 29
#ifdef WIN32
# define pmemlock mutex
# define pmementer(m) (m)->lock()
# define pmemleave(m) (m)->unlock()
#else
# define _MTX_INIT PTHREAD_MUTEX_INITIALIZER
# define pmemlock pthread_mutex_t
# define pmementer pthread_mutex_lock
# define pmemleave pthread_mutex_unlock
#endif
extern pmemlock _mtxtable[_MUTEX_HASH_SIZE];
#define pgetmemlock(addr) (_mtxtable + pintptr(addr) % _MUTEX_HASH_SIZE)
#ifdef WIN32
class trigger: public noncopyable
{
protected:
HANDLE handle;
public:
trigger(bool autoreset, bool state);
~trigger() { CloseHandle(handle); }
void wait() { WaitForSingleObject(handle, INFINITE); }
void post() { SetEvent(handle); }
void signal() { post(); }
void reset() { ResetEvent(handle); }
};
#else
class trigger: public noncopyable
{
protected:
pthread_mutex_t mtx;
pthread_cond_t cond;
int state;
bool autoreset;
public:
trigger(bool autoreset, bool state);
~trigger();
void wait();
void post();
void signal() { post(); }
void reset();
};
#endif
#if defined(WIN32) || defined(__DARWIN__) || defined(__bsdi__)
# define __PTYPES_RWLOCK__
#elif defined(linux)
# if defined(_GNU_SOURCE) || defined(__USE_UNIX98)
# define __POSIX_RWLOCK__
# endif
#else
# define __POSIX_RWLOCK__
#endif
#ifdef __PTYPES_RWLOCK__
struct rwlock: protected mutex
{
protected:
#ifdef WIN32
HANDLE reading;
HANDLE finished;
int readcnt;
int writecnt;
#else
pthread_mutex_t mtx;
pthread_cond_t readcond;
pthread_cond_t writecond;
int locks;
int writers;
int readers;
#endif
public:
rwlock();
~rwlock();
void rdlock();
void wrlock();
void unlock();
void lock() { wrlock(); }
};
#elif defined(__POSIX_RWLOCK__)
struct rwlock: public noncopyable
{
protected:
pthread_rwlock_t rw;
public:
rwlock();
~rwlock() { pthread_rwlock_destroy(&rw); }
void rdlock() { pthread_rwlock_rdlock(&rw); }
void wrlock() { pthread_rwlock_wrlock(&rw); }
void unlock() { pthread_rwlock_unlock(&rw); }
void lock() { wrlock(); }
};
#endif
#if defined(__PTYPES_RWLOCK__) || defined(__POSIX_RWLOCK__)
class scoperead: public noncopyable
{
protected:
rwlock* rw;
public:
scoperead(rwlock& irw): rw(&irw) { rw->rdlock(); }
~scoperead() { rw->unlock(); }
};
class scopewrite: public noncopyable
{
protected:
rwlock* rw;
public:
scopewrite(rwlock& irw): rw(&irw) { rw->wrlock(); }
~scopewrite() { rw->unlock(); }
};
#endif
#if defined(WIN32) || defined(__DARWIN__) || defined(__bsdi__)
# define __SEM_TO_TIMEDSEM__
#endif
#ifdef __SEM_TO_TIMEDSEM__
class timedsem;
typedef timedsem semaphore;
#else
class semaphore: public unknown
{
protected:
sem_t handle;
public:
semaphore(int initvalue);
virtual ~semaphore();
void wait();
void post();
void signal() { post(); }
};
#endif
class timedsem: public unknown
{
protected:
#ifdef WIN32
HANDLE handle;
#else
int count;
pthread_mutex_t mtx;
pthread_cond_t cond;
#endif
public:
timedsem(int initvalue);
virtual ~timedsem();
bool wait(int msecs = -1);
void post();
void signal() { post(); }
};
class thread: public unknown
{
protected:
#ifdef WIN32
unsigned id;
#endif
pthread_t handle;
int autofree;
int running;
int signaled;
int finished;
int freed;
int reserved;
timedsem relaxsem;
virtual void execute() = 0;
virtual void cleanup();
bool relax(int msecs) { return relaxsem.wait(msecs); }
friend void _threadepilog(thread* thr);
#ifdef WIN32
friend unsigned __stdcall _threadproc(void* arg);
#else
friend void* _threadproc(void* arg);
#endif
public:
thread(bool iautofree);
virtual ~thread();
#ifdef WIN32
pthread_id_t get_id() { return int(id); }
#else
pthread_id_t get_id() { return handle; }
#endif
bool get_running() { return running != 0; }
bool get_finished() { return finished != 0; }
bool get_signaled() { return signaled != 0; }
void start();
void signal();
void waitfor();
};
const int MSG_USER = 0;
const int MSG_QUIT = -1;
const int DEF_QUEUE_LIMIT = 5000;
class message: public unknown
{
protected:
message* next;
semaphore* sync;
friend class jobqueue;
friend class msgqueue;
public:
int id;
pintptr param;
pintptr result;
message(int iid, pintptr iparam = 0);
virtual ~message();
};
class jobqueue: public noncopyable
{
private:
int limit;
message* head;
message* tail;
int qcount;
timedsem sem;
timedsem ovrsem;
mutex qlock;
protected:
bool enqueue(message* msg, int timeout = -1);
bool push(message* msg, int timeout = -1);
message* dequeue(bool safe = true, int timeout = -1);
void purgequeue();
public:
jobqueue(int ilimit = DEF_QUEUE_LIMIT);
virtual ~jobqueue();
int get_count() const { return qcount; }
int get_limit() const { return limit; }
bool post(message* msg, int timeout = -1);
bool post(int id, pintptr param = 0, int timeout = -1);
bool posturgent(message* msg, int timeout = -1);
bool posturgent(int id, pintptr param = 0, int timeout = -1);
message* getmessage(int timeout = -1);
#ifdef PTYPES19_COMPAT
int msgsavail() const { return get_count(); }
#endif
};
template <class T> class tjobqueue: protected jobqueue
{
public:
tjobqueue(int ilimit = DEF_QUEUE_LIMIT);
int get_count() const { return jobqueue::get_count(); }
int get_limit() const { return jobqueue::get_limit(); }
bool post(T* msg, int timeout = -1) { return jobqueue::post(msg, timeout); }
bool posturgent(T* msg, int timeout = -1) { return jobqueue::posturgent(msg, timeout); }
T* getmessage(int timeout = -1) { return (T*)jobqueue::getmessage(timeout); }
};
class msgqueue: protected jobqueue
{
private:
mutex thrlock;
pthread_id_t owner;
pintptr finishmsg(message* msg);
void handlemsg(message* msg);
void takeownership();
protected:
bool quit;
void defhandler(message& msg);
virtual void msghandler(message& msg) = 0;
public:
msgqueue(int ilimit = DEF_QUEUE_LIMIT);
virtual ~msgqueue();
// functions calling from the owner thread:
void processone();
void processmsgs();
void run();
// functions calling from any thread:
int get_count() const { return jobqueue::get_count(); }
int get_limit() const { return jobqueue::get_limit(); }
bool post(message* msg, int timeout = -1) { return jobqueue::post(msg, timeout); }
bool post(int id, pintptr param = 0, int timeout = -1) { return jobqueue::post(id, param, timeout); }
bool posturgent(message* msg, int timeout = -1) { return jobqueue::posturgent(msg, timeout); }
bool posturgent(int id, pintptr param = 0, int timeout = -1) { return jobqueue::posturgent(id, param, timeout); }
pintptr send(message* msg);
pintptr send(int id, pintptr param = 0);
#ifdef PTYPES19_COMPAT
int msgsavail() const { return get_count(); }
#endif
};
#ifdef _MSC_VER
#pragma pack(pop)
#endif
PTYPES_END
#endif