Mercurial > hg > cc > cirrus_work
comparison bin/ix.py @ 21:cbac7dfe2f24
interpolate process0, support permanent subproc
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Thu, 29 Sep 2022 16:33:42 +0100 |
parents | 046dbe557911 |
children | fa43c318749b |
comparison
equal
deleted
inserted
replaced
20:a5dafc1364ed | 21:cbac7dfe2f24 |
---|---|
25 FOO=argparse.RawDescriptionHelpFormatter.format_help(self) | 25 FOO=argparse.RawDescriptionHelpFormatter.format_help(self) |
26 return HACK_USAGE.sub('\n [ ( -x | length offset filename ) ]', | 26 return HACK_USAGE.sub('\n [ ( -x | length offset filename ) ]', |
27 FOO) | 27 FOO) |
28 | 28 |
29 def process(options,buf,filename,offset,length,whole): | 29 def process(options,buf,filename,offset,length,whole): |
30 global CMD_PROC, BINOUT, TMPFILENAME, TMPFILE | 30 try: |
31 process0(options,buf,filename,offset,length,whole) | |
32 except Exception as e: | |
33 if options.debug: | |
34 import traceback | |
35 traceback.print_exc(file=sys.stderr) | |
36 else: | |
37 print("Process fail: %s, input line:\n %s"%(e,l), | |
38 file=sys.stderr,end='') | |
39 exit(3) | |
40 | |
41 def process0(options,buf,filename,offset,length,whole): | |
42 global TMPFILENAME, TMPFILE | |
31 if options.save: | 43 if options.save: |
32 (tf,TMPFILENAME)=tempfile.mkstemp() | 44 (tf,TMPFILENAME)=tempfile.mkstemp() |
33 TMPFILE=open(tf,mode='wb') | 45 TMPFILE=open(tf,mode='wb') |
34 if options.cmd: | 46 if options.cmd and not options.process: |
35 CMD_PROC=Popen(shlex.split(options.cmd),stdin=PIPE,bufsize=0) | 47 launch(options.cmd) |
36 BINOUT=CMD_PROC.stdin | |
37 process1(options,buf,filename,offset,length,whole) | 48 process1(options,buf,filename,offset,length,whole) |
38 if options.save: | 49 if options.save: |
39 TMPFILE.close() | 50 TMPFILE.close() |
40 if options.cmd: | 51 if options.cmd: |
41 _output_subproc(bytes(TMPFILENAME,'utf-8')) | 52 _output_subproc(bytes(TMPFILENAME,'utf-8')) |
42 _output_subproc(b"\n") | 53 _output_subproc(b"\n") |
43 else: | 54 else: |
44 BINOUT.write(bytes(TMPFILENAME,'utf-8')) | 55 BINOUT.write(bytes(TMPFILENAME,'utf-8')) |
45 BINOUT.write(b"\n") | 56 BINOUT.write(b"\n") |
46 if options.cmd: | 57 if options.cmd: |
47 # Wind up subproc | 58 if not options.process: |
48 BINOUT.close() | 59 windup(filename,options,length) |
49 if CMD_PROC.wait()!=0: # could/should be async? | |
50 print("subproc of %s:%s:%s failed with %s"%(length,offset,filename, | |
51 CMD_PROC.returncode), | |
52 file=sys.stderr) | |
53 if options.save: | 60 if options.save: |
54 # not if async? | |
55 os.unlink(TMPFILENAME) | 61 os.unlink(TMPFILENAME) |
56 TMPFILENAME=None | 62 TMPFILENAME=None |
57 elif options.save: | 63 elif options.save: |
58 print("%s will need to be deleted"%TMPFILENAME,file=sys.stderr) | 64 print("%s will need to be deleted"%TMPFILENAME,file=sys.stderr) |
59 TMPFILENAME=None | 65 TMPFILENAME=None |
60 | 66 |
67 def launch(cmd): | |
68 global CMD_PROC, BINOUT | |
69 CMD_PROC=Popen(shlex.split(cmd),stdin=PIPE,bufsize=0) | |
70 BINOUT=CMD_PROC.stdin | |
71 | |
72 def windup(length,offset,filename): | |
73 # Wind up subproc | |
74 BINOUT.close() | |
75 if CMD_PROC.wait()!=0: # could/should be async? | |
76 print("subproc of %s:%s:%s failed with %s"%(length,offset,filename, | |
77 CMD_PROC.returncode), | |
78 file=sys.stderr) | |
79 | |
61 def _output_tmpfile(buf): | 80 def _output_tmpfile(buf): |
62 TMPFILE.write(buf) | 81 TMPFILE.write(buf) |
63 | 82 |
64 def _output_stdout(buf): | 83 def _output_stdout(buf): |
65 BINOUT.write(buf) | 84 BINOUT.write(buf) |
111 return | 130 return |
112 # Only output parts (0 = WARC header, 1 = HTTP header, 2 = body) that are wanted | 131 # Only output parts (0 = WARC header, 1 = HTTP header, 2 = body) that are wanted |
113 state=0 | 132 state=0 |
114 tr=None # Was this record truncated? | 133 tr=None # Was this record truncated? |
115 bl=None # for HTTP Content-Length for the length of the body? | 134 bl=None # for HTTP Content-Length for the length of the body? |
135 # Could we make this faster by working purely within the cb memoryview? | |
136 # It would be messy, but avoid copying huge amounts | |
137 # The outer loop would just be something like | |
138 # clbv=memoryview(bytearray(b"Content-Length: ")) | |
139 # i=s=0 | |
140 # while i<ll: | |
141 # if cb[i]==10: # need to handle \r\n | |
142 # L=cb[s:i] | |
143 # s=i=i+1 | |
144 # if L[:16]==clbv: | |
145 # wl=int(L[16:]) | |
146 # else: | |
147 # i+=1 | |
148 # | |
116 with io.BytesIO(cb) as clear_text: | 149 with io.BytesIO(cb) as clear_text: |
117 for L in clear_text: | 150 for L in clear_text: |
118 if state==0: | 151 if state==0: |
119 # WARC header | 152 # WARC header |
120 if L.startswith(b"Content-Length: "): | 153 if L.startswith(b"Content-Length: "): |
134 if state==1: | 167 if state==1: |
135 # HTTP header | 168 # HTTP header |
136 wl -= len(L) | 169 wl -= len(L) |
137 if not (L==b"" or L.startswith(b"\r")): | 170 if not (L==b"" or L.startswith(b"\r")): |
138 # Non-blank, it's a header | 171 # Non-blank, it's a header |
139 if bl is None and L.startswith(b"Content-Length: "): | 172 (h,_,v)=L.partition(b": ") |
140 bl=int(L[16:].rstrip()) | 173 if bl is None and (h==b"Content-Length"): |
174 bl=int(v) | |
141 if options.headers: | 175 if options.headers: |
142 _output(L) | 176 if isinstance(options.headers,dict): |
177 if h in options.headers: | |
178 options.headers[h]=v | |
179 else: | |
180 _output(L) | |
143 else: | 181 else: |
144 # Blank line, HTTP header is finished | 182 # Blank line, HTTP header is finished |
183 if isinstance(options.headers,dict): | |
184 _output(bytes(str(options.headers),'utf-8')) | |
145 if not options.body: | 185 if not options.body: |
146 return | 186 return |
147 if options.headers: | 187 if options.headers: |
148 _output(L) | 188 _output(L) |
149 state=2 | 189 state=2 |
180 fphelp=('format string for turning 4 filename components into a path, must contain %%s exactly 4 times,\ndefault is "%s"'%FPAT).replace('%s','%%s') | 220 fphelp=('format string for turning 4 filename components into a path, must contain %%s exactly 4 times,\ndefault is "%s"'%FPAT).replace('%s','%%s') |
181 parser.add_argument('--help',help='Show help',action='help') | 221 parser.add_argument('--help',help='Show help',action='help') |
182 parser.add_argument('-d','--debug',help='Debug output',action='store_true') | 222 parser.add_argument('-d','--debug',help='Debug output',action='store_true') |
183 parser.add_argument('-w','--warc',help='output WARC headers', | 223 parser.add_argument('-w','--warc',help='output WARC headers', |
184 action='store_true') | 224 action='store_true') |
185 parser.add_argument('-h','--headers',help='output HTTP headers', | 225 parser.add_argument('-h','--headers',help='process HTTP headers: collect into dict with named values (,-separated) if arg present, else output', |
186 action='store_true') | 226 nargs='?',default=None,const=True) |
187 parser.add_argument('-b','--body',help='output HTTP body', | 227 parser.add_argument('-b','--body',help='output HTTP body', |
188 action='store_true') | 228 action='store_true') |
189 parser.add_argument('-c','--cmd',help='pipes each result thru CMD') | 229 parser.add_argument('-c','--cmd',help='pipes each result thru CMD') |
230 parser.add_argument('-p','--process',help='with -c, launches CMD only once', | |
231 action='store_true') | |
190 parser.add_argument('-m','--module.function',help='module.function to call with a stream'), | 232 parser.add_argument('-m','--module.function',help='module.function to call with a stream'), |
191 parser.add_argument('-s','--save',action='store_true', | 233 parser.add_argument('-s','--save',action='store_true', |
192 help="write to a temporary file and output the name") | 234 help="write to a temporary file and output the name") |
193 parser.add_argument('-f','--fpath', | 235 parser.add_argument('-f','--fpath', |
194 help=fphelp, | 236 help=fphelp, |
219 #print(pa,file=sys.stderr) | 261 #print(pa,file=sys.stderr) |
220 if pa.length is not None: | 262 if pa.length is not None: |
221 # We have to enforce our own check.. | 263 # We have to enforce our own check.. |
222 if pa.offset is None or pa.filename is None: | 264 if pa.offset is None or pa.filename is None: |
223 parser.error("length, offset and filename must all be supplied together") | 265 parser.error("length, offset and filename must all be supplied together") |
266 if isinstance(pa.headers,str): | |
267 pa.headers=dict((bytes(k,'utf-8'),None) for k in pa.headers.split(',')) | |
224 | 268 |
225 buf=bytearray(128*1024*1024) | 269 buf=bytearray(128*1024*1024) |
226 | 270 |
227 whole=not (pa.warc or pa.headers or pa.body) | 271 whole=not (pa.warc or pa.headers or pa.body) |
228 if pa.save: | 272 if pa.save: |
230 import tempfile | 274 import tempfile |
231 elif pa.cmd: | 275 elif pa.cmd: |
232 _output = _output_subproc | 276 _output = _output_subproc |
233 else: | 277 else: |
234 _output = _output_stdout | 278 _output = _output_stdout |
279 if pa.cmd and pa.process: | |
280 launch(pa.cmd) | |
281 # three different ways to process | |
235 if pa.index: | 282 if pa.index: |
236 CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/(warc|crawldiagnostics)/(.*\.gz)"') # no robotstxt yet... | 283 CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/(warc|crawldiagnostics)/(.*\.gz)"') # no robotstxt yet... |
237 for l in sys.stdin: | 284 for l in sys.stdin: |
238 m=CDX.search(l) | 285 m=CDX.search(l) |
239 if m is None: | 286 if m is None: |
240 if l.find('/robotstxt/')>-1: | 287 if l.find('/robotstxt/')>-1: |
241 continue | 288 continue |
242 print("index line problem: \"%s\""%l,file=sys.stderr,end='') | 289 print("index line problem: \"%s\""%l,file=sys.stderr,end='') |
243 exit(2) | 290 exit(2) |
244 f=pa.fpath%(m[3:7]) | 291 filename=pa.fpath%(m[3:7]) |
245 try: | 292 process(pa,buf,filename, |
246 process(pa,buf,f, | 293 int(offset:=m[2]),int(length:=m[1]),whole) |
247 int(m[2]),int(m[1]),whole) | |
248 except Exception as e: | |
249 if pa.debug: | |
250 import traceback | |
251 traceback.print_exc(file=sys.stderr) | |
252 else: | |
253 print("Process fail: %s, input line:\n %s"%(e,l), | |
254 file=sys.stderr,end='') | |
255 exit(3) | |
256 elif pa.length is not None: | 294 elif pa.length is not None: |
257 print(pa.filename,file=sys.stderr) | 295 print(pa.filename,file=sys.stderr) |
258 process(pa,buf,pa.fpath%tuple(pa.filename.split('/')), | 296 process(pa,buf,pa.fpath%tuple(pa.filename.split('/')), |
259 pa.offset,pa.length,whole) | 297 pa.offset,pa.length,whole) |
260 else: | 298 else: |
267 offset=int(offset) | 305 offset=int(offset) |
268 except ValueError as e: | 306 except ValueError as e: |
269 parser.error('Invalid input line: %s\n "%s"'%(e,l)) | 307 parser.error('Invalid input line: %s\n "%s"'%(e,l)) |
270 process(pa,buf,pa.fpath%tuple(filename.split('/')), | 308 process(pa,buf,pa.fpath%tuple(filename.split('/')), |
271 offset,length,whole) | 309 offset,length,whole) |
272 | 310 # processing done one way or another |
311 if pa.cmd and pa.process: | |
312 windup(length,offset,filename) | |
313 # if pa.save and pa.process, deleting temp files is down to cmd | |
273 if __name__ == "__main__": | 314 if __name__ == "__main__": |
274 main() | 315 main() |