view bin/ix.py @ 111:3119bca71181

warc and headers parts working
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Mon, 26 Apr 2021 15:28:23 +0000
parents f148c2366faa
children 6467024cd072
line wrap: on
line source

#!/usr/bin/env python3
'''Extract request records from Common Crawl WARC-format files
given length, offset and filename triples.
Input one triple on command line, or
triples from stdin as tab-delimited lines
or complete cdx index lines.
In all cases by 'filename' is meant crawlid/segmentid/filename

Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.'''

import sys, argparse, regex, os, shutil, io, gzip, time
from isal import igzip
#from subprocess import Popen, PIPE
#import asyncio

HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]')
BINOUT=sys.stdout.buffer
FPAT="/%s/%s/orig/warc/%s"

class HackFormat(argparse.RawDescriptionHelpFormatter):
  def format_help(self):
    global FOO
    FOO=argparse.RawDescriptionHelpFormatter.format_help(self)
    return HACK_USAGE.sub('\n             [ ( -x | length offset filename ) ]',
                          FOO)

def process(options,buf,root,filename,offset,length,whole):
  rfn=root+filename
  if root!="/beegfs/common_crawl":
    if not os.path.exists(rfn):
      if not os.path.exists(os.path.dirname(rfn)):
        os.makedirs(os.path.dirname(rfn))
      with io.FileIO('/beegfs/common_crawl'+filename,'r') as infile, \
              io.FileIO(rfn,'w') as outfile:
        #shutil.copyfileobj(infile,outfile,128*1024*1024)
        while True:
          l=infile.readinto(buf)
          if l==0:
            break
          outfile.write(memoryview(buf)[:l])
  file=open(rfn,'rb',0)
  file.seek(offset)
  bv=memoryview(buf)[:length]
  nb=file.readinto(bv)
  file.close()
  if nb!=length:
    print("losing",file.name,length,nb,file=sys.stderr)
  if whole and options.zipped:
    BINOUT.write(bv)
    return
  gzip_chunk = io.BytesIO(bv)
  uv=memoryview(buf)[length:]
  with igzip.IGzipFile(fileobj=gzip_chunk) as gzip_fin:
    ll=0
    while True:
      l=gzip_fin.readinto(uv)
      if not l:
        break
      ll+=l
    cb=memoryview(uv)[:ll]
    if whole:
      BINOUT.write(cb)
      return
  # only parts wanted
  # Note that _unlike the above_ this strips the ^M from the output lines
  #  so we are _not_ idempotent
  state=0
  tr=None
  with io.TextIOWrapper(io.BytesIO(cb),encoding='iso-8859-1',
                        newline='\r\n') as clear_text:
    for L in clear_text:
      if state==0:
        # WARC header
        if L.startswith("Content-Length: "):
          wl=int(L[16:].rstrip())
        elif L.startswith("WARC-Truncated: "):
          tr=L[16:].rstrip()
          tr="EMPTY" if tr=="" else tr
        elif L.startswith("\r"): # make us idempotent
          if not (options.headers or options.body):
            return
          state=1
          bl=None
          if options.warc:
            # preserve the empty line
            print()
            continue
        if options.warc:
          print(L.rstrip())
        continue
      if state==1:
        # HTTP header
        wl -= len(L)
        if L.startswith("Content-Length: "):
          bl=int(L[16:].rstrip())
        elif L=="" or L.startswith("\r"):
          if not options.body:
            return
          state=2
          if options.headers:
            # preserve the empty line
            print()
          if bl is not None:
            if bl!=wl:
              print("length mismatch: %s %s %s here: %s given: %s trunc: %s"%\
                    (length,offset,filename,wl,bl,tr),file=sys.stderr)
          continue
        if options.headers:
          print(L.rstrip())
        continue
      # HTTP body
      if options.body:
        sys.stdout.flush()
        BINOUT.write(cb[clear_text.tell():])
      return

def main():
  parser = argparse.ArgumentParser(
    description='''Extract records from warc files given length, offset and file triples.
  Input one triple on command line, or
  triples from stdin as tab-delimited lines
  or complete cdx index lines.
  In all cases by 'filename' is meant crawlid/segmentid/filename''',
    epilog='''Note that if no output flag(s) is/are given,
  the whole WARC record will be output, more efficiently than
  would be the case if all three flags were given.''',
    add_help=False,
    conflict_handler='resolve',
    formatter_class=HackFormat
    )

  parser.add_argument('--help',help='Show help',action='help')
  parser.add_argument('-d','--debug',help='Debug output',action='store_true')
  parser.add_argument('-w','--warc',help='output WARC headers',
                      action='store_true')
  parser.add_argument('-h','--headers',help='output HTTP headers',
                      action='store_true')
  parser.add_argument('-b','--body',help='output HTTP body',
                      action='store_true')
  parser.add_argument('-c','--cmd',help='pipes each result thru CMD')
  parser.add_argument('-r','--root',nargs='?',
                  help='File path root, create a copy there if necessary',
                  default='/beegfs/common_crawl'),
  parser.add_argument('-z','--zipped',
                      help="output raw gzipped record, ignored if any of -bhw supplied",
                      action='store_true')
  sg=parser.add_mutually_exclusive_group()
  sg.add_argument('-x','--index',
                      help='take lines of triples from a cdx index file as input',
                      action='store_true')
  sg.add_argument('length',type=int,
                  help='length in bytes of gzipped record',
                  nargs='?')
  parser.add_argument('offset',type=int,
                      help='start position in bytes of gzipped record',
                      nargs='?')
  parser.add_argument('filename',
                      help='pathname of gzipped Common Crawl WARC-format file',
                      nargs='?')
  # Hack the order of optional and positional in the help output
  parser._action_groups.sort(key=lambda g:g.title)
  #parser.print_help()
  pa=parser.parse_args(sys.argv[1:])
  #print(pa,file=sys.stderr)
  if pa.length is not None:
    # We have to enforce our own check..
    if pa.offset is None or pa.filename is None:
      parser.error("length, offset and filename must all be supplied together")
 
  buf=bytearray(128*1024*1024)

  whole=not (pa.warc or pa.headers or pa.body)
  if pa.length is not None:
    process(pa,buf,pa.root,FPAT%list(pa.filename.split('/')),
            pa.offset,pa.length,whole)
    exit(0)
  if pa.index:
    CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/warc/(.*\.gz)"')
    for l in sys.stdin:
      m=CDX.search(l)
      if m is None:
        print("index line problem: \"%s\""%l.lstrip(),file=sys.stderr)
        exit(2)
      f=FPAT%(m[3:6])
      process(pa,buf,pa.root,f,
              int(m[2]),int(m[1]),whole)
    exit(0)
if __name__ == "__main__":
    main()