changeset 289:f17aef7ba4a7

try simpler refill
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Wed, 09 Apr 2025 11:15:14 +0100
parents d3fc7b5c73d0
children 52c9d1875608
files lib/python/cc/warc.py
diffstat 1 files changed, 56 insertions(+), 108 deletions(-) [+]
line wrap: on
line diff
--- a/lib/python/cc/warc.py	Tue Apr 08 16:06:33 2025 +0100
+++ b/lib/python/cc/warc.py	Wed Apr 09 11:15:14 2025 +0100
@@ -5,127 +5,59 @@
 
 import sys, io
 from isal import igzip
-import cython, typing
+import cython, typing, gzip
 
 # see warc.pxd for function signatures
 
-RESP: bytes = b'response'
-REQ: bytes =  b'request'
-META: bytes =  b'metadata'
-INFO: bytes =  b'warcinfo'
+INFO: int =  0
+RESP: int = 1
+REQ: int =  2
+META: int = 3
 
-BUFSIZE = cython.declare(cython.long, 16 * 1024 * 1024)
+BUFSIZE: int = 16 * 1024 * 1024
 BUFMIN: int = 3 * 512 * 1024 # 1.5MiB, will need to be increased
                              # to 5.5MiB for CC-MAIN-2025-13 (Mar) and thereafter
 
 HDRMAX: int = 0  # will grow
-ITEMMAX int = 0  # will grow
-
-def refill(buf: typing.ByteString, bufView: typing.ByteString, stream: typing.BinaryIO,
-           start_1: int, bl: int, bp: int, eol: int,
-        length: int, needed: bool) -> tuple[int, int, int, bytes,
-                                            int, typing.ByteString, bool]:
-  global BUFSIZE, BUFMIN, HDRMAX, ITEMMAX
-  whole: int
-  xBuf: char[::1]
-  #if (stream.tell() > 5766470000): # 82535
-  #  breakpoint()
-  if needed:
-    # we need to keep from start_1 to bl
-    keepFrom: int = start_1
-    keepLen: int = bl-keepFrom
-    if (whole:=((bp - start_1)+length)) > bl:
-      newb: long = BUFSIZE + whole
-      while newb > BUFSIZE:
-        # Need a bigger buffer
-        print('Growing buffer %s > %s'%(whole,BUFSIZE),file=sys.stderr)
-        BUFSIZE=BUFSIZE+(64 * 1024)
-      newbuf: char[::1] = bytearray(BUFSIZE)
-      newbuf[0:keepLen]=bufView[keepFrom:bl]
-      bl = BUFSIZE
-      buf = newbuf
-      bufView = memoryview(buf)
-    else:
-      buf[0:keepLen]=bufView[keepFrom:bl]
-    eol=eol-start_1
-    start_1=0
-    bp=eol+2
-  else:
-    # we can skip the rest of this part
-    if (bp+length)<=bl:
-      # we have at least some bytes from the next part
-      keepLen=bl-(bp+length)
-      buf[0:keepLen]=bufView[bl-keepLen:bl]
-    else:
-      # we don't have all of the bytes from the current part
-      #  so can skip the rest of it
-      keepLen=0
-      stream.seek(stream.tell() + bp + length - bl)
-    bp=0
-  spaceToFill: int = BUFSIZE-keepLen
-  with memoryview(buf)[keepLen:BUFSIZE] as xBuf:
-    nb=stream.readinto(xBuf)
-  bl=keepLen+nb
-  #if (bp > bl):
-  #  breakpoint()
-  return start_1, bp, eol, buf, bl, bufView, nb<spaceToFill
-  
+RECORDMAX int = 0  # will grow
 
 def warc(filename: str,
          callback: typing.Callable[[bytes, typing.ByteString, int], typing.BinaryIO],
-         types: typing.List[bytes] = [b'response'], whole: bool = False, parts: int = 7,
+         types: typing.List[int] = [RESP], whole: bool = False, parts: int = 7,
          debug: bool = False):
   '''parts is a bit-mask:
      1 for warc header;
      2 for req/resp HTTP header, warcinfo/metadata features;
      4 for req/resp body'''
   # Not currently trying to depend on this, but I believe that
-  #   warcinfo: warc-headers+1bl+crawl-headers+2bl
-  #   request: warc-headers+1bl+HTTP-headers+3bl
-  #   response: warc-headers+1bl+HTTP-headers+[1bl or 2bl]+HTTP-body+1bl
-  #   metadata: warc-headers+1bl+metadata-headers+3bl
-q  global BUFSIZE, HDRMAX
+  #   warcinfo: record-headers+1bl+crawl-headers+2bl
+  #   request: record-headers+1bl+HTTP-headers+3bl
+  #   response: record-headers+1bl+HTTP-headers+[1bl or 2bl]+HTTP-body+1bl
+  #   metadata: record-headers+1bl+metadata-headers+3bl
+  global BUFSIZE, HDRMAX, BUFMIN, RECORDMAX
+  _out: typing.BinaryIO
   # should do some sanity checking wrt parts and types
-  if filename.endswith(".gz"):
-    stream: typing.BinaryIO = igzip.IGzipFile(filename=filename)
-  else:
-    stream: typing.BinaryIO = open(filename,'rb',0)
+  stream: typing.BinaryIO
+  with gzip.open(filename, 'r') as fh:
+    try:
+      fh.read(1)
+    except gzip.BadGzipFile:
+      stream = open(filename,'rb',0)
+    else:
+      stream = igzip.IGzipFile(filename=filename)
   buf: char[::1] = bytearray(BUFSIZE)
