Mercurial > hg > cc > cirrus_home
view bin/ix.py @ 111:3119bca71181
warc and headers parts working
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Mon, 26 Apr 2021 15:28:23 +0000 |
parents | f148c2366faa |
children | 6467024cd072 |
line wrap: on
line source
#!/usr/bin/env python3 '''Extract request records from Common Crawl WARC-format files given length, offset and filename triples. Input one triple on command line, or triples from stdin as tab-delimited lines or complete cdx index lines. In all cases by 'filename' is meant crawlid/segmentid/filename Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.''' import sys, argparse, regex, os, shutil, io, gzip, time from isal import igzip #from subprocess import Popen, PIPE #import asyncio HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]') BINOUT=sys.stdout.buffer FPAT="/%s/%s/orig/warc/%s" class HackFormat(argparse.RawDescriptionHelpFormatter): def format_help(self): global FOO FOO=argparse.RawDescriptionHelpFormatter.format_help(self) return HACK_USAGE.sub('\n [ ( -x | length offset filename ) ]', FOO) def process(options,buf,root,filename,offset,length,whole): rfn=root+filename if root!="/beegfs/common_crawl": if not os.path.exists(rfn): if not os.path.exists(os.path.dirname(rfn)): os.makedirs(os.path.dirname(rfn)) with io.FileIO('/beegfs/common_crawl'+filename,'r') as infile, \ io.FileIO(rfn,'w') as outfile: #shutil.copyfileobj(infile,outfile,128*1024*1024) while True: l=infile.readinto(buf) if l==0: break outfile.write(memoryview(buf)[:l]) file=open(rfn,'rb',0) file.seek(offset) bv=memoryview(buf)[:length] nb=file.readinto(bv) file.close() if nb!=length: print("losing",file.name,length,nb,file=sys.stderr) if whole and options.zipped: BINOUT.write(bv) return gzip_chunk = io.BytesIO(bv) uv=memoryview(buf)[length:] with igzip.IGzipFile(fileobj=gzip_chunk) as gzip_fin: ll=0 while True: l=gzip_fin.readinto(uv) if not l: break ll+=l cb=memoryview(uv)[:ll] if whole: BINOUT.write(cb) return # only parts wanted # Note that _unlike the above_ this strips the ^M from the output lines # so we are _not_ idempotent state=0 tr=None with io.TextIOWrapper(io.BytesIO(cb),encoding='iso-8859-1', newline='\r\n') as clear_text: for L in clear_text: if state==0: # WARC header if L.startswith("Content-Length: "): wl=int(L[16:].rstrip()) elif L.startswith("WARC-Truncated: "): tr=L[16:].rstrip() tr="EMPTY" if tr=="" else tr elif L.startswith("\r"): # make us idempotent if not (options.headers or options.body): return state=1 bl=None if options.warc: # preserve the empty line print() continue if options.warc: print(L.rstrip()) continue if state==1: # HTTP header wl -= len(L) if L.startswith("Content-Length: "): bl=int(L[16:].rstrip()) elif L=="" or L.startswith("\r"): if not options.body: return state=2 if options.headers: # preserve the empty line print() if bl is not None: if bl!=wl: print("length mismatch: %s %s %s here: %s given: %s trunc: %s"%\ (length,offset,filename,wl,bl,tr),file=sys.stderr) continue if options.headers: print(L.rstrip()) continue # HTTP body if options.body: sys.stdout.flush() BINOUT.write(cb[clear_text.tell():]) return def main(): parser = argparse.ArgumentParser( description='''Extract records from warc files given length, offset and file triples. Input one triple on command line, or triples from stdin as tab-delimited lines or complete cdx index lines. In all cases by 'filename' is meant crawlid/segmentid/filename''', epilog='''Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if all three flags were given.''', add_help=False, conflict_handler='resolve', formatter_class=HackFormat ) parser.add_argument('--help',help='Show help',action='help') parser.add_argument('-d','--debug',help='Debug output',action='store_true') parser.add_argument('-w','--warc',help='output WARC headers', action='store_true') parser.add_argument('-h','--headers',help='output HTTP headers', action='store_true') parser.add_argument('-b','--body',help='output HTTP body', action='store_true') parser.add_argument('-c','--cmd',help='pipes each result thru CMD') parser.add_argument('-r','--root',nargs='?', help='File path root, create a copy there if necessary', default='/beegfs/common_crawl'), parser.add_argument('-z','--zipped', help="output raw gzipped record, ignored if any of -bhw supplied", action='store_true') sg=parser.add_mutually_exclusive_group() sg.add_argument('-x','--index', help='take lines of triples from a cdx index file as input', action='store_true') sg.add_argument('length',type=int, help='length in bytes of gzipped record', nargs='?') parser.add_argument('offset',type=int, help='start position in bytes of gzipped record', nargs='?') parser.add_argument('filename', help='pathname of gzipped Common Crawl WARC-format file', nargs='?') # Hack the order of optional and positional in the help output parser._action_groups.sort(key=lambda g:g.title) #parser.print_help() pa=parser.parse_args(sys.argv[1:]) #print(pa,file=sys.stderr) if pa.length is not None: # We have to enforce our own check.. if pa.offset is None or pa.filename is None: parser.error("length, offset and filename must all be supplied together") buf=bytearray(128*1024*1024) whole=not (pa.warc or pa.headers or pa.body) if pa.length is not None: process(pa,buf,pa.root,FPAT%list(pa.filename.split('/')), pa.offset,pa.length,whole) exit(0) if pa.index: CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/warc/(.*\.gz)"') for l in sys.stdin: m=CDX.search(l) if m is None: print("index line problem: \"%s\""%l.lstrip(),file=sys.stderr) exit(2) f=FPAT%(m[3:6]) process(pa,buf,pa.root,f, int(m[2]),int(m[1]),whole) exit(0) if __name__ == "__main__": main()