view bin/ix.py @ 82:7bbb14f6e394

merge
author Henry Thompson <ht@markup.co.uk>
date Sat, 19 Aug 2023 16:02:29 -0400
parents fa43c318749b
children
line wrap: on
line source

#!/usr/bin/env python3
'''Extract response records from Common Crawl WARC-format files
given length, offset and filename triples.
Input one triple on command line, or
triples from stdin as tab-delimited lines
or complete cdx index lines.
In all cases by 'filename' is meant crawlid/segmentid/type/filename

Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.'''

import sys, argparse, regex, os, shutil, io, gzip, time, shlex
from isal import igzip
from subprocess import Popen, PIPE
#import asyncio

HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]')
BINOUT=sys.stdout.buffer
FPAT="/%s/%s/orig/%s/%s"

CMD_PROC=None
TMPFILENAME=None

class HackFormat(argparse.RawDescriptionHelpFormatter):
  def format_help(self):
    FOO=argparse.RawDescriptionHelpFormatter.format_help(self)
    return HACK_USAGE.sub('\n             [ ( -x | length offset filename ) ]',
                          FOO)

def process(options,buf,filename,offset,length,whole):
    try:
      process0(options,buf,filename,offset,length,whole)
    except Exception as e:
      if options.debug:
        import traceback
        traceback.print_exc(file=sys.stderr)
      else:
        print("Process fail: %s, input line:\n %s"%(e,l),
              file=sys.stderr,end='')
      exit(3)

def process0(options,buf,filename,offset,length,whole):
  global TMPFILENAME, TMPFILE
  if options.save:
    (tf,TMPFILENAME)=tempfile.mkstemp()
    TMPFILE=open(tf,mode='wb')
  if options.cmd and not options.process:
    launch(options.cmd)
  process1(options,buf,filename,offset,length,whole)
  if options.save:
    TMPFILE.close()
    if options.cmd:
      _output_subproc(bytes(TMPFILENAME,'utf-8'))
      _output_subproc(b"\n")
    else:
      BINOUT.write(bytes(TMPFILENAME,'utf-8'))
      BINOUT.write(b"\n")
  if options.cmd:
    if not options.process:
      windup(filename,options,length)
    if options.save:
      os.unlink(TMPFILENAME)
      TMPFILENAME=None
  elif options.save:
    print("%s will need to be deleted"%TMPFILENAME,file=sys.stderr)
    TMPFILENAME=None

def launch(cmd):
  global CMD_PROC, BINOUT
  CMD_PROC=Popen(shlex.split(cmd),stdin=PIPE,bufsize=0)
  BINOUT=CMD_PROC.stdin

def windup(length,offset,filename):
  # Wind up subproc
  BINOUT.close()
  if CMD_PROC.wait()!=0:    # could/should be async?
    print("subproc of %s:%s:%s failed with %s"%(length,offset,filename,
                                                CMD_PROC.returncode),
          file=sys.stderr)
  
def _output_tmpfile(buf):
  TMPFILE.write(buf)

def _output_stdout(buf):
  BINOUT.write(buf)

def _output_subproc(buf):
  toWrite=len(buf)
  while toWrite>0:
    toWrite -= BINOUT.write(buf)    

