# HG changeset patch # User Henry S. Thompson # Date 1615911602 0 # Node ID b6a5999d8e068e046a36ec35981fab98eae0fb79 # Parent b5fef78cbb26f39848eebd2b57b48bca111f2a62 working with locking and copying diff -r b5fef78cbb26 -r b6a5999d8e06 bin/cdx_segment.sh --- a/bin/cdx_segment.sh Mon Mar 15 14:26:42 2021 +0000 +++ b/bin/cdx_segment.sh Tue Mar 16 16:20:02 2021 +0000 @@ -6,10 +6,11 @@ node=$SLURMD_NODENAME local=$SLURM_LOCALID proc=$SLURM_PROCID -echo $(date) $node:$proc $start +echo $(date) $node:$proc start module load gnu-parallel +PYTHONPATH=$PYTHONPATH:$HOME/lib/python parallel --will-cite -j $c lib/python/cdx_segment.py 2019-35 15 '{}' < cdx_segment/$proc.txt echo $(date) $proc end diff -r b5fef78cbb26 -r b6a5999d8e06 lib/python/cdx_segment.py --- a/lib/python/cdx_segment.py Mon Mar 15 14:26:42 2021 +0000 +++ b/lib/python/cdx_segment.py Tue Mar 16 16:20:02 2021 +0000 @@ -6,10 +6,11 @@ [all segments, all and only those paths matching segment-prefix*.{0..99}] idx_in is an alphabetically ordered index fragment (one of cdx/warc/...gz), relative to archive/cdx/warc ''' -import gzip -from os import listdir, makedirs +from os import listdir, makedirs, lseek, SEEK_END, SEEK_SET, read, write, fsync, system from datetime import datetime -import sys,re +from random import sample +from lock import AtomicOpen +import sys,re,gzip archive="CC-MAIN-%s"%sys.argv[1] adir="/beegfs/common_crawl/%s"%archive @@ -17,20 +18,22 @@ pref=sys.argv[2] afn=sys.argv[3] +ifn=afn.split('.')[0] SPAT=re.compile("%s[0-9]*\\.[0-9]{1,2}$"%pref) IPAT=re.compile('"filename": "%s/segments/([0-9.]*)/([a-z]*)/'%apref) segdirs=[d for d in listdir(adir) if SPAT.match(d)] +rr=("warc","robotstxt","crawldiagnostics") ss={} n={} -for r in ("warc","robotstxt","crawldiagnostics"): +for r in rr: ss[r]=rd=dict() n[r]=0 for s in segdirs: - rdir="%s/%s/orig/cdx/%s"%(adir,s,r) - makedirs(rdir,0o755,exist_ok=True) - rd[s]=open("%s/cdx"%rdir,'at') + rdir="%s/%s/orig/cdx/%s"%(ifn,s,r) + makedirs(rdir,0o755) + rd[s]=open("%s/cdx"%rdir,'w+') idir="%s/cdx/warc"%adir @@ -50,11 +53,26 @@ sys.stderr.write("bogus: ",afn,l) e+=1 -for gg in ss.values(): - for g in gg.values(): - g.close() +mt=datetime.now() +print(mt,"copying",ifn,"%s ok, %d bogus, %d seconds so far"%(':'.join(map(str,n.values())), + e,(mt-st).seconds),file=sys.stderr) +# Randomise to try to avoid contention +for s in sample(segdirs,100): + for r in rr: + of=ss[r][s] + of.flush() + o=of.fileno() + fsync(o) + with AtomicOpen("%s/%s/orig/cdx/%s/cdx"%(adir,s,r),"rb+") as df: + d=df.fileno() + while True: + data = read(o,131072) + if data == b'': # end of file reached + break + write(d,data) + of.close() + +res=system("rm -r %s"%ifn) et=datetime.now() -print(et,"finished",afn,"%s ok, %d bogus, %d seconds elapsed"%(':'.join(map(str,n.values())), - e,(et-st).seconds),file=sys.stderr) - +print(et,"finished",ifn,res,"%d seconds total"%((et-st).seconds),file=sys.stderr) diff -r b5fef78cbb26 -r b6a5999d8e06 lib/python/lock.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/python/lock.py Tue Mar 16 16:20:02 2021 +0000 @@ -0,0 +1,34 @@ +# Courtesy of https://stackoverflow.com/a/46407326 +import fcntl, os +def lock_file(f): + if f.writable(): fcntl.lockf(f, fcntl.LOCK_EX) +def unlock_file(f): + if f.writable(): fcntl.lockf(f, fcntl.LOCK_UN) + +# Class for ensuring that all file operations are atomic, treat +# initialization like a standard call to 'open' that happens to be atomic. +# This file opener *must* be used in a "with" block. +class AtomicOpen: + # Open the file with arguments provided by user. Then acquire + # a lock on that file object (WARNING: Advisory locking). + def __init__(self, path, *args, **kwargs): + # Open the file and acquire a lock on the file before operating + self.file = open(path,*args, **kwargs) + # Lock the opened file + lock_file(self.file) + + # Return the opened file object (knowing a lock has been obtained). + def __enter__(self, *args, **kwargs): return self.file + + # Unlock the file and close the file object. + def __exit__(self, exc_type=None, exc_value=None, traceback=None): + # Flush to make sure all buffered contents are written to file. + self.file.flush() + os.fsync(self.file.fileno()) + # Release the lock on the file. + unlock_file(self.file) + self.file.close() + # Handle exceptions that may have come up during execution, by + # default any exceptions are raised to the user. + if (exc_type != None): return False + else: return True