comparison bin/ix.py @ 18:046dbe557911

write to tmp file implemented
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Sun, 07 Aug 2022 13:58:33 +0100
parents
children cbac7dfe2f24
comparison
equal deleted inserted replaced
17:75e0d0013da0 18:046dbe557911
1 #!/usr/bin/env python3
2 '''Extract request records from Common Crawl WARC-format files
3 given length, offset and filename triples.
4 Input one triple on command line, or
5 triples from stdin as tab-delimited lines
6 or complete cdx index lines.
7 In all cases by 'filename' is meant crawlid/segmentid/type/filename
8
9 Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.'''
10
11 import sys, argparse, regex, os, shutil, io, gzip, time, shlex
12 from isal import igzip
13 from subprocess import Popen, PIPE
14 #import asyncio
15
16 HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]')
17 BINOUT=sys.stdout.buffer
18 FPAT="/%s/%s/orig/%s/%s"
19
20 CMD_PROC=None
21 TMPFILENAME=None
22
23 class HackFormat(argparse.RawDescriptionHelpFormatter):
24 def format_help(self):
25 FOO=argparse.RawDescriptionHelpFormatter.format_help(self)
26 return HACK_USAGE.sub('\n [ ( -x | length offset filename ) ]',
27 FOO)
28
29 def process(options,buf,filename,offset,length,whole):
30 global CMD_PROC, BINOUT, TMPFILENAME, TMPFILE
31 if options.save:
32 (tf,TMPFILENAME)=tempfile.mkstemp()
33 TMPFILE=open(tf,mode='wb')
34 if options.cmd:
35 CMD_PROC=Popen(shlex.split(options.cmd),stdin=PIPE,bufsize=0)
36 BINOUT=CMD_PROC.stdin
37 process1(options,buf,filename,offset,length,whole)
38 if options.save:
39 TMPFILE.close()
40 if options.cmd:
41 _output_subproc(bytes(TMPFILENAME,'utf-8'))
42 _output_subproc(b"\n")
43 else:
44 BINOUT.write(bytes(TMPFILENAME,'utf-8'))
45 BINOUT.write(b"\n")
46 if options.cmd:
47 # Wind up subproc
48 BINOUT.close()
49 if CMD_PROC.wait()!=0: # could/should be async?
50 print("subproc of %s:%s:%s failed with %s"%(length,offset,filename,
51 CMD_PROC.returncode),
52 file=sys.stderr)
53 if options.save:
54 # not if async?
55 os.unlink(TMPFILENAME)
56 TMPFILENAME=None
57 elif options.save:
58 print("%s will need to be deleted"%TMPFILENAME,file=sys.stderr)
59 TMPFILENAME=None
60
61 def _output_tmpfile(buf):
62 TMPFILE.write(buf)
63
64 def _output_stdout(buf):
65 BINOUT.write(buf)
66
67 def _output_subproc(buf):
68 toWrite=len(buf)
69 while toWrite>0:
70 toWrite -= BINOUT.write(buf)
71
72 def process1(options,buf,filename,offset,length,whole):
73 root=options.root
74 rfn=root+filename
75 if root!="/beegfs/common_crawl":
76 # Support using ramdisk or other local disk as a faster cached
77 if not os.path.exists(rfn):
78 if not os.path.exists(os.path.dirname(rfn)):
79 os.makedirs(os.path.dirname(rfn))
80 with io.FileIO('/beegfs/common_crawl'+filename,'r') as infile, \
81 io.FileIO(rfn,'w') as outfile:
82 #shutil.copyfileobj(infile,outfile,128*1024*1024)
83 while True:
84 l=infile.readinto(buf)
85 if l==0:
86 break
87 outfile.write(memoryview(buf)[:l])
88 file=open(rfn,'rb',0)
89 file.seek(offset)
90 bv=memoryview(buf)[:length]
91 nb=file.readinto(bv)
92 file.close()
93 if nb!=length:
94 raise ValueError("Chunk read losing: %s, got %s expected %s at %s"%(file.name,
95 nb,length,offset))
96 if whole and options.zipped:
97 _output(bv)
98 return
99 gzip_chunk = io.BytesIO(bv)
100 uv=memoryview(buf)[length:]
101 with igzip.IGzipFile(fileobj=gzip_chunk) as gzip_fin:
102 ll=0
103 while True:
104 l=gzip_fin.readinto(uv)
105 if not l:
106 break
107 ll+=l
108 cb=memoryview(uv)[:ll]
109 if whole:
110 _output(cb)
111 return
112 # Only output parts (0 = WARC header, 1 = HTTP header, 2 = body) that are wanted
113 state=0
114 tr=None # Was this record truncated?
115 bl=None # for HTTP Content-Length for the length of the body?
116 with io.BytesIO(cb) as clear_text:
117 for L in clear_text:
118 if state==0:
119 # WARC header
120 if L.startswith(b"Content-Length: "):
121 wl=int(L[16:].rstrip())
122 elif L.startswith(b"WARC-Truncated: "):
123 tr=L[16:].rstrip()
124 tr="EMPTY" if tr=="" else tr
125 elif L==b"" or L.startswith(b"\r"): # for idempotency
126 # Blank line, WARC header is finished
127 if not (options.headers or options.body):
128 return
129 state=1
130 # Note we preserve the empty line
131 if options.warc:
132 _output(L)
133 continue
134 if state==1:
135 # HTTP header
136 wl -= len(L)
137 if not (L==b"" or L.startswith(b"\r")):
138 # Non-blank, it's a header
139 if bl is None and L.startswith(b"Content-Length: "):
140 bl=int(L[16:].rstrip())
141 if options.headers:
142 _output(L)
143 else:
144 # Blank line, HTTP header is finished
145 if not options.body:
146 return
147 if options.headers:
148 _output(L)
149 state=2
150 # The above is just for sanity, because we do _not_
151 # continue with the outer loop,
152 # since we can now block-output the entire rest of the
153 # input buffer.
154 if bl is not None:
155 if bl!=wl:
156 print("length mismatch: %s %s %s here: %s given: %s trunc: %s"%\
157 (length,offset,filename,wl,bl,tr),file=sys.stderr)
158 # HTTP body
159 balance=clear_text.tell()
160 #print(balance,bl,wl,ll,ll-balance,file=sys.stderr)
161 # Output whatever is left
162 _output(cb[balance:balance+wl])
163 return
164
165 def main():
166 global _output,TMPFILE,TMPFILENAME,tempfile
167 parser = argparse.ArgumentParser(
168 description='''Extract records from warc files given length, offset and file triples.
169 Input one triple on command line, or
170 triples from stdin as tab-delimited lines
171 or complete cdx index lines.
172 In all cases by 'filename' is meant crawlid/segmentid/type/filename''',
173 epilog='''Note that if no output flag(s) is/are given,
174 the whole WARC record will be output, more efficiently than
175 would be the case if all three flags were given.''',
176 add_help=False,
177 conflict_handler='resolve',
178 formatter_class=HackFormat
179 )
180 fphelp=('format string for turning 4 filename components into a path, must contain %%s exactly 4 times,\ndefault is "%s"'%FPAT).replace('%s','%%s')
181 parser.add_argument('--help',help='Show help',action='help')
182 parser.add_argument('-d','--debug',help='Debug output',action='store_true')
183 parser.add_argument('-w','--warc',help='output WARC headers',
184 action='store_true')
185 parser.add_argument('-h','--headers',help='output HTTP headers',
186 action='store_true')
187 parser.add_argument('-b','--body',help='output HTTP body',
188 action='store_true')
189 parser.add_argument('-c','--cmd',help='pipes each result thru CMD')
190 parser.add_argument('-m','--module.function',help='module.function to call with a stream'),
191 parser.add_argument('-s','--save',action='store_true',
192 help="write to a temporary file and output the name")
193 parser.add_argument('-f','--fpath',
194 help=fphelp,
195 default=FPAT)
196 parser.add_argument('-r','--root',nargs='?',
197 help='File path root, create a copy there if necessary',
198 default='/beegfs/common_crawl'),
199 parser.add_argument('-z','--zipped',
200 help="output raw gzipped record, ignored if any of -bhw supplied",
201 action='store_true')
202 sg=parser.add_mutually_exclusive_group()
203 sg.add_argument('-x','--index',
204 help='take lines of triples from a cdx index file as input',
205 action='store_true')
206 sg.add_argument('length',type=int,
207 help='length in bytes of gzipped record',
208 nargs='?')
209 parser.add_argument('offset',type=int,
210 help='start position in bytes of gzipped record',
211 nargs='?')
212 parser.add_argument('filename',
213 help='pathname of gzipped Common Crawl WARC-format file',
214 nargs='?')
215 # Hack the order of optional and positional in the help output
216 parser._action_groups.sort(key=lambda g:g.title)
217 #parser.print_help()
218 pa=parser.parse_args(sys.argv[1:])
219 #print(pa,file=sys.stderr)
220 if pa.length is not None:
221 # We have to enforce our own check..
222 if pa.offset is None or pa.filename is None:
223 parser.error("length, offset and filename must all be supplied together")
224
225 buf=bytearray(128*1024*1024)
226
227 whole=not (pa.warc or pa.headers or pa.body)
228 if pa.save:
229 _output=_output_tmpfile
230 import tempfile
231 elif pa.cmd:
232 _output = _output_subproc
233 else:
234 _output = _output_stdout
235 if pa.index:
236 CDX=regex.compile('length": "([0-9]*)", "offset": "([0-9]*)", "filename": "crawl-data/([^/]*)/segments/([^/]*)/(warc|crawldiagnostics)/(.*\.gz)"') # no robotstxt yet...
237 for l in sys.stdin:
238 m=CDX.search(l)
239 if m is None:
240 if l.find('/robotstxt/')>-1:
241 continue
242 print("index line problem: \"%s\""%l,file=sys.stderr,end='')
243 exit(2)
244 f=pa.fpath%(m[3:7])
245 try:
246 process(pa,buf,f,
247 int(m[2]),int(m[1]),whole)
248 except Exception as e:
249 if pa.debug:
250 import traceback
251 traceback.print_exc(file=sys.stderr)
252 else:
253 print("Process fail: %s, input line:\n %s"%(e,l),
254 file=sys.stderr,end='')
255 exit(3)
256 elif pa.length is not None:
257 print(pa.filename,file=sys.stderr)
258 process(pa,buf,pa.fpath%tuple(pa.filename.split('/')),
259 pa.offset,pa.length,whole)
260 else:
261 print("Reading length, offset, filename tab-delimited triples from stdin...",
262 file=sys.stderr)
263 for l in sys.stdin:
264 try:
265 (length,offset,filename)=l.rstrip().split('\t')
266 length=int(length)
267 offset=int(offset)
268 except ValueError as e:
269 parser.error('Invalid input line: %s\n "%s"'%(e,l))
270 process(pa,buf,pa.fpath%tuple(filename.split('/')),
271 offset,length,whole)
272
273 if __name__ == "__main__":
274 main()