changeset 18:046dbe557911

write to tmp file implemented
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Sun, 07 Aug 2022 13:58:33 +0100
parents 75e0d0013da0
children cec930a032ef
files bin/ix.py
diffstat 1 files changed, 274 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/bin/ix.py	Sun Aug 07 13:58:33 2022 +0100
@@ -0,0 +1,274 @@
+#!/usr/bin/env python3
+'''Extract request records from Common Crawl WARC-format files
+given length, offset and filename triples.
+Input one triple on command line, or
+triples from stdin as tab-delimited lines
+or complete cdx index lines.
+In all cases by 'filename' is meant crawlid/segmentid/type/filename
+
+Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.'''
+
+import sys, argparse, regex, os, shutil, io, gzip, time, shlex
+from isal import igzip
+from subprocess import Popen, PIPE
+#import asyncio
+
+HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]')
+BINOUT=sys.stdout.buffer
+FPAT="/%s/%s/orig/%s/%s"
+
+CMD_PROC=None
+TMPFILENAME=None
+
+class HackFormat(argparse.RawDescriptionHelpFormatter):
+  def format_help(self):
+    FOO=argparse.RawDescriptionHelpFormatter.format_help(self)
+    return HACK_USAGE.sub('\n             [ ( -x | length offset filename ) ]',
+                          FOO)
+
+def process(options,buf,filename,offset,length,whole):
+  global CMD_PROC, BINOUT, TMPFILENAME, TMPFILE
+  if options.save:
+    (tf,TMPFILENAME)=tempfile.mkstemp()
+    TMPFILE=open(tf,mode='wb')
+  if options.cmd:
+    CMD_PROC=Popen(shlex.split(options.cmd),stdin=PIPE,bufsize=0)
+    BINOUT=CMD_PROC.stdin
+  process1(options,buf,filename,offset,length,whole)
+  if options.save:
+    TMPFILE.close()
+    if options.cmd:
+      _output_subproc(bytes(TMPFILENAME,'utf-8'))
+      _output_subproc(b"\n")
+    else:
+      BINOUT.write(bytes(TMPFILENAME,'utf-8'))
+      BINOUT.write(b"\n")
+  if options.cmd:
+    # Wind up subproc
+    BINOUT.close()
+    if CMD_PROC.wait()!=0:    # could/should be async?
+      print("subproc of %s:%s:%s failed with %s"%(length,offset,filename,
+                                                  CMD_PROC.returncode),
+            file=sys.stderr)
+    if options.save:
+      # not if async?
+      os.unlink(TMPFILENAME)
+      TMPFILENAME=None
+  elif options.save:
+    print("%s will need to be deleted"%TMPFILENAME,file=sys.stderr)
+    TMPFILENAME=None
+
+def _output_tmpfile(buf):
+  TMPFILE.write(buf)
+
+def _output_stdout(buf):
+  BINOUT.write(buf)
+
+def _output_subproc(buf):
+  toWrite=len(buf)
+  while toWrite>0:
+    toWrite -= BINOUT.write(buf)    
+
+def process1(options,buf,filename,offset,length,whole):
+  root=options.root
+  rfn=root+filename
+  if root!="/beegfs/common_crawl":
+    # Support using ramdisk or other local disk as a faster cached
+    if not os.path.exists(rfn):
+      if not os.path.exists(os.path.dirname(rfn)):
+        os.makedirs(os.path.dirname(rfn))
+      with io.FileIO('/beegfs/common_crawl'+filename,'r') as infile, \
+              io.FileIO(rfn,'w') as outfile:
+        #shutil.copyfileobj(infile,outfile,128*1024*1024)
+        while True:
+          l=infile.readinto(buf)
+          if l==0:
+            break
+          outfile.write(memoryview(buf)[:l])
+  file=open(rfn,'rb',0)
+  file.seek(offset)
+  bv=memoryview(buf)[:length]
+  nb=file.readinto(bv)
+  file.close()
+  if nb!=length:
+    raise ValueError("Chunk read losing: %s, got %s expected %s at %s"%(file.name,
+                                                                  nb,length,offset))
+  if whole and options.zipped:
+    _output(bv)
+    return
+  gzip_chunk = io.BytesIO(bv)
+  uv=memoryview(buf)[length:]
+  with igzip.IGzipFile(fileobj=gzip_chunk) as gzip_fin:
+    ll=0
+    while True:
+      l=gzip_fin.readinto(uv)
+      if not l:
+        break
+      ll+=l
+    cb=memoryview(uv)[:ll]
+    if whole:
+      _output(cb)
+      return
+  # Only output parts (0 = WARC header, 1 = HTTP header, 2 = body) that are wanted
+  state=0
+  tr=None # Was this record truncated?
+  bl=None # for HTTP Content-Length for the length of the body?
+  with io.BytesIO(cb) as clear_text:
+    for L in clear_text:
+      if state==0:
+        # WARC header
+        if L.startswith(b"Content-Length: "):
+          wl=int(L[16:].rstrip())
+        elif L.startswith(b"WARC-Truncated: "):
+          tr=L[16:].rstrip()
+          tr="EMPTY" if tr=="" else tr
+        elif L==b"" or L.startswith(b"\r"): # for idempotency
+          # Blank line, WARC header is finished
+          if not (options.headers or options.body):
+            return
+          state=1
+          # Note we preserve the empty line
+        if options.warc:
+          _output(L)
+        continue
+      if state==1:
+        # HTTP header
+        wl -= len(L)
+        if not (L==b"" or L.startswith(b"\r")):
+          # Non-blank, it's a header
+          if bl is None and L.startswith(b"Content-Length: "):
+            bl=int(L[16:].rstrip())
+          if options.headers:
+            _output(L)
+        else:
+          # Blank line, HTTP header is finished
+          if not options.body:
+            return
+          if options.headers:
+            _output(L)
+          state=2
+          # The above is just for sanity, because we do _not_
+          #  continue with the outer loop,
+          #  since we can now block-output the entire rest of the
+          #  input buffer.
+          if bl is not None:
+            if bl!=wl:
+              print("length mismatch: %s %s %s here: %s given: %s trunc: %s"%\
+                    (length,offset,filename,wl,bl,tr),file=sys.stderr)
+          # HTTP body
+          balance=clear_text.tell()
+          #print(balance,bl,wl,ll,ll-balance,file=sys.stderr)
+          # Output whatever is left
+          _output(cb[balance:balance+wl])
+          return
+
+def main():
+  global _output,TMPFILE,TMPFILENAME,tempfile
+  parser = argparse.ArgumentParser(
+    description='''Extract records from warc files given length, offset and file triples.
+  Input one triple on command line, or
+  triples from stdin as tab-delimited lines
+  or complete cdx index lines.
+  In all cases by 'filename' is meant crawlid/segmentid/type/filename''',
+    epilog='''Note that if no output flag(s) is/are given,
+  the whole WARC record will be output, more efficiently than
+  would be the case if all three flags were given.''',
+    add_help=False,
+    conflict_handler='resolve',
+    formatter_class=HackFormat
+    )
+  fphelp=('format string for turning 4 filename components into a path, must contain %%s exactly 4 times,\ndefault is "%s"'%FPAT).replace('%s','%%s')
+  parser.add_argument('--help',help='Show help',action='help')
+  parser.add_argument('-d','--debug',help='Debug output',action='store_true')
+  parser.add_argument('-w','--warc',help='output WARC headers',
+                      action='store_true')
+  parser.add_argument('-h','--headers',help='output HTTP headers',
+                      action='store_true')
+  parser.add_argument('-b','--body',help='output HTTP body',
+                      action='store_true')
+  parser.add_argument('-c','--cmd',help='pipes each result thru CMD')
+  parser.add_argument('-m','--module.function',help='module.function to call with a stream'),
+  parser.add_argument('-s','--save',action='store_true',
+                      help="write to a temporary file and output the name")
+  parser.add_argument('-f','--fpath',
+                      help=fphelp,
+                      default=FPAT)
+  parser.add_argument('-r','--root',nargs='?',
+                  help='File path root, create a copy there if necessary',
+                  default='/beegfs/common_crawl'),
+  parser.add_argument('-z','--zipped',
+                      help="output raw gzipped record, ignored if any of -bhw supplied",
+                      action='store_true')
+  sg=parser.add_mutually_exclusive_group()
+  sg.add_argument('-x','--index',
+                      help='take lines of triples from a cdx index file as input',
+                      action='store_true')
+  sg.add_argument('length',type=int,
+                  help='length in bytes of gzipped record',
+                  nargs='?')
+  parser.add_argument('offset',type=int,
+                      help='start position in bytes of gzipped record',
+                      nargs='?')
+  parser.add_argument('filename',
+                      help='pathname of gzipped Common Crawl WARC-format file',
+                      nargs='?')
+  # Hack the order of optional and positional in the help output
+  parser._action_groups.sort(key=lambda g:g.title)
+  #parser.print_help()
+  pa=parser.parse_args(sys.argv[1:])
+  #print(pa,file=sys.stderr)
+  if pa.length is not None:
+    # We have to enforce our own check..
+    if pa.offset is None or pa.filename is None:
+      parser.error("length, offset and filename must all be supplied together")
+ 
+  buf=bytearray(128*1024*1024)
+
+  whole=not (pa.warc or pa.headers or pa.body)
+  if pa.save:
+    _output=_output_tmpfile
+    import tempfile
+  elif pa.cmd:
+    _output = _output_subproc
+  else:
+    _output = _output_stdout
+  if pa.index:
+    CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/(warc|crawldiagnostics)/(.*\.gz)"') # no robotstxt yet...
+    for l in sys.stdin:
+      m=CDX.search(l)
+      if m is None:
+        if l.find('/robotstxt/')>-1:
+          continue
+        print("index line problem: \"%s\""%l,file=sys.stderr,end='')
+        exit(2)
+      f=pa.fpath%(m[3:7])
+      try:
+        process(pa,buf,f,
+                int(m[2]),int(m[1]),whole)
+      except Exception as e:
+        if pa.debug:
+          import traceback
+          traceback.print_exc(file=sys.stderr)
+        else:
+          print("Process fail: %s, input line:\n %s"%(e,l),
+                file=sys.stderr,end='')
+        exit(3)
+  elif pa.length is not None:
+    print(pa.filename,file=sys.stderr)
+    process(pa,buf,pa.fpath%tuple(pa.filename.split('/')),
+            pa.offset,pa.length,whole)
+  else:
+    print("Reading length, offset, filename tab-delimited triples from stdin...",
+          file=sys.stderr)
+    for l in sys.stdin:
+      try:
+        (length,offset,filename)=l.rstrip().split('\t')
+        length=int(length)
+        offset=int(offset)
+      except ValueError as e:
+        parser.error('Invalid input line: %s\n "%s"'%(e,l))
+      process(pa,buf,pa.fpath%tuple(filename.split('/')),
+              offset,length,whole)
+
+if __name__ == "__main__":
+    main()