def process1(options,buf,filename,offset,length,whole):
  root=options.root
  rfn=root+filename
  if root!="/beegfs/common_crawl":
    # Support using ramdisk or other local disk as a faster cached
    if not os.path.exists(rfn):
      if not os.path.exists(os.path.dirname(rfn)):
        os.makedirs(os.path.dirname(rfn))
      with io.FileIO('/beegfs/common_crawl'+filename,'r') as infile, \
              io.FileIO(rfn,'w') as outfile:
        #shutil.copyfileobj(infile,outfile,128*1024*1024)
        while True:
          l=infile.readinto(buf)
          if l==0:
            break
          outfile.write(memoryview(buf)[:l])
  file=open(rfn,'rb',0)
  file.seek(offset)
  bv=memoryview(buf)[:length]
  nb=file.readinto(bv)
  file.close()
  if nb!=length:
    raise ValueError("Chunk read losing: %s, got %s expected %s at %s"%(file.name,
                                                                  nb,length,offset))
  if whole and options.zipped:
    _output(bv)
    return
  gzip_chunk = io.BytesIO(bv)
  uv=memoryview(buf)[length:]
  with igzip.IGzipFile(fileobj=gzip_chunk) as gzip_fin:
    ll=0
    while True:
      l=gzip_fin.readinto(uv)
      if not l:
        break
      ll+=l
    cb=memoryview(uv)[:ll]
    if whole:
      _output(cb)
      return
  # Only output parts (0 = WARC header, 1 = HTTP header, 2 = body) that are wanted
  state=0
  tr=None # Was this record truncated?
  bl=None # for HTTP Content-Length for the length of the body?
  # Could we make this faster by working purely within the cb memoryview?
  # It would be messy, but avoid copying huge amounts
  # The outer loop would just be something like
  #   clbv=memoryview(bytearray(b"Content-Length: "))
  #   i=s=0
  #   while i<ll:
  #     if cb[i]==10: # need to handle \r\n
  #       L=cb[s:i]
  #       s=i=i+1
  #       if L[:16]==clbv:
  #         wl=int(L[16:])
  #     else:
  #       i+=1
  #
  with io.BytesIO(cb) as clear_text:
    for L in clear_text:
      if state==0:
        # WARC header
        if L.startswith(b"Content-Length: "):
          wl=int(L[16:].rstrip())
        elif L.startswith(b"WARC-Truncated: "):
          tr=L[16:].rstrip()
          tr="EMPTY" if tr=="" else tr
        elif L==b"" or L.startswith(b"\r"): # for idempotency
          # Blank line, WARC header is finished
          if not (options.headers or options.body):
            return
          state=1
          # Note we preserve the empty line
        if options.warc:
          _output(L)
        continue
      if state==1:
        # HTTP header
        wl -= len(L)
        if not (L==b"" or L.startswith(b"\r")):
          # Non-blank, it's a header
          (h,_,v)=L.partition(b": ")
          if bl is None and (h==b"Content-Length"):
            bl=int(v)
          if options.headers:
            if isinstance(options.headers,dict):
              if h in options.headers:
                options.headers[h]=v
            else:
              _output(L)
        else:
          # Blank line, HTTP header is finished
          if isinstance(options.headers,dict):
            _output(bytes(str(options.headers),'utf-8'))
          if not options.body:
            return
          if options.headers:
            _output(L)
          state=2
          # The above is just for sanity, because we do _not_
          #  continue with the outer loop,
          #  since we can now block-output the entire rest of the
          #  input buffer.
          if bl is not None:
            if bl!=wl:
              print("length mismatch: %s %s %s here: %s given: %s trunc: %s"%\
                    (length,offset,filename,wl,bl,tr),file=sys.stderr)
          # HTTP body
          balance=clear_text.tell()
          #print(balance,bl,wl,ll,ll-balance,file=sys.stderr)
          # Output whatever is left
          _output(cb[balance:balance+wl])
          return

