changeset 287:fe78af4ea7c5

in the midst of trying to rethink the refill logic
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Mon, 07 Apr 2025 16:34:31 +0100
parents 147f648e4e5e
children d3fc7b5c73d0
files lib/python/cc/warc.py
diffstat 1 files changed, 69 insertions(+), 80 deletions(-) [+]
line wrap: on
line diff
--- a/lib/python/cc/warc.py	Mon Mar 24 14:30:32 2025 +0000
+++ b/lib/python/cc/warc.py	Mon Apr 07 16:34:31 2025 +0100
@@ -7,29 +7,33 @@
 from isal import igzip
 import cython, typing
 
+# see warc.pxd for function signatures
 
-RESP: cython.bytes = b'response'
-REQ: cython.bytes = b'request'
-META: cython.bytes = b'metadata'
-INFO: cython.bytes = b'warcinfo'
+RESP: bytes = b'response'
+REQ: bytes =  b'request'
+META: bytes =  b'metadata'
+INFO: bytes =  b'warcinfo'
 
-BUFSIZE: int = 2*1024*1024
+BUFSIZE = cython.declare(cython.long, 2*1024*1024)
+
 HDRMAX: int = 32*1024  # Not really max, there are some enormous ones, see below
 
-def refill(buf: char[::1], bufView: char[::1], stream: typing.BinaryIO,
+def refill(buf: typing.ByteString, bufView: typing.ByteString, stream: typing.BinaryIO,
            start_1: int, bl: int, bp: int, eol: int,
-        length: int, needed: bool) -> (int, int, int, cython.bytes, int, char[::1], bool):
+        length: int, needed: bool) -> tuple[int, int, int, bytes,
+                                            int, typing.ByteString, bool]:
   global BUFSIZE
   whole: int
   xBuf: char[::1]
-  #if (stream.tell() > 2381000000):
+  #if (stream.tell() > 5766470000): # 82535
   #  breakpoint()
   if needed:
     # we need to keep from start_1 to bl
     keepFrom: int = start_1
     keepLen: int = bl-keepFrom
-    if (whole:=((bp-start_1)+length)) > BUFSIZE:
-      while whole > BUFSIZE:
+    if (whole:=((bp - start_1)+length)) > bl:
+      newb: long = BUFSIZE + whole
+      while newb > BUFSIZE:
         # Need a bigger buffer
         print('Growing buffer %s > %s'%(whole,BUFSIZE),file=sys.stderr)
         BUFSIZE=BUFSIZE+(64 * 1024)
@@ -59,68 +63,82 @@
   with memoryview(buf)[keepLen:BUFSIZE] as xBuf:
     nb=stream.readinto(xBuf)
   bl=keepLen+nb
+  #if (bp > bl):
+  #  breakpoint()
   return start_1, bp, eol, buf, bl, bufView, nb<spaceToFill
   
 
-def warc(filename,callback,types=['response'],whole=False,parts=7,debug=False):
+def warc(filename: str,
+         callback: typing.Callable[[bytes, typing.ByteString, int], typing.BinaryIO],
+         types: typing.List[bytes] = [b'response'], whole: bool = False, parts: int = 7,
+         debug: bool = False):
   '''parts is a bit-mask:
      1 for warc header;
      2 for req/resp HTTP header, warcinfo/metadata features;
      4 for req/resp body'''
   global BUFSIZE, HDRMAX
   # should do some sanity checking wrt parts and types
-  types=[(t if isinstance(t,bytes) else bytes(t,'utf8')) for t in types]
-  nb=0
   if filename.endswith(".gz"):
-    stream=igzip.IGzipFile(filename=filename)
+    stream: typing.BinaryIO = igzip.IGzipFile(filename=filename)
   else:
-    stream=open(filename,'rb',0)
-  buf=bytearray(BUFSIZE)
-  bufView=memoryview(buf)
-  fpos=bl=stream.readinto(buf)
-  bp=0
-  done=False
+    stream: typing.BinaryIO = open(filename,'rb',0)
+  buf: char[::1] = bytearray(BUFSIZE)
+  bufView: char[::1] =memoryview(buf)
+  fpos: long = 0
+  bp: long = 0
+  done: bool = False
+  bl: long = stream.readinto(buf)
   while True:
