#ifndef __PSTREAMS_H__
#define __PSTREAMS_H__
#ifndef __PPORT_H__
#include "pport.h"
#endif
#ifndef __PTYPES_H__
#include "ptypes.h"
#endif
#ifndef PTYPES_ST
# ifndef __PASYNC_H__
# include "pasync.h"
# endif
#endif
#include <stdarg.h>
#include <errno.h>
#ifdef WIN32
# define _WINSOCKAPI_
# include "windows.h"
#endif
PTYPES_BEGIN
#ifdef _MSC_VER
#pragma pack(push, 4)
#endif
class iobase;
class estream: public exception
{
protected:
int code;
iobase* errstm;
public:
estream(iobase* ierrstm, int icode, const char* imsg);
estream(iobase* ierrstm, int icode, const string& imsg);
virtual ~estream();
int get_code() { return code; }
iobase* get_errstm() { return errstm; }
};
typedef void (*iostatusevent)(iobase* sender, int code);
int unixerrno();
const char* unixerrmsg(int code);
const int IO_CREATED = 1;
const int IO_OPENING = 5;
const int IO_OPENED = 35;
const int IO_READING = 37;
const int IO_WRITING = 38;
const int IO_EOF = 45;
const int IO_CLOSING = 250;
const int IO_CLOSED = 253;
enum ioseekmode
{
IO_BEGIN,
IO_CURRENT,
IO_END
};
const int invhandle = -1;
class iobase: public component
{
friend class fdxoutstm;
protected:
bool active;
bool cancelled;
bool eof;
int handle;
large abspos;
int bufsize;
char* bufdata;
int bufpos;
int bufend;
int stmerrno;
string deferrormsg;
int status;
iostatusevent onstatus;
virtual void bufalloc();
virtual void buffree();
void bufclear() { bufpos = 0; bufend = 0; }
void errstminactive();
void errbufrequired();
void requireactive() { if (!active) errstminactive(); }
void requirebuf() { requireactive(); if (bufdata == 0) errbufrequired(); }
int convertoffset(large);
virtual void doopen() = 0;
virtual void doclose();
virtual large doseek(large newpos, ioseekmode mode);
virtual void chstat(int newstat);
virtual int uerrno();
virtual const char* uerrmsg(int code);
public:
iobase(int ibufsize = -1);
virtual ~iobase();
void open();
void close();
void cancel();
void reopen() { open(); }
large seekx(large newpos, ioseekmode mode = IO_BEGIN);
int seek(int newpos, ioseekmode mode = IO_BEGIN) { return convertoffset(seekx(newpos, mode)); }
void error(int code, const char* defmsg);
virtual void flush();
virtual string get_errormsg();
virtual string get_errstmname();
virtual string get_streamname() = 0;
bool get_active() { return active; }
void set_active(bool newval);
bool get_cancelled() { return cancelled; }
void set_cancelled(bool newval) { cancelled = newval; }
int get_handle() { return handle; }
int get_bufsize() { return bufsize; }
void set_bufsize(int newval);
int get_stmerrno() { return stmerrno; }
int get_status() { return status; }
iostatusevent get_onstatus() { return onstatus; }
void set_onstatus(iostatusevent newval) { onstatus = newval; }
};
typedef iobase* piobase;
extern int defbufsize;
extern int stmbalance;
const char eofchar = 0;
class instm: public iobase
{
protected:
virtual int dorawread(char* buf, int count);
int rawread(char* buf, int count);
virtual void bufvalidate();
void skipeol();
public:
instm(int ibufsize = -1);
virtual ~instm();
virtual int classid();
bool get_eof();
void set_eof(bool ieof) { eof = ieof; }
bool get_eol();
int get_dataavail();
char preview();
char get();
void putback();
string token(const cset& chars);
string token(const cset& chars, int limit);
int token(const cset& chars, char* buf, int size);
string line();
string line(int limit);
int line(char* buf, int size, bool eateol = true);
int read(void* buf, int count);
int skip(int count);
int skiptoken(const cset& chars);
void skipline(bool eateol = true);
large tellx();
int tell() { return convertoffset(tellx()); }
large seekx(large newpos, ioseekmode mode = IO_BEGIN);
int seek(int newpos, ioseekmode mode = IO_BEGIN) { return convertoffset(seekx(newpos, mode)); }
};
typedef instm* pinstm;
class outstm: public iobase
{
protected:
bool flusheol;
virtual int dorawwrite(const char* buf, int count);
int rawwrite(const char* buf, int count);
virtual void bufvalidate();
void bufadvance(int delta)
{ bufpos += delta; if (bufend < bufpos) bufend = bufpos; }
bool canwrite();
public:
outstm(bool iflusheol = false, int ibufsize = -1);
virtual ~outstm();
virtual int classid();
bool get_flusheol() { return flusheol; }
void set_flusheol(bool newval) { flusheol = newval; }
virtual void flush();
bool get_eof() { return eof; }
void put(char c);
void put(const char* str);
void put(const string& str);
void vputf(const char* fmt, va_list);
void putf(const char* fmt, ...);
void putline(const char* str);
void putline(const string& str);
void puteol();
int write(const void* buf, int count);
large tellx() { return abspos + bufpos; }
int tell() { return convertoffset(tellx()); }
large seekx(large newpos, ioseekmode mode = IO_BEGIN);
int seek(int newpos, ioseekmode mode = IO_BEGIN) { return convertoffset(seekx(newpos, mode)); }
};
typedef outstm* poutstm;
extern char* shorttimefmt;
extern char* longtimefmt;
class fdxstm;
class fdxoutstm: public outstm
{
friend class fdxstm;
protected:
fdxstm* in;
virtual void chstat(int newstat);
virtual int uerrno();
virtual const char* uerrmsg(int code);
virtual void doopen();
virtual void doclose();
virtual int dorawwrite(const char* buf, int count);
public:
fdxoutstm(int ibufsize, fdxstm* iin);
virtual ~fdxoutstm();
virtual string get_streamname();
};
typedef fdxstm* pfdxstm;
class fdxstm: public instm
{
friend class fdxoutstm;
protected:
fdxoutstm out;
virtual int dorawwrite(const char* buf, int count);
public:
fdxstm(int ibufsize = -1);
virtual ~fdxstm();
virtual int classid();
void set_bufsize(int newval);
void open();
void close();
void cancel();
virtual void flush();
large tellx(bool);
int tell(bool forin) { return convertoffset(tellx(forin)); }
// output interface: pretend this class is derived both
// from instm and outstm. actually we can't use multiple
// inheritance here, since this is a full-duplex stream,
// hence everything must be duplicated for input and output
void putf(const char* fmt, ...);
void put(char c) { out.put(c); }
void put(const char* str) { out.put(str); }
void put(const string& str) { out.put(str); }
void putline(const char* str) { out.putline(str); }
void putline(const string& str) { out.putline(str); }
void puteol() { out.puteol(); }
int write(const void* buf, int count) { return out.write(buf, count); }
bool get_flusheol() { return out.get_flusheol(); }
void set_flusheol(bool newval) { out.set_flusheol(newval); }
operator outstm&() { return out; }
};
class infilter: public instm
{
protected:
instm* stm;
char* savebuf;
int savecount;
string postponed;
void copytobuf(string& s);
void copytobuf(pconst& buf, int& count);
bool copytobuf(char c);
virtual void freenotify(component* sender);
virtual void doopen();
virtual void doclose();
virtual int dorawread(char* buf, int count);
virtual void dofilter() = 0;
bool bufavail() { return savecount > 0; }
void post(const char* buf, int count);
void post(const char* s);
void post(char c);
virtual void post(string s);
public:
infilter(instm* istm, int ibufsize = -1);
virtual ~infilter();
virtual string get_errstmname();
instm* get_stm() { return stm; }
void set_stm(instm* stm);
};
class outfilter: public outstm
{
protected:
outstm* stm;
virtual void freenotify(component* sender);
virtual void doopen();
virtual void doclose();
public:
outfilter(outstm* istm, int ibufsize = -1);
virtual ~outfilter();
virtual string get_errstmname();
outstm* get_stm() { return stm; }
void set_stm(outstm* stm);
};
class inmemory: public instm
{
protected:
string mem;
virtual void bufalloc();
virtual void buffree();
virtual void bufvalidate();
virtual void doopen();
virtual void doclose();
virtual large doseek(large newpos, ioseekmode mode);
virtual int dorawread(char* buf, int count);
public:
inmemory(const string& imem);
virtual ~inmemory();
virtual int classid();
virtual string get_streamname();
large seekx(large newpos, ioseekmode mode = IO_BEGIN);
int seek(int newpos, ioseekmode mode = IO_BEGIN) { return convertoffset(seekx(newpos, mode)); }
string get_strdata() { return mem; }
void set_strdata(const string& data);
};
class outmemory: public outstm
{
protected:
string mem;
int limit;
virtual void doopen();
virtual void doclose();
virtual large doseek(large newpos, ioseekmode mode);
virtual int dorawwrite(const char* buf, int count);
public:
outmemory(int limit = -1);
virtual ~outmemory();
virtual int classid();
virtual string get_streamname();
large tellx() { return abspos; }
int tell() { return (int)abspos; }
string get_strdata();
};
class outfile;
class infile: public instm
{
protected:
string filename;
int syshandle;
int peerhandle;
virtual void doopen();
virtual void doclose();
public:
infile();
infile(const char* ifn);
infile(const string& ifn);
virtual ~infile();
virtual int classid();
void pipe(outfile&);
virtual string get_streamname();
int get_syshandle() { return syshandle; }
void set_syshandle(int ihandle) { close(); syshandle = ihandle; }
int get_peerhandle() { return peerhandle; }
string get_filename() { return filename; }
void set_filename(const string& ifn) { close(); filename = ifn; }
void set_filename(const char* ifn) { close(); filename = ifn; }
};
class outfile: public outstm
{
protected:
friend class infile;
string filename;
int syshandle;
int peerhandle;
int umode;
bool append;
virtual void doopen();
virtual void doclose();
public:
outfile();
outfile(const char* ifn, bool iappend = false);
outfile(const string& ifn, bool iappend = false);
virtual ~outfile();
virtual int classid();
virtual void flush();
virtual string get_streamname();
int get_syshandle() { return syshandle; }
void set_syshandle(int ihandle) { close(); syshandle = ihandle; }
int get_peerhandle() { return peerhandle; }
string get_filename() { return filename; }
void set_filename(const string& ifn) { close(); filename = ifn; }
void set_filename(const char* ifn) { close(); filename = ifn; }
bool get_append() { return append; }
void set_append(bool iappend) { close(); append = iappend; }
int get_umode() { return umode; }
void set_umode(int iumode) { close(); umode = iumode; }
};
class logfile: public outfile
{
protected:
#ifndef PTYPES_ST
mutex lock;
#endif
public:
logfile();
logfile(const char* ifn, bool iappend = true);
logfile(const string& ifn, bool iappend = true);
virtual ~logfile();
virtual int classid();
void vputf(const char* fmt, va_list);
void putf(const char* fmt, ...);
};
class intee: public infilter {
protected:
outfile file;
virtual void doopen();
virtual void doclose();
virtual void dofilter();
public:
intee(instm* istm, const char* ifn, bool iappend = false);
intee(instm* istm, const string& ifn, bool iappend = false);
virtual ~intee();
outfile* get_file() { return &file; }
virtual string get_streamname();
};
#ifndef WIN32
# define DEF_NAMED_PIPES_DIR "/tmp/"
#endif
#ifdef WIN32
const int DEF_PIPE_TIMEOUT = 20000;
const int DEF_PIPE_OPEN_TIMEOUT = 1000;
const int DEF_PIPE_OPEN_RETRY = 5;
const int DEF_PIPE_SYSTEM_BUF_SIZE = 4096;
#endif
class namedpipe: public fdxstm
{
friend class npserver;
protected:
string pipename;
int svhandle;
#ifdef WIN32
// we use overlapped IO in order to have timed waiting in serve()
// and also to implement timeout error on the client side
OVERLAPPED ovr;
virtual int dorawread(char* buf, int count);
virtual int dorawwrite(const char* buf, int count);
static string realpipename(const string& pipename, const string& svrname = nullstring);
void initovr();
#else
static string realpipename(const string& pipename);
static bool setupsockaddr(const string& pipename, void* sa);
void initovr() {}
#endif
virtual void doopen();
virtual void doclose();
virtual large doseek(large, ioseekmode);
public:
namedpipe();
namedpipe(const string& ipipename);
#ifdef WIN32
namedpipe(const string& ipipename, const string& servername);
#endif
virtual ~namedpipe();
virtual int classid();
virtual void flush();
virtual string get_streamname();
string get_pipename() { return pipename; }
void set_pipename(const string&);
void set_pipename(const char*);
};
class npserver: public unknown
{
string pipename;
int handle;
bool active;
void error(int code, const char* defmsg);
void open();
void close();
#ifdef WIN32
void openinst();
void closeinst();
#endif
public:
npserver(const string& ipipename);
~npserver();
bool serve(namedpipe& client, int timeout = -1);
};
const int md5_digsize = 16;
typedef uchar md5_digest[md5_digsize];
typedef unsigned char md5_byte_t;
typedef unsigned int md5_word_t;
typedef struct md5_state_s
{
md5_word_t count[2];
md5_word_t abcd[4];
md5_byte_t buf[64];
} md5_state_t;
class outmd5: public outfilter
{
protected:
md5_state_s ctx;
md5_digest digest;
virtual void doopen();
virtual void doclose();
virtual int dorawwrite(const char* buf, int count);
public:
outmd5(outstm* istm = nil);
virtual ~outmd5();
virtual string get_streamname();
const unsigned char* get_bindigest() { close(); return digest; }
string get_digest();
};
class outnull: public outstm
{
protected:
virtual int dorawwrite(const char*, int);
virtual void doopen();
virtual void doclose();
public:
outnull();
virtual ~outnull();
virtual string get_streamname();
};
#ifdef _MSC_VER
# pragma warning (disable: 4099)
# pragma warning (disable: 4251)
#endif
class unit_thread;
class unit: public component
{
protected:
friend class unit_thread;
unit* pipe_next;
unit_thread* main_thread;
int running;
void do_main();
public:
compref<instm> uin;
compref<outstm> uout;
unit();
virtual ~unit();
virtual int classid();
// things that may be overridden in descendant classes
virtual void main();
virtual void cleanup();
// service methods
void connect(unit* next);
void run(bool async = false);
void waitfor();
};
typedef unit* punit;
typedef unit CUnit;
extern infile pin;
extern logfile pout;
extern logfile perr;
extern outnull pnull;
#ifdef _MSC_VER
#pragma pack(pop)
#endif
PTYPES_END
#endif