Mercurial > hg > cc > cirrus_work
changeset 18:046dbe557911
write to tmp file implemented
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Sun, 07 Aug 2022 13:58:33 +0100 |
parents | 75e0d0013da0 |
children | cec930a032ef |
files | bin/ix.py |
diffstat | 1 files changed, 274 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/bin/ix.py Sun Aug 07 13:58:33 2022 +0100 @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 +'''Extract request records from Common Crawl WARC-format files +given length, offset and filename triples. +Input one triple on command line, or +triples from stdin as tab-delimited lines +or complete cdx index lines. +In all cases by 'filename' is meant crawlid/segmentid/type/filename + +Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.''' + +import sys, argparse, regex, os, shutil, io, gzip, time, shlex +from isal import igzip +from subprocess import Popen, PIPE +#import asyncio + +HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]') +BINOUT=sys.stdout.buffer +FPAT="/%s/%s/orig/%s/%s" + +CMD_PROC=None +TMPFILENAME=None + +class HackFormat(argparse.RawDescriptionHelpFormatter): + def format_help(self): + FOO=argparse.RawDescriptionHelpFormatter.format_help(self) + return HACK_USAGE.sub('\n [ ( -x | length offset filename ) ]', + FOO) + +def process(options,buf,filename,offset,length,whole): + global CMD_PROC, BINOUT, TMPFILENAME, TMPFILE + if options.save: + (tf,TMPFILENAME)=tempfile.mkstemp() + TMPFILE=open(tf,mode='wb') + if options.cmd: + CMD_PROC=Popen(shlex.split(options.cmd),stdin=PIPE,bufsize=0) + BINOUT=CMD_PROC.stdin + process1(options,buf,filename,offset,length,whole) + if options.save: + TMPFILE.close() + if options.cmd: + _output_subproc(bytes(TMPFILENAME,'utf-8')) + _output_subproc(b"\n") + else: + BINOUT.write(bytes(TMPFILENAME,'utf-8')) + BINOUT.write(b"\n") + if options.cmd: + # Wind up subproc + BINOUT.close() + if CMD_PROC.wait()!=0: # could/should be async? + print("subproc of %s:%s:%s failed with %s"%(length,offset,filename, + CMD_PROC.returncode), + file=sys.stderr) + if options.save: + # not if async? + os.unlink(TMPFILENAME) + TMPFILENAME=None + elif options.save: + print("%s will need to be deleted"%TMPFILENAME,file=sys.stderr) + TMPFILENAME=None + +def _output_tmpfile(buf): + TMPFILE.write(buf) + +def _output_stdout(buf): + BINOUT.write(buf) + +def _output_subproc(buf): + toWrite=len(buf) + while toWrite>0: + toWrite -= BINOUT.write(buf) + +def process1(options,buf,filename,offset,length,whole): + root=options.root + rfn=root+filename + if root!="/beegfs/common_crawl": + # Support using ramdisk or other local disk as a faster cached + if not os.path.exists(rfn): + if not os.path.exists(os.path.dirname(rfn)): + os.makedirs(os.path.dirname(rfn)) + with io.FileIO('/beegfs/common_crawl'+filename,'r') as infile, \ + io.FileIO(rfn,'w') as outfile: + #shutil.copyfileobj(infile,outfile,128*1024*1024) + while True: + l=infile.readinto(buf) + if l==0: + break + outfile.write(memoryview(buf)[:l]) + file=open(rfn,'rb',0) + file.seek(offset) + bv=memoryview(buf)[:length] + nb=file.readinto(bv) + file.close() + if nb!=length: + raise ValueError("Chunk read losing: %s, got %s expected %s at %s"%(file.name, + nb,length,offset)) + if whole and options.zipped: + _output(bv) + return + gzip_chunk = io.BytesIO(bv) + uv=memoryview(buf)[length:] + with igzip.IGzipFile(fileobj=gzip_chunk) as gzip_fin: + ll=0 + while True: + l=gzip_fin.readinto(uv) + if not l: + break + ll+=l + cb=memoryview(uv)[:ll] + if whole: + _output(cb) + return + # Only output parts (0 = WARC header, 1 = HTTP header, 2 = body) that are wanted + state=0 + tr=None # Was this record truncated? + bl=None # for HTTP Content-Length for the length of the body? + with io.BytesIO(cb) as clear_text: + for L in clear_text: + if state==0: + # WARC header + if L.startswith(b"Content-Length: "): + wl=int(L[16:].rstrip()) + elif L.startswith(b"WARC-Truncated: "): + tr=L[16:].rstrip() + tr="EMPTY" if tr=="" else tr + elif L==b"" or L.startswith(b"\r"): # for idempotency + # Blank line, WARC header is finished + if not (options.headers or options.body): + return + state=1 + # Note we preserve the empty line + if options.warc: + _output(L) + continue + if state==1: + # HTTP header + wl -= len(L) + if not (L==b"" or L.startswith(b"\r")): + # Non-blank, it's a header + if bl is None and L.startswith(b"Content-Length: "): + bl=int(L[16:].rstrip()) + if options.headers: + _output(L) + else: + # Blank line, HTTP header is finished + if not options.body: + return + if options.headers: + _output(L) + state=2 + # The above is just for sanity, because we do _not_ + # continue with the outer loop, + # since we can now block-output the entire rest of the + # input buffer. + if bl is not None: + if bl!=wl: + print("length mismatch: %s %s %s here: %s given: %s trunc: %s"%\ + (length,offset,filename,wl,bl,tr),file=sys.stderr) + # HTTP body + balance=clear_text.tell() + #print(balance,bl,wl,ll,ll-balance,file=sys.stderr) + # Output whatever is left + _output(cb[balance:balance+wl]) + return + +def main(): + global _output,TMPFILE,TMPFILENAME,tempfile + parser = argparse.ArgumentParser( + description='''Extract records from warc files given length, offset and file triples. + Input one triple on command line, or + triples from stdin as tab-delimited lines + or complete cdx index lines. + In all cases by 'filename' is meant crawlid/segmentid/type/filename''', + epilog='''Note that if no output flag(s) is/are given, + the whole WARC record will be output, more efficiently than + would be the case if all three flags were given.''', + add_help=False, + conflict_handler='resolve', + formatter_class=HackFormat + ) + fphelp=('format string for turning 4 filename components into a path, must contain %%s exactly 4 times,\ndefault is "%s"'%FPAT).replace('%s','%%s') + parser.add_argument('--help',help='Show help',action='help') + parser.add_argument('-d','--debug',help='Debug output',action='store_true') + parser.add_argument('-w','--warc',help='output WARC headers', + action='store_true') + parser.add_argument('-h','--headers',help='output HTTP headers', + action='store_true') + parser.add_argument('-b','--body',help='output HTTP body', + action='store_true') + parser.add_argument('-c','--cmd',help='pipes each result thru CMD') + parser.add_argument('-m','--module.function',help='module.function to call with a stream'), + parser.add_argument('-s','--save',action='store_true', + help="write to a temporary file and output the name") + parser.add_argument('-f','--fpath', + help=fphelp, + default=FPAT) + parser.add_argument('-r','--root',nargs='?', + help='File path root, create a copy there if necessary', + default='/beegfs/common_crawl'), + parser.add_argument('-z','--zipped', + help="output raw gzipped record, ignored if any of -bhw supplied", + action='store_true') + sg=parser.add_mutually_exclusive_group() + sg.add_argument('-x','--index', + help='take lines of triples from a cdx index file as input', + action='store_true') + sg.add_argument('length',type=int, + help='length in bytes of gzipped record', + nargs='?') + parser.add_argument('offset',type=int, + help='start position in bytes of gzipped record', + nargs='?') + parser.add_argument('filename', + help='pathname of gzipped Common Crawl WARC-format file', + nargs='?') + # Hack the order of optional and positional in the help output + parser._action_groups.sort(key=lambda g:g.title) + #parser.print_help() + pa=parser.parse_args(sys.argv[1:]) + #print(pa,file=sys.stderr) + if pa.length is not None: + # We have to enforce our own check.. + if pa.offset is None or pa.filename is None: + parser.error("length, offset and filename must all be supplied together") + + buf=bytearray(128*1024*1024) + + whole=not (pa.warc or pa.headers or pa.body) + if pa.save: + _output=_output_tmpfile + import tempfile + elif pa.cmd: + _output = _output_subproc + else: + _output = _output_stdout + if pa.index: + CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/(warc|crawldiagnostics)/(.*\.gz)"') # no robotstxt yet... + for l in sys.stdin: + m=CDX.search(l) + if m is None: + if l.find('/robotstxt/')>-1: + continue + print("index line problem: \"%s\""%l,file=sys.stderr,end='') + exit(2) + f=pa.fpath%(m[3:7]) + try: + process(pa,buf,f, + int(m[2]),int(m[1]),whole) + except Exception as e: + if pa.debug: + import traceback + traceback.print_exc(file=sys.stderr) + else: + print("Process fail: %s, input line:\n %s"%(e,l), + file=sys.stderr,end='') + exit(3) + elif pa.length is not None: + print(pa.filename,file=sys.stderr) + process(pa,buf,pa.fpath%tuple(pa.filename.split('/')), + pa.offset,pa.length,whole) + else: + print("Reading length, offset, filename tab-delimited triples from stdin...", + file=sys.stderr) + for l in sys.stdin: + try: + (length,offset,filename)=l.rstrip().split('\t') + length=int(length) + offset=int(offset) + except ValueError as e: + parser.error('Invalid input line: %s\n "%s"'%(e,l)) + process(pa,buf,pa.fpath%tuple(filename.split('/')), + offset,length,whole) + +if __name__ == "__main__": + main()