changeset 21:cbac7dfe2f24

interpolate process0, support permanent subproc
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Thu, 29 Sep 2022 16:33:42 +0100
parents a5dafc1364ed
children 38bab758e469
files bin/ix.py
diffstat 1 files changed, 70 insertions(+), 29 deletions(-) [+]
line wrap: on
line diff
--- a/bin/ix.py	Thu Sep 29 16:31:28 2022 +0100
+++ b/bin/ix.py	Thu Sep 29 16:33:42 2022 +0100
@@ -27,13 +27,24 @@
                           FOO)
 
 def process(options,buf,filename,offset,length,whole):
-  global CMD_PROC, BINOUT, TMPFILENAME, TMPFILE
+    try:
+      process0(options,buf,filename,offset,length,whole)
+    except Exception as e:
+      if options.debug:
+        import traceback
+        traceback.print_exc(file=sys.stderr)
+      else:
+        print("Process fail: %s, input line:\n %s"%(e,l),
+              file=sys.stderr,end='')
+      exit(3)
+
+def process0(options,buf,filename,offset,length,whole):
+  global TMPFILENAME, TMPFILE
   if options.save:
     (tf,TMPFILENAME)=tempfile.mkstemp()
     TMPFILE=open(tf,mode='wb')
-  if options.cmd:
-    CMD_PROC=Popen(shlex.split(options.cmd),stdin=PIPE,bufsize=0)
-    BINOUT=CMD_PROC.stdin
+  if options.cmd and not options.process:
+    launch(options.cmd)
   process1(options,buf,filename,offset,length,whole)
   if options.save:
     TMPFILE.close()
@@ -44,20 +55,28 @@
       BINOUT.write(bytes(TMPFILENAME,'utf-8'))
       BINOUT.write(b"\n")
   if options.cmd:
-    # Wind up subproc
-    BINOUT.close()
-    if CMD_PROC.wait()!=0:    # could/should be async?
-      print("subproc of %s:%s:%s failed with %s"%(length,offset,filename,
-                                                  CMD_PROC.returncode),
-            file=sys.stderr)
+    if not options.process:
+      windup(filename,options,length)
     if options.save:
-      # not if async?
       os.unlink(TMPFILENAME)
       TMPFILENAME=None
   elif options.save:
     print("%s will need to be deleted"%TMPFILENAME,file=sys.stderr)
     TMPFILENAME=None
 
+def launch(cmd):
+  global CMD_PROC, BINOUT
+  CMD_PROC=Popen(shlex.split(cmd),stdin=PIPE,bufsize=0)
+  BINOUT=CMD_PROC.stdin
+
+def windup(length,offset,filename):
+  # Wind up subproc
+  BINOUT.close()
+  if CMD_PROC.wait()!=0:    # could/should be async?
+    print("subproc of %s:%s:%s failed with %s"%(length,offset,filename,
+                                                CMD_PROC.returncode),
+          file=sys.stderr)
+  
 def _output_tmpfile(buf):
   TMPFILE.write(buf)
 
@@ -113,6 +132,20 @@
   state=0
   tr=None # Was this record truncated?
   bl=None # for HTTP Content-Length for the length of the body?
+  # Could we make this faster by working purely within the cb memoryview?
+  # It would be messy, but avoid copying huge amounts
+  # The outer loop would just be something like
+  #   clbv=memoryview(bytearray(b"Content-Length: "))
+  #   i=s=0
+  #   while i<ll:
+  #     if cb[i]==10: # need to handle \r\n
+  #       L=cb[s:i]
+  #       s=i=i+1
+  #       if L[:16]==clbv:
+  #         wl=int(L[16:])
+  #     else:
+  #       i+=1
+  #
   with io.BytesIO(cb) as clear_text:
     for L in clear_text:
       if state==0:
@@ -136,12 +169,19 @@
         wl -= len(L)
         if not (L==b"" or L.startswith(b"\r")):
           # Non-blank, it's a header
-          if bl is None and L.startswith(b"Content-Length: "):
-            bl=int(L[16:].rstrip())
+          (h,_,v)=L.partition(b": ")
+          if bl is None and (h==b"Content-Length"):
+            bl=int(v)
           if options.headers:
-            _output(L)
+            if isinstance(options.headers,dict):
+              if h in options.headers:
+                options.headers[h]=v
+            else:
+              _output(L)
         else:
           # Blank line, HTTP header is finished
+          if isinstance(options.headers,dict):
+            _output(bytes(str(options.headers),'utf-8'))
           if not options.body:
             return
           if options.headers:
@@ -182,11 +222,13 @@
   parser.add_argument('-d','--debug',help='Debug output',action='store_true')
   parser.add_argument('-w','--warc',help='output WARC headers',
                       action='store_true')
-  parser.add_argument('-h','--headers',help='output HTTP headers',
-                      action='store_true')
+  parser.add_argument('-h','--headers',help='process HTTP headers: collect into dict with named values (,-separated) if arg present, else output',
+                      nargs='?',default=None,const=True)
   parser.add_argument('-b','--body',help='output HTTP body',
                       action='store_true')
   parser.add_argument('-c','--cmd',help='pipes each result thru CMD')
+  parser.add_argument('-p','--process',help='with -c, launches CMD only once',
+                      action='store_true')
   parser.add_argument('-m','--module.function',help='module.function to call with a stream'),
   parser.add_argument('-s','--save',action='store_true',
                       help="write to a temporary file and output the name")
@@ -221,6 +263,8 @@
     # We have to enforce our own check..
     if pa.offset is None or pa.filename is None:
       parser.error("length, offset and filename must all be supplied together")
+  if isinstance(pa.headers,str):
+    pa.headers=dict((bytes(k,'utf-8'),None) for k in pa.headers.split(','))
  
   buf=bytearray(128*1024*1024)
 
@@ -232,6 +276,9 @@
     _output = _output_subproc
   else:
     _output = _output_stdout
+  if pa.cmd and pa.process:
+      launch(pa.cmd)
+  # three different ways to process
   if pa.index:
     CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/(warc|crawldiagnostics)/(.*\.gz)"') # no robotstxt yet...
     for l in sys.stdin:
@@ -241,18 +288,9 @@
           continue
         print("index line problem: \"%s\""%l,file=sys.stderr,end='')
         exit(2)
-      f=pa.fpath%(m[3:7])
-      try:
-        process(pa,buf,f,
-                int(m[2]),int(m[1]),whole)
-      except Exception as e:
-        if pa.debug:
-          import traceback
-          traceback.print_exc(file=sys.stderr)
-        else:
-          print("Process fail: %s, input line:\n %s"%(e,l),
-                file=sys.stderr,end='')
-        exit(3)
+      filename=pa.fpath%(m[3:7])
+      process(pa,buf,filename,
+              int(offset:=m[2]),int(length:=m[1]),whole)
   elif pa.length is not None:
     print(pa.filename,file=sys.stderr)
     process(pa,buf,pa.fpath%tuple(pa.filename.split('/')),
@@ -269,6 +307,9 @@
         parser.error('Invalid input line: %s\n "%s"'%(e,l))
       process(pa,buf,pa.fpath%tuple(filename.split('/')),
               offset,length,whole)
-
+  # processing done one way or another
+  if pa.cmd and pa.process:
+    windup(length,offset,filename)
+  # if pa.save and pa.process, deleting temp files is down to cmd
 if __name__ == "__main__":
     main()