Mercurial > hg > cc > cirrus_work
changeset 21:cbac7dfe2f24
interpolate process0, support permanent subproc
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Thu, 29 Sep 2022 16:33:42 +0100 |
parents | a5dafc1364ed |
children | 38bab758e469 |
files | bin/ix.py |
diffstat | 1 files changed, 70 insertions(+), 29 deletions(-) [+] |
line wrap: on
line diff
--- a/bin/ix.py Thu Sep 29 16:31:28 2022 +0100 +++ b/bin/ix.py Thu Sep 29 16:33:42 2022 +0100 @@ -27,13 +27,24 @@ FOO) def process(options,buf,filename,offset,length,whole): - global CMD_PROC, BINOUT, TMPFILENAME, TMPFILE + try: + process0(options,buf,filename,offset,length,whole) + except Exception as e: + if options.debug: + import traceback + traceback.print_exc(file=sys.stderr) + else: + print("Process fail: %s, input line:\n %s"%(e,l), + file=sys.stderr,end='') + exit(3) + +def process0(options,buf,filename,offset,length,whole): + global TMPFILENAME, TMPFILE if options.save: (tf,TMPFILENAME)=tempfile.mkstemp() TMPFILE=open(tf,mode='wb') - if options.cmd: - CMD_PROC=Popen(shlex.split(options.cmd),stdin=PIPE,bufsize=0) - BINOUT=CMD_PROC.stdin + if options.cmd and not options.process: + launch(options.cmd) process1(options,buf,filename,offset,length,whole) if options.save: TMPFILE.close() @@ -44,20 +55,28 @@ BINOUT.write(bytes(TMPFILENAME,'utf-8')) BINOUT.write(b"\n") if options.cmd: - # Wind up subproc - BINOUT.close() - if CMD_PROC.wait()!=0: # could/should be async? - print("subproc of %s:%s:%s failed with %s"%(length,offset,filename, - CMD_PROC.returncode), - file=sys.stderr) + if not options.process: + windup(filename,options,length) if options.save: - # not if async? os.unlink(TMPFILENAME) TMPFILENAME=None elif options.save: print("%s will need to be deleted"%TMPFILENAME,file=sys.stderr) TMPFILENAME=None +def launch(cmd): + global CMD_PROC, BINOUT + CMD_PROC=Popen(shlex.split(cmd),stdin=PIPE,bufsize=0) + BINOUT=CMD_PROC.stdin + +def windup(length,offset,filename): + # Wind up subproc + BINOUT.close() + if CMD_PROC.wait()!=0: # could/should be async? + print("subproc of %s:%s:%s failed with %s"%(length,offset,filename, + CMD_PROC.returncode), + file=sys.stderr) + def _output_tmpfile(buf): TMPFILE.write(buf) @@ -113,6 +132,20 @@ state=0 tr=None # Was this record truncated? bl=None # for HTTP Content-Length for the length of the body? + # Could we make this faster by working purely within the cb memoryview? + # It would be messy, but avoid copying huge amounts + # The outer loop would just be something like + # clbv=memoryview(bytearray(b"Content-Length: ")) + # i=s=0 + # while i<ll: + # if cb[i]==10: # need to handle \r\n + # L=cb[s:i] + # s=i=i+1 + # if L[:16]==clbv: + # wl=int(L[16:]) + # else: + # i+=1 + # with io.BytesIO(cb) as clear_text: for L in clear_text: if state==0: @@ -136,12 +169,19 @@ wl -= len(L) if not (L==b"" or L.startswith(b"\r")): # Non-blank, it's a header - if bl is None and L.startswith(b"Content-Length: "): - bl=int(L[16:].rstrip()) + (h,_,v)=L.partition(b": ") + if bl is None and (h==b"Content-Length"): + bl=int(v) if options.headers: - _output(L) + if isinstance(options.headers,dict): + if h in options.headers: + options.headers[h]=v + else: + _output(L) else: # Blank line, HTTP header is finished + if isinstance(options.headers,dict): + _output(bytes(str(options.headers),'utf-8')) if not options.body: return if options.headers: @@ -182,11 +222,13 @@ parser.add_argument('-d','--debug',help='Debug output',action='store_true') parser.add_argument('-w','--warc',help='output WARC headers', action='store_true') - parser.add_argument('-h','--headers',help='output HTTP headers', - action='store_true') + parser.add_argument('-h','--headers',help='process HTTP headers: collect into dict with named values (,-separated) if arg present, else output', + nargs='?',default=None,const=True) parser.add_argument('-b','--body',help='output HTTP body', action='store_true') parser.add_argument('-c','--cmd',help='pipes each result thru CMD') + parser.add_argument('-p','--process',help='with -c, launches CMD only once', + action='store_true') parser.add_argument('-m','--module.function',help='module.function to call with a stream'), parser.add_argument('-s','--save',action='store_true', help="write to a temporary file and output the name") @@ -221,6 +263,8 @@ # We have to enforce our own check.. if pa.offset is None or pa.filename is None: parser.error("length, offset and filename must all be supplied together") + if isinstance(pa.headers,str): + pa.headers=dict((bytes(k,'utf-8'),None) for k in pa.headers.split(',')) buf=bytearray(128*1024*1024) @@ -232,6 +276,9 @@ _output = _output_subproc else: _output = _output_stdout + if pa.cmd and pa.process: + launch(pa.cmd) + # three different ways to process if pa.index: CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/(warc|crawldiagnostics)/(.*\.gz)"') # no robotstxt yet... for l in sys.stdin: @@ -241,18 +288,9 @@ continue print("index line problem: \"%s\""%l,file=sys.stderr,end='') exit(2) - f=pa.fpath%(m[3:7]) - try: - process(pa,buf,f, - int(m[2]),int(m[1]),whole) - except Exception as e: - if pa.debug: - import traceback - traceback.print_exc(file=sys.stderr) - else: - print("Process fail: %s, input line:\n %s"%(e,l), - file=sys.stderr,end='') - exit(3) + filename=pa.fpath%(m[3:7]) + process(pa,buf,filename, + int(offset:=m[2]),int(length:=m[1]),whole) elif pa.length is not None: print(pa.filename,file=sys.stderr) process(pa,buf,pa.fpath%tuple(pa.filename.split('/')), @@ -269,6 +307,9 @@ parser.error('Invalid input line: %s\n "%s"'%(e,l)) process(pa,buf,pa.fpath%tuple(filename.split('/')), offset,length,whole) - + # processing done one way or another + if pa.cmd and pa.process: + windup(length,offset,filename) + # if pa.save and pa.process, deleting temp files is down to cmd if __name__ == "__main__": main()