12 #include <sys/types.h> 14 #define __WVSTREAM_UNIT_TEST 1 16 #include "wvtimeutils.h" 18 #include "wvstreamsdebugger.h" 20 #include "wvistreamlist.h" 21 #include "wvlinkerhack.h" 22 #include "wvmoniker.h" 25 #define ENOBUFS WSAENOBUFS 27 #define errno GetLastError() 29 #include <sys/socket.h> 46 # define TRACE(x, y...) fprintf(stderr, x, ## y); fflush(stderr); 52 # define TRACE(x, y...) 58 WvStream *WvStream::globalstream = NULL;
66 static map<WSID, WvStream*> *wsid_map;
67 static WSID next_wsid_to_try;
82 IWvStream *s = wvcreate<IWvStream>(moniker, obj);
86 s->seterr_both(EINVAL,
"Unknown moniker '%s'", moniker);
93 static bool is_prefix_insensitive(
const char *str,
const char *prefix)
95 size_t len = strlen(prefix);
96 return strlen(str) >= len && strncasecmp(str, prefix, len) == 0;
100 static const char *strstr_insensitive(
const char *haystack,
const char *needle)
102 while (*haystack !=
'\0')
104 if (is_prefix_insensitive(haystack, needle))
112 static bool contains_insensitive(
const char *haystack,
const char *needle)
114 return strstr_insensitive(haystack, needle) != NULL;
118 static const char *list_format =
"%6s%s%2s%s%3s%s%3s%s%6s%s%20s%s%s";
119 static inline const char *Yes_No(
bool val)
121 return val?
"Yes":
"No";
125 void WvStream::debugger_streams_display_header(
WvStringParm cmd,
126 WvStreamsDebugger::ResultCallback result_cb)
129 result.append(list_format,
"--WSID",
"-",
"RC",
"-",
"-Ok",
"-",
"-Cs",
"-",
"-AlRem",
"-",
130 "----------------Type",
"-",
"Name--------------------");
131 result_cb(cmd, result);
136 static WvString friendly_ms(time_t ms)
142 else if (ms < 60*1000)
144 else if (ms < 60*60*1000)
145 return WvString(
"%sm", ms/(60*1000));
146 else if (ms <= 24*60*60*1000)
147 return WvString(
"%sh", ms/(60*60*1000));
149 return WvString(
"%sd", ms/(24*60*60*1000));
152 void WvStream::debugger_streams_display_one_stream(
WvStream *s,
154 WvStreamsDebugger::ResultCallback result_cb)
158 unsigned refcount = s->
release();
159 result.append(list_format,
162 Yes_No(s->
isok()),
" ",
167 result_cb(cmd, result);
171 void WvStream::debugger_streams_maybe_display_one_stream(
WvStream *s,
174 WvStreamsDebugger::ResultCallback result_cb)
176 bool show = args.isempty();
177 WvStringList::Iter arg(args);
178 for (arg.rewind(); arg.next(); )
181 bool is_num = wvstring_to_num(*arg, wsid);
185 if (s->wsid() == wsid)
193 if ((s->wsname() && contains_insensitive(s->wsname(), *arg))
194 || (s->wstype() && contains_insensitive(s->wstype(), *arg)))
202 debugger_streams_display_one_stream(s, cmd, result_cb);
208 WvStreamsDebugger::ResultCallback result_cb,
void *)
210 debugger_streams_display_header(cmd, result_cb);
213 map<WSID, WvStream*>::iterator it;
215 for (it = wsid_map->begin(); it != wsid_map->end(); ++it)
216 debugger_streams_maybe_display_one_stream(it->second, cmd, args,
220 return WvString::null;
226 WvStreamsDebugger::ResultCallback result_cb,
void *)
229 return WvString(
"Usage: %s <WSID>", cmd);
232 if (!wvstring_to_num(wsid_str, wsid))
233 return WvString(
"Invalid WSID '%s'", wsid_str);
234 IWvStream *s = WvStream::find_by_wsid(wsid);
238 return WvString::null;
242 void WvStream::add_debugger_commands()
244 WvStreamsDebugger::add_command(
"streams", 0, debugger_streams_run_cb, 0);
245 WvStreamsDebugger::add_command(
"close", 0, debugger_close_run_cb, 0);
250 read_requires_writable(NULL),
251 write_requires_readable(NULL),
252 uses_continue_select(false),
253 personal_stack_size(131072),
254 alarm_was_ticking(false),
258 readcb(
wv::bind(&
WvStream::legacy_callback, this)),
260 outbuf_delayed_flush(false),
266 alarm_time(wvtime_zero),
267 last_alarm_check(wvtime_zero)
269 TRACE(
"Creating wvstream %p\n",
this);
271 static bool first =
true;
275 WvStream::add_debugger_commands();
280 wsid_map =
new map<WSID, WvStream*>;
281 WSID first_wsid_tried = next_wsid_to_try;
284 if (wsid_map->find(next_wsid_to_try) == wsid_map->end())
287 }
while (next_wsid_to_try != first_wsid_tried);
288 my_wsid = next_wsid_to_try++;
289 bool inserted = wsid_map->insert(make_pair(my_wsid,
this)).second;
294 int result = WSAStartup(MAKEWORD(2,0), &wsaData);
301 IWvStream::IWvStream()
306 IWvStream::~IWvStream()
311 WvStream::~WvStream()
313 TRACE(
"destroying %p\n",
this);
324 wsid_map->erase(my_wsid);
325 if (wsid_map->empty())
335 WvIStreamList::globallist.unlink(
this);
337 TRACE(
"done destroying %p\n",
this);
343 TRACE(
"flushing in wvstream...\n");
345 TRACE(
"(flushed)\n");
351 IWvStreamCallback cb = closecb;
364 setcallback(wv::bind(autoforward_callback, wv::ref(*
this), wv::ref(s)));
381 len = input.
read(buf,
sizeof(buf));
382 output.
write(buf, len);
408 alarm_time = wvtime_zero;
416 #define TEST_CONTINUES_HARSHLY 0 417 #if TEST_CONTINUES_HARSHLY 419 # warning "Using WvCont for *all* streams for testing!" 464 size_t free = outbuf.
free();
469 unsigned char *buf = tmp.
alloc(count);
470 size_t len =
read(buf, count);
480 size_t avail = inbuf.
used();
483 const unsigned char *buf = inbuf.
get(count);
484 size_t len =
write(buf, count);
485 inbuf.
unget(count - len);
492 assert(!count || buf);
495 unsigned char *newbuf;
498 if (bufu < queue_min)
500 newbuf = inbuf.
alloc(queue_min - bufu);
502 i =
uread(newbuf, queue_min - bufu);
503 inbuf.
unalloc(queue_min - bufu - i);
508 if (bufu < queue_min)
516 bufu =
uread(buf, count);
523 memcpy(buf, inbuf.
get(bufu), bufu);
526 TRACE(
"read obj 0x%08x, bytes %d/%d\n", (
unsigned int)
this, bufu, count);
534 assert(!count || buf);
535 if (!
isok() || !buf || !count || stop_write)
return 0;
538 if (!outbuf_delayed_flush && !outbuf.
used())
540 wrote =
uwrite(buf, count);
542 buf = (
const unsigned char *)buf + wrote;
545 if (max_outbuf_size != 0)
547 size_t canbuffer = max_outbuf_size - outbuf.
used();
548 if (count > canbuffer)
553 outbuf.put(buf, count);
592 return isok() &&
select(0,
true,
false,
false);
598 return !stop_write &&
isok() &&
select(0,
false,
true,
false);
605 assert(separator >= 0);
606 assert(separator <= 255);
612 timeout_time = msecadd(wvtime(), wait_msec);
624 if (inbuf.
strchr(separator) > 0)
635 wait_msec = msecdiff(timeout_time, wvtime());
646 hasdata =
select(wait_msec,
true,
false);
655 unsigned char *buf = tmp.
alloc(readahead);
657 size_t len =
uread(buf, readahead);
659 inbuf.put(tmp.
get(len), len);
666 if (!hasdata && wait_msec == 0)
674 i = inbuf.
strchr(separator);
677 assert(eol && *eol == separator);
679 return const_cast<char*
>((
const char *)inbuf.
get(i));
684 inbuf.
alloc(1)[0] = 0;
685 return const_cast<char *
>((
const char *)inbuf.
get(inbuf.
used()));
693 assert(
false &&
"not implemented, come back later!");
703 read(buf,
sizeof(buf));
709 if (is_flushing)
return false;
711 TRACE(
"%p flush starts\n",
this);
714 want_to_flush =
true;
715 bool done = flush_internal(msec_timeout)
716 && flush_outbuf(msec_timeout);
719 TRACE(
"flush stops (%d)\n", done);
726 return want_to_flush;
730 bool WvStream::flush_outbuf(time_t msec_timeout)
732 TRACE(
"%p flush_outbuf starts (isok=%d)\n",
this,
isok());
733 bool outbuf_was_used = outbuf.
used();
738 if (!outbuf_was_used && !autoclose_time && !outbuf_delayed_flush)
744 WvTime stoptime = msecadd(wvtime(), msec_timeout);
747 while (outbuf_was_used &&
isok())
753 size_t real =
uwrite(outbuf.
get(attempt), attempt);
758 if (
isok() && real < attempt)
760 TRACE(
"flush_outbuf: unget %d-%d\n", attempt, real);
761 assert(outbuf.
ungettable() >= attempt - real);
762 outbuf.
unget(attempt - real);
771 if (msec_timeout >= 0
772 && (stoptime < wvtime() || !
select(msec_timeout,
false,
true)))
775 outbuf_was_used = outbuf.
used();
779 if (autoclose_time &&
isok())
781 time_t now = time(NULL);
782 TRACE(
"Autoclose enabled for 0x%p - now-time=%ld, buf %d bytes\n",
783 this, now - autoclose_time, outbuf.
used());
784 if ((flush_internal(0) && !outbuf.
used()) || now > autoclose_time)
791 TRACE(
"flush_outbuf: after autoclose chunk\n");
792 if (outbuf_delayed_flush && !outbuf_was_used)
793 want_to_flush =
false;
795 TRACE(
"flush_outbuf: now isok=%d\n",
isok());
798 if (outbuf_was_used && !
isok())
802 TRACE(
"flush_outbuf stops\n");
804 return !outbuf_was_used;
808 bool WvStream::flush_internal(time_t msec_timeout)
815 int WvStream::getrfd()
const 821 int WvStream::getwfd()
const 829 time_t now = time(NULL);
830 autoclose_time = now + (msec_timeout + 999) / 1000;
832 TRACE(
"Autoclose SETUP for 0x%p - buf %d bytes, timeout %ld sec\n",
833 this, outbuf.
used(), autoclose_time - now);
850 if (!
isok() || (!si.inherit_request && alarmleft == 0))
856 if (!si.inherit_request)
858 si.wants.readable |=
static_cast<bool>(readcb);
859 si.wants.writable |=
static_cast<bool>(writecb);
860 si.wants.isexception |=
static_cast<bool>(exceptcb);
864 if (si.wants.readable && inbuf.
used() && inbuf.
used() >= queue_min)
870 && (alarmleft < si.msec_timeout || si.msec_timeout < 0))
871 si.msec_timeout = alarmleft + 10;
877 if (!si.inherit_request)
879 si.wants.readable |=
static_cast<bool>(readcb);
880 si.wants.writable |=
static_cast<bool>(writecb);
881 si.wants.isexception |=
static_cast<bool>(exceptcb);
894 if ((si.wants.readable || (!si.inherit_request && readcb))
895 && inbuf.
used() && inbuf.
used() >= queue_min)
901 void WvStream::_build_selectinfo(
SelectInfo &si, time_t msec_timeout,
902 bool readable,
bool writable,
bool isexcept,
bool forceable)
910 si.wants.readable =
static_cast<bool>(readcb);
911 si.wants.writable =
static_cast<bool>(writecb);
912 si.wants.isexception =
static_cast<bool>(exceptcb);
916 si.wants.readable = readable;
917 si.wants.writable = writable;
918 si.wants.isexception = isexcept;
922 si.msec_timeout = msec_timeout;
923 si.inherit_request = ! forceable;
924 si.global_sure =
false;
929 if (globalstream && forceable && (globalstream !=
this))
943 tv.tv_sec = si.msec_timeout / 1000;
944 tv.tv_usec = (si.msec_timeout % 1000) * 1000;
948 SOCKET fakefd = socket(PF_INET, SOCK_STREAM, 0);
949 FD_SET(fakefd, &si.except);
953 int sel =
::select(si.max_fd+1, &si.read, &si.write, &si.except,
954 si.msec_timeout >= 0 ? &tv : (timeval*)NULL);
962 && errno != EAGAIN && errno != EINTR
972 TRACE(
"select() returned %d\n", sel);
977 bool WvStream::_process_selectinfo(
SelectInfo &si,
bool forceable)
984 wvstime_sync_forward();
987 if (globalstream && forceable && (globalstream !=
this))
999 bool WvStream::_select(time_t msec_timeout,
bool readable,
bool writable,
1000 bool isexcept,
bool forceable)
1003 assert(wsid_map && (wsid_map->find(my_wsid) != wsid_map->end()));
1006 _build_selectinfo(si, msec_timeout, readable, writable, isexcept,
1010 int sel = _do_select(si);
1012 sure = _process_selectinfo(si, forceable);
1013 if (si.global_sure && globalstream && forceable && (globalstream !=
this))
1023 static_cast<bool>(exceptcb));
1030 readcb = wv::bind(&WvStream::legacy_callback,
this);
1032 writecb = wv::bind(&WvStream::legacy_callback,
this);
1034 exceptcb = wv::bind(&WvStream::legacy_callback,
this);
1051 if (msec_timeout >= 0)
1052 alarm_time = msecadd(wvstime(), msec_timeout);
1054 alarm_time = wvtime_zero;
1060 if (alarm_time.tv_sec)
1065 if (now < last_alarm_check)
1067 #if 0 // okay, I give up. Time just plain goes backwards on some systems. 1069 if (msecdiff(last_alarm_check, now) > 200)
1070 fprintf(stderr,
" ************* TIME WENT BACKWARDS! " 1071 "(%ld:%ld %ld:%ld)\n",
1072 last_alarm_check.tv_sec, last_alarm_check.tv_usec,
1073 now.tv_sec, now.tv_usec);
1075 alarm_time = tvdiff(alarm_time, tvdiff(last_alarm_check, now));
1078 last_alarm_check = now;
1080 time_t remaining = msecdiff(alarm_time, now);
1097 if (msec_timeout >= 0)
1098 alarm(msec_timeout);
1100 alarm(msec_timeout);
1110 TRACE(
"hello-%p\n",
this);
1112 static_cast<bool>(writecb), static_cast<bool>(exceptcb));
1131 callfunc = _callfunc;
1136 void WvStream::legacy_callback()
1146 IWvStreamCallback tmp = readcb;
1156 IWvStreamCallback tmp = writecb;
1166 IWvStreamCallback tmp = exceptcb;
1176 IWvStreamCallback tmp = closecb;
1193 tmp.
merge(unreadbuf, count);
1200 IWvStream *WvStream::find_by_wsid(WSID wsid)
1206 map<WSID, WvStream*>::iterator it = wsid_map->find(wsid);
1208 if (it != wsid_map->end())
1209 retval = it->second;
IWvStreamCallback setclosecallback(IWvStreamCallback _callback)
Sets a callback to be invoked on close().
A WvFastString acts exactly like a WvString, but can take (const char *) strings without needing to a...
virtual int geterr() const
If isok() is false, return the system error number corresponding to the error, -1 for a special error...
The basic interface which is included by all other XPLC interfaces and objects.
void undo_force_select(bool readable, bool writable, bool isexception=false)
Undo a previous force_select() - ie.
virtual unsigned int release()=0
Indicate that you are finished using this object.
void xpre_select(SelectInfo &si, const SelectRequest &r)
Like pre_select(), but still exists even if you override the other pre_select() in a subclass...
void _callback()
Actually call the registered callfunc and execute().
T * alloc(size_t count)
Allocates exactly the specified number of elements and returns a pointer to an UNINITIALIZED storage ...
virtual bool isok() const
By default, returns true if geterr() == 0.
virtual void seterr(int _errnum)
Set the errnum variable – we have an error.
bool continue_select(time_t msec_timeout)
return to the caller from execute(), but don't really return exactly; this uses WvCont::yield() to re...
virtual void close()
Close the stream if it is open; isok() becomes false from now on.
void queuemin(size_t count)
force read() to not return any bytes unless 'count' bytes can be read at once.
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
T * mutablepeek(int offset, size_t count)
Returns a non-const pointer info the buffer at the specified offset to the specified number of elemen...
IWvStreamCallback setreadcallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is readable.
bool alarm_was_ticking
This will be true during callback execution if the callback was triggered by the alarm going off...
WvString popstr()
get the first string in the list, or an empty string if the list is empty.
virtual void nowrite()
Shuts down the writing side of the stream.
bool uses_continue_select
If this is set, enables the use of continue_select().
void unalloc(size_t count)
Unallocates exactly the specified number of elements by removing them from the buffer and releasing t...
Based on (and interchangeable with) struct timeval.
void flush_then_close(int msec_timeout)
flush the output buffer automatically as select() is called.
WvCont provides "continuations", which are apparently also known as semi-coroutines.
virtual bool post_select(SelectInfo &si)
post_select() is called after ::select(), and returns true if this object is now ready.
virtual size_t write(const void *buf, size_t count)
Write data to the stream.
virtual bool flush(time_t msec_timeout)
flush the output buffer, if we can do it without delaying more than msec_timeout milliseconds at a ti...
virtual size_t uwrite(const void *buf, size_t count)
unbuffered I/O functions; these ignore the buffer, which is handled by write().
void autoforward(WvStream &s)
set the callback function for this stream to an internal routine that auto-forwards all incoming stre...
void noautoforward()
Stops autoforwarding.
char * blocking_getline(time_t wait_msec, int separator='\n', int readahead=1024)
This is a version of getline() that allows you to block for more data to arrive.
size_t optgettable() const
Returns the optimal maximum number of elements in the buffer currently available for reading without ...
size_t ungettable() const
Returns the maximum number of elements that may be ungotten at this time.
bool xpost_select(SelectInfo &si, const SelectRequest &r)
Like post_select(), but still exists even if you override the other post_select() in a subclass...
A type-safe version of WvMonikerBase that lets you provide create functions for object types other th...
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling ::select().
virtual bool iswritable()
Returns true if the stream is writable (without using the outbuf).
Unified support for streams, that is, sequences of bytes that may or may not be ready for read/write ...
WvStream()
Basic constructor for just a do-nothing WvStream.
IWvStreamCallback setexceptcallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is in exception state.
#define UUID_MAP_ENTRY(iface)
Add an entry to an interface map.
IWvStream::SelectRequest get_select_request()
Use get_select_request() to save the current state of the selection state of this stream...
virtual void maybe_autoclose()
Auto-close the stream if the time is right.
void drain()
drain the input buffer (read and discard data until select(0) returns false)
#define UUID_MAP_BEGIN(component)
Start the interface map for "component".
the data structure used by pre_select()/post_select() and internally by select(). ...
A SelectRequest is a convenient way to remember what we want to do to a particular stream: read from ...
bool select(time_t msec_timeout)
Return true if any of the requested features are true on the stream.
bool stop_read
True if noread()/nowrite()/close() have been called, respectively.
virtual void callback()
if the stream has a callback function defined, call it now.
WvStream * read_requires_writable
If this is set, select() doesn't return true for read unless the given stream also returns true for w...
This is a WvList of WvStrings, and is a really handy way to parse strings.
Specialization of WvBufBase for unsigned char type buffers intended for use with raw memory buffers...
size_t used() const
Returns the number of elements in the buffer currently available for reading.
size_t personal_stack_size
Specifies the stack size to reserve for continue_select().
void force_select(bool readable, bool writable, bool isexception=false)
Use force_select() to force one or more particular modes (readable, writable, or isexception) to true...
virtual bool should_flush()
Returns true if we want to flush the output buffer right now.
void terminate_continue_select()
you MUST run this from your destructor if you use continue_select(), or very weird things will happen...
size_t strchr(int ch)
Returns the number of characters that would have to be read to find the first instance of the charact...
virtual void seterr(int _errnum)
Override seterr() from WvError so that it auto-closes the stream.
void zap()
Clears the buffer.
char * continue_getline(time_t wait_msec, int separator='\n', int readahead=1024)
This is a version of blocking_getline() that uses continue_select to avoid blocking other streams...
Various little string functions.
Base class for different address types, each of which will have the ability to convert itself to/from...
virtual bool isok() const
return true if the stream is actually usable right now
void * _callwrap(void *)
A wrapper that's compatible with WvCont, but calls the "real" callback.
virtual unsigned int addRef()=0
Indicate you are using this object.
void merge(Buffer &inbuf, size_t count)
Efficiently moves count bytes from the specified buffer into this one.
#define UUID_MAP_END
Marks the end of an interface map.
void setcallback(IWvStreamCallback _callfunc)
define the callback function for this stream, called whenever the callback() member is run...
virtual const WvAddr * src() const
get the remote address from which the last data block was received.
WvString is an implementation of a simple and efficient printable-string class.
time_t alarm_remaining()
return the number of milliseconds remaining before the alarm will go off; -1 means no alarm is set (i...
static void * yield(void *ret=0)
"return" from the current callback, giving value 'ret' to the person who called us.
size_t free() const
Returns the number of elements that the buffer can currently accept for writing.
void alarm(time_t msec_timeout)
set an alarm, ie.
IWvStreamCallback setwritecallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is writable.
virtual size_t read(void *buf, size_t count)
read a data block on the stream.
const T * get(size_t count)
Reads exactly the specified number of elements and returns a pointer to a storage location owned by t...
virtual void unread(WvBuf &outbuf, size_t count)
Puts data back into the stream's internal buffer.
virtual void noread()
Shuts down the reading side of the stream.
virtual bool isreadable()
Returns true if the stream is readable.
virtual size_t uread(void *buf, size_t count)
unbuffered I/O functions; these ignore the buffer, which is handled by read().
void unget(size_t count)
Ungets exactly the specified number of elements by returning them to the buffer for subsequent reads...