changeset 87:b6a5999d8e06

working with locking and copying
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Tue, 16 Mar 2021 16:20:02 +0000
parents b5fef78cbb26
children 464d2dfb99c9
files bin/cdx_segment.sh lib/python/cdx_segment.py lib/python/lock.py
diffstat 3 files changed, 67 insertions(+), 14 deletions(-) [+]
line wrap: on
line diff
--- a/bin/cdx_segment.sh	Mon Mar 15 14:26:42 2021 +0000
+++ b/bin/cdx_segment.sh	Tue Mar 16 16:20:02 2021 +0000
@@ -6,10 +6,11 @@
 node=$SLURMD_NODENAME
 local=$SLURM_LOCALID
 proc=$SLURM_PROCID
-echo $(date) $node:$proc $start
+echo $(date) $node:$proc start
 
 module load gnu-parallel
 
+PYTHONPATH=$PYTHONPATH:$HOME/lib/python
 parallel --will-cite -j $c lib/python/cdx_segment.py 2019-35 15 '{}' < cdx_segment/$proc.txt
 
 echo $(date) $proc end
--- a/lib/python/cdx_segment.py	Mon Mar 15 14:26:42 2021 +0000
+++ b/lib/python/cdx_segment.py	Tue Mar 16 16:20:02 2021 +0000
@@ -6,10 +6,11 @@
   [all segments, all and only those paths matching segment-prefix*.{0..99}]
  idx_in is an alphabetically ordered index fragment (one of cdx/warc/...gz), relative to archive/cdx/warc
 '''
-import gzip
-from os import listdir, makedirs
+from os import listdir, makedirs, lseek, SEEK_END, SEEK_SET, read, write, fsync, system
 from datetime import datetime
-import sys,re
+from random import sample
+from lock import AtomicOpen
+import sys,re,gzip
 
 archive="CC-MAIN-%s"%sys.argv[1]
 adir="/beegfs/common_crawl/%s"%archive
@@ -17,20 +18,22 @@
 pref=sys.argv[2]
 
 afn=sys.argv[3]
+ifn=afn.split('.')[0]
 
 SPAT=re.compile("%s[0-9]*\\.[0-9]{1,2}$"%pref)
 IPAT=re.compile('"filename": "%s/segments/([0-9.]*)/([a-z]*)/'%apref)
 
 segdirs=[d for d in listdir(adir) if SPAT.match(d)]
+rr=("warc","robotstxt","crawldiagnostics")
 ss={}
 n={}
-for r in ("warc","robotstxt","crawldiagnostics"):
+for r in rr:
   ss[r]=rd=dict()
   n[r]=0
   for s in segdirs:
-    rdir="%s/%s/orig/cdx/%s"%(adir,s,r)
-    makedirs(rdir,0o755,exist_ok=True)
-    rd[s]=open("%s/cdx"%rdir,'at')
+    rdir="%s/%s/orig/cdx/%s"%(ifn,s,r)
+    makedirs(rdir,0o755)
+    rd[s]=open("%s/cdx"%rdir,'w+')
 
 idir="%s/cdx/warc"%adir
 
@@ -50,11 +53,26 @@
       sys.stderr.write("bogus: ",afn,l)
       e+=1
 
-for gg in ss.values():
-  for g in gg.values():
-    g.close()
+mt=datetime.now()
+print(mt,"copying",ifn,"%s ok, %d bogus, %d seconds so far"%(':'.join(map(str,n.values())),
+                                                              e,(mt-st).seconds),file=sys.stderr)
+# Randomise to try to avoid contention
+for s in sample(segdirs,100):
+  for r in rr:
+    of=ss[r][s]
+    of.flush()
+    o=of.fileno()
+    fsync(o)
+    with AtomicOpen("%s/%s/orig/cdx/%s/cdx"%(adir,s,r),"rb+") as df:
+      d=df.fileno()
+      while True:
+        data = read(o,131072)
+        if data == b'':  # end of file reached
+            break
+        write(d,data)
+    of.close()
+
+res=system("rm -r %s"%ifn)
 
 et=datetime.now()
-print(et,"finished",afn,"%s ok, %d bogus, %d seconds elapsed"%(':'.join(map(str,n.values())),
-                                                               e,(et-st).seconds),file=sys.stderr)
-        
+print(et,"finished",ifn,res,"%d seconds total"%((et-st).seconds),file=sys.stderr)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/python/lock.py	Tue Mar 16 16:20:02 2021 +0000
@@ -0,0 +1,34 @@
+# Courtesy of https://stackoverflow.com/a/46407326
+import fcntl, os
+def lock_file(f):
+  if f.writable(): fcntl.lockf(f, fcntl.LOCK_EX)
+def unlock_file(f):
+  if f.writable(): fcntl.lockf(f, fcntl.LOCK_UN)
+
+# Class for ensuring that all file operations are atomic, treat
+# initialization like a standard call to 'open' that happens to be atomic.
+# This file opener *must* be used in a "with" block.
+class AtomicOpen:
+    # Open the file with arguments provided by user. Then acquire
+    # a lock on that file object (WARNING: Advisory locking).
+    def __init__(self, path, *args, **kwargs):
+        # Open the file and acquire a lock on the file before operating
+        self.file = open(path,*args, **kwargs)
+        # Lock the opened file
+        lock_file(self.file)
+
+    # Return the opened file object (knowing a lock has been obtained).
+    def __enter__(self, *args, **kwargs): return self.file
+
+    # Unlock the file and close the file object.
+    def __exit__(self, exc_type=None, exc_value=None, traceback=None):        
+        # Flush to make sure all buffered contents are written to file.
+        self.file.flush()
+        os.fsync(self.file.fileno())
+        # Release the lock on the file.
+        unlock_file(self.file)
+        self.file.close()
+        # Handle exceptions that may have come up during execution, by
+        # default any exceptions are raised to the user.
+        if (exc_type != None): return False
+        else:                  return True