diff src/event-msw.c @ 274:ca9a9ec9c1c1 r21-0b35

Import from CVS: tag r21-0b35
author cvs
date Mon, 13 Aug 2007 10:29:42 +0200
parents c5d627a313b1
children 6330739388db
line wrap: on
line diff
--- a/src/event-msw.c	Mon Aug 13 10:28:54 2007 +0200
+++ b/src/event-msw.c	Mon Aug 13 10:29:42 2007 +0200
@@ -46,6 +46,7 @@
 #include "device.h"
 #include "events.h"
 #include "frame.h"
+#include "lstream.h"
 #include "process.h"
 #include "redisplay.h"
 #include "sysproc.h"
@@ -54,6 +55,8 @@
 
 #include "events-mod.h"
 
+#include <errno.h>
+
 #ifdef BROKEN_CYGWIN
 int WINAPI      DdeCmpStringHandles (HSZ, HSZ);
 HDDEDATA WINAPI DdeCreateDataHandle (DWORD, LPBYTE, DWORD, DWORD, HSZ,
@@ -102,7 +105,10 @@
 #define MAX_WAITABLE (MAXIMUM_WAIT_OBJECTS - 1)
 
 /* List of mswindows waitable handles. */
-static HANDLE mswindows_waitable[MAX_WAITABLE];
+static HANDLE mswindows_waitable_handles[MAX_WAITABLE];
+
+/* Number of wait handles */
+static int mswindows_waitable_count=0;
 
 /* Count of quit chars currently in the queue */
 /* Incremented in WM_[SYS]KEYDOWN handler in the mswindows_wnd_proc()
@@ -116,9 +122,6 @@
 int mswindows_mouse_button_max_skew_y;
 int mswindows_mouse_button_tolerance;
 
-/* Number of wait handles */
-static int mswindows_waitable_count=0;
-
 /* This is the event signaled by the event pump.
    See mswindows_pump_outstanding_events for comments */
 static Lisp_Object mswindows_error_caught_in_modal_loop;
@@ -126,6 +129,441 @@
 
 /* Count of wound timers */
 static int mswindows_pending_timers_count;
+
+/************************************************************************/
+/*                Pipe instream - reads process output                  */
+/************************************************************************/
+
+#define PIPE_READ_DELAY 20
+
+#define HANDLE_TO_USID(h) ((USID)(h))
+
+#define NTPIPE_SLURP_STREAM_DATA(stream) \
+  LSTREAM_TYPE_DATA (stream, ntpipe_slurp)
+
+struct ntpipe_slurp_stream
+{
+  LPARAM user_data;	/* Any user data stored in the stream object	 */
+  HANDLE hev_thread;	/* Our thread blocks on this, signaled by caller */
+			/* This is a manual-reset object. 		 */
+  HANDLE hev_caller;	/* Caller blocks on this, and we signal it	 */
+			/* This is a manual-reset object. 		 */
+  HANDLE hev_unsleep;	/* Pipe read delay is canceled if this is set	 */
+			/* This is a manual-reset object. 		 */
+  HANDLE hpipe;		/* Pipe read end handle.			 */
+  HANDLE hthread;	/* Reader thread handle.			 */
+  BYTE   onebyte;	/* One byte buffer read by thread		 */
+  DWORD  die_p;		/* Thread must exit ASAP if non-zero		 */
+  BOOL   eof_p   : 1;	/* Set when thread saw EOF			 */
+  BOOL   error_p : 1;   /* Read error other than EOF/broken pipe	 */
+};
+
+DEFINE_LSTREAM_IMPLEMENTATION ("ntpipe-input", lstream_ntpipe_slurp,
+			       sizeof (struct ntpipe_slurp_stream));
+
+static DWORD WINAPI
+slurp_thread (LPVOID vparam)
+{
+  struct ntpipe_slurp_stream *s = (struct ntpipe_slurp_stream*)vparam;
+
+  for (;;)
+    {
+      /* Read one byte from the pipe */
+      DWORD actually_read;
+      if (!ReadFile (s->hpipe, &s->onebyte, 1, &actually_read, NULL))
+	{
+	  DWORD err = GetLastError ();
+	  if (err == ERROR_BROKEN_PIPE || err == ERROR_NO_DATA)
+	    s->eof_p = TRUE;
+	  else
+	    s->error_p = TRUE;
+	}
+      else if (actually_read == 0)
+	s->eof_p = TRUE;
+
+      /* We must terminate on an error or eof */
+      if (s->eof_p || s->error_p)
+	InterlockedIncrement (&s->die_p);
+
+      /* Before we notify caller, we unsignal our event. */
+      ResetEvent (s->hev_thread);
+
+      /* Now we got something to notify caller, either a byte or an
+	 error/eof indication. Before we do, allow internal pipe
+	 buffer to accumulate little bit more data. 
+	 Reader function pulses this event before waiting for
+	 a character, to avoid pipde delay, and to get the byte
+	 immediately. */
+      if (!s->die_p)
+	WaitForSingleObject (s->hev_unsleep, PIPE_READ_DELAY);
+
+      /* Either make event loop generate a process event, or
+	 inblock reader */
+      SetEvent (s->hev_caller);
+
+      /* Cleanup and exit if we're shot off */
+      if (s->die_p)
+	break;
+
+      /* Block until the client finishes with retireving the rest of
+	 pipe data */
+      WaitForSingleObject (s->hev_thread, INFINITE);
+    }
+
+  return 0;
+}
+
+static Lisp_Object
+make_ntpipe_input_stream (HANDLE hpipe, LPARAM param)
+{
+  Lisp_Object obj;
+  Lstream *lstr = Lstream_new (lstream_ntpipe_slurp, "r");
+  struct ntpipe_slurp_stream* s = NTPIPE_SLURP_STREAM_DATA (lstr);
+  DWORD thread_id_unused;
+
+  /* We deal only with pipes, for we're using PeekNamedPipe api */
+  assert (GetFileType (hpipe) == FILE_TYPE_PIPE);
+
+  s->die_p = 0;
+  s->eof_p = FALSE;
+  s->error_p = FALSE;
+  s->hpipe = hpipe;
+  s->user_data = param;
+
+  /* Create reader thread. This could fail, so do not 
+     create events until thread is created */
+  s->hthread = CreateThread (NULL, 0, slurp_thread, (LPVOID)s,
+			     CREATE_SUSPENDED, &thread_id_unused);
+  if (s->hthread == NULL)
+    {
+      Lstream_delete (lstr);
+      return Qnil;
+    }
+
+  /* hev_thread is a manual-reset event, initially signaled */
+  s->hev_thread = CreateEvent (NULL, TRUE, TRUE, NULL);
+  /* hev_caller is a manual-reset event, initially nonsignaled */
+  s->hev_caller = CreateEvent (NULL, TRUE, FALSE, NULL);
+  /* hev_unsleep is a manual-reset event, initially nonsignaled */
+  s->hev_unsleep = CreateEvent (NULL, TRUE, FALSE, NULL);
+
+  /* Now let it go */
+  ResumeThread (s->hthread);
+
+  lstr->flags |= LSTREAM_FL_CLOSE_AT_DISKSAVE;
+  XSETLSTREAM (obj, lstr);
+  return obj;
+}
+
+static LPARAM
+get_ntpipe_input_stream_param (Lstream *stream)
+{
+  struct ntpipe_slurp_stream* s = NTPIPE_SLURP_STREAM_DATA(stream);
+  return s->user_data;
+}
+
+static HANDLE
+get_ntpipe_input_stream_waitable (Lstream *stream)
+{
+  struct ntpipe_slurp_stream* s = NTPIPE_SLURP_STREAM_DATA(stream);
+  return s->hev_caller;
+}
+
+static int 
+ntpipe_slurp_reader (Lstream *stream, unsigned char *data, size_t size)
+{
+  /* This function must be called from the main thread only */
+  struct ntpipe_slurp_stream* s = NTPIPE_SLURP_STREAM_DATA(stream);
+
+  if (!s->die_p)
+    {
+      DWORD wait_result;
+      /* Disallow pipe read delay for the thread: we need a character ASAP */
+      SetEvent (s->hev_unsleep);
+  
+      /* Check if we have a character ready. Give it a short delay, for
+	 the thread to awake from pipe delay, just ion case*/
+      wait_result = WaitForSingleObject (s->hev_caller, 2);
+
+      /* Revert to the normal sleep behavior. */
+      ResetEvent (s->hev_unsleep);
+
+      /* If there's no byte buffered yet, give up */
+      if (wait_result == WAIT_TIMEOUT)
+	{
+	  errno = EAGAIN;
+	  return -1;
+	}
+    }
+
+  /* Reset caller unlock event now, as we've handled the pending
+     process output event */
+  ResetEvent (s->hev_caller);
+
+  /* It is now safe to do anything with contents of S, except for
+     changing s->die_p, which still should be interlocked */
+
+  if (s->eof_p)
+    return 0;
+  if (s->error_p || s->die_p)
+    return -1;
+
+  /* Ok, there were no error neither eof - we've got a byte from the pipe */
+  *(data++) = s->onebyte;
+  --size;
+
+  if (size > 0)
+    {
+      DWORD bytes_available, bytes_read;
+
+      /* If the api call fails, return at least one byte already read.
+	 ReadFile in thread will return error */
+      if (!PeekNamedPipe (s->hpipe, NULL, 0, NULL, &bytes_available, NULL))
+	return 1;
+
+      /* Fetch available bytes. The same consideration applies, so do
+         not check for errors. ReadFile in the thread will fail if the
+         next call fails. */
+      ReadFile (s->hpipe, data, min (bytes_available, size), &bytes_read, NULL);
+
+      /* Now we can unblock thread, so it attempts to read more */
+      SetEvent (s->hev_thread);
+      return bytes_read + 1;
+    }
+  else
+    {
+      SetEvent (s->hev_thread);
+      return 1;
+    }
+}
+
+static int 
+ntpipe_slurp_closer (Lstream *stream)
+{
+  /* This function must be called from the main thread only */
+  struct ntpipe_slurp_stream* s = NTPIPE_SLURP_STREAM_DATA(stream);
+
+  /* Force thread to stop */
+  InterlockedIncrement (&s->die_p);
+
+  /* Break the pipe, in case the thread still blocked on read */
+  CloseHandle (s->hpipe);
+
+  /* Set events which could possibly block slurper */
+  SetEvent (s->hev_unsleep);
+  SetEvent (s->hev_thread);
+
+  /* Wait while thread terminates */
+  WaitForSingleObject (s->hthread, INFINITE);
+  CloseHandle (s->hthread);
+
+  /* Destroy events */
+  CloseHandle (s->hev_thread);
+  CloseHandle (s->hev_caller);
+  CloseHandle (s->hev_unsleep);
+
+  return 0;
+}
+
+static void
+init_slurp_stream (void)
+{
+  LSTREAM_HAS_METHOD (ntpipe_slurp, reader);
+  LSTREAM_HAS_METHOD (ntpipe_slurp, closer);
+}
+
+/************************************************************************/
+/*                Pipe outstream - writes process input                 */
+/************************************************************************/
+
+#define MAX_FLUSH_TIME 500
+
+#define NTPIPE_SHOVE_STREAM_DATA(stream) \
+  LSTREAM_TYPE_DATA (stream, ntpipe_shove)
+
+struct ntpipe_shove_stream
+{
+  LPARAM user_data;	/* Any user data stored in the stream object	 */
+  HANDLE hev_thread;	/* Our thread blocks on this, signaled by caller */
+			/* This is an auto-reset object. 		 */
+  HANDLE hpipe;		/* Pipe write end handle.			 */
+  HANDLE hthread;	/* Reader thread handle.			 */
+  LPVOID buffer;	/* Buffer being written				 */
+  DWORD  size;		/* Number of bytes to write			 */
+  DWORD  die_p;		/* Thread must exit ASAP if non-zero		 */
+  DWORD  idle_p;	/* Non-zero if thread is waiting for job	 */
+  BOOL   error_p : 1;   /* Read error other than EOF/broken pipe	 */
+  BOOL   blocking_p : 1;/* Last write attempt would cause blocking	 */
+};
+
+DEFINE_LSTREAM_IMPLEMENTATION ("ntpipe-output", lstream_ntpipe_shove,
+			       sizeof (struct ntpipe_shove_stream));
+
+static DWORD WINAPI
+shove_thread (LPVOID vparam)
+{
+  struct ntpipe_shove_stream *s = (struct ntpipe_shove_stream*) vparam;
+
+  for (;;)
+    {
+      DWORD bytes_written; 
+
+      /* Block on event and wait for a job */
+      InterlockedIncrement (&s->idle_p);
+      WaitForSingleObject (s->hev_thread, INFINITE);
+
+      if (s->die_p)
+	break;
+
+      /* Write passed buffer */
+      if (!WriteFile (s->hpipe, s->buffer, s->size, &bytes_written, NULL)
+	  || bytes_written != s->size)
+	{
+	  s->error_p = TRUE;
+	  InterlockedIncrement (&s->die_p);
+	}
+
+      /* free it */
+      LocalFree ((HLOCAL)s->buffer);
+
+      if (s->die_p)
+	break;
+    }
+
+  return 0;
+}
+
+static Lisp_Object
+make_ntpipe_output_stream (HANDLE hpipe, LPARAM param)
+{
+  Lisp_Object obj;
+  Lstream *lstr = Lstream_new (lstream_ntpipe_shove, "w");
+  struct ntpipe_shove_stream* s = NTPIPE_SHOVE_STREAM_DATA (lstr);
+  DWORD thread_id_unused;
+
+  s->die_p = 0;
+  s->error_p = FALSE;
+  s->hpipe = hpipe;
+  s->user_data = param;
+
+  /* Create reader thread. This could fail, so do not 
+     create the event until thread is created */
+  s->hthread = CreateThread (NULL, 0, shove_thread, (LPVOID)s,
+			     CREATE_SUSPENDED, &thread_id_unused);
+  if (s->hthread == NULL)
+    {
+      Lstream_delete (lstr);
+      return Qnil;
+    }
+
+  /* hev_thread is an auto-reset event, initially nonsignaled */
+  s->hev_thread = CreateEvent (NULL, FALSE, FALSE, NULL);
+
+  /* Now let it go */
+  ResumeThread (s->hthread);
+
+  lstr->flags |= LSTREAM_FL_CLOSE_AT_DISKSAVE;
+  XSETLSTREAM (obj, lstr);
+  return obj;
+}
+
+static LPARAM
+get_ntpipe_output_stream_param (Lstream *stream)
+{
+  struct ntpipe_shove_stream* s = NTPIPE_SHOVE_STREAM_DATA(stream);
+  return s->user_data;
+}
+
+static int
+ntpipe_shove_writer (Lstream *stream, const unsigned char *data, size_t size)
+{
+  struct ntpipe_shove_stream* s = NTPIPE_SHOVE_STREAM_DATA(stream);
+
+  if (s->error_p)
+    return -1;
+
+  s->blocking_p = !s->idle_p;
+  if (s->blocking_p)
+    return 0;
+
+  /* Make a copy of data to be written. We intentionally avoid using
+     xalloc/xfree, because gnu malloc is not thread-safe */
+  s->buffer = (LPVOID) LocalAlloc (LMEM_FIXED, size);
+  if (s->buffer == NULL)
+    return -1;
+  memcpy (s->buffer, data, size);
+  s->size = size;
+
+  /* Start output */
+  InterlockedDecrement (&s->idle_p);
+  SetEvent (s->hev_thread);
+  return size;
+}
+
+static int
+ntpipe_shove_was_blocked_p (Lstream *stream)
+{
+  struct ntpipe_shove_stream* s = NTPIPE_SHOVE_STREAM_DATA(stream);
+  return s->blocking_p;
+}
+
+static int
+ntpipe_shove_flusher (Lstream *stream)
+{
+  struct ntpipe_shove_stream* s = NTPIPE_SHOVE_STREAM_DATA(stream);
+  int i;
+
+  if (s->error_p)
+    return -1;
+
+  /* We do not want to be blocked forever. Instead, we wait
+     about 0.5 second for output to finish. If this does
+     not help, we just return flush failure. */
+  for (i = 0; i < MAX_FLUSH_TIME / 50; ++i)
+    {
+      if (s->idle_p)
+	return 0;
+      Sleep (50);
+    }
+  return -1;
+}
+
+static int
+ntpipe_shove_closer (Lstream *stream)
+{
+  struct ntpipe_shove_stream* s = NTPIPE_SHOVE_STREAM_DATA(stream);
+
+  /* Force thread stop */
+  InterlockedIncrement (&s->die_p);
+
+  /* Close pipe handle, possibly breaking it */
+  CloseHandle (s->hpipe);
+
+  /* Thread will end upon unblocking */
+  SetEvent (s->hev_thread);
+
+  /* Wait while thread terminates */
+  WaitForSingleObject (s->hthread, INFINITE);
+  CloseHandle (s->hthread);
+
+  /* Destroy the event */
+  CloseHandle (s->hev_thread);
+
+  return 0;
+}
+
+static void
+init_shove_stream (void)
+{
+  LSTREAM_HAS_METHOD (ntpipe_shove, writer);
+  LSTREAM_HAS_METHOD (ntpipe_shove, flusher);
+  LSTREAM_HAS_METHOD (ntpipe_shove, was_blocked_p);
+  LSTREAM_HAS_METHOD (ntpipe_shove, closer);
+}
+
+/************************************************************************/
+/*                     Dispatch queue management                        */
+/************************************************************************/
 
 static int
 mswindows_user_event_p (struct Lisp_Event* sevt)
@@ -136,10 +574,6 @@
 	  || sevt->event_type == dnd_drop_event);
 }
 