-  bufView: char[::1] =memoryview(buf)
+  bufView: char[::1] = memoryview(buf)
   fpos: long = 0
   bp: long = 0
-  done: bool = False
   bl: long = stream.readinto(buf)
-  while True:
+  done: bool = bl < BUFSIZE 
+  while not (done and bl == bp):
+    while buf.startswith(b'\r\n',bp):
+      bp+=2
     start_1: long = bp
-    try:
-      # Check that we can at least see to the Content-Length Warc-header
-      clh_begin: long = buf.index(b'\r\nContent-Length: ', bp)
-      if clh_begin > bl:
-        raise ValueError
-      clh_end = buf.index(b'\r\n', clh_begin+2)
-      if clh_end > bl:
-        raise ValueError
-      length = wl = int(bufView[clh_begin+18:clh_end])
-      # Check whether we can see to the _end_ of the Warc-header
-      eowh = buf.index(b'\r\n\r\n', clh_end)
-      if eowh > bl:
-        raise ValueError
-    except ValueError:
-      # We can't see to the end of this item
-      #  So we do a buffer shift, forcing the restart
-      #  because skipping won't work as we're not at the end of the WARC
-      #  header yet
-      start_1, bp, _, buf, bl, bufView, done = refill(buf, bufView, stream,
-                                                      start_1, bl, bp, eol,
-                                                      length, True)
-      bp -= 2 # situation is slightly different from the other call to refill
     if (bp > bl):
       breakpoint()
     if not buf.startswith(b'WARC/1.0\r\n',bp):
-      if done and bl-bp==0:
-        # really done
-        return
       breakpoint()
       raise ValueError("Not a WARC file? In %s at %s of %s (%s): %s[%s]"%(filename,
                                                                    bp,bl,fpos,
@@ -134,14 +66,15 @@
     bp+=10
     wtype: bytes = b''
     length: int = 0
-    state: int = 1
     tr: bytes = b'' # Was this record truncated?
     if (bp > bl):
       breakpoint()
     while not buf.startswith(b'\r\n',bp):
       # there should always be enough in the buffer to complete this loop,
-      #  because of the buffer update logic above
+      #  because of the buffer update logic at the end
       eol = buf.index(b'\r\n', bp)
+      if buf.startswith(b"Content-Length: ",bp):
+        length=wl=int(bufView[bp+16:eol-2])
       if buf.startswith(b"WARC-Truncated: ",bp):
         if bp+16==eol-2:
           tr = b"EMPTY"
@@ -161,24 +94,24 @@
                              bytes(bufView[bp+11:eol-2]),filename,
                              fpos-(bl-bp)))
       bp=eol
+    # record header done
     bp=eol+2
     if (bp > bl):
       breakpoint()
     if (hl:=(bp - start_1)) > HDRMAX:
       HDRMAX = hl
-    OUT: typing.BinaryIO
-    if done:
-      if (bp+length)>bl:
-        raise ValueError("Done but need more! %s + %s > %s in %s"%(bp,
-                         length,bl,filename))
+    #if done:
+    #  if (bp+length)>bl:
+    #    raise ValueError("Done but need more! %s + %s > %s in %s"%(bp,
+    #                     length,bl,filename))
     if (wtype in types):
       # Output whole or part 1 as required
       if whole:
         bp+=length
-        OUT=callback(wtype,bufView[start_1:bp],7)
+        _out=callback(wtype,bufView[start_1:bp],7)
         continue
       elif (parts & 1):
-        OUT=callback(wtype,bufView[start_1:eol],1)
+        _out=callback(wtype,bufView[start_1:eol],1)
       if parts!=1:
         while buf.startswith(b'\r\n',bp):
           bp+=2
@@ -186,16 +119,31 @@
         eob=bp+length
         while buf.startswith(b'\r\n',eob-2):
           eob-=2
+        
         # Only output parts (2 = HTTP header, 4 = body) that are wanted
         if parts & 2:
-          if wtype is META or wtype is INFO:
+          if wtype == META or wtype == INFO:
             # rest of the part
-            OUT=callback(wtype,bufView[start_2:eob],2)
+            _out=callback(wtype,bufView[start_2:eob],2)
           else:
             # request and response have http headers
             eo2=buf.index(b'\r\n\r\n',start_2)
-            OUT=callback(wtype,bufView[start_2:eo2+2],2)
+            _out=callback(wtype,bufView[start_2:eo2+2],2)
         if parts & 4:
           raise ValueError("Not implemented: body part (4): %s"%parts)
-    bp+=length
+    bp += length
+    rl: int
+    if (rl := (bp - start_1)) > RECORDMAX:
+      RECORDMAX = rl
+    keepLen: int
+    if (not done) and (keepLen := bl - bp) < BUFMIN:
+      # we need to shift and read more
+      buf[0:keepLen]=bufView[bp:bl]
+      with memoryview(buf)[keepLen:BUFSIZE] as xBuf:
+        nb=stream.readinto(xBuf)
+      bl = keepLen+nb
+      done = bl < BUFSIZE 
+      bp = 0
+      
     #print('end of loop',wtype,start_1,bp,eol,length,bl,file=sys.stderr)
+    print('Max record: %d, Max header: %d