Mercurial > hg > cc > cirrus_home
comparison bin/ix.py @ 119:bc958b776fb8
implement --cmd
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Wed, 16 Jun 2021 16:12:46 +0000 |
parents | 63898fde9751 |
children | 5b0ec642ee9b |
comparison
equal
deleted
inserted
replaced
118:551ff1de13d8 | 119:bc958b776fb8 |
---|---|
6 or complete cdx index lines. | 6 or complete cdx index lines. |
7 In all cases by 'filename' is meant crawlid/segmentid/filename | 7 In all cases by 'filename' is meant crawlid/segmentid/filename |
8 | 8 |
9 Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.''' | 9 Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.''' |
10 | 10 |
11 import sys, argparse, regex, os, shutil, io, gzip, time | 11 import sys, argparse, regex, os, shutil, io, gzip, time, shlex |
12 from isal import igzip | 12 from isal import igzip |
13 #from subprocess import Popen, PIPE | 13 from subprocess import Popen, PIPE |
14 #import asyncio | 14 #import asyncio |
15 | 15 |
16 HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]') | 16 HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]') |
17 BINOUT=sys.stdout.buffer | 17 BINOUT=sys.stdout.buffer |
18 FPAT="/%s/%s/orig/warc/%s" | 18 FPAT="/%s/%s/orig/warc/%s" |
22 global FOO | 22 global FOO |
23 FOO=argparse.RawDescriptionHelpFormatter.format_help(self) | 23 FOO=argparse.RawDescriptionHelpFormatter.format_help(self) |
24 return HACK_USAGE.sub('\n [ ( -x | length offset filename ) ]', | 24 return HACK_USAGE.sub('\n [ ( -x | length offset filename ) ]', |
25 FOO) | 25 FOO) |
26 | 26 |
27 def process(options,buf,root,filename,offset,length,whole): | 27 def process(options,buf,filename,offset,length,whole): |
28 global CMD_PROC, BINOUT | |
29 if options.cmd: | |
30 CMD_PROC=Popen(shlex.split(options.cmd),stdin=PIPE,bufsize=0) | |
31 BINOUT=CMD_PROC.stdin | |
32 process1(options,buf,filename,offset,length,whole) | |
33 if options.cmd: | |
34 # Wind up subproc | |
35 BINOUT.close() | |
36 if CMD_PROC.wait()!=0: # could/should be async? | |
37 print("subproc of %s:%s:%s failed with %s"%(length,offset,filename, | |
38 CMD_PROC.returncode), | |
39 file=sys.stderr) | |
40 | |
41 def _output_stdout(buf): | |
42 BINOUT.write(buf) | |
43 | |
44 def _output_subproc(buf): | |
45 toWrite=len(buf) | |
46 while toWrite>0: | |
47 toWrite -= BINOUT.write(buf) | |
48 | |
49 def process1(options,buf,filename,offset,length,whole): | |
50 root=options.root | |
28 rfn=root+filename | 51 rfn=root+filename |
29 if root!="/beegfs/common_crawl": | 52 if root!="/beegfs/common_crawl": |
30 # Support using ramdisk or other local disk as a faster cached | 53 # Support using ramdisk or other local disk as a faster cached |
31 if not os.path.exists(rfn): | 54 if not os.path.exists(rfn): |
32 if not os.path.exists(os.path.dirname(rfn)): | 55 if not os.path.exists(os.path.dirname(rfn)): |
45 nb=file.readinto(bv) | 68 nb=file.readinto(bv) |
46 file.close() | 69 file.close() |
47 if nb!=length: | 70 if nb!=length: |
48 print("losing",file.name,length,nb,file=sys.stderr) | 71 print("losing",file.name,length,nb,file=sys.stderr) |
49 if whole and options.zipped: | 72 if whole and options.zipped: |
50 BINOUT.write(bv) | 73 _output(bv) |
51 return | 74 return |
52 gzip_chunk = io.BytesIO(bv) | 75 gzip_chunk = io.BytesIO(bv) |
53 uv=memoryview(buf)[length:] | 76 uv=memoryview(buf)[length:] |
54 with igzip.IGzipFile(fileobj=gzip_chunk) as gzip_fin: | 77 with igzip.IGzipFile(fileobj=gzip_chunk) as gzip_fin: |
55 ll=0 | 78 ll=0 |
58 if not l: | 81 if not l: |
59 break | 82 break |
60 ll+=l | 83 ll+=l |
61 cb=memoryview(uv)[:ll] | 84 cb=memoryview(uv)[:ll] |
62 if whole: | 85 if whole: |
63 BINOUT.write(cb) | 86 _output(cb) |
64 return | 87 return |
65 # Only output parts (0 = WARC header, 1 = HTTP header, 2 = body) that are wanted | 88 # Only output parts (0 = WARC header, 1 = HTTP header, 2 = body) that are wanted |
66 state=0 | 89 state=0 |
67 tr=None # Was this record truncated? | 90 tr=None # Was this record truncated? |
68 bl=None # for HTTP Content-Length for the length of the body? | 91 bl=None # for HTTP Content-Length for the length of the body? |
80 if not (options.headers or options.body): | 103 if not (options.headers or options.body): |
81 return | 104 return |
82 state=1 | 105 state=1 |
83 # Note we preserve the empty line | 106 # Note we preserve the empty line |
84 if options.warc: | 107 if options.warc: |
85 BINOUT.write(L) | 108 _output(L) |
86 continue | 109 continue |
87 if state==1: | 110 if state==1: |
88 # HTTP header | 111 # HTTP header |
89 wl -= len(L) | 112 wl -= len(L) |
90 if not (L==b"" or L.startswith(b"\r")): | 113 if not (L==b"" or L.startswith(b"\r")): |
91 # Non-blank, it's a header | 114 # Non-blank, it's a header |
92 if bl is None and L.startswith(b"Content-Length: "): | 115 if bl is None and L.startswith(b"Content-Length: "): |
93 bl=int(L[16:].rstrip()) | 116 bl=int(L[16:].rstrip()) |
94 if options.headers: | 117 if options.headers: |
95 BINOUT.write(L) | 118 _output(L) |
96 else: | 119 else: |
97 # Blank line, HTTP header is finished | 120 # Blank line, HTTP header is finished |
98 if not options.body: | 121 if not options.body: |
99 return | 122 return |
100 if options.headers: | 123 if options.headers: |
101 BINOUT.write(L) | 124 _output(L) |
102 state=2 | 125 state=2 |
103 # The above is just for sanity, because we do _not_ | 126 # The above is just for sanity, because we do _not_ |
104 # continue with the outer loop, | 127 # continue with the outer loop, |
105 # since we can now block-output the entire rest of the | 128 # since we can now block-output the entire rest of the |
106 # input buffer. | 129 # input buffer. |
110 (length,offset,filename,wl,bl,tr),file=sys.stderr) | 133 (length,offset,filename,wl,bl,tr),file=sys.stderr) |
111 # HTTP body | 134 # HTTP body |
112 balance=clear_text.tell() | 135 balance=clear_text.tell() |
113 #print(balance,bl,wl,ll,ll-balance,file=sys.stderr) | 136 #print(balance,bl,wl,ll,ll-balance,file=sys.stderr) |
114 # Output whatever is left | 137 # Output whatever is left |
115 BINOUT.write(cb[balance:balance+wl]) | 138 _output(cb[balance:balance+wl]) |
116 return | 139 return |
117 | 140 |
118 def main(): | 141 def main(): |
142 global _output | |
119 parser = argparse.ArgumentParser( | 143 parser = argparse.ArgumentParser( |
120 description='''Extract records from warc files given length, offset and file triples. | 144 description='''Extract records from warc files given length, offset and file triples. |
121 Input one triple on command line, or | 145 Input one triple on command line, or |
122 triples from stdin as tab-delimited lines | 146 triples from stdin as tab-delimited lines |
123 or complete cdx index lines. | 147 or complete cdx index lines. |
169 parser.error("length, offset and filename must all be supplied together") | 193 parser.error("length, offset and filename must all be supplied together") |
170 | 194 |
171 buf=bytearray(128*1024*1024) | 195 buf=bytearray(128*1024*1024) |
172 | 196 |
173 whole=not (pa.warc or pa.headers or pa.body) | 197 whole=not (pa.warc or pa.headers or pa.body) |
174 if pa.length is not None: | 198 if pa.cmd: |
175 process(pa,buf,pa.root,FPAT%list(pa.filename.split('/')), | 199 _output = _output_subproc |
176 pa.offset,pa.length,whole) | 200 else: |
177 exit(0) | 201 _output = _output_stdout |
178 if pa.index: | 202 if pa.index: |
179 CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/warc/(.*\.gz)"') | 203 CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/warc/(.*\.gz)"') |
180 for l in sys.stdin: | 204 for l in sys.stdin: |
181 m=CDX.search(l) | 205 m=CDX.search(l) |
182 if m is None: | 206 if m is None: |
183 print("index line problem: \"%s\""%l.lstrip(),file=sys.stderr) | 207 print("index line problem: \"%s\""%l.lstrip(),file=sys.stderr) |
184 exit(2) | 208 exit(2) |
185 f=FPAT%(m[3:6]) | 209 f=FPAT%(m[3:6]) |
186 process(pa,buf,pa.root,f, | 210 process(pa,buf,f, |
187 int(m[2]),int(m[1]),whole) | 211 int(m[2]),int(m[1]),whole) |
188 exit(0) | 212 elif pa.length is not None: |
213 print(pa.filename,file=sys.stderr) | |
214 process(pa,buf,FPAT%tuple(pa.filename.split('/')), | |
215 pa.offset,pa.length,whole) | |
216 else: | |
217 print("Reading length, offset, filename tab-delimited triples from stdin...", | |
218 file=sys.stderr) | |
219 for l in sys.stdin: | |
220 try: | |
221 (length,offset,filename)=l.rstrip().split('\t') | |
222 length=int(length) | |
223 offset=int(offset) | |
224 except ValueError as e: | |
225 parser.error('Invalid input line: %s\n "%s"'%(e,l)) | |
226 process(pa,buf,FPAT%tuple(filename.split('/')), | |
227 offset,length,whole) | |
228 | |
189 if __name__ == "__main__": | 229 if __name__ == "__main__": |
190 main() | 230 main() |