-/************************************************************************/
-/*                     Dispatch queue management                        */
-/************************************************************************/
-
 /* 
  * Add an emacs event to the proper dispatch queue
  */
@@ -173,6 +607,21 @@
 }
 
 static void
+mswindows_enqueue_process_event (struct Lisp_Process* p)
+{
+  Lisp_Object emacs_event = Fmake_event (Qnil, Qnil);
+  struct Lisp_Event* event = XEVENT (emacs_event);
+  Lisp_Object process;
+  XSETPROCESS (process, p);
+
+  event->event_type = process_event;
+  event->timestamp  = GetTickCount ();
+  event->event.process.process = process;
+
+  mswindows_enqueue_dispatch_event (emacs_event);
+}
+
+static void
 mswindows_enqueue_mouse_button_event (HWND hwnd, UINT message, POINTS where, DWORD when)
 {
 
@@ -307,8 +756,44 @@
     }
   return Qnil;
 }
+
+/************************************************************************/
+/*                     Waitable handles manipulation                    */
+/************************************************************************/
+static int
+find_waitable_handle (HANDLE h)
+{
+  int i;
+  for (i = 0; i < mswindows_waitable_count; ++i)
+    if (mswindows_waitable_handles[i] == h)
+      return i;
 
+  return -1;
+}
 
