Mercurial > hg > cc > cirrus_home
diff lib/python/cdx_segment.py @ 87:b6a5999d8e06
working with locking and copying
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Tue, 16 Mar 2021 16:20:02 +0000 |
parents | b5fef78cbb26 |
children | 464d2dfb99c9 |
line wrap: on
line diff
--- a/lib/python/cdx_segment.py Mon Mar 15 14:26:42 2021 +0000 +++ b/lib/python/cdx_segment.py Tue Mar 16 16:20:02 2021 +0000 @@ -6,10 +6,11 @@ [all segments, all and only those paths matching segment-prefix*.{0..99}] idx_in is an alphabetically ordered index fragment (one of cdx/warc/...gz), relative to archive/cdx/warc ''' -import gzip -from os import listdir, makedirs +from os import listdir, makedirs, lseek, SEEK_END, SEEK_SET, read, write, fsync, system from datetime import datetime -import sys,re +from random import sample +from lock import AtomicOpen +import sys,re,gzip archive="CC-MAIN-%s"%sys.argv[1] adir="/beegfs/common_crawl/%s"%archive @@ -17,20 +18,22 @@ pref=sys.argv[2] afn=sys.argv[3] +ifn=afn.split('.')[0] SPAT=re.compile("%s[0-9]*\\.[0-9]{1,2}$"%pref) IPAT=re.compile('"filename": "%s/segments/([0-9.]*)/([a-z]*)/'%apref) segdirs=[d for d in listdir(adir) if SPAT.match(d)] +rr=("warc","robotstxt","crawldiagnostics") ss={} n={} -for r in ("warc","robotstxt","crawldiagnostics"): +for r in rr: ss[r]=rd=dict() n[r]=0 for s in segdirs: - rdir="%s/%s/orig/cdx/%s"%(adir,s,r) - makedirs(rdir,0o755,exist_ok=True) - rd[s]=open("%s/cdx"%rdir,'at') + rdir="%s/%s/orig/cdx/%s"%(ifn,s,r) + makedirs(rdir,0o755) + rd[s]=open("%s/cdx"%rdir,'w+') idir="%s/cdx/warc"%adir @@ -50,11 +53,26 @@ sys.stderr.write("bogus: ",afn,l) e+=1 -for gg in ss.values(): - for g in gg.values(): - g.close() +mt=datetime.now() +print(mt,"copying",ifn,"%s ok, %d bogus, %d seconds so far"%(':'.join(map(str,n.values())), + e,(mt-st).seconds),file=sys.stderr) +# Randomise to try to avoid contention +for s in sample(segdirs,100): + for r in rr: + of=ss[r][s] + of.flush() + o=of.fileno() + fsync(o) + with AtomicOpen("%s/%s/orig/cdx/%s/cdx"%(adir,s,r),"rb+") as df: + d=df.fileno() + while True: + data = read(o,131072) + if data == b'': # end of file reached + break + write(d,data) + of.close() + +res=system("rm -r %s"%ifn) et=datetime.now() -print(et,"finished",afn,"%s ok, %d bogus, %d seconds elapsed"%(':'.join(map(str,n.values())), - e,(et-st).seconds),file=sys.stderr) - +print(et,"finished",ifn,res,"%d seconds total"%((et-st).seconds),file=sys.stderr)