changeset 144:ec79bb4ccd74

refactor to enable rerun with fixup, based on previous merge which failed in a few places
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Mon, 02 Oct 2023 18:56:50 +0100
parents f63a8477c9df
children 170844e51987
files lib/python/cc/lmh/merge_date.py
diffstat 1 files changed, 89 insertions(+), 32 deletions(-) [+]
line wrap: on
line diff
--- a/lib/python/cc/lmh/merge_date.py	Mon Oct 02 18:55:48 2023 +0100
+++ b/lib/python/cc/lmh/merge_date.py	Mon Oct 02 18:56:50 2023 +0100
@@ -2,10 +2,14 @@
 '''Add timestamps from Last-Modified-dated (ks.tsv) files into
    that year's index
 
-Usage: merge_date.py ksvstream cdx-dir outdir
+Usage: merge_date.py [-d...] [-m merged-already] ksvstream cdx-dir outdir
 
 ksvstream consists of tab-separated key, CC date, url and Unix timestamp
-''' # '
+
+If merged-already is provided, that's a merge log from a previous run
+includes some Fail2 output at some point(s).  We try to skip the good results
+and only redo the files where problems occured
+'''
 
 import sys, io, os, os.path, time, re
 from isal import igzip
@@ -16,6 +20,11 @@
   sys.argv.pop(1)
   DEBUG += 1  
 
+MERGED=None
+if sys.argv[1] == '-m':
+  sys.argv.pop(1)
+  MERGED = sys.argv.pop(1)
+
 XPATH = "%s/cdx-00%%0.3d.gz"%sys.argv[2]
 NPATH = "%s/cdx-00%%0.3d"%sys.argv[3]
 
@@ -40,35 +49,84 @@
 
 os.makedirs(sys.argv[3], exist_ok=True)
 
-FN = 0
+FN = -1 # file id for original cdx files
+MN = -1 # file id of current old merge info
+
+XCNT = 0 # number of lines read from current original cdx file
+DCNT = 0 # number of lines read from merged date stream
+
+DL = None
+
+WCNT = 0 # number of lines written to current output cdx file
 
-XCNT = WCNT = 0
-DCNT = 0
+NF = open('/dev/null','w')
+XF = open('/dev/null','rb')
+if False:
+  MF = open(MERGED,'r')
+  PREV_DCNT = 0
+else:
+  MF = None
 
-XF = igzip.IGzipFile(filename=XPATH%0)
-NF = open(NN:=(NPATH%0),'wb')
+REDOING = False
 
 def nextLine():
   '''Move on to next index file if current has run out'''
-  global FN, NF, NPATH, NN, XF, XPATH, XCNT, DCNT, WCNT
+  global DF, FN, NF, NPATH, NN, MF, MN, ML, PREV_DCNT, XF
+  global XPATH, XCNT, DCNT, WCNT, REDOING
+  global DKEY, DDATE, DURL, DTIME
   while True:
     xl=XF.readline()
-    XCNT += 1
     if xl == b'':
       # need to move to next index file
+      if MF and REDOING:
+        oo = ML.split()
+        oo = [oo[0]]+[int(o) for o in oo[1:]]
+        if oo != (no:=[NN, XCNT, WCNT, DCNT]):
+          print(*('%s:\t%s<>%s'%vv for vv in
+                  zip(('NN', 'XCNT', 'WCNT', 'DCNT'),oo,no)),
+                sep='\n',file=sys.stderr)
+        REDOING=False
+      if FN != -1:
+        print(NN, flush=True) # so we can compress it
+        print(NN, XCNT, WCNT, DCNT,sep='\t',file=sys.stderr,flush=True)
+        time.sleep(0.1) # so they flush?
       FN += 1
-      XF.close()
-      NF.close()
-      print(NN, flush=True) # so we can compress it
-      print(NN, XCNT, WCNT, DCNT,sep='\t',file=sys.stderr,flush=True)
-      time.sleep(0.1) # so they flush?
+      if MF:
+        ML=MF.readline()
+        if ML:
+          if ML.startswith("Fail2:"):
+            while ML.startswith("Fail2:"):
+              for i in range(8):
+                ML = MF.readline()
+            REDOING=True
+            # fall through to close previous and open next
+          else:
+            # don't do anything with files
+            MN+=1
+            mo = ML.split()
+            NN = mo[0]
+            (XCNT, WCNT, DCNT) = [int(o) for o in mo[1:]]
+            # file col. 4 is 1 ahead of the game
+            for i in range((DCNT-1)-PREV_DCNT):
+              dl = DF.readline()
+            # hack because the first date of the next x file has
+            #   already been read and split
+            DKEY, DDATE, DURL, DTIME = dl.split(b'\t')
+            PREV_DCNT = DCNT
+            # We've skipped this one, go around again,
+            #  the existing XF will still be at EOF
+            continue
+      PREV_DCNT = DCNT
       XN=XPATH%FN
       if not os.path.exists(XN):
         return None
+      XF.close()
       XF = igzip.IGzipFile(filename=XN)
+      NF.close()
       NF = open((NN:=NPATH%FN), 'wb')
       xl = XF.readline()
-      WCNT = XCNT = 1
+      WCNT = XCNT = 0
+    XCNT += 1
     if WARC.search(xl):
       WCNT += 1
       return xl
@@ -78,9 +136,9 @@
         sys.stderr.write("out_rc\n")
 
 
-def nextDate(df,dn):
-  global DEBUG, DCNT, XCNT
-  dl = df.readline()
+def nextDate(dn):
+  global DEBUG, DF, DCNT, XCNT
+  dl = DF.readline()
   if dl == b'':
     # write out the last of the last index file, if any
     return "", "", "", 0
@@ -90,11 +148,10 @@
   DCNT += 1
   return dkey, ddate, durl, dtime
 
-with open(sys.argv[1], 'rb') as df:
+with open(sys.argv[1], 'rb') as DF:
   DCNT = 0
 
-  dkey, ddate, durl, dtime = nextDate(df,1)
-
+  DKEY, DDATE, DURL, DTIME = nextDate(1)
   while (xl := nextLine()) is not None:
     xkey, xdate, xprops = xl.split(b' ', maxsplit=2)
     m = URL.match(xprops)
@@ -105,38 +162,38 @@
     if DEBUG:
       sys.stderr.write("xl: %s\n"%(' '.join(xp.decode('ascii')
                                             for xp in (xkey, xdate, xurl))))
-    if dkey==xkey and ddate==xdate and durl==xurl:
+    if DKEY==xkey and DDATE==xdate and DURL==xurl:
       # Got it
       NF.write(xkey)
       NF.write(b' ')
       NF.write(xdate)
       NF.write(b' ')
       NF.write(xprops[:-2])
-      NF.write(b', "lastmod": "%d"}\n'%int(dtime[:-3]))
+      NF.write(b', "lastmod": "%d"}\n'%int(DTIME[:-3]))
       if DEBUG:
         sys.stderr.write("out_t: %s"%(' '.join(xp.decode('ascii')
                                              for xp in (xkey, xdate, xurl))))
-        sys.stderr.write(" %d\n"%int(dtime[:-3]))
+        sys.stderr.write(" %d\n"%int(DTIME[:-3]))
 
-      dkey, ddate, durl, dtime = nextDate(df,2)
+      DKEY, DDATE, DURL, DTIME = nextDate(2)
       continue
     else:
-      if dkey and xkey.decode('ascii')>(dkey.decode('ascii')):
+      if DKEY and xkey.decode('ascii')>(DKEY.decode('ascii')):
         # we've missed something, disaster looms
         print("Fail2:"
                "      xkey: %s\n"
-               "      dkey: %s\n"
+               "      DKEY: %s\n"
                "      xdate: %s\n"
-               "      ddate: %s\n"
+               "      DDATE: %s\n"
                "      xurl: %s\n"
-               "      durl: %s\n"
+               "      DURL: %s\n"
                "FN: %s XCNT: %s DCNT: %s\n"
-               "xl: %s"%(xkey, dkey, xdate, ddate,
-                         xurl, durl,
+               "xl: %s"%(xkey, DKEY, xdate, DDATE,
+                         xurl, DURL,
                          FN, XCNT, DCNT, xl),
               file=sys.stderr)
         # try to force recovery
-        dkey, ddate, durl, dtime = nextDate(df,3)
+        DKEY, DDATE, DURL, DTIME = nextDate(3)
         continue
       # else fall through to write
     NF.write(xl)