+static BOOL
+add_waitable_handle (HANDLE h)
+{
+  assert (find_waitable_handle (h) < 0);
+  if (mswindows_waitable_count == MAX_WAITABLE)
+    return FALSE;
+
+  mswindows_waitable_handles [mswindows_waitable_count++] = h;
+  return TRUE;
+}
+
+static void
+remove_waitable_handle (HANDLE h)
+{
+  int ix = find_waitable_handle (h);
+  if (ix < 0)
+    return;
+
+  mswindows_waitable_handles [ix] = 
+    mswindows_waitable_handles [--mswindows_waitable_count];
+}
+
+
 /************************************************************************/
 /*                             Event pump                               */
 /************************************************************************/
@@ -455,11 +940,9 @@
  * fetching WM_TIMER messages. Instead of trying to fetch a WM_TIMER
  * which will never come when there are no pending timers, which leads
  * to deadlock, we simply signal an error.
- *
- * The implementation does not honor user_p by design.
  */
 static void
-mswindows_need_event_in_modal_loop (int user_p, int badly_p)
+mswindows_need_event_in_modal_loop (int badly_p)
 {
   MSG msg;
 
@@ -489,26 +972,17 @@
  * This drains the event queue and fills up two internal queues until
  * an event of a type specified by USER_P is retrieved.
  *
- * If user_p, then the function drains until the first user event, or
- * the first non-user event if there no user events. Otherwise, If
- * not user_p, it does not give preference to user events.
- *
- * If badly_p, then the function does not return until an event is
- * available.
- *
- * The code does not rely on MsgWaitForMultipleObjects preference for
- * messages over waitable handles.
  *
  * Used by emacs_mswindows_event_pending_p and emacs_mswindows_next_event
  */
 static void
-mswindows_need_event (int user_p, int badly_p)
+mswindows_need_event (int badly_p)
 {
   int active;
 
   if (mswindows_in_modal_loop)
     {
-      mswindows_need_event_in_modal_loop (user_p, badly_p);
+      mswindows_need_event_in_modal_loop (badly_p);
       return;
     }
 
@@ -516,17 +990,12 @@
      quit char when called from quit_p */
   mswindows_drain_windows_queue ();
 
-  while (NILP (mswindows_u_dispatch_event_queue) &&
-	 (user_p || NILP (mswindows_s_dispatch_event_queue)))
+  while (NILP (mswindows_u_dispatch_event_queue)
+	 && NILP (mswindows_s_dispatch_event_queue))
   {
-    /* If we already have an event, we've got someting to return - no wait! */
-    if (!NILP (mswindows_u_dispatch_event_queue)
-	|| !NILP (mswindows_s_dispatch_event_queue))
-      badly_p = 0;
-    
-    /* Now try getting a message */
+    /* Now try getting a message or process event */
     active = MsgWaitForMultipleObjects (mswindows_waitable_count,
-					mswindows_waitable,
+					mswindows_waitable_handles,
 					FALSE, badly_p ? INFINITE : 0,
 					QS_ALLINPUT);
 
@@ -548,13 +1017,27 @@
       }
     else
       {
-	/* XXX FIXME: We should do some kind of round-robin scheme to ensure fairness */
-	int waitable = active - WAIT_OBJECT_0;
-	assert(0);	/* #### */
+	int ix = active - WAIT_OBJECT_0;
+	/* First, try to find which process' ouptut has signaled */
+	struct Lisp_Process *p = 
+	  get_process_from_usid (HANDLE_TO_USID (mswindows_waitable_handles[ix]));
+	if (p != NULL)
+	  {
+	    /* Found a signaled process input handle */
+	    mswindows_enqueue_process_event (p);
+	  }
+	else
+	  {
+	    /* None. This means that the process handle itself has signaled.
+	       Remove the handle from the wait vector, and make status_ntoify
+	       note the exited process */
+	    CloseHandle (mswindows_waitable_handles[ix]);
+	    mswindows_waitable_handles [ix] =
+	      mswindows_waitable_handles [--mswindows_waitable_count];
+	    kick_status_notify ();
+	  }
       }
   } /* while */
-
-  return;
 }
 
 /************************************************************************/
