Mercurial > hg > cc > cirrus_home
comparison lib/python/cdx_segment.py @ 87:b6a5999d8e06
working with locking and copying
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Tue, 16 Mar 2021 16:20:02 +0000 |
parents | b5fef78cbb26 |
children | 464d2dfb99c9 |
comparison
equal
deleted
inserted
replaced
86:b5fef78cbb26 | 87:b6a5999d8e06 |
---|---|
4 archive is e.g. 2019-35, assuming /beegfs/common_crawl/CC-MAIN-2019-35 has sub-directories for | 4 archive is e.g. 2019-35, assuming /beegfs/common_crawl/CC-MAIN-2019-35 has sub-directories for |
5 cdx/warc | 5 cdx/warc |
6 [all segments, all and only those paths matching segment-prefix*.{0..99}] | 6 [all segments, all and only those paths matching segment-prefix*.{0..99}] |
7 idx_in is an alphabetically ordered index fragment (one of cdx/warc/...gz), relative to archive/cdx/warc | 7 idx_in is an alphabetically ordered index fragment (one of cdx/warc/...gz), relative to archive/cdx/warc |
8 ''' | 8 ''' |
9 import gzip | 9 from os import listdir, makedirs, lseek, SEEK_END, SEEK_SET, read, write, fsync, system |
10 from os import listdir, makedirs | |
11 from datetime import datetime | 10 from datetime import datetime |
12 import sys,re | 11 from random import sample |
12 from lock import AtomicOpen | |
13 import sys,re,gzip | |
13 | 14 |
14 archive="CC-MAIN-%s"%sys.argv[1] | 15 archive="CC-MAIN-%s"%sys.argv[1] |
15 adir="/beegfs/common_crawl/%s"%archive | 16 adir="/beegfs/common_crawl/%s"%archive |
16 apref="crawl-data/%s"%archive | 17 apref="crawl-data/%s"%archive |
17 pref=sys.argv[2] | 18 pref=sys.argv[2] |
18 | 19 |
19 afn=sys.argv[3] | 20 afn=sys.argv[3] |
21 ifn=afn.split('.')[0] | |
20 | 22 |
21 SPAT=re.compile("%s[0-9]*\\.[0-9]{1,2}$"%pref) | 23 SPAT=re.compile("%s[0-9]*\\.[0-9]{1,2}$"%pref) |
22 IPAT=re.compile('"filename": "%s/segments/([0-9.]*)/([a-z]*)/'%apref) | 24 IPAT=re.compile('"filename": "%s/segments/([0-9.]*)/([a-z]*)/'%apref) |
23 | 25 |
24 segdirs=[d for d in listdir(adir) if SPAT.match(d)] | 26 segdirs=[d for d in listdir(adir) if SPAT.match(d)] |
27 rr=("warc","robotstxt","crawldiagnostics") | |
25 ss={} | 28 ss={} |
26 n={} | 29 n={} |
27 for r in ("warc","robotstxt","crawldiagnostics"): | 30 for r in rr: |
28 ss[r]=rd=dict() | 31 ss[r]=rd=dict() |
29 n[r]=0 | 32 n[r]=0 |
30 for s in segdirs: | 33 for s in segdirs: |
31 rdir="%s/%s/orig/cdx/%s"%(adir,s,r) | 34 rdir="%s/%s/orig/cdx/%s"%(ifn,s,r) |
32 makedirs(rdir,0o755,exist_ok=True) | 35 makedirs(rdir,0o755) |
33 rd[s]=open("%s/cdx"%rdir,'at') | 36 rd[s]=open("%s/cdx"%rdir,'w+') |
34 | 37 |
35 idir="%s/cdx/warc"%adir | 38 idir="%s/cdx/warc"%adir |
36 | 39 |
37 e=0 | 40 e=0 |
38 | 41 |
48 n[r]+=1 | 51 n[r]+=1 |
49 else: | 52 else: |
50 sys.stderr.write("bogus: ",afn,l) | 53 sys.stderr.write("bogus: ",afn,l) |
51 e+=1 | 54 e+=1 |
52 | 55 |
53 for gg in ss.values(): | 56 mt=datetime.now() |
54 for g in gg.values(): | 57 print(mt,"copying",ifn,"%s ok, %d bogus, %d seconds so far"%(':'.join(map(str,n.values())), |
55 g.close() | 58 e,(mt-st).seconds),file=sys.stderr) |
59 # Randomise to try to avoid contention | |
60 for s in sample(segdirs,100): | |
61 for r in rr: | |
62 of=ss[r][s] | |
63 of.flush() | |
64 o=of.fileno() | |
65 fsync(o) | |
66 with AtomicOpen("%s/%s/orig/cdx/%s/cdx"%(adir,s,r),"rb+") as df: | |
67 d=df.fileno() | |
68 while True: | |
69 data = read(o,131072) | |
70 if data == b'': # end of file reached | |
71 break | |
72 write(d,data) | |
73 of.close() | |
74 | |
75 res=system("rm -r %s"%ifn) | |
56 | 76 |
57 et=datetime.now() | 77 et=datetime.now() |
58 print(et,"finished",afn,"%s ok, %d bogus, %d seconds elapsed"%(':'.join(map(str,n.values())), | 78 print(et,"finished",ifn,res,"%d seconds total"%((et-st).seconds),file=sys.stderr) |
59 e,(et-st).seconds),file=sys.stderr) | |
60 |