changeset 119:bc958b776fb8

implement --cmd
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Wed, 16 Jun 2021 16:12:46 +0000
parents 551ff1de13d8
children d0b544e53dda
files bin/ix.py
diffstat 1 files changed, 55 insertions(+), 15 deletions(-) [+]
line wrap: on
line diff
--- a/bin/ix.py	Wed Jun 16 16:12:16 2021 +0000
+++ b/bin/ix.py	Wed Jun 16 16:12:46 2021 +0000
@@ -8,9 +8,9 @@
 
 Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.'''
 
-import sys, argparse, regex, os, shutil, io, gzip, time
+import sys, argparse, regex, os, shutil, io, gzip, time, shlex
 from isal import igzip
-#from subprocess import Popen, PIPE
+from subprocess import Popen, PIPE
 #import asyncio
 
 HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]')
@@ -24,7 +24,30 @@
     return HACK_USAGE.sub('\n             [ ( -x | length offset filename ) ]',
                           FOO)
 
-def process(options,buf,root,filename,offset,length,whole):
+def process(options,buf,filename,offset,length,whole):
+  global CMD_PROC, BINOUT
+  if options.cmd:
+    CMD_PROC=Popen(shlex.split(options.cmd),stdin=PIPE,bufsize=0)
+    BINOUT=CMD_PROC.stdin
+  process1(options,buf,filename,offset,length,whole)
+  if options.cmd:
+    # Wind up subproc
+    BINOUT.close()
+    if CMD_PROC.wait()!=0:    # could/should be async?
+      print("subproc of %s:%s:%s failed with %s"%(length,offset,filename,
+                                                  CMD_PROC.returncode),
+            file=sys.stderr)
+
+def _output_stdout(buf):
+  BINOUT.write(buf)
+
+def _output_subproc(buf):
+  toWrite=len(buf)
+  while toWrite>0:
+    toWrite -= BINOUT.write(buf)    
+
+def process1(options,buf,filename,offset,length,whole):
+  root=options.root
   rfn=root+filename
   if root!="/beegfs/common_crawl":
     # Support using ramdisk or other local disk as a faster cached
@@ -47,7 +70,7 @@
   if nb!=length:
     print("losing",file.name,length,nb,file=sys.stderr)
   if whole and options.zipped:
-    BINOUT.write(bv)
+    _output(bv)
     return
   gzip_chunk = io.BytesIO(bv)
   uv=memoryview(buf)[length:]
@@ -60,7 +83,7 @@
       ll+=l
     cb=memoryview(uv)[:ll]
     if whole:
-      BINOUT.write(cb)
+      _output(cb)
       return
   # Only output parts (0 = WARC header, 1 = HTTP header, 2 = body) that are wanted
   state=0
@@ -82,7 +105,7 @@
           state=1
           # Note we preserve the empty line
         if options.warc:
-          BINOUT.write(L)
+          _output(L)
         continue
       if state==1:
         # HTTP header
@@ -92,13 +115,13 @@
           if bl is None and L.startswith(b"Content-Length: "):
             bl=int(L[16:].rstrip())
           if options.headers:
-            BINOUT.write(L)
+            _output(L)
         else:
           # Blank line, HTTP header is finished
           if not options.body:
             return
           if options.headers:
-            BINOUT.write(L)
+            _output(L)
           state=2
           # The above is just for sanity, because we do _not_
           #  continue with the outer loop,
@@ -112,10 +135,11 @@
           balance=clear_text.tell()
           #print(balance,bl,wl,ll,ll-balance,file=sys.stderr)
           # Output whatever is left
-          BINOUT.write(cb[balance:balance+wl])
+          _output(cb[balance:balance+wl])
           return
 
 def main():
+  global _output
   parser = argparse.ArgumentParser(
     description='''Extract records from warc files given length, offset and file triples.
   Input one triple on command line, or
@@ -171,10 +195,10 @@
   buf=bytearray(128*1024*1024)
 
   whole=not (pa.warc or pa.headers or pa.body)
-  if pa.length is not None:
-    process(pa,buf,pa.root,FPAT%list(pa.filename.split('/')),
-            pa.offset,pa.length,whole)
-    exit(0)
+  if pa.cmd:
+    _output = _output_subproc
+  else:
+    _output = _output_stdout
   if pa.index:
     CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/warc/(.*\.gz)"')
     for l in sys.stdin:
@@ -183,8 +207,24 @@
         print("index line problem: \"%s\""%l.lstrip(),file=sys.stderr)
         exit(2)
       f=FPAT%(m[3:6])
-      process(pa,buf,pa.root,f,
+      process(pa,buf,f,
               int(m[2]),int(m[1]),whole)
-    exit(0)
+  elif pa.length is not None:
+    print(pa.filename,file=sys.stderr)
+    process(pa,buf,FPAT%tuple(pa.filename.split('/')),
+            pa.offset,pa.length,whole)
+  else:
+    print("Reading length, offset, filename tab-delimited triples from stdin...",
+          file=sys.stderr)
+    for l in sys.stdin:
+      try:
+        (length,offset,filename)=l.rstrip().split('\t')
+        length=int(length)
+        offset=int(offset)
+      except ValueError as e:
+        parser.error('Invalid input line: %s\n "%s"'%(e,l))
+      process(pa,buf,FPAT%tuple(filename.split('/')),
+              offset,length,whole)
+
 if __name__ == "__main__":
     main()