@@ -1008,8 +1491,8 @@
 	  FRAME_PIXHEIGHT(frame) = rect.bottom;
 
 	  pixel_to_real_char_size (frame, rect.right, rect.bottom,
-				   &MSWINDOWS_FRAME_CHARWIDTH (frame),
-				   &MSWINDOWS_FRAME_CHARHEIGHT (frame));
+				   &FRAME_MSWINDOWS_CHARWIDTH (frame),
+				   &FRAME_MSWINDOWS_CHARHEIGHT (frame));
 
 	  pixel_to_char_size (frame, rect.right, rect.bottom, &columns, &rows);
 	  change_frame_size (frame, rows, columns, 1);
@@ -1429,16 +1912,8 @@
 Lisp_Object
 mswindows_find_console (HWND hwnd)
 {
-  Lisp_Object concons;
-
-  CONSOLE_LOOP (concons)
-    {
-      Lisp_Object console = XCAR (concons);
-      /* We only support one console so this must be it */
-      return console;
-    }
-
-  return Qnil;
+  /* We only support one console */
+  return XCAR (Vconsole_list);
 }
 
 /*
@@ -1512,8 +1987,7 @@
 static int
 emacs_mswindows_event_pending_p (int user_p)
 {
-  mswindows_need_event (user_p, 0);
-
+  mswindows_need_event (0);
   return (!NILP (mswindows_u_dispatch_event_queue)
 	  || (!user_p && !NILP (mswindows_s_dispatch_event_queue)));
 }
@@ -1526,10 +2000,8 @@
 {
   Lisp_Object event, event2;
 
-  /* Give strong preference to user events */
-  mswindows_need_event (1, 1);
+  mswindows_need_event (1);
 
