Mercurial > hg > cc > cirrus_work
view bin/ix.py @ 82:7bbb14f6e394
merge
author | Henry Thompson <ht@markup.co.uk> |
---|---|
date | Sat, 19 Aug 2023 16:02:29 -0400 |
parents | fa43c318749b |
children |
line wrap: on
line source
#!/usr/bin/env python3 '''Extract response records from Common Crawl WARC-format files given length, offset and filename triples. Input one triple on command line, or triples from stdin as tab-delimited lines or complete cdx index lines. In all cases by 'filename' is meant crawlid/segmentid/type/filename Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.''' import sys, argparse, regex, os, shutil, io, gzip, time, shlex from isal import igzip from subprocess import Popen, PIPE #import asyncio HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]') BINOUT=sys.stdout.buffer FPAT="/%s/%s/orig/%s/%s" CMD_PROC=None TMPFILENAME=None class HackFormat(argparse.RawDescriptionHelpFormatter): def format_help(self): FOO=argparse.RawDescriptionHelpFormatter.format_help(self) return HACK_USAGE.sub('\n [ ( -x | length offset filename ) ]', FOO) def process(options,buf,filename,offset,length,whole): try: process0(options,buf,filename,offset,length,whole) except Exception as e: if options.debug: import traceback traceback.print_exc(file=sys.stderr) else: print("Process fail: %s, input line:\n %s"%(e,l), file=sys.stderr,end='') exit(3) def process0(options,buf,filename,offset,length,whole): global TMPFILENAME, TMPFILE if options.save: (tf,TMPFILENAME)=tempfile.mkstemp() TMPFILE=open(tf,mode='wb') if options.cmd and not options.process: launch(options.cmd) process1(options,buf,filename,offset,length,whole) if options.save: TMPFILE.close() if options.cmd: _output_subproc(bytes(TMPFILENAME,'utf-8')) _output_subproc(b"\n") else: BINOUT.write(bytes(TMPFILENAME,'utf-8')) BINOUT.write(b"\n") if options.cmd: if not options.process: windup(filename,options,length) if options.save: os.unlink(TMPFILENAME) TMPFILENAME=None elif options.save: print("%s will need to be deleted"%TMPFILENAME,file=sys.stderr) TMPFILENAME=None def launch(cmd): global CMD_PROC, BINOUT CMD_PROC=Popen(shlex.split(cmd),stdin=PIPE,bufsize=0) BINOUT=CMD_PROC.stdin def windup(length,offset,filename): # Wind up subproc BINOUT.close() if CMD_PROC.wait()!=0: # could/should be async? print("subproc of %s:%s:%s failed with %s"%(length,offset,filename, CMD_PROC.returncode), file=sys.stderr) def _output_tmpfile(buf): TMPFILE.write(buf) def _output_stdout(buf): BINOUT.write(buf) def _output_subproc(buf): toWrite=len(buf) while toWrite>0: toWrite -= BINOUT.write(buf) def process1(options,buf,filename,offset,length,whole): root=options.root rfn=root+filename if root!="/beegfs/common_crawl": # Support using ramdisk or other local disk as a faster cached if not os.path.exists(rfn): if not os.path.exists(os.path.dirname(rfn)): os.makedirs(os.path.dirname(rfn)) with io.FileIO('/beegfs/common_crawl'+filename,'r') as infile, \ io.FileIO(rfn,'w') as outfile: #shutil.copyfileobj(infile,outfile,128*1024*1024) while True: l=infile.readinto(buf) if l==0: break outfile.write(memoryview(buf)[:l]) file=open(rfn,'rb',0) file.seek(offset) bv=memoryview(buf)[:length] nb=file.readinto(bv) file.close() if nb!=length: raise ValueError("Chunk read losing: %s, got %s expected %s at %s"%(file.name, nb,length,offset)) if whole and options.zipped: _output(bv) return gzip_chunk = io.BytesIO(bv) uv=memoryview(buf)[length:] with igzip.IGzipFile(fileobj=gzip_chunk) as gzip_fin: ll=0 while True: l=gzip_fin.readinto(uv) if not l: break ll+=l cb=memoryview(uv)[:ll] if whole: _output(cb) return # Only output parts (0 = WARC header, 1 = HTTP header, 2 = body) that are wanted state=0 tr=None # Was this record truncated? bl=None # for HTTP Content-Length for the length of the body? # Could we make this faster by working purely within the cb memoryview? # It would be messy, but avoid copying huge amounts # The outer loop would just be something like # clbv=memoryview(bytearray(b"Content-Length: ")) # i=s=0 # while i<ll: # if cb[i]==10: # need to handle \r\n # L=cb[s:i] # s=i=i+1 # if L[:16]==clbv: # wl=int(L[16:]) # else: # i+=1 # with io.BytesIO(cb) as clear_text: for L in clear_text: if state==0: # WARC header if L.startswith(b"Content-Length: "): wl=int(L[16:].rstrip()) elif L.startswith(b"WARC-Truncated: "): tr=L[16:].rstrip() tr="EMPTY" if tr=="" else tr elif L==b"" or L.startswith(b"\r"): # for idempotency # Blank line, WARC header is finished if not (options.headers or options.body): return state=1 # Note we preserve the empty line if options.warc: _output(L) continue if state==1: # HTTP header wl -= len(L) if not (L==b"" or L.startswith(b"\r")): # Non-blank, it's a header (h,_,v)=L.partition(b": ") if bl is None and (h==b"Content-Length"): bl=int(v) if options.headers: if isinstance(options.headers,dict): if h in options.headers: options.headers[h]=v else: _output(L) else: # Blank line, HTTP header is finished if isinstance(options.headers,dict): _output(bytes(str(options.headers),'utf-8')) if not options.body: return if options.headers: _output(L) state=2 # The above is just for sanity, because we do _not_ # continue with the outer loop, # since we can now block-output the entire rest of the # input buffer. if bl is not None: if bl!=wl: print("length mismatch: %s %s %s here: %s given: %s trunc: %s"%\ (length,offset,filename,wl,bl,tr),file=sys.stderr) # HTTP body balance=clear_text.tell() #print(balance,bl,wl,ll,ll-balance,file=sys.stderr) # Output whatever is left _output(cb[balance:balance+wl]) return def main(): global _output,TMPFILE,TMPFILENAME,tempfile parser = argparse.ArgumentParser( description='''Extract records from warc files given length, offset and file triples. Input one triple on command line, or triples from stdin as tab-delimited lines or complete cdx index lines. In all cases by 'filename' is meant crawlid/segmentid/type/filename''', epilog='''Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if all three flags were given.''', add_help=False, conflict_handler='resolve', formatter_class=HackFormat ) fphelp=('format string for turning 4 filename components into a path, must contain %%s exactly 4 times,\ndefault is "%s"'%FPAT).replace('%s','%%s') parser.add_argument('--help',help='Show help',action='help') parser.add_argument('-d','--debug',help='Debug output',action='store_true') parser.add_argument('-w','--warc',help='output WARC headers', action='store_true') parser.add_argument('-h','--headers',help='process HTTP headers: collect into dict with named values (,-separated) if arg present, else output', nargs='?',default=None,const=True) parser.add_argument('-b','--body',help='output HTTP body', action='store_true') parser.add_argument('-c','--cmd',help='pipes each result thru CMD') parser.add_argument('-p','--process',help='with -c, launches CMD only once', action='store_true') parser.add_argument('-m','--module.function',help='module.function to call with a stream'), parser.add_argument('-s','--save',action='store_true', help="write to a temporary file and output the name") parser.add_argument('-f','--fpath', help=fphelp, default=FPAT) parser.add_argument('-r','--root',nargs='?', help='File path root, create a copy there if necessary', default='/beegfs/common_crawl'), parser.add_argument('-z','--zipped', help="output raw gzipped record, ignored if any of -bhw supplied", action='store_true') sg=parser.add_mutually_exclusive_group() sg.add_argument('-x','--index', help='take lines of triples from a cdx index file as input', action='store_true') sg.add_argument('length',type=int, help='length in bytes of gzipped record', nargs='?') parser.add_argument('offset',type=int, help='start position in bytes of gzipped record', nargs='?') parser.add_argument('filename', help='pathname of gzipped Common Crawl WARC-format file', nargs='?') # Hack the order of optional and positional in the help output parser._action_groups.sort(key=lambda g:g.title) #parser.print_help() pa=parser.parse_args(sys.argv[1:]) #print(pa,file=sys.stderr) if pa.length is not None: # We have to enforce our own check.. if pa.offset is None or pa.filename is None: parser.error("length, offset and filename must all be supplied together") if isinstance(pa.headers,str): pa.headers=dict((bytes(k,'utf-8'),None) for k in pa.headers.split(',')) buf=bytearray(128*1024*1024) whole=not (pa.warc or pa.headers or pa.body) if pa.save: _output=_output_tmpfile import tempfile elif pa.cmd: _output = _output_subproc else: _output = _output_stdout if pa.cmd and pa.process: launch(pa.cmd) # three different ways to process if pa.index: CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/(warc|crawldiagnostics)/(.*\.gz)"') # no robotstxt yet... for l in sys.stdin: m=CDX.search(l) if m is None: if l.find('/robotstxt/')>-1: continue print("index line problem: \"%s\""%l,file=sys.stderr,end='') exit(2) filename=pa.fpath%(m[3:7]) process(pa,buf,filename, int(offset:=m[2]),int(length:=m[1]),whole) elif pa.length is not None: print(pa.filename,file=sys.stderr) process(pa,buf,pa.fpath%tuple(pa.filename.split('/')), pa.offset,pa.length,whole) else: print("Reading length, offset, filename tab-delimited triples from stdin...", file=sys.stderr) for l in sys.stdin: try: (length,offset,filename)=l.rstrip().split('\t') length=int(length) offset=int(offset) except ValueError as e: parser.error('Invalid input line: %s\n "%s"'%(e,l)) process(pa,buf,pa.fpath%tuple(filename.split('/')), offset,length,whole) # processing done one way or another if pa.cmd and pa.process: windup(length,offset,filename) # if pa.save and pa.process, deleting temp files is down to cmd if __name__ == "__main__": main()