Mercurial > hg > cc > cirrus_work
changeset 289:f17aef7ba4a7
try simpler refill
| author | Henry S. Thompson <ht@inf.ed.ac.uk> |
|---|---|
| date | Wed, 09 Apr 2025 11:15:14 +0100 |
| parents | d3fc7b5c73d0 |
| children | 52c9d1875608 |
| files | lib/python/cc/warc.py |
| diffstat | 1 files changed, 56 insertions(+), 108 deletions(-) [+] |
line wrap: on
line diff
--- a/lib/python/cc/warc.py Tue Apr 08 16:06:33 2025 +0100 +++ b/lib/python/cc/warc.py Wed Apr 09 11:15:14 2025 +0100 @@ -5,127 +5,59 @@ import sys, io from isal import igzip -import cython, typing +import cython, typing, gzip # see warc.pxd for function signatures -RESP: bytes = b'response' -REQ: bytes = b'request' -META: bytes = b'metadata' -INFO: bytes = b'warcinfo' +INFO: int = 0 +RESP: int = 1 +REQ: int = 2 +META: int = 3 -BUFSIZE = cython.declare(cython.long, 16 * 1024 * 1024) +BUFSIZE: int = 16 * 1024 * 1024 BUFMIN: int = 3 * 512 * 1024 # 1.5MiB, will need to be increased # to 5.5MiB for CC-MAIN-2025-13 (Mar) and thereafter HDRMAX: int = 0 # will grow -ITEMMAX int = 0 # will grow - -def refill(buf: typing.ByteString, bufView: typing.ByteString, stream: typing.BinaryIO, - start_1: int, bl: int, bp: int, eol: int, - length: int, needed: bool) -> tuple[int, int, int, bytes, - int, typing.ByteString, bool]: - global BUFSIZE, BUFMIN, HDRMAX, ITEMMAX - whole: int - xBuf: char[::1] - #if (stream.tell() > 5766470000): # 82535 - # breakpoint() - if needed: - # we need to keep from start_1 to bl - keepFrom: int = start_1 - keepLen: int = bl-keepFrom - if (whole:=((bp - start_1)+length)) > bl: - newb: long = BUFSIZE + whole - while newb > BUFSIZE: - # Need a bigger buffer - print('Growing buffer %s > %s'%(whole,BUFSIZE),file=sys.stderr) - BUFSIZE=BUFSIZE+(64 * 1024) - newbuf: char[::1] = bytearray(BUFSIZE) - newbuf[0:keepLen]=bufView[keepFrom:bl] - bl = BUFSIZE - buf = newbuf - bufView = memoryview(buf) - else: - buf[0:keepLen]=bufView[keepFrom:bl] - eol=eol-start_1 - start_1=0 - bp=eol+2 - else: - # we can skip the rest of this part - if (bp+length)<=bl: - # we have at least some bytes from the next part - keepLen=bl-(bp+length) - buf[0:keepLen]=bufView[bl-keepLen:bl] - else: - # we don't have all of the bytes from the current part - # so can skip the rest of it - keepLen=0 - stream.seek(stream.tell() + bp + length - bl) - bp=0 - spaceToFill: int = BUFSIZE-keepLen - with memoryview(buf)[keepLen:BUFSIZE] as xBuf: - nb=stream.readinto(xBuf) - bl=keepLen+nb - #if (bp > bl): - # breakpoint() - return start_1, bp, eol, buf, bl, bufView, nb<spaceToFill - +RECORDMAX int = 0 # will grow def warc(filename: str, callback: typing.Callable[[bytes, typing.ByteString, int], typing.BinaryIO], - types: typing.List[bytes] = [b'response'], whole: bool = False, parts: int = 7, + types: typing.List[int] = [RESP], whole: bool = False, parts: int = 7, debug: bool = False): '''parts is a bit-mask: 1 for warc header; 2 for req/resp HTTP header, warcinfo/metadata features; 4 for req/resp body''' # Not currently trying to depend on this, but I believe that - # warcinfo: warc-headers+1bl+crawl-headers+2bl - # request: warc-headers+1bl+HTTP-headers+3bl - # response: warc-headers+1bl+HTTP-headers+[1bl or 2bl]+HTTP-body+1bl - # metadata: warc-headers+1bl+metadata-headers+3bl -q global BUFSIZE, HDRMAX + # warcinfo: record-headers+1bl+crawl-headers+2bl + # request: record-headers+1bl+HTTP-headers+3bl + # response: record-headers+1bl+HTTP-headers+[1bl or 2bl]+HTTP-body+1bl + # metadata: record-headers+1bl+metadata-headers+3bl + global BUFSIZE, HDRMAX, BUFMIN, RECORDMAX + _out: typing.BinaryIO # should do some sanity checking wrt parts and types - if filename.endswith(".gz"): - stream: typing.BinaryIO = igzip.IGzipFile(filename=filename) - else: - stream: typing.BinaryIO = open(filename,'rb',0) + stream: typing.BinaryIO + with gzip.open(filename, 'r') as fh: + try: + fh.read(1) + except gzip.BadGzipFile: + stream = open(filename,'rb',0) + else: + stream = igzip.IGzipFile(filename=filename) buf: char[::1] = bytearray(BUFSIZE) - bufView: char[::1] =memoryview(buf) + bufView: char[::1] = memoryview(buf) fpos: long = 0 bp: long = 0 - done: bool = False bl: long = stream.readinto(buf) - while True: + done: bool = bl < BUFSIZE + while not (done and bl == bp): + while buf.startswith(b'\r\n',bp): + bp+=2 start_1: long = bp - try: - # Check that we can at least see to the Content-Length Warc-header - clh_begin: long = buf.index(b'\r\nContent-Length: ', bp) - if clh_begin > bl: - raise ValueError - clh_end = buf.index(b'\r\n', clh_begin+2) - if clh_end > bl: - raise ValueError - length = wl = int(bufView[clh_begin+18:clh_end]) - # Check whether we can see to the _end_ of the Warc-header - eowh = buf.index(b'\r\n\r\n', clh_end) - if eowh > bl: - raise ValueError - except ValueError: - # We can't see to the end of this item - # So we do a buffer shift, forcing the restart - # because skipping won't work as we're not at the end of the WARC - # header yet - start_1, bp, _, buf, bl, bufView, done = refill(buf, bufView, stream, - start_1, bl, bp, eol, - length, True) - bp -= 2 # situation is slightly different from the other call to refill if (bp > bl): breakpoint() if not buf.startswith(b'WARC/1.0\r\n',bp): - if done and bl-bp==0: - # really done - return breakpoint() raise ValueError("Not a WARC file? In %s at %s of %s (%s): %s[%s]"%(filename, bp,bl,fpos, @@ -134,14 +66,15 @@ bp+=10 wtype: bytes = b'' length: int = 0 - state: int = 1 tr: bytes = b'' # Was this record truncated? if (bp > bl): breakpoint() while not buf.startswith(b'\r\n',bp): # there should always be enough in the buffer to complete this loop, - # because of the buffer update logic above + # because of the buffer update logic at the end eol = buf.index(b'\r\n', bp) + if buf.startswith(b"Content-Length: ",bp): + length=wl=int(bufView[bp+16:eol-2]) if buf.startswith(b"WARC-Truncated: ",bp): if bp+16==eol-2: tr = b"EMPTY" @@ -161,24 +94,24 @@ bytes(bufView[bp+11:eol-2]),filename, fpos-(bl-bp))) bp=eol + # record header done bp=eol+2 if (bp > bl): breakpoint() if (hl:=(bp - start_1)) > HDRMAX: HDRMAX = hl - OUT: typing.BinaryIO - if done: - if (bp+length)>bl: - raise ValueError("Done but need more! %s + %s > %s in %s"%(bp, - length,bl,filename)) + #if done: + # if (bp+length)>bl: + # raise ValueError("Done but need more! %s + %s > %s in %s"%(bp, + # length,bl,filename)) if (wtype in types): # Output whole or part 1 as required if whole: bp+=length - OUT=callback(wtype,bufView[start_1:bp],7) + _out=callback(wtype,bufView[start_1:bp],7) continue elif (parts & 1): - OUT=callback(wtype,bufView[start_1:eol],1) + _out=callback(wtype,bufView[start_1:eol],1) if parts!=1: while buf.startswith(b'\r\n',bp): bp+=2 @@ -186,16 +119,31 @@ eob=bp+length while buf.startswith(b'\r\n',eob-2): eob-=2 + # Only output parts (2 = HTTP header, 4 = body) that are wanted if parts & 2: - if wtype is META or wtype is INFO: + if wtype == META or wtype == INFO: # rest of the part - OUT=callback(wtype,bufView[start_2:eob],2) + _out=callback(wtype,bufView[start_2:eob],2) else: # request and response have http headers eo2=buf.index(b'\r\n\r\n',start_2) - OUT=callback(wtype,bufView[start_2:eo2+2],2) + _out=callback(wtype,bufView[start_2:eo2+2],2) if parts & 4: raise ValueError("Not implemented: body part (4): %s"%parts) - bp+=length + bp += length + rl: int + if (rl := (bp - start_1)) > RECORDMAX: + RECORDMAX = rl + keepLen: int + if (not done) and (keepLen := bl - bp) < BUFMIN: + # we need to shift and read more + buf[0:keepLen]=bufView[bp:bl] + with memoryview(buf)[keepLen:BUFSIZE] as xBuf: + nb=stream.readinto(xBuf) + bl = keepLen+nb + done = bl < BUFSIZE + bp = 0 + #print('end of loop',wtype,start_1,bp,eol,length,bl,file=sys.stderr) + print('Max record: %d, Max header: %d
