comparison bin/ix.py @ 119:bc958b776fb8

implement --cmd
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Wed, 16 Jun 2021 16:12:46 +0000
parents 63898fde9751
children 5b0ec642ee9b
comparison
equal deleted inserted replaced
118:551ff1de13d8 119:bc958b776fb8
6 or complete cdx index lines. 6 or complete cdx index lines.
7 In all cases by 'filename' is meant crawlid/segmentid/filename 7 In all cases by 'filename' is meant crawlid/segmentid/filename
8 8
9 Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.''' 9 Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.'''
10 10
11 import sys, argparse, regex, os, shutil, io, gzip, time 11 import sys, argparse, regex, os, shutil, io, gzip, time, shlex
12 from isal import igzip 12 from isal import igzip
13 #from subprocess import Popen, PIPE 13 from subprocess import Popen, PIPE
14 #import asyncio 14 #import asyncio
15 15
16 HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]') 16 HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]')
17 BINOUT=sys.stdout.buffer 17 BINOUT=sys.stdout.buffer
18 FPAT="/%s/%s/orig/warc/%s" 18 FPAT="/%s/%s/orig/warc/%s"
22 global FOO 22 global FOO
23 FOO=argparse.RawDescriptionHelpFormatter.format_help(self) 23 FOO=argparse.RawDescriptionHelpFormatter.format_help(self)
24 return HACK_USAGE.sub('\n [ ( -x | length offset filename ) ]', 24 return HACK_USAGE.sub('\n [ ( -x | length offset filename ) ]',
25 FOO) 25 FOO)
26 26
27 def process(options,buf,root,filename,offset,length,whole): 27 def process(options,buf,filename,offset,length,whole):
28 global CMD_PROC, BINOUT
29 if options.cmd:
30 CMD_PROC=Popen(shlex.split(options.cmd),stdin=PIPE,bufsize=0)
31 BINOUT=CMD_PROC.stdin
32 process1(options,buf,filename,offset,length,whole)
33 if options.cmd:
34 # Wind up subproc
35 BINOUT.close()
36 if CMD_PROC.wait()!=0: # could/should be async?
37 print("subproc of %s:%s:%s failed with %s"%(length,offset,filename,
38 CMD_PROC.returncode),
39 file=sys.stderr)
40
41 def _output_stdout(buf):
42 BINOUT.write(buf)
43
44 def _output_subproc(buf):
45 toWrite=len(buf)
46 while toWrite>0:
47 toWrite -= BINOUT.write(buf)
48
49 def process1(options,buf,filename,offset,length,whole):
50 root=options.root
28 rfn=root+filename 51 rfn=root+filename
29 if root!="/beegfs/common_crawl": 52 if root!="/beegfs/common_crawl":
30 # Support using ramdisk or other local disk as a faster cached 53 # Support using ramdisk or other local disk as a faster cached
31 if not os.path.exists(rfn): 54 if not os.path.exists(rfn):
32 if not os.path.exists(os.path.dirname(rfn)): 55 if not os.path.exists(os.path.dirname(rfn)):
45 nb=file.readinto(bv) 68 nb=file.readinto(bv)
46 file.close() 69 file.close()
47 if nb!=length: 70 if nb!=length:
48 print("losing",file.name,length,nb,file=sys.stderr) 71 print("losing",file.name,length,nb,file=sys.stderr)
49 if whole and options.zipped: 72 if whole and options.zipped:
50 BINOUT.write(bv) 73 _output(bv)
51 return 74 return
52 gzip_chunk = io.BytesIO(bv) 75 gzip_chunk = io.BytesIO(bv)
53 uv=memoryview(buf)[length:] 76 uv=memoryview(buf)[length:]
54 with igzip.IGzipFile(fileobj=gzip_chunk) as gzip_fin: 77 with igzip.IGzipFile(fileobj=gzip_chunk) as gzip_fin:
55 ll=0 78 ll=0
58 if not l: 81 if not l:
59 break 82 break
60 ll+=l 83 ll+=l
61 cb=memoryview(uv)[:ll] 84 cb=memoryview(uv)[:ll]
62 if whole: 85 if whole:
63 BINOUT.write(cb) 86 _output(cb)
64 return 87 return
65 # Only output parts (0 = WARC header, 1 = HTTP header, 2 = body) that are wanted 88 # Only output parts (0 = WARC header, 1 = HTTP header, 2 = body) that are wanted
66 state=0 89 state=0
67 tr=None # Was this record truncated? 90 tr=None # Was this record truncated?
68 bl=None # for HTTP Content-Length for the length of the body? 91 bl=None # for HTTP Content-Length for the length of the body?
80 if not (options.headers or options.body): 103 if not (options.headers or options.body):
81 return 104 return
82 state=1 105 state=1
83 # Note we preserve the empty line 106 # Note we preserve the empty line
84 if options.warc: 107 if options.warc:
85 BINOUT.write(L) 108 _output(L)
86 continue 109 continue
87 if state==1: 110 if state==1:
88 # HTTP header 111 # HTTP header
89 wl -= len(L) 112 wl -= len(L)
90 if not (L==b"" or L.startswith(b"\r")): 113 if not (L==b"" or L.startswith(b"\r")):
91 # Non-blank, it's a header 114 # Non-blank, it's a header
92 if bl is None and L.startswith(b"Content-Length: "): 115 if bl is None and L.startswith(b"Content-Length: "):
93 bl=int(L[16:].rstrip()) 116 bl=int(L[16:].rstrip())
94 if options.headers: 117 if options.headers:
95 BINOUT.write(L) 118 _output(L)
96 else: 119 else:
97 # Blank line, HTTP header is finished 120 # Blank line, HTTP header is finished
98 if not options.body: 121 if not options.body:
99 return 122 return
100 if options.headers: 123 if options.headers:
101 BINOUT.write(L) 124 _output(L)
102 state=2 125 state=2
103 # The above is just for sanity, because we do _not_ 126 # The above is just for sanity, because we do _not_
104 # continue with the outer loop, 127 # continue with the outer loop,
105 # since we can now block-output the entire rest of the 128 # since we can now block-output the entire rest of the
106 # input buffer. 129 # input buffer.
110 (length,offset,filename,wl,bl,tr),file=sys.stderr) 133 (length,offset,filename,wl,bl,tr),file=sys.stderr)
111 # HTTP body 134 # HTTP body
112 balance=clear_text.tell() 135 balance=clear_text.tell()
113 #print(balance,bl,wl,ll,ll-balance,file=sys.stderr) 136 #print(balance,bl,wl,ll,ll-balance,file=sys.stderr)
114 # Output whatever is left 137 # Output whatever is left
115 BINOUT.write(cb[balance:balance+wl]) 138 _output(cb[balance:balance+wl])
116 return 139 return
117 140
118 def main(): 141 def main():
142 global _output
119 parser = argparse.ArgumentParser( 143 parser = argparse.ArgumentParser(
120 description='''Extract records from warc files given length, offset and file triples. 144 description='''Extract records from warc files given length, offset and file triples.
121 Input one triple on command line, or 145 Input one triple on command line, or
122 triples from stdin as tab-delimited lines 146 triples from stdin as tab-delimited lines
123 or complete cdx index lines. 147 or complete cdx index lines.
169 parser.error("length, offset and filename must all be supplied together") 193 parser.error("length, offset and filename must all be supplied together")
170 194
171 buf=bytearray(128*1024*1024) 195 buf=bytearray(128*1024*1024)
172 196
173 whole=not (pa.warc or pa.headers or pa.body) 197 whole=not (pa.warc or pa.headers or pa.body)
174 if pa.length is not None: 198 if pa.cmd:
175 process(pa,buf,pa.root,FPAT%list(pa.filename.split('/')), 199 _output = _output_subproc
176 pa.offset,pa.length,whole) 200 else:
177 exit(0) 201 _output = _output_stdout
178 if pa.index: 202 if pa.index:
179 CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/warc/(.*\.gz)"') 203 CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/warc/(.*\.gz)"')
180 for l in sys.stdin: 204 for l in sys.stdin:
181 m=CDX.search(l) 205 m=CDX.search(l)
182 if m is None: 206 if m is None:
183 print("index line problem: \"%s\""%l.lstrip(),file=sys.stderr) 207 print("index line problem: \"%s\""%l.lstrip(),file=sys.stderr)
184 exit(2) 208 exit(2)
185 f=FPAT%(m[3:6]) 209 f=FPAT%(m[3:6])
186 process(pa,buf,pa.root,f, 210 process(pa,buf,f,
187 int(m[2]),int(m[1]),whole) 211 int(m[2]),int(m[1]),whole)
188 exit(0) 212 elif pa.length is not None:
213 print(pa.filename,file=sys.stderr)
214 process(pa,buf,FPAT%tuple(pa.filename.split('/')),
215 pa.offset,pa.length,whole)
216 else:
217 print("Reading length, offset, filename tab-delimited triples from stdin...",
218 file=sys.stderr)
219 for l in sys.stdin:
220 try:
221 (length,offset,filename)=l.rstrip().split('\t')
222 length=int(length)
223 offset=int(offset)
224 except ValueError as e:
225 parser.error('Invalid input line: %s\n "%s"'%(e,l))
226 process(pa,buf,FPAT%tuple(filename.split('/')),
227 offset,length,whole)
228
189 if __name__ == "__main__": 229 if __name__ == "__main__":
190 main() 230 main()