Mercurial > hg > cc > cirrus_home
changeset 119:bc958b776fb8
implement --cmd
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Wed, 16 Jun 2021 16:12:46 +0000 |
parents | 551ff1de13d8 |
children | d0b544e53dda |
files | bin/ix.py |
diffstat | 1 files changed, 55 insertions(+), 15 deletions(-) [+] |
line wrap: on
line diff
--- a/bin/ix.py Wed Jun 16 16:12:16 2021 +0000 +++ b/bin/ix.py Wed Jun 16 16:12:46 2021 +0000 @@ -8,9 +8,9 @@ Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.''' -import sys, argparse, regex, os, shutil, io, gzip, time +import sys, argparse, regex, os, shutil, io, gzip, time, shlex from isal import igzip -#from subprocess import Popen, PIPE +from subprocess import Popen, PIPE #import asyncio HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]') @@ -24,7 +24,30 @@ return HACK_USAGE.sub('\n [ ( -x | length offset filename ) ]', FOO) -def process(options,buf,root,filename,offset,length,whole): +def process(options,buf,filename,offset,length,whole): + global CMD_PROC, BINOUT + if options.cmd: + CMD_PROC=Popen(shlex.split(options.cmd),stdin=PIPE,bufsize=0) + BINOUT=CMD_PROC.stdin + process1(options,buf,filename,offset,length,whole) + if options.cmd: + # Wind up subproc + BINOUT.close() + if CMD_PROC.wait()!=0: # could/should be async? + print("subproc of %s:%s:%s failed with %s"%(length,offset,filename, + CMD_PROC.returncode), + file=sys.stderr) + +def _output_stdout(buf): + BINOUT.write(buf) + +def _output_subproc(buf): + toWrite=len(buf) + while toWrite>0: + toWrite -= BINOUT.write(buf) + +def process1(options,buf,filename,offset,length,whole): + root=options.root rfn=root+filename if root!="/beegfs/common_crawl": # Support using ramdisk or other local disk as a faster cached @@ -47,7 +70,7 @@ if nb!=length: print("losing",file.name,length,nb,file=sys.stderr) if whole and options.zipped: - BINOUT.write(bv) + _output(bv) return gzip_chunk = io.BytesIO(bv) uv=memoryview(buf)[length:] @@ -60,7 +83,7 @@ ll+=l cb=memoryview(uv)[:ll] if whole: - BINOUT.write(cb) + _output(cb) return # Only output parts (0 = WARC header, 1 = HTTP header, 2 = body) that are wanted state=0 @@ -82,7 +105,7 @@ state=1 # Note we preserve the empty line if options.warc: - BINOUT.write(L) + _output(L) continue if state==1: # HTTP header @@ -92,13 +115,13 @@ if bl is None and L.startswith(b"Content-Length: "): bl=int(L[16:].rstrip()) if options.headers: - BINOUT.write(L) + _output(L) else: # Blank line, HTTP header is finished if not options.body: return if options.headers: - BINOUT.write(L) + _output(L) state=2 # The above is just for sanity, because we do _not_ # continue with the outer loop, @@ -112,10 +135,11 @@ balance=clear_text.tell() #print(balance,bl,wl,ll,ll-balance,file=sys.stderr) # Output whatever is left - BINOUT.write(cb[balance:balance+wl]) + _output(cb[balance:balance+wl]) return def main(): + global _output parser = argparse.ArgumentParser( description='''Extract records from warc files given length, offset and file triples. Input one triple on command line, or @@ -171,10 +195,10 @@ buf=bytearray(128*1024*1024) whole=not (pa.warc or pa.headers or pa.body) - if pa.length is not None: - process(pa,buf,pa.root,FPAT%list(pa.filename.split('/')), - pa.offset,pa.length,whole) - exit(0) + if pa.cmd: + _output = _output_subproc + else: + _output = _output_stdout if pa.index: CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/warc/(.*\.gz)"') for l in sys.stdin: @@ -183,8 +207,24 @@ print("index line problem: \"%s\""%l.lstrip(),file=sys.stderr) exit(2) f=FPAT%(m[3:6]) - process(pa,buf,pa.root,f, + process(pa,buf,f, int(m[2]),int(m[1]),whole) - exit(0) + elif pa.length is not None: + print(pa.filename,file=sys.stderr) + process(pa,buf,FPAT%tuple(pa.filename.split('/')), + pa.offset,pa.length,whole) + else: + print("Reading length, offset, filename tab-delimited triples from stdin...", + file=sys.stderr) + for l in sys.stdin: + try: + (length,offset,filename)=l.rstrip().split('\t') + length=int(length) + offset=int(offset) + except ValueError as e: + parser.error('Invalid input line: %s\n "%s"'%(e,l)) + process(pa,buf,FPAT%tuple(filename.split('/')), + offset,length,whole) + if __name__ == "__main__": main()