comparison bin/ix.py @ 21:cbac7dfe2f24

interpolate process0, support permanent subproc
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Thu, 29 Sep 2022 16:33:42 +0100
parents 046dbe557911
children fa43c318749b
comparison
equal deleted inserted replaced
20:a5dafc1364ed 21:cbac7dfe2f24
25 FOO=argparse.RawDescriptionHelpFormatter.format_help(self) 25 FOO=argparse.RawDescriptionHelpFormatter.format_help(self)
26 return HACK_USAGE.sub('\n [ ( -x | length offset filename ) ]', 26 return HACK_USAGE.sub('\n [ ( -x | length offset filename ) ]',
27 FOO) 27 FOO)
28 28
29 def process(options,buf,filename,offset,length,whole): 29 def process(options,buf,filename,offset,length,whole):
30 global CMD_PROC, BINOUT, TMPFILENAME, TMPFILE 30 try:
31 process0(options,buf,filename,offset,length,whole)
32 except Exception as e:
33 if options.debug:
34 import traceback
35 traceback.print_exc(file=sys.stderr)
36 else:
37 print("Process fail: %s, input line:\n %s"%(e,l),
38 file=sys.stderr,end='')
39 exit(3)
40
41 def process0(options,buf,filename,offset,length,whole):
42 global TMPFILENAME, TMPFILE
31 if options.save: 43 if options.save:
32 (tf,TMPFILENAME)=tempfile.mkstemp() 44 (tf,TMPFILENAME)=tempfile.mkstemp()
33 TMPFILE=open(tf,mode='wb') 45 TMPFILE=open(tf,mode='wb')
34 if options.cmd: 46 if options.cmd and not options.process:
35 CMD_PROC=Popen(shlex.split(options.cmd),stdin=PIPE,bufsize=0) 47 launch(options.cmd)
36 BINOUT=CMD_PROC.stdin
37 process1(options,buf,filename,offset,length,whole) 48 process1(options,buf,filename,offset,length,whole)
38 if options.save: 49 if options.save:
39 TMPFILE.close() 50 TMPFILE.close()
40 if options.cmd: 51 if options.cmd:
41 _output_subproc(bytes(TMPFILENAME,'utf-8')) 52 _output_subproc(bytes(TMPFILENAME,'utf-8'))
42 _output_subproc(b"\n") 53 _output_subproc(b"\n")
43 else: 54 else:
44 BINOUT.write(bytes(TMPFILENAME,'utf-8')) 55 BINOUT.write(bytes(TMPFILENAME,'utf-8'))
45 BINOUT.write(b"\n") 56 BINOUT.write(b"\n")
46 if options.cmd: 57 if options.cmd:
47 # Wind up subproc 58 if not options.process:
48 BINOUT.close() 59 windup(filename,options,length)
49 if CMD_PROC.wait()!=0: # could/should be async?
50 print("subproc of %s:%s:%s failed with %s"%(length,offset,filename,
51 CMD_PROC.returncode),
52 file=sys.stderr)
53 if options.save: 60 if options.save:
54 # not if async?
55 os.unlink(TMPFILENAME) 61 os.unlink(TMPFILENAME)
56 TMPFILENAME=None 62 TMPFILENAME=None
57 elif options.save: 63 elif options.save:
58 print("%s will need to be deleted"%TMPFILENAME,file=sys.stderr) 64 print("%s will need to be deleted"%TMPFILENAME,file=sys.stderr)
59 TMPFILENAME=None 65 TMPFILENAME=None
60 66
67 def launch(cmd):
68 global CMD_PROC, BINOUT
69 CMD_PROC=Popen(shlex.split(cmd),stdin=PIPE,bufsize=0)
70 BINOUT=CMD_PROC.stdin
71
72 def windup(length,offset,filename):
73 # Wind up subproc
74 BINOUT.close()
75 if CMD_PROC.wait()!=0: # could/should be async?
76 print("subproc of %s:%s:%s failed with %s"%(length,offset,filename,
77 CMD_PROC.returncode),
78 file=sys.stderr)
79
61 def _output_tmpfile(buf): 80 def _output_tmpfile(buf):
62 TMPFILE.write(buf) 81 TMPFILE.write(buf)
63 82
64 def _output_stdout(buf): 83 def _output_stdout(buf):
65 BINOUT.write(buf) 84 BINOUT.write(buf)
111 return 130 return
112 # Only output parts (0 = WARC header, 1 = HTTP header, 2 = body) that are wanted 131 # Only output parts (0 = WARC header, 1 = HTTP header, 2 = body) that are wanted
113 state=0 132 state=0
114 tr=None # Was this record truncated? 133 tr=None # Was this record truncated?
115 bl=None # for HTTP Content-Length for the length of the body? 134 bl=None # for HTTP Content-Length for the length of the body?
135 # Could we make this faster by working purely within the cb memoryview?
136 # It would be messy, but avoid copying huge amounts
137 # The outer loop would just be something like
138 # clbv=memoryview(bytearray(b"Content-Length: "))
139 # i=s=0
140 # while i<ll:
141 # if cb[i]==10: # need to handle \r\n
142 # L=cb[s:i]
143 # s=i=i+1
144 # if L[:16]==clbv:
145 # wl=int(L[16:])
146 # else:
147 # i+=1
148 #
116 with io.BytesIO(cb) as clear_text: 149 with io.BytesIO(cb) as clear_text:
117 for L in clear_text: 150 for L in clear_text:
118 if state==0: 151 if state==0:
119 # WARC header 152 # WARC header
120 if L.startswith(b"Content-Length: "): 153 if L.startswith(b"Content-Length: "):
134 if state==1: 167 if state==1:
135 # HTTP header 168 # HTTP header
136 wl -= len(L) 169 wl -= len(L)
137 if not (L==b"" or L.startswith(b"\r")): 170 if not (L==b"" or L.startswith(b"\r")):
138 # Non-blank, it's a header 171 # Non-blank, it's a header
139 if bl is None and L.startswith(b"Content-Length: "): 172 (h,_,v)=L.partition(b": ")
140 bl=int(L[16:].rstrip()) 173 if bl is None and (h==b"Content-Length"):
174 bl=int(v)
141 if options.headers: 175 if options.headers:
142 _output(L) 176 if isinstance(options.headers,dict):
177 if h in options.headers:
178 options.headers[h]=v
179 else:
180 _output(L)
143 else: 181 else:
144 # Blank line, HTTP header is finished 182 # Blank line, HTTP header is finished
183 if isinstance(options.headers,dict):
184 _output(bytes(str(options.headers),'utf-8'))
145 if not options.body: 185 if not options.body:
146 return 186 return
147 if options.headers: 187 if options.headers:
148 _output(L) 188 _output(L)
149 state=2 189 state=2
180 fphelp=('format string for turning 4 filename components into a path, must contain %%s exactly 4 times,\ndefault is "%s"'%FPAT).replace('%s','%%s') 220 fphelp=('format string for turning 4 filename components into a path, must contain %%s exactly 4 times,\ndefault is "%s"'%FPAT).replace('%s','%%s')
181 parser.add_argument('--help',help='Show help',action='help') 221 parser.add_argument('--help',help='Show help',action='help')
182 parser.add_argument('-d','--debug',help='Debug output',action='store_true') 222 parser.add_argument('-d','--debug',help='Debug output',action='store_true')
183 parser.add_argument('-w','--warc',help='output WARC headers', 223 parser.add_argument('-w','--warc',help='output WARC headers',
184 action='store_true') 224 action='store_true')
185 parser.add_argument('-h','--headers',help='output HTTP headers', 225 parser.add_argument('-h','--headers',help='process HTTP headers: collect into dict with named values (,-separated) if arg present, else output',
186 action='store_true') 226 nargs='?',default=None,const=True)
187 parser.add_argument('-b','--body',help='output HTTP body', 227 parser.add_argument('-b','--body',help='output HTTP body',
188 action='store_true') 228 action='store_true')
189 parser.add_argument('-c','--cmd',help='pipes each result thru CMD') 229 parser.add_argument('-c','--cmd',help='pipes each result thru CMD')
230 parser.add_argument('-p','--process',help='with -c, launches CMD only once',
231 action='store_true')
190 parser.add_argument('-m','--module.function',help='module.function to call with a stream'), 232 parser.add_argument('-m','--module.function',help='module.function to call with a stream'),
191 parser.add_argument('-s','--save',action='store_true', 233 parser.add_argument('-s','--save',action='store_true',
192 help="write to a temporary file and output the name") 234 help="write to a temporary file and output the name")
193 parser.add_argument('-f','--fpath', 235 parser.add_argument('-f','--fpath',
194 help=fphelp, 236 help=fphelp,
219 #print(pa,file=sys.stderr) 261 #print(pa,file=sys.stderr)
220 if pa.length is not None: 262 if pa.length is not None:
221 # We have to enforce our own check.. 263 # We have to enforce our own check..
222 if pa.offset is None or pa.filename is None: 264 if pa.offset is None or pa.filename is None:
223 parser.error("length, offset and filename must all be supplied together") 265 parser.error("length, offset and filename must all be supplied together")
266 if isinstance(pa.headers,str):
267 pa.headers=dict((bytes(k,'utf-8'),None) for k in pa.headers.split(','))
224 268
225 buf=bytearray(128*1024*1024) 269 buf=bytearray(128*1024*1024)
226 270
227 whole=not (pa.warc or pa.headers or pa.body) 271 whole=not (pa.warc or pa.headers or pa.body)
228 if pa.save: 272 if pa.save:
230 import tempfile 274 import tempfile
231 elif pa.cmd: 275 elif pa.cmd:
232 _output = _output_subproc 276 _output = _output_subproc
233 else: 277 else:
234 _output = _output_stdout 278 _output = _output_stdout
279 if pa.cmd and pa.process:
280 launch(pa.cmd)
281 # three different ways to process
235 if pa.index: 282 if pa.index:
236 CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/(warc|crawldiagnostics)/(.*\.gz)"') # no robotstxt yet... 283 CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/(warc|crawldiagnostics)/(.*\.gz)"') # no robotstxt yet...
237 for l in sys.stdin: 284 for l in sys.stdin:
238 m=CDX.search(l) 285 m=CDX.search(l)
239 if m is None: 286 if m is None:
240 if l.find('/robotstxt/')>-1: 287 if l.find('/robotstxt/')>-1:
241 continue 288 continue
242 print("index line problem: \"%s\""%l,file=sys.stderr,end='') 289 print("index line problem: \"%s\""%l,file=sys.stderr,end='')
243 exit(2) 290 exit(2)
244 f=pa.fpath%(m[3:7]) 291 filename=pa.fpath%(m[3:7])
245 try: 292 process(pa,buf,filename,
246 process(pa,buf,f, 293 int(offset:=m[2]),int(length:=m[1]),whole)
247 int(m[2]),int(m[1]),whole)
248 except Exception as e:
249 if pa.debug:
250 import traceback
251 traceback.print_exc(file=sys.stderr)
252 else:
253 print("Process fail: %s, input line:\n %s"%(e,l),
254 file=sys.stderr,end='')
255 exit(3)
256 elif pa.length is not None: 294 elif pa.length is not None:
257 print(pa.filename,file=sys.stderr) 295 print(pa.filename,file=sys.stderr)
258 process(pa,buf,pa.fpath%tuple(pa.filename.split('/')), 296 process(pa,buf,pa.fpath%tuple(pa.filename.split('/')),
259 pa.offset,pa.length,whole) 297 pa.offset,pa.length,whole)
260 else: 298 else:
267 offset=int(offset) 305 offset=int(offset)
268 except ValueError as e: 306 except ValueError as e:
269 parser.error('Invalid input line: %s\n "%s"'%(e,l)) 307 parser.error('Invalid input line: %s\n "%s"'%(e,l))
270 process(pa,buf,pa.fpath%tuple(filename.split('/')), 308 process(pa,buf,pa.fpath%tuple(filename.split('/')),
271 offset,length,whole) 309 offset,length,whole)
272 310 # processing done one way or another
311 if pa.cmd and pa.process:
312 windup(length,offset,filename)
313 # if pa.save and pa.process, deleting temp files is down to cmd
273 if __name__ == "__main__": 314 if __name__ == "__main__":
274 main() 315 main()