Mercurial > hg > cc > cirrus_work
comparison bin/ix.py @ 18:046dbe557911
write to tmp file implemented
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Sun, 07 Aug 2022 13:58:33 +0100 |
parents | |
children | cbac7dfe2f24 |
comparison
equal
deleted
inserted
replaced
17:75e0d0013da0 | 18:046dbe557911 |
---|---|
1 #!/usr/bin/env python3 | |
2 '''Extract request records from Common Crawl WARC-format files | |
3 given length, offset and filename triples. | |
4 Input one triple on command line, or | |
5 triples from stdin as tab-delimited lines | |
6 or complete cdx index lines. | |
7 In all cases by 'filename' is meant crawlid/segmentid/type/filename | |
8 | |
9 Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.''' | |
10 | |
11 import sys, argparse, regex, os, shutil, io, gzip, time, shlex | |
12 from isal import igzip | |
13 from subprocess import Popen, PIPE | |
14 #import asyncio | |
15 | |
16 HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]') | |
17 BINOUT=sys.stdout.buffer | |
18 FPAT="/%s/%s/orig/%s/%s" | |
19 | |
20 CMD_PROC=None | |
21 TMPFILENAME=None | |
22 | |
23 class HackFormat(argparse.RawDescriptionHelpFormatter): | |
24 def format_help(self): | |
25 FOO=argparse.RawDescriptionHelpFormatter.format_help(self) | |
26 return HACK_USAGE.sub('\n [ ( -x | length offset filename ) ]', | |
27 FOO) | |
28 | |
29 def process(options,buf,filename,offset,length,whole): | |
30 global CMD_PROC, BINOUT, TMPFILENAME, TMPFILE | |
31 if options.save: | |
32 (tf,TMPFILENAME)=tempfile.mkstemp() | |
33 TMPFILE=open(tf,mode='wb') | |
34 if options.cmd: | |
35 CMD_PROC=Popen(shlex.split(options.cmd),stdin=PIPE,bufsize=0) | |
36 BINOUT=CMD_PROC.stdin | |
37 process1(options,buf,filename,offset,length,whole) | |
38 if options.save: | |
39 TMPFILE.close() | |
40 if options.cmd: | |
41 _output_subproc(bytes(TMPFILENAME,'utf-8')) | |
42 _output_subproc(b"\n") | |
43 else: | |
44 BINOUT.write(bytes(TMPFILENAME,'utf-8')) | |
45 BINOUT.write(b"\n") | |
46 if options.cmd: | |
47 # Wind up subproc | |
48 BINOUT.close() | |
49 if CMD_PROC.wait()!=0: # could/should be async? | |
50 print("subproc of %s:%s:%s failed with %s"%(length,offset,filename, | |
51 CMD_PROC.returncode), | |
52 file=sys.stderr) | |
53 if options.save: | |
54 # not if async? | |
55 os.unlink(TMPFILENAME) | |
56 TMPFILENAME=None | |
57 elif options.save: | |
58 print("%s will need to be deleted"%TMPFILENAME,file=sys.stderr) | |
59 TMPFILENAME=None | |
60 | |
61 def _output_tmpfile(buf): | |
62 TMPFILE.write(buf) | |
63 | |
64 def _output_stdout(buf): | |
65 BINOUT.write(buf) | |
66 | |
67 def _output_subproc(buf): | |
68 toWrite=len(buf) | |
69 while toWrite>0: | |
70 toWrite -= BINOUT.write(buf) | |
71 | |
72 def process1(options,buf,filename,offset,length,whole): | |
73 root=options.root | |
74 rfn=root+filename | |
75 if root!="/beegfs/common_crawl": | |
76 # Support using ramdisk or other local disk as a faster cached | |
77 if not os.path.exists(rfn): | |
78 if not os.path.exists(os.path.dirname(rfn)): | |
79 os.makedirs(os.path.dirname(rfn)) | |
80 with io.FileIO('/beegfs/common_crawl'+filename,'r') as infile, \ | |
81 io.FileIO(rfn,'w') as outfile: | |
82 #shutil.copyfileobj(infile,outfile,128*1024*1024) | |
83 while True: | |
84 l=infile.readinto(buf) | |
85 if l==0: | |
86 break | |
87 outfile.write(memoryview(buf)[:l]) | |
88 file=open(rfn,'rb',0) | |
89 file.seek(offset) | |
90 bv=memoryview(buf)[:length] | |
91 nb=file.readinto(bv) | |
92 file.close() | |
93 if nb!=length: | |
94 raise ValueError("Chunk read losing: %s, got %s expected %s at %s"%(file.name, | |
95 nb,length,offset)) | |
96 if whole and options.zipped: | |
97 _output(bv) | |
98 return | |
99 gzip_chunk = io.BytesIO(bv) | |
100 uv=memoryview(buf)[length:] | |
101 with igzip.IGzipFile(fileobj=gzip_chunk) as gzip_fin: | |
102 ll=0 | |
103 while True: | |
104 l=gzip_fin.readinto(uv) | |
105 if not l: | |
106 break | |
107 ll+=l | |
108 cb=memoryview(uv)[:ll] | |
109 if whole: | |
110 _output(cb) | |
111 return | |
112 # Only output parts (0 = WARC header, 1 = HTTP header, 2 = body) that are wanted | |
113 state=0 | |
114 tr=None # Was this record truncated? | |
115 bl=None # for HTTP Content-Length for the length of the body? | |
116 with io.BytesIO(cb) as clear_text: | |
117 for L in clear_text: | |
118 if state==0: | |
119 # WARC header | |
120 if L.startswith(b"Content-Length: "): | |
121 wl=int(L[16:].rstrip()) | |
122 elif L.startswith(b"WARC-Truncated: "): | |
123 tr=L[16:].rstrip() | |
124 tr="EMPTY" if tr=="" else tr | |
125 elif L==b"" or L.startswith(b"\r"): # for idempotency | |
126 # Blank line, WARC header is finished | |
127 if not (options.headers or options.body): | |
128 return | |
129 state=1 | |
130 # Note we preserve the empty line | |
131 if options.warc: | |
132 _output(L) | |
133 continue | |
134 if state==1: | |
135 # HTTP header | |
136 wl -= len(L) | |
137 if not (L==b"" or L.startswith(b"\r")): | |
138 # Non-blank, it's a header | |
139 if bl is None and L.startswith(b"Content-Length: "): | |
140 bl=int(L[16:].rstrip()) | |
141 if options.headers: | |
142 _output(L) | |
143 else: | |
144 # Blank line, HTTP header is finished | |
145 if not options.body: | |
146 return | |
147 if options.headers: | |
148 _output(L) | |
149 state=2 | |
150 # The above is just for sanity, because we do _not_ | |
151 # continue with the outer loop, | |
152 # since we can now block-output the entire rest of the | |
153 # input buffer. | |
154 if bl is not None: | |
155 if bl!=wl: | |
156 print("length mismatch: %s %s %s here: %s given: %s trunc: %s"%\ | |
157 (length,offset,filename,wl,bl,tr),file=sys.stderr) | |
158 # HTTP body | |
159 balance=clear_text.tell() | |
160 #print(balance,bl,wl,ll,ll-balance,file=sys.stderr) | |
161 # Output whatever is left | |
162 _output(cb[balance:balance+wl]) | |
163 return | |
164 | |
165 def main(): | |
166 global _output,TMPFILE,TMPFILENAME,tempfile | |
167 parser = argparse.ArgumentParser( | |
168 description='''Extract records from warc files given length, offset and file triples. | |
169 Input one triple on command line, or | |
170 triples from stdin as tab-delimited lines | |
171 or complete cdx index lines. | |
172 In all cases by 'filename' is meant crawlid/segmentid/type/filename''', | |
173 epilog='''Note that if no output flag(s) is/are given, | |
174 the whole WARC record will be output, more efficiently than | |
175 would be the case if all three flags were given.''', | |
176 add_help=False, | |
177 conflict_handler='resolve', | |
178 formatter_class=HackFormat | |
179 ) | |
180 fphelp=('format string for turning 4 filename components into a path, must contain %%s exactly 4 times,\ndefault is "%s"'%FPAT).replace('%s','%%s') | |
181 parser.add_argument('--help',help='Show help',action='help') | |
182 parser.add_argument('-d','--debug',help='Debug output',action='store_true') | |
183 parser.add_argument('-w','--warc',help='output WARC headers', | |
184 action='store_true') | |
185 parser.add_argument('-h','--headers',help='output HTTP headers', | |
186 action='store_true') | |
187 parser.add_argument('-b','--body',help='output HTTP body', | |
188 action='store_true') | |
189 parser.add_argument('-c','--cmd',help='pipes each result thru CMD') | |
190 parser.add_argument('-m','--module.function',help='module.function to call with a stream'), | |
191 parser.add_argument('-s','--save',action='store_true', | |
192 help="write to a temporary file and output the name") | |
193 parser.add_argument('-f','--fpath', | |
194 help=fphelp, | |
195 default=FPAT) | |
196 parser.add_argument('-r','--root',nargs='?', | |
197 help='File path root, create a copy there if necessary', | |
198 default='/beegfs/common_crawl'), | |
199 parser.add_argument('-z','--zipped', | |
200 help="output raw gzipped record, ignored if any of -bhw supplied", | |
201 action='store_true') | |
202 sg=parser.add_mutually_exclusive_group() | |
203 sg.add_argument('-x','--index', | |
204 help='take lines of triples from a cdx index file as input', | |
205 action='store_true') | |
206 sg.add_argument('length',type=int, | |
207 help='length in bytes of gzipped record', | |
208 nargs='?') | |
209 parser.add_argument('offset',type=int, | |
210 help='start position in bytes of gzipped record', | |
211 nargs='?') | |
212 parser.add_argument('filename', | |
213 help='pathname of gzipped Common Crawl WARC-format file', | |
214 nargs='?') | |
215 # Hack the order of optional and positional in the help output | |
216 parser._action_groups.sort(key=lambda g:g.title) | |
217 #parser.print_help() | |
218 pa=parser.parse_args(sys.argv[1:]) | |
219 #print(pa,file=sys.stderr) | |
220 if pa.length is not None: | |
221 # We have to enforce our own check.. | |
222 if pa.offset is None or pa.filename is None: | |
223 parser.error("length, offset and filename must all be supplied together") | |
224 | |
225 buf=bytearray(128*1024*1024) | |
226 | |
227 whole=not (pa.warc or pa.headers or pa.body) | |
228 if pa.save: | |
229 _output=_output_tmpfile | |
230 import tempfile | |
231 elif pa.cmd: | |
232 _output = _output_subproc | |
233 else: | |
234 _output = _output_stdout | |
235 if pa.index: | |
236 CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/(warc|crawldiagnostics)/(.*\.gz)"') # no robotstxt yet... | |
237 for l in sys.stdin: | |
238 m=CDX.search(l) | |
239 if m is None: | |
240 if l.find('/robotstxt/')>-1: | |
241 continue | |
242 print("index line problem: \"%s\""%l,file=sys.stderr,end='') | |
243 exit(2) | |
244 f=pa.fpath%(m[3:7]) | |
245 try: | |
246 process(pa,buf,f, | |
247 int(m[2]),int(m[1]),whole) | |
248 except Exception as e: | |
249 if pa.debug: | |
250 import traceback | |
251 traceback.print_exc(file=sys.stderr) | |
252 else: | |
253 print("Process fail: %s, input line:\n %s"%(e,l), | |
254 file=sys.stderr,end='') | |
255 exit(3) | |
256 elif pa.length is not None: | |
257 print(pa.filename,file=sys.stderr) | |
258 process(pa,buf,pa.fpath%tuple(pa.filename.split('/')), | |
259 pa.offset,pa.length,whole) | |
260 else: | |
261 print("Reading length, offset, filename tab-delimited triples from stdin...", | |
262 file=sys.stderr) | |
263 for l in sys.stdin: | |
264 try: | |
265 (length,offset,filename)=l.rstrip().split('\t') | |
266 length=int(length) | |
267 offset=int(offset) | |
268 except ValueError as e: | |
269 parser.error('Invalid input line: %s\n "%s"'%(e,l)) | |
270 process(pa,buf,pa.fpath%tuple(filename.split('/')), | |
271 offset,length,whole) | |
272 | |
273 if __name__ == "__main__": | |
274 main() |