Mercurial > hg > cc > cirrus_work
changeset 144:ec79bb4ccd74
refactor to enable rerun with fixup,
based on previous merge which failed in a few places
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Mon, 02 Oct 2023 18:56:50 +0100 |
parents | f63a8477c9df |
children | 170844e51987 |
files | lib/python/cc/lmh/merge_date.py |
diffstat | 1 files changed, 89 insertions(+), 32 deletions(-) [+] |
line wrap: on
line diff
--- a/lib/python/cc/lmh/merge_date.py Mon Oct 02 18:55:48 2023 +0100 +++ b/lib/python/cc/lmh/merge_date.py Mon Oct 02 18:56:50 2023 +0100 @@ -2,10 +2,14 @@ '''Add timestamps from Last-Modified-dated (ks.tsv) files into that year's index -Usage: merge_date.py ksvstream cdx-dir outdir +Usage: merge_date.py [-d...] [-m merged-already] ksvstream cdx-dir outdir ksvstream consists of tab-separated key, CC date, url and Unix timestamp -''' # ' + +If merged-already is provided, that's a merge log from a previous run +includes some Fail2 output at some point(s). We try to skip the good results +and only redo the files where problems occured +''' import sys, io, os, os.path, time, re from isal import igzip @@ -16,6 +20,11 @@ sys.argv.pop(1) DEBUG += 1 +MERGED=None +if sys.argv[1] == '-m': + sys.argv.pop(1) + MERGED = sys.argv.pop(1) + XPATH = "%s/cdx-00%%0.3d.gz"%sys.argv[2] NPATH = "%s/cdx-00%%0.3d"%sys.argv[3] @@ -40,35 +49,84 @@ os.makedirs(sys.argv[3], exist_ok=True) -FN = 0 +FN = -1 # file id for original cdx files +MN = -1 # file id of current old merge info + +XCNT = 0 # number of lines read from current original cdx file +DCNT = 0 # number of lines read from merged date stream + +DL = None + +WCNT = 0 # number of lines written to current output cdx file -XCNT = WCNT = 0 -DCNT = 0 +NF = open('/dev/null','w') +XF = open('/dev/null','rb') +if False: + MF = open(MERGED,'r') + PREV_DCNT = 0 +else: + MF = None -XF = igzip.IGzipFile(filename=XPATH%0) -NF = open(NN:=(NPATH%0),'wb') +REDOING = False def nextLine(): '''Move on to next index file if current has run out''' - global FN, NF, NPATH, NN, XF, XPATH, XCNT, DCNT, WCNT + global DF, FN, NF, NPATH, NN, MF, MN, ML, PREV_DCNT, XF + global XPATH, XCNT, DCNT, WCNT, REDOING + global DKEY, DDATE, DURL, DTIME while True: xl=XF.readline() - XCNT += 1 if xl == b'': # need to move to next index file + if MF and REDOING: + oo = ML.split() + oo = [oo[0]]+[int(o) for o in oo[1:]] + if oo != (no:=[NN, XCNT, WCNT, DCNT]): + print(*('%s:\t%s<>%s'%vv for vv in + zip(('NN', 'XCNT', 'WCNT', 'DCNT'),oo,no)), + sep='\n',file=sys.stderr) + REDOING=False + if FN != -1: + print(NN, flush=True) # so we can compress it + print(NN, XCNT, WCNT, DCNT,sep='\t',file=sys.stderr,flush=True) + time.sleep(0.1) # so they flush? FN += 1 - XF.close() - NF.close() - print(NN, flush=True) # so we can compress it - print(NN, XCNT, WCNT, DCNT,sep='\t',file=sys.stderr,flush=True) - time.sleep(0.1) # so they flush? + if MF: + ML=MF.readline() + if ML: + if ML.startswith("Fail2:"): + while ML.startswith("Fail2:"): + for i in range(8): + ML = MF.readline() + REDOING=True + # fall through to close previous and open next + else: + # don't do anything with files + MN+=1 + mo = ML.split() + NN = mo[0] + (XCNT, WCNT, DCNT) = [int(o) for o in mo[1:]] + # file col. 4 is 1 ahead of the game + for i in range((DCNT-1)-PREV_DCNT): + dl = DF.readline() + # hack because the first date of the next x file has + # already been read and split + DKEY, DDATE, DURL, DTIME = dl.split(b'\t') + PREV_DCNT = DCNT + # We've skipped this one, go around again, + # the existing XF will still be at EOF + continue + PREV_DCNT = DCNT XN=XPATH%FN if not os.path.exists(XN): return None + XF.close() XF = igzip.IGzipFile(filename=XN) + NF.close() NF = open((NN:=NPATH%FN), 'wb') xl = XF.readline() - WCNT = XCNT = 1 + WCNT = XCNT = 0 + XCNT += 1 if WARC.search(xl): WCNT += 1 return xl @@ -78,9 +136,9 @@ sys.stderr.write("out_rc\n") -def nextDate(df,dn): - global DEBUG, DCNT, XCNT - dl = df.readline() +def nextDate(dn): + global DEBUG, DF, DCNT, XCNT + dl = DF.readline() if dl == b'': # write out the last of the last index file, if any return "", "", "", 0 @@ -90,11 +148,10 @@ DCNT += 1 return dkey, ddate, durl, dtime -with open(sys.argv[1], 'rb') as df: +with open(sys.argv[1], 'rb') as DF: DCNT = 0 - dkey, ddate, durl, dtime = nextDate(df,1) - + DKEY, DDATE, DURL, DTIME = nextDate(1) while (xl := nextLine()) is not None: xkey, xdate, xprops = xl.split(b' ', maxsplit=2) m = URL.match(xprops) @@ -105,38 +162,38 @@ if DEBUG: sys.stderr.write("xl: %s\n"%(' '.join(xp.decode('ascii') for xp in (xkey, xdate, xurl)))) - if dkey==xkey and ddate==xdate and durl==xurl: + if DKEY==xkey and DDATE==xdate and DURL==xurl: # Got it NF.write(xkey) NF.write(b' ') NF.write(xdate) NF.write(b' ') NF.write(xprops[:-2]) - NF.write(b', "lastmod": "%d"}\n'%int(dtime[:-3])) + NF.write(b', "lastmod": "%d"}\n'%int(DTIME[:-3])) if DEBUG: sys.stderr.write("out_t: %s"%(' '.join(xp.decode('ascii') for xp in (xkey, xdate, xurl)))) - sys.stderr.write(" %d\n"%int(dtime[:-3])) + sys.stderr.write(" %d\n"%int(DTIME[:-3])) - dkey, ddate, durl, dtime = nextDate(df,2) + DKEY, DDATE, DURL, DTIME = nextDate(2) continue else: - if dkey and xkey.decode('ascii')>(dkey.decode('ascii')): + if DKEY and xkey.decode('ascii')>(DKEY.decode('ascii')): # we've missed something, disaster looms print("Fail2:" " xkey: %s\n" - " dkey: %s\n" + " DKEY: %s\n" " xdate: %s\n" - " ddate: %s\n" + " DDATE: %s\n" " xurl: %s\n" - " durl: %s\n" + " DURL: %s\n" "FN: %s XCNT: %s DCNT: %s\n" - "xl: %s"%(xkey, dkey, xdate, ddate, - xurl, durl, + "xl: %s"%(xkey, DKEY, xdate, DDATE, + xurl, DURL, FN, XCNT, DCNT, xl), file=sys.stderr) # try to force recovery - dkey, ddate, durl, dtime = nextDate(df,3) + DKEY, DDATE, DURL, DTIME = nextDate(3) continue # else fall through to write NF.write(xl)