Mercurial > hg > cc > cirrus_home
view lib/python/cdx_segment.py @ 172:72b0420167dc
generalised sbatch front-end to cdx2tsv.py
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Thu, 28 Jul 2022 17:24:29 +0100 |
parents | 464d2dfb99c9 |
children |
line wrap: on
line source
#!/usr/bin/python3 '''Split out a alphabetical cdx file by segment Usage: cdx_segment.py archive segment-prefix idx_in archive is e.g. 2019-35, assuming /beegfs/common_crawl/CC-MAIN-2019-35 has sub-directories for cdx/warc [all segments, all and only those paths matching segment-prefix*.{0..99}] idx_in is an alphabetically ordered index fragment (one of cdx/warc/...gz), relative to archive/cdx/warc ''' from os import listdir, makedirs, lseek, SEEK_END, SEEK_SET, read, write, fsync, system from datetime import datetime from random import sample from lock import AtomicOpen import sys,re,gzip archive="CC-MAIN-%s"%sys.argv[1] adir="/beegfs/common_crawl/%s"%archive apref="crawl-data/%s"%archive pref=sys.argv[2] afn=sys.argv[3] ifn=afn.split('.')[0] SPAT=re.compile("%s[0-9]*\\.[0-9]{1,2}$"%pref) IPAT=re.compile('"filename": "%s/segments/([0-9.]*)/([a-z]*)/'%apref) segdirs=[d for d in listdir(adir) if SPAT.match(d)] rr=("warc","robotstxt","crawldiagnostics") ss={} n={} for r in rr: ss[r]=rd=dict() n[r]=0 for s in segdirs: rdir="%s/%s/orig/cdx/%s"%(ifn,s,r) makedirs(rdir,0o755) rd[s]=open("%s/cdx"%rdir,'w+') idir="%s/cdx/warc"%adir e=0 st=datetime.now() print(st,"starting",afn,file=sys.stderr) with gzip.open("%s/%s"%(idir,afn),'rt') as f: for l in f: m=IPAT.search(l) if m: r=m[2] ss[r][m[1]].write(l) n[r]+=1 else: sys.stderr.write("bogus: ",afn,l) e+=1 if True: # See note below, will have to copy entire result to /beegfs at shell level for rr in ss.values(): for s in rr.values(): s.close() else: # The following fails, in that there are occasional small gaps in the result # I've given up trying to figure out why... # Randomise to try to avoid contention mt=datetime.now() print(mt,"copying",ifn,"%s ok, %d bogus, %d seconds so far"%(':'.join(map(str,n.values())), e,(mt-st).seconds),file=sys.stderr) for s in sample(segdirs,100): for r in rr: of=ss[r][s] of.flush() o=of.fileno() fsync(o) opos=lseek(o,0,SEEK_SET) with AtomicOpen("%s/%s/orig/cdx/%s/cdx"%(adir,s,r),"rb+") as df: d=df.fileno() dpos=lseek(d,0,SEEK_END) print(of.name,opos,df.name,dpos,file=sys.stderr) while True: data = read(o,131072) if data == b'': # end of file reached break write(d,data) of.close() res=0 #system("rm -r %s"%ifn) et=datetime.now() print(et,"finished",ifn,"%s ok, %d bogus, %d seconds total"%(':'.join(map(str,n.values())), e,(et-st).seconds),file=sys.stderr)