comparison lib/python/cdx_segment.py @ 87:b6a5999d8e06

working with locking and copying
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Tue, 16 Mar 2021 16:20:02 +0000
parents b5fef78cbb26
children 464d2dfb99c9
comparison
equal deleted inserted replaced
86:b5fef78cbb26 87:b6a5999d8e06
4 archive is e.g. 2019-35, assuming /beegfs/common_crawl/CC-MAIN-2019-35 has sub-directories for 4 archive is e.g. 2019-35, assuming /beegfs/common_crawl/CC-MAIN-2019-35 has sub-directories for
5 cdx/warc 5 cdx/warc
6 [all segments, all and only those paths matching segment-prefix*.{0..99}] 6 [all segments, all and only those paths matching segment-prefix*.{0..99}]
7 idx_in is an alphabetically ordered index fragment (one of cdx/warc/...gz), relative to archive/cdx/warc 7 idx_in is an alphabetically ordered index fragment (one of cdx/warc/...gz), relative to archive/cdx/warc
8 ''' 8 '''
9 import gzip 9 from os import listdir, makedirs, lseek, SEEK_END, SEEK_SET, read, write, fsync, system
10 from os import listdir, makedirs
11 from datetime import datetime 10 from datetime import datetime
12 import sys,re 11 from random import sample
12 from lock import AtomicOpen
13 import sys,re,gzip
13 14
14 archive="CC-MAIN-%s"%sys.argv[1] 15 archive="CC-MAIN-%s"%sys.argv[1]
15 adir="/beegfs/common_crawl/%s"%archive 16 adir="/beegfs/common_crawl/%s"%archive
16 apref="crawl-data/%s"%archive 17 apref="crawl-data/%s"%archive
17 pref=sys.argv[2] 18 pref=sys.argv[2]
18 19
19 afn=sys.argv[3] 20 afn=sys.argv[3]
21 ifn=afn.split('.')[0]
20 22
21 SPAT=re.compile("%s[0-9]*\\.[0-9]{1,2}$"%pref) 23 SPAT=re.compile("%s[0-9]*\\.[0-9]{1,2}$"%pref)
22 IPAT=re.compile('"filename": "%s/segments/([0-9.]*)/([a-z]*)/'%apref) 24 IPAT=re.compile('"filename": "%s/segments/([0-9.]*)/([a-z]*)/'%apref)
23 25
24 segdirs=[d for d in listdir(adir) if SPAT.match(d)] 26 segdirs=[d for d in listdir(adir) if SPAT.match(d)]
27 rr=("warc","robotstxt","crawldiagnostics")
25 ss={} 28 ss={}
26 n={} 29 n={}
27 for r in ("warc","robotstxt","crawldiagnostics"): 30 for r in rr:
28 ss[r]=rd=dict() 31 ss[r]=rd=dict()
29 n[r]=0 32 n[r]=0
30 for s in segdirs: 33 for s in segdirs:
31 rdir="%s/%s/orig/cdx/%s"%(adir,s,r) 34 rdir="%s/%s/orig/cdx/%s"%(ifn,s,r)
32 makedirs(rdir,0o755,exist_ok=True) 35 makedirs(rdir,0o755)
33 rd[s]=open("%s/cdx"%rdir,'at') 36 rd[s]=open("%s/cdx"%rdir,'w+')
34 37
35 idir="%s/cdx/warc"%adir 38 idir="%s/cdx/warc"%adir
36 39
37 e=0 40 e=0
38 41
48 n[r]+=1 51 n[r]+=1
49 else: 52 else:
50 sys.stderr.write("bogus: ",afn,l) 53 sys.stderr.write("bogus: ",afn,l)
51 e+=1 54 e+=1
52 55
53 for gg in ss.values(): 56 mt=datetime.now()
54 for g in gg.values(): 57 print(mt,"copying",ifn,"%s ok, %d bogus, %d seconds so far"%(':'.join(map(str,n.values())),
55 g.close() 58 e,(mt-st).seconds),file=sys.stderr)
59 # Randomise to try to avoid contention
60 for s in sample(segdirs,100):
61 for r in rr:
62 of=ss[r][s]
63 of.flush()
64 o=of.fileno()
65 fsync(o)
66 with AtomicOpen("%s/%s/orig/cdx/%s/cdx"%(adir,s,r),"rb+") as df:
67 d=df.fileno()
68 while True:
69 data = read(o,131072)
70 if data == b'': # end of file reached
71 break
72 write(d,data)
73 of.close()
74
75 res=system("rm -r %s"%ifn)
56 76
57 et=datetime.now() 77 et=datetime.now()
58 print(et,"finished",afn,"%s ok, %d bogus, %d seconds elapsed"%(':'.join(map(str,n.values())), 78 print(et,"finished",ifn,res,"%d seconds total"%((et-st).seconds),file=sys.stderr)
59 e,(et-st).seconds),file=sys.stderr)
60