def main():
  global _output,TMPFILE,TMPFILENAME,tempfile
  parser = argparse.ArgumentParser(
    description='''Extract records from warc files given length, offset and file triples.
  Input one triple on command line, or
  triples from stdin as tab-delimited lines
  or complete cdx index lines.
  In all cases by 'filename' is meant crawlid/segmentid/type/filename''',
    epilog='''Note that if no output flag(s) is/are given,
  the whole WARC record will be output, more efficiently than
  would be the case if all three flags were given.''',
    add_help=False,
    conflict_handler='resolve',
    formatter_class=HackFormat
    )
  fphelp=('format string for turning 4 filename components into a path, must contain %%s exactly 4 times,\ndefault is "%s"'%FPAT).replace('%s','%%s')
  parser.add_argument('--help',help='Show help',action='help')
  parser.add_argument('-d','--debug',help='Debug output',action='store_true')
  parser.add_argument('-w','--warc',help='output WARC headers',
                      action='store_true')
  parser.add_argument('-h','--headers',help='process HTTP headers: collect into dict with named values (,-separated) if arg present, else output',
                      nargs='?',default=None,const=True)
  parser.add_argument('-b','--body',help='output HTTP body',
                      action='store_true')
  parser.add_argument('-c','--cmd',help='pipes each result thru CMD')
  parser.add_argument('-p','--process',help='with -c, launches CMD only once',
                      action='store_true')
  parser.add_argument('-m','--module.function',help='module.function to call with a stream'),
  parser.add_argument('-s','--save',action='store_true',
                      help="write to a temporary file and output the name")
  parser.add_argument('-f','--fpath',
                      help=fphelp,
                      default=FPAT)
  parser.add_argument('-r','--root',nargs='?',
                  help='File path root, create a copy there if necessary',
                  default='/beegfs/common_crawl'),
  parser.add_argument('-z','--zipped',
                      help="output raw gzipped record, ignored if any of -bhw supplied",
                      action='store_true')
  sg=parser.add_mutually_exclusive_group()
  sg.add_argument('-x','--index',
                      help='take lines of triples from a cdx index file as input',
                      action='store_true')
  sg.add_argument('length',type=int,
                  help='length in bytes of gzipped record',
                  nargs='?')
  parser.add_argument('offset',type=int,
                      help='start position in bytes of gzipped record',
                      nargs='?')
  parser.add_argument('filename',
                      help='pathname of gzipped Common Crawl WARC-format file',
                      nargs='?')
  # Hack the order of optional and positional in the help output
  parser._action_groups.sort(key=lambda g:g.title)
  #parser.print_help()
  pa=parser.parse_args(sys.argv[1:])
  #print(pa,file=sys.stderr)
  if pa.length is not None:
    # We have to enforce our own check..
    if pa.offset is None or pa.filename is None:
      parser.error("length, offset and filename must all be supplied together")
  if isinstance(pa.headers,str):
    pa.headers=dict((bytes(k,'utf-8'),None) for k in pa.headers.split(','))
 
  buf=bytearray(128*1024*1024)

  whole=not (pa.warc or pa.headers or pa.body)
  if pa.save:
    _output=_output_tmpfile
    import tempfile
  elif pa.cmd:
    _output = _output_subproc
  else:
    _output = _output_stdout
  if pa.cmd and pa.process:
      launch(pa.cmd)
  # three different ways to process
  if pa.index:
    CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/(warc|crawldiagnostics)/(.*\.gz)"') # no robotstxt yet...
    for l in sys.stdin:
      m=CDX.search(l)
      if m is None:
        if l.find('/robotstxt/')>-1:
          continue
        print("index line problem: \"%s\""%l,file=sys.stderr,end='')
        exit(2)
      filename=pa.fpath%(m[3:7])
      process(pa,buf,filename,
              int(offset:=m[2]),int(length:=m[1]),whole)
  elif pa.length is not None:
    print(pa.filename,file=sys.stderr)
    process(pa,buf,pa.fpath%tuple(pa.filename.split('/')),
            pa.offset,pa.length,whole)
  else:
    print("Reading length, offset, filename tab-delimited triples from stdin...",
          file=sys.stderr)
    for l in sys.stdin:
      try:
        (length,offset,filename)=l.rstrip().split('\t')
        length=int(length)
        offset=int(offset)
      except ValueError as e:
        parser.error('Invalid input line: %s\n "%s"'%(e,l))
      process(pa,buf,pa.fpath%tuple(filename.split('/')),
              offset,length,whole)
  # processing done one way or another
  if pa.cmd and pa.process:
    windup(length,offset,filename)
  # if pa.save and pa.process, deleting temp files is down to cmd
if __name__ == "__main__":
    main()