-    while buf.startswith(b'\r\n',bp,bl): # will Fail if buffer (nearly) empty
-      bp+=2
-    start_1=bp
+    start_1: long = bp
+    try:
+      # Check that we can at least see to the Content-Length Warc-header
+      clh_begin: long = buf.index(b'\r\nContent-Length: ', bp)
+      if clh_begin > bl:
+        raise ValueError
+      clh_end = buf.index(b'\r\n', clh_begin+2)
+      if clh_end > bl:
+        raise ValueError
+      length = wl = int(bufView[clh_begin+18:clh_end])
+      # There are some enormous TargetURIs which overflow HDRMAX
+      # so check whether we can see to the _end_ of the Warc-header
+      eowh = buf.index(b'\r\n\r\n', clh_end)
+      if eowh > bl:
+        raise ValueError
+    except ValueError:
+      # No! So we do an emergency buffer shift, forcing the restart
+      #  because skipping won't work as we're not at the end of the WARC
+      #  header yet
+      start_1, bp, _, buf, bl, bufView, done = refill(buf, bufView, stream,
+                                                      start_1, bl, bp, eol,
+                                                      length, True)
+      bp -= 2 # situation is slightly different from the other call to refill
+    if (bp > bl):
+      breakpoint()
     if not buf.startswith(b'WARC/1.0\r\n',bp):
       if done and bl-bp==0:
         # really done
         return
+      breakpoint()
       raise ValueError("Not a WARC file? In %s at %s of %s (%s): %s[%s]"%(filename,
                                                                    bp,bl,fpos,
          (buf[bp:min(bl,bp+20)] if bp<bl else buf[bl-20:bl]).decode('latin-1'),
                                                                      bl-bp))
     bp+=10
-    wtype=None
-    length=None
-    state=1
-    tr=None # Was this record truncated?
+    wtype: cython.bytes = b''
+    length: int = 0
+    state: int = 1
+    tr: cython.bytes = b'' # Was this record truncated?
+    if (bp > bl):
+      breakpoint()
     while not buf.startswith(b'\r\n',bp):
       # there should always be enough in the buffer to complete this loop,
-      #  because of the buffer update logic below
-      try:
-        eol = buf.index(b'\r\n',bp)+2
-      except ValueError:
-        # there are some enormous TargetURIs which overflow HDRMAX
-        # so we do an emergency buffer shift, forcing the restart
-        #  because skipping won't work as we're not at the end of the WARC
-        #  header yet
-        if not buf.startswith(b'WARC-Target-URI: ',bp):
-          raise
-        start_1, bp, _, buf, bl, bufView, done = refill(buf, bufView, stream,
-                                                        start_1, bl, bp, eol,
-                                                        length, True)
-        bp -= 2 # situation is slightly different from the other call to refill
-        eol = buf.index(b'\r\n',bp)+2
-      if buf.startswith(b"Content-Length: ",bp):
-        length=wl=int(bufView[bp+16:eol-2])
-      elif buf.startswith(b"WARC-Truncated: ",bp):
+      #  because of the buffer update logic above
+      eol = buf.index(b'\r\n', bp)
+      if buf.startswith(b"WARC-Truncated: ",bp):
         if bp+16==eol-2:
-          tr=b"EMPTY"
+          tr = b"EMPTY"
         else:
-          tr=bytes(bufView[bp+16:eol-2])
+          tr = bytes(bufView[bp+16:eol-2])
       elif buf.startswith(b'WARC-Type: ',bp):
         if buf.startswith(b's',bp+13):
           wtype = RESP
@@ -136,6 +154,9 @@
                              fpos-(bl-bp)))
       bp=eol
     bp=eol+2
+    OUT: typing.BinaryIO
+    if (bp > bl):
+      breakpoint()
     if done:
       if (bp+length)>bl:
         raise ValueError("Done but need more! %s + %s > %s in %s"%(bp,
@@ -172,38 +193,6 @@
             eo2=buf.index(b'\r\n\r\n',start_2)
             OUT=callback(wtype,bufView[start_2:eo2+2],2)
         if parts & 4:
-          # stale below here???
-          rec_text = []
-          for L in rec_text:
-            if state==2:
-              # HTTP header
-              wl -= len(L)
-              if not (L==b"" or L.startswith(b"\r")):
-                # Non-empty, it's (a continuation of) a header
-                if bl is None and L.startswith(b"Content-Length: "):
-                  bl=int(L[16:].rstrip())
-              else:
-                # Blank line, HTTP header is finished
-                if parts & 2:
-                  callback(wtype,bufView[start_2:start_2+L_start],2)
-                state=4
-                # The above is just for sanity, because we do _not_
-                #  continue with the outer loop,
-                #  since we can now block-output the entire rest of the
-                #  input buffer.
-                if bl is not None:
-                  if bl!=wl:
-                    print("length mismatch: %s %s %s here: %s given: %s trunc: %s"%\
-                          (length,#offset,
-                           filename,wl,bl,tr),file=sys.stderr)
-                # HTTP body
-                balance=start_2+rec_text.tell()
-                #print(balance,bl,wl,ll,ll-balance,file=sys.stderr)
-                # Output whatever is left
-                if parts & 4:
-                  callback(wtype,bufView[balance:balance+wl],4)
-                state=1
-
-              L_start=rec_text.tell()
+          raise ValueError("Not implemented: body part (4): %s"%parts)
     bp+=length
     #print('end of loop',wtype,start_1,bp,eol,length,bl,file=sys.stderr)