diff lib/python/cdx_segment.py @ 87:b6a5999d8e06

working with locking and copying
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Tue, 16 Mar 2021 16:20:02 +0000
parents b5fef78cbb26
children 464d2dfb99c9
line wrap: on
line diff
--- a/lib/python/cdx_segment.py	Mon Mar 15 14:26:42 2021 +0000
+++ b/lib/python/cdx_segment.py	Tue Mar 16 16:20:02 2021 +0000
@@ -6,10 +6,11 @@
   [all segments, all and only those paths matching segment-prefix*.{0..99}]
  idx_in is an alphabetically ordered index fragment (one of cdx/warc/...gz), relative to archive/cdx/warc
 '''
-import gzip
-from os import listdir, makedirs
+from os import listdir, makedirs, lseek, SEEK_END, SEEK_SET, read, write, fsync, system
 from datetime import datetime
-import sys,re
+from random import sample
+from lock import AtomicOpen
+import sys,re,gzip
 
 archive="CC-MAIN-%s"%sys.argv[1]
 adir="/beegfs/common_crawl/%s"%archive
@@ -17,20 +18,22 @@
 pref=sys.argv[2]
 
 afn=sys.argv[3]
+ifn=afn.split('.')[0]
 
 SPAT=re.compile("%s[0-9]*\\.[0-9]{1,2}$"%pref)
 IPAT=re.compile('"filename": "%s/segments/([0-9.]*)/([a-z]*)/'%apref)
 
 segdirs=[d for d in listdir(adir) if SPAT.match(d)]
+rr=("warc","robotstxt","crawldiagnostics")
 ss={}
 n={}
-for r in ("warc","robotstxt","crawldiagnostics"):
+for r in rr:
   ss[r]=rd=dict()
   n[r]=0
   for s in segdirs:
-    rdir="%s/%s/orig/cdx/%s"%(adir,s,r)
-    makedirs(rdir,0o755,exist_ok=True)
-    rd[s]=open("%s/cdx"%rdir,'at')
+    rdir="%s/%s/orig/cdx/%s"%(ifn,s,r)
+    makedirs(rdir,0o755)
+    rd[s]=open("%s/cdx"%rdir,'w+')
 
 idir="%s/cdx/warc"%adir
 
@@ -50,11 +53,26 @@
       sys.stderr.write("bogus: ",afn,l)
       e+=1
 
-for gg in ss.values():
-  for g in gg.values():
-    g.close()
+mt=datetime.now()
+print(mt,"copying",ifn,"%s ok, %d bogus, %d seconds so far"%(':'.join(map(str,n.values())),
+                                                              e,(mt-st).seconds),file=sys.stderr)
+# Randomise to try to avoid contention
+for s in sample(segdirs,100):
+  for r in rr:
+    of=ss[r][s]
+    of.flush()
+    o=of.fileno()
+    fsync(o)
+    with AtomicOpen("%s/%s/orig/cdx/%s/cdx"%(adir,s,r),"rb+") as df:
+      d=df.fileno()
+      while True:
+        data = read(o,131072)
+        if data == b'':  # end of file reached
+            break
+        write(d,data)
+    of.close()
+
+res=system("rm -r %s"%ifn)
 
 et=datetime.now()
-print(et,"finished",afn,"%s ok, %d bogus, %d seconds elapsed"%(':'.join(map(str,n.values())),
-                                                               e,(et-st).seconds),file=sys.stderr)
-        
+print(et,"finished",ifn,res,"%d seconds total"%((et-st).seconds),file=sys.stderr)