-  /* XXX Copied from event-Xt.c */
   event = mswindows_dequeue_dispatch_event (!NILP(mswindows_u_dispatch_event_queue));
   XSETEVENT (event2, emacs_event);
   Fcopy_event (event, event2);
@@ -1593,14 +2065,70 @@
   }
 }
 
+static HANDLE
+get_process_input_waitable (struct Lisp_Process *process)
+{
+  Lisp_Object instr, outstr;
+  get_process_streams (process, &instr, &outstr);
+  assert (!NILP (instr));
+  return get_ntpipe_input_stream_waitable (XLSTREAM (instr));
+}
+
+static HANDLE
+get_process_handle (struct Lisp_Process *p)
+{
+  /* #### The guess is that cygwin uses the same algorithm for
+     computing pids: negate if less than zero, '95 case */
+  Lisp_Object process, pid;
+  XSETPROCESS (process, p);
+  pid = Fprocess_id (process);
+  if (INTP (pid))
+    {
+      HANDLE hproc;
+      int ipid = XINT (pid);
+      if (ipid < 0)
+	ipid = -ipid;
+      hproc = OpenProcess (SYNCHRONIZE, FALSE, ipid);
+      /* #### This is WRONG! The process migh have ended before we got here. */
+      /* assert  (hproc != NULL); */
+      /* Instead, we fake "a signaled hadle", which will trigger
+	 immediately upon entering the message loop */
+      if (hproc = NULL)
+	hproc = CreateEvent (NULL, TRUE, TRUE, NULL);
+      return hproc;
+    }
+  else
+    return NULL;
+}
+
 static void
 emacs_mswindows_select_process (struct Lisp_Process *process)
 {
+  HANDLE hev = get_process_input_waitable (process);
+  HANDLE hprocess;
+
+  if (!add_waitable_handle (hev))
+    error ("Too many active processes");
+
+  hprocess = get_process_handle (process);
+  if (hprocess)
+    {
+      if (!add_waitable_handle (hprocess))
+	{
+	  remove_waitable_handle (hev);
+	  CloseHandle (hprocess);
+	  error ("Too many active processes");
+	}
+    }
 }
 
 static void
 emacs_mswindows_unselect_process (struct Lisp_Process *process)
 {
+  /* Process handle is removed in the event loop as soon
+     as it is signaled, so don't bother here about it */
+  HANDLE hev = get_process_input_waitable (process);
+  remove_waitable_handle (hev);
 }
 
 static void
