Mercurial > hg > cc > cirrus_home
annotate lib/python/cdx_segment.py @ 134:d3ef00af2064
add usage/help info
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Wed, 14 Jul 2021 16:49:54 +0000 |
parents | 464d2dfb99c9 |
children |
rev | line source |
---|---|
86 | 1 #!/usr/bin/python3 |
2 '''Split out a alphabetical cdx file by segment | |
3 Usage: cdx_segment.py archive segment-prefix idx_in | |
4 archive is e.g. 2019-35, assuming /beegfs/common_crawl/CC-MAIN-2019-35 has sub-directories for | |
5 cdx/warc | |
6 [all segments, all and only those paths matching segment-prefix*.{0..99}] | |
7 idx_in is an alphabetically ordered index fragment (one of cdx/warc/...gz), relative to archive/cdx/warc | |
8 ''' | |
87
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
9 from os import listdir, makedirs, lseek, SEEK_END, SEEK_SET, read, write, fsync, system |
86 | 10 from datetime import datetime |
87
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
11 from random import sample |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
12 from lock import AtomicOpen |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
13 import sys,re,gzip |
86 | 14 |
15 archive="CC-MAIN-%s"%sys.argv[1] | |
16 adir="/beegfs/common_crawl/%s"%archive | |
17 apref="crawl-data/%s"%archive | |
18 pref=sys.argv[2] | |
19 | |
20 afn=sys.argv[3] | |
87
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
21 ifn=afn.split('.')[0] |
86 | 22 |
23 SPAT=re.compile("%s[0-9]*\\.[0-9]{1,2}$"%pref) | |
24 IPAT=re.compile('"filename": "%s/segments/([0-9.]*)/([a-z]*)/'%apref) | |
25 | |
26 segdirs=[d for d in listdir(adir) if SPAT.match(d)] | |
87
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
27 rr=("warc","robotstxt","crawldiagnostics") |
86 | 28 ss={} |
29 n={} | |
87
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
30 for r in rr: |
86 | 31 ss[r]=rd=dict() |
32 n[r]=0 | |
33 for s in segdirs: | |
87
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
34 rdir="%s/%s/orig/cdx/%s"%(ifn,s,r) |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
35 makedirs(rdir,0o755) |
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
36 rd[s]=open("%s/cdx"%rdir,'w+') |
86 | 37 |
38 idir="%s/cdx/warc"%adir | |
39 | |
40 e=0 | |
41 | |
42 st=datetime.now() | |
43 print(st,"starting",afn,file=sys.stderr) | |
44 | |
45 with gzip.open("%s/%s"%(idir,afn),'rt') as f: | |
46 for l in f: | |
47 m=IPAT.search(l) | |
48 if m: | |
49 r=m[2] | |
50 ss[r][m[1]].write(l) | |
51 n[r]+=1 | |
52 else: | |
53 sys.stderr.write("bogus: ",afn,l) | |
54 e+=1 | |
55 | |
88 | 56 if True: |
57 # See note below, will have to copy entire result to /beegfs at shell level | |
58 for rr in ss.values(): | |
59 for s in rr.values(): | |
60 s.close() | |
61 else: | |
62 # The following fails, in that there are occasional small gaps in the result | |
63 # I've given up trying to figure out why... | |
64 # Randomise to try to avoid contention | |
65 mt=datetime.now() | |
66 print(mt,"copying",ifn,"%s ok, %d bogus, %d seconds so far"%(':'.join(map(str,n.values())), | |
67 e,(mt-st).seconds),file=sys.stderr) | |
87
b6a5999d8e06
working with locking and copying
Henry S. Thompson <ht@inf.ed.ac.uk>
parents:
86
diff
changeset
|
68 |
88 | 69 for s in sample(segdirs,100): |
70 for r in rr: | |
71 of=ss[r][s] | |
72 of.flush() | |
73 o=of.fileno() | |
74 fsync(o) | |
75 opos=lseek(o,0,SEEK_SET) | |
76 with AtomicOpen("%s/%s/orig/cdx/%s/cdx"%(adir,s,r),"rb+") as df: | |
77 d=df.fileno() | |
78 dpos=lseek(d,0,SEEK_END) | |
79 print(of.name,opos,df.name,dpos,file=sys.stderr) | |
80 while True: | |
81 data = read(o,131072) | |
82 if data == b'': # end of file reached | |
83 break | |
84 write(d,data) | |
85 of.close() | |
86 | |
87 res=0 #system("rm -r %s"%ifn) | |
86 | 88 |
89 et=datetime.now() | |
88 | 90 print(et,"finished",ifn,"%s ok, %d bogus, %d seconds total"%(':'.join(map(str,n.values())), |
91 e,(et-st).seconds),file=sys.stderr) |