Mercurial > hg > cc > cirrus_home
annotate lib/python/cdx_segment.py @ 87:b6a5999d8e06
working with locking and copying
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Tue, 16 Mar 2021 16:20:02 +0000 |
parents | b5fef78cbb26 |
children | 464d2dfb99c9 |
rev | line source |
---|---|
86 | 1 #!/usr/bin/python3 |
2 '''Split out a alphabetical cdx file by segment | |
3 Usage: cdx_segment.py archive segment-prefix idx_in | |
4 archive is e.g. 2019-35, assuming /beegfs/common_crawl/CC-MAIN-2019-35 has sub-directories for | |
5 cdx/warc | |
6 [all segments, all and only those paths matching segment-prefix*.{0..99}] | |
7 idx_in is an alphabetically ordered index fragment (one of cdx/warc/...gz), relative to archive/cdx/warc | |
8 ''' | |
87
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
9 from os import listdir, makedirs, lseek, SEEK_END, SEEK_SET, read, write, fsync, system |
86 | 10 from datetime import datetime |
87
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
11 from random import sample |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
12 from lock import AtomicOpen |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
13 import sys,re,gzip |
86 | 14 |
15 archive="CC-MAIN-%s"%sys.argv[1] | |
16 adir="/beegfs/common_crawl/%s"%archive | |
17 apref="crawl-data/%s"%archive | |
18 pref=sys.argv[2] | |
19 | |
20 afn=sys.argv[3] | |
87
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
21 ifn=afn.split('.')[0] |
86 | 22 |
23 SPAT=re.compile("%s[0-9]*\\.[0-9]{1,2}$"%pref) | |
24 IPAT=re.compile('"filename": "%s/segments/([0-9.]*)/([a-z]*)/'%apref) | |
25 | |
26 segdirs=[d for d in listdir(adir) if SPAT.match(d)] | |
87
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
27 rr=("warc","robotstxt","crawldiagnostics") |
86 | 28 ss={} |
29 n={} | |
87
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
30 for r in rr: |
86 | 31 ss[r]=rd=dict() |
32 n[r]=0 | |
33 for s in segdirs: | |
87
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
34 rdir="%s/%s/orig/cdx/%s"%(ifn,s,r) |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
35 makedirs(rdir,0o755) |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
36 rd[s]=open("%s/cdx"%rdir,'w+') |
86 | 37 |
38 idir="%s/cdx/warc"%adir | |
39 | |
40 e=0 | |
41 | |
42 st=datetime.now() | |
43 print(st,"starting",afn,file=sys.stderr) | |
44 | |
45 with gzip.open("%s/%s"%(idir,afn),'rt') as f: | |
46 for l in f: | |
47 m=IPAT.search(l) | |
48 if m: | |
49 r=m[2] | |
50 ss[r][m[1]].write(l) | |
51 n[r]+=1 | |
52 else: | |
53 sys.stderr.write("bogus: ",afn,l) | |
54 e+=1 | |
55 | |
87
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
56 mt=datetime.now() |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
57 print(mt,"copying",ifn,"%s ok, %d bogus, %d seconds so far"%(':'.join(map(str,n.values())), |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
58 e,(mt-st).seconds),file=sys.stderr) |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
59 # Randomise to try to avoid contention |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
60 for s in sample(segdirs,100): |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
61 for r in rr: |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
62 of=ss[r][s] |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
63 of.flush() |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
64 o=of.fileno() |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
65 fsync(o) |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
66 with AtomicOpen("%s/%s/orig/cdx/%s/cdx"%(adir,s,r),"rb+") as df: |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
67 d=df.fileno() |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
68 while True: |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
69 data = read(o,131072) |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
70 if data == b'': # end of file reached |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
71 break |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
72 write(d,data) |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
73 of.close() |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
74 |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
75 res=system("rm -r %s"%ifn) |
86 | 76 |
77 et=datetime.now() | |
87
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
78 print(et,"finished",ifn,res,"%d seconds total"%((et-st).seconds),file=sys.stderr) |