@@ -1616,7 +2144,9 @@
 static void
 emacs_mswindows_quit_p (void)
 {
-  mswindows_need_event (1, 0);
+  /* Drain windows queue. This sets up number of quit
+     characters in in the queue */
+  mswindows_drain_windows_queue ();
 
   if (mswindows_quit_chars_count > 0)
     {
@@ -1638,6 +2168,69 @@
     }
 }
 
+USID
+emacs_mswindows_create_stream_pair (void* inhandle, void* outhandle,
+				    Lisp_Object* instream,
+				    Lisp_Object* outstream,
+				    int flags)
+{
+  /* Handles for streams */
+  HANDLE hin, hout;
+  /* fds. These just stored along with the streams, and are closed in
+     delete stream pair method, because we need to handle fake unices
+     here. */
+  int fdi, fdo;
+
+  /* Decode inhandle and outhandle. Their meaning depends on
+     the process implementation being used. */
+#if defined (HAVE_WIN32_PROCESSES)
+  /* We're passed in Windows handles. That's what we like most... */
+  hin = (HANDLE) inhandle;
+  hout = (HANDLE) outhandle;
+  fdi = fdo = -1;
+#elif defined (HAVE_UNIX_PROCESSES)
+  /* We are passed UNIX fds. This must be Cygwin.
+     Fetch os handles */
+  hin = inhandle >= 0 ? get_osfhandle ((int)inhandle) : INVALID_HANDLE_VALUE;
+  hout = outhandle >= 0 ? get_sfhandle ((int)outhandle) : INVALID_HANDLE_VALUE;
+#else
+#error "So, WHICH kind of processes do you want?"
+#endif
+
+  *instream = (hin != INVALID_HANDLE_VALUE
+	       ? make_ntpipe_input_stream (hin, fdi)
+	       : Qnil);
+
+  *outstream = (hout != INVALID_HANDLE_VALUE
+		? make_ntpipe_output_stream (hout, fdo)
+		: Qnil);
+
+  return (NILP (*instream) ? USID_ERROR
+	  : HANDLE_TO_USID (get_ntpipe_input_stream_waitable (XLSTREAM (*instream))));
+}
+
+USID
+emacs_mswindows_delete_stream_pair (Lisp_Object instream,
+					 Lisp_Object outstream)
+{
+  /* Oh nothing special here for Win32 at all */
+#if defined (HAVE_UNIX_PROCESSES)
+  int in = (NILP(instream) ? -1
+	    : get_ntpipe_input_stream_param (XLSTREAM (instream)));
+  int out = (NILP(outstream) ? -1
+	     : get_ntpipe_output_stream_param (XLSTREAM (outstream)));
+
+  if (in >= 0)
+    close (in);
+  if (out != in && out >= 0)
+    close (out);
+#endif
+
+  return (NILP (instream) ? USID_DONTHASH
+	  : HANDLE_TO_USID (get_ntpipe_input_stream_waitable (XLSTREAM (instream))));
+}
+
+
 #ifndef HAVE_X_WINDOWS
 /* This is called from GC when a process object is about to be freed.
    If we've still got pointers to it in this file, we're gonna lose hard.
@@ -1690,6 +2283,8 @@
   mswindows_event_stream->select_process_cb 	= emacs_mswindows_select_process;
   mswindows_event_stream->unselect_process_cb	= emacs_mswindows_unselect_process;
   mswindows_event_stream->quit_p_cb		= emacs_mswindows_quit_p;
+  mswindows_event_stream->create_stream_pair_cb = emacs_mswindows_create_stream_pair;
+  mswindows_event_stream->delete_stream_pair_cb = emacs_mswindows_delete_stream_pair;
 
   DEFVAR_BOOL ("mswindows-dynamic-frame-resize", &mswindows_dynamic_frame_resize /*
 *Controls redrawing frame contents during mouse-drag or keyboard resize
@@ -1743,6 +2338,13 @@
 }
 
 void
+lstream_type_create_mswindows_selectable (void)
+{
+  init_slurp_stream ();
+  init_shove_stream ();
+}
+
+void
 init_event_mswindows_late (void)
 {
   event_stream = mswindows_event_stream;