Mercurial > hg > cc > cirrus_work
changeset 287:fe78af4ea7c5
in the midst of trying to rethink the refill logic
| author | Henry S. Thompson <ht@inf.ed.ac.uk> |
|---|---|
| date | Mon, 07 Apr 2025 16:34:31 +0100 |
| parents | 147f648e4e5e |
| children | d3fc7b5c73d0 |
| files | lib/python/cc/warc.py |
| diffstat | 1 files changed, 69 insertions(+), 80 deletions(-) [+] |
line wrap: on
line diff
--- a/lib/python/cc/warc.py Mon Mar 24 14:30:32 2025 +0000 +++ b/lib/python/cc/warc.py Mon Apr 07 16:34:31 2025 +0100 @@ -7,29 +7,33 @@ from isal import igzip import cython, typing +# see warc.pxd for function signatures -RESP: cython.bytes = b'response' -REQ: cython.bytes = b'request' -META: cython.bytes = b'metadata' -INFO: cython.bytes = b'warcinfo' +RESP: bytes = b'response' +REQ: bytes = b'request' +META: bytes = b'metadata' +INFO: bytes = b'warcinfo' -BUFSIZE: int = 2*1024*1024 +BUFSIZE = cython.declare(cython.long, 2*1024*1024) + HDRMAX: int = 32*1024 # Not really max, there are some enormous ones, see below -def refill(buf: char[::1], bufView: char[::1], stream: typing.BinaryIO, +def refill(buf: typing.ByteString, bufView: typing.ByteString, stream: typing.BinaryIO, start_1: int, bl: int, bp: int, eol: int, - length: int, needed: bool) -> (int, int, int, cython.bytes, int, char[::1], bool): + length: int, needed: bool) -> tuple[int, int, int, bytes, + int, typing.ByteString, bool]: global BUFSIZE whole: int xBuf: char[::1] - #if (stream.tell() > 2381000000): + #if (stream.tell() > 5766470000): # 82535 # breakpoint() if needed: # we need to keep from start_1 to bl keepFrom: int = start_1 keepLen: int = bl-keepFrom - if (whole:=((bp-start_1)+length)) > BUFSIZE: - while whole > BUFSIZE: + if (whole:=((bp - start_1)+length)) > bl: + newb: long = BUFSIZE + whole + while newb > BUFSIZE: # Need a bigger buffer print('Growing buffer %s > %s'%(whole,BUFSIZE),file=sys.stderr) BUFSIZE=BUFSIZE+(64 * 1024) @@ -59,68 +63,82 @@ with memoryview(buf)[keepLen:BUFSIZE] as xBuf: nb=stream.readinto(xBuf) bl=keepLen+nb + #if (bp > bl): + # breakpoint() return start_1, bp, eol, buf, bl, bufView, nb<spaceToFill -def warc(filename,callback,types=['response'],whole=False,parts=7,debug=False): +def warc(filename: str, + callback: typing.Callable[[bytes, typing.ByteString, int], typing.BinaryIO], + types: typing.List[bytes] = [b'response'], whole: bool = False, parts: int = 7, + debug: bool = False): '''parts is a bit-mask: 1 for warc header; 2 for req/resp HTTP header, warcinfo/metadata features; 4 for req/resp body''' global BUFSIZE, HDRMAX # should do some sanity checking wrt parts and types - types=[(t if isinstance(t,bytes) else bytes(t,'utf8')) for t in types] - nb=0 if filename.endswith(".gz"): - stream=igzip.IGzipFile(filename=filename) + stream: typing.BinaryIO = igzip.IGzipFile(filename=filename) else: - stream=open(filename,'rb',0) - buf=bytearray(BUFSIZE) - bufView=memoryview(buf) - fpos=bl=stream.readinto(buf) - bp=0 - done=False + stream: typing.BinaryIO = open(filename,'rb',0) + buf: char[::1] = bytearray(BUFSIZE) + bufView: char[::1] =memoryview(buf) + fpos: long = 0 + bp: long = 0 + done: bool = False + bl: long = stream.readinto(buf) while True: - while buf.startswith(b'\r\n',bp,bl): # will Fail if buffer (nearly) empty - bp+=2 - start_1=bp + start_1: long = bp + try: + # Check that we can at least see to the Content-Length Warc-header + clh_begin: long = buf.index(b'\r\nContent-Length: ', bp) + if clh_begin > bl: + raise ValueError + clh_end = buf.index(b'\r\n', clh_begin+2) + if clh_end > bl: + raise ValueError + length = wl = int(bufView[clh_begin+18:clh_end]) + # There are some enormous TargetURIs which overflow HDRMAX + # so check whether we can see to the _end_ of the Warc-header + eowh = buf.index(b'\r\n\r\n', clh_end) + if eowh > bl: + raise ValueError + except ValueError: + # No! So we do an emergency buffer shift, forcing the restart + # because skipping won't work as we're not at the end of the WARC + # header yet + start_1, bp, _, buf, bl, bufView, done = refill(buf, bufView, stream, + start_1, bl, bp, eol, + length, True) + bp -= 2 # situation is slightly different from the other call to refill + if (bp > bl): + breakpoint() if not buf.startswith(b'WARC/1.0\r\n',bp): if done and bl-bp==0: # really done return + breakpoint() raise ValueError("Not a WARC file? In %s at %s of %s (%s): %s[%s]"%(filename, bp,bl,fpos, (buf[bp:min(bl,bp+20)] if bp<bl else buf[bl-20:bl]).decode('latin-1'), bl-bp)) bp+=10 - wtype=None - length=None - state=1 - tr=None # Was this record truncated? + wtype: cython.bytes = b'' + length: int = 0 + state: int = 1 + tr: cython.bytes = b'' # Was this record truncated? + if (bp > bl): + breakpoint() while not buf.startswith(b'\r\n',bp): # there should always be enough in the buffer to complete this loop, - # because of the buffer update logic below - try: - eol = buf.index(b'\r\n',bp)+2 - except ValueError: - # there are some enormous TargetURIs which overflow HDRMAX - # so we do an emergency buffer shift, forcing the restart - # because skipping won't work as we're not at the end of the WARC - # header yet - if not buf.startswith(b'WARC-Target-URI: ',bp): - raise - start_1, bp, _, buf, bl, bufView, done = refill(buf, bufView, stream, - start_1, bl, bp, eol, - length, True) - bp -= 2 # situation is slightly different from the other call to refill - eol = buf.index(b'\r\n',bp)+2 - if buf.startswith(b"Content-Length: ",bp): - length=wl=int(bufView[bp+16:eol-2]) - elif buf.startswith(b"WARC-Truncated: ",bp): + # because of the buffer update logic above + eol = buf.index(b'\r\n', bp) + if buf.startswith(b"WARC-Truncated: ",bp): if bp+16==eol-2: - tr=b"EMPTY" + tr = b"EMPTY" else: - tr=bytes(bufView[bp+16:eol-2]) + tr = bytes(bufView[bp+16:eol-2]) elif buf.startswith(b'WARC-Type: ',bp): if buf.startswith(b's',bp+13): wtype = RESP @@ -136,6 +154,9 @@ fpos-(bl-bp))) bp=eol bp=eol+2 + OUT: typing.BinaryIO + if (bp > bl): + breakpoint() if done: if (bp+length)>bl: raise ValueError("Done but need more! %s + %s > %s in %s"%(bp, @@ -172,38 +193,6 @@ eo2=buf.index(b'\r\n\r\n',start_2) OUT=callback(wtype,bufView[start_2:eo2+2],2) if parts & 4: - # stale below here??? - rec_text = [] - for L in rec_text: - if state==2: - # HTTP header - wl -= len(L) - if not (L==b"" or L.startswith(b"\r")): - # Non-empty, it's (a continuation of) a header - if bl is None and L.startswith(b"Content-Length: "): - bl=int(L[16:].rstrip()) - else: - # Blank line, HTTP header is finished - if parts & 2: - callback(wtype,bufView[start_2:start_2+L_start],2) - state=4 - # The above is just for sanity, because we do _not_ - # continue with the outer loop, - # since we can now block-output the entire rest of the - # input buffer. - if bl is not None: - if bl!=wl: - print("length mismatch: %s %s %s here: %s given: %s trunc: %s"%\ - (length,#offset, - filename,wl,bl,tr),file=sys.stderr) - # HTTP body - balance=start_2+rec_text.tell() - #print(balance,bl,wl,ll,ll-balance,file=sys.stderr) - # Output whatever is left - if parts & 4: - callback(wtype,bufView[balance:balance+wl],4) - state=1 - - L_start=rec_text.tell() + raise ValueError("Not implemented: body part (4): %s"%parts) bp+=length #print('end of loop',wtype,start_1,bp,eol,length,bl,file=sys.stderr)
