comparison bin/ix.py @ 108:9e5b117dc461

using Popen to run igzip (also not great)
author Henry S. Thompson <ht@inf.ed.ac.uk>
date Thu, 22 Apr 2021 19:06:55 +0000
parents 007f35b9df9c
children 15abf4aab307
comparison
equal deleted inserted replaced
107:007f35b9df9c 108:9e5b117dc461
6 or complete cdx index lines. 6 or complete cdx index lines.
7 In all cases by 'filename' is meant crawlid/segmentid/filename 7 In all cases by 'filename' is meant crawlid/segmentid/filename
8 8
9 Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.''' 9 Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.'''
10 10
11 import sys, argparse, regex, os, shutil 11 import sys, argparse, regex, os, shutil, io, gzip, time
12 #from isal import igzip
13 from subprocess import Popen, PIPE
14 #import asyncio
12 15
13 HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]') 16 HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]')
14 BINOUT=sys.stdout.buffer 17 BINOUT=sys.stdout.buffer
15 FPAT="/%s/%s/orig/warc/%s" 18 FPAT="/%s/%s/orig/warc/%s"
16 19
25 rfn=root+filename 28 rfn=root+filename
26 if root!="/beegfs/common_crawl": 29 if root!="/beegfs/common_crawl":
27 if not os.path.exists(rfn): 30 if not os.path.exists(rfn):
28 if not os.path.exists(os.path.dirname(rfn)): 31 if not os.path.exists(os.path.dirname(rfn)):
29 os.makedirs(os.path.dirname(rfn)) 32 os.makedirs(os.path.dirname(rfn))
30 with open('/beegfs/common_crawl'+filename,'rb',0) as infile, \ 33 with io.FileIO('/beegfs/common_crawl'+filename,'r') as infile, \
31 open(rfn,'wb',0) as outfile: 34 io.FileIO(rfn,'w') as outfile:
32 shutil.copyfileobj(infile,outfile,2048*1024) 35 #shutil.copyfileobj(infile,outfile,128*1024*1024)
33 # while True: 36 while True:
34 # l=infile.readinto(buf) 37 l=infile.readinto(buf)
35 # if l is None: 38 if l==0:
36 # break 39 break
37 # print(l,file=sys.stderr) 40 outfile.write(memoryview(buf)[:l])
38 # outfile.write(memoryview(buf)[:l])
39 infile.close()
40 outfile.close()
41 file=open(rfn,'rb',0) 41 file=open(rfn,'rb',0)
42 if whole: 42 if whole:
43 # try external unzip using Popen
43 file.seek(offset) 44 file.seek(offset)
44 bv=memoryview(buf)[:length] 45 bv=memoryview(buf)[:length]
45 nb=file.readinto(bv) 46 nb=file.readinto(bv)
46 if nb!=length: 47 if nb!=length:
47 print("losing",file.name,length,nb,file=sys.stderr) 48 print("losing",file.name,length,nb,file=sys.stderr)
48 BINOUT.write(bv) 49 if options.zipped:
50 BINOUT.write(bv)
51 else:
52 #gzip_chunk = io.BytesIO(bv)
53 uv=memoryview(buf)[length:]
54 #clear_bytes=io.BytesIO(uv)
55 p = Popen(["/lustre/home/dc007/hst/gentoo/usr/bin/igzip",
56 "-dc"],
57 stdin=PIPE,
58 stdout=None)
59 p.stdin.write(bv)
60 p.stdin.close()
61 res=p.wait()
62 if res!=0:
63 print('pipe failed',res,p.stderr.decode())
64 exit(2)
65 file.close()
66 return
67 with igzip.IGzipFile(fileobj=gzip_chunk) as gzip_fin:
68 while True:
69 l=gzip_fin.readinto(uv)
70 if not l:
71 break
72 BINOUT.write(memoryview(uv)[:l])
49 file.close() 73 file.close()
50 74
51 def main(): 75 def main():
52 parser = argparse.ArgumentParser( 76 parser = argparse.ArgumentParser(
53 description='''Extract records from warc files given length, offset and file triples. 77 description='''Extract records from warc files given length, offset and file triples.
73 action='store_true') 97 action='store_true')
74 parser.add_argument('-c','--cmd',help='pipes each result thru CMD') 98 parser.add_argument('-c','--cmd',help='pipes each result thru CMD')
75 parser.add_argument('-r','--root',nargs='?', 99 parser.add_argument('-r','--root',nargs='?',
76 help='File path root, create a copy there if necessary', 100 help='File path root, create a copy there if necessary',
77 default='/beegfs/common_crawl'), 101 default='/beegfs/common_crawl'),
102 parser.add_argument('-z','--zipped',
103 help="output raw gzipped record, ignored if any of -bhw supplied",
104 action='store_true')
78 sg=parser.add_mutually_exclusive_group() 105 sg=parser.add_mutually_exclusive_group()
79 sg.add_argument('-x','--index', 106 sg.add_argument('-x','--index',
80 help='take lines of triples from a cdx index file as input', 107 help='take lines of triples from a cdx index file as input',
81 action='store_true') 108 action='store_true')
82 sg.add_argument('length',type=int, 109 sg.add_argument('length',type=int,
96 if pa.length is not None: 123 if pa.length is not None:
97 # We have to enforce our own check.. 124 # We have to enforce our own check..
98 if pa.offset is None or pa.filename is None: 125 if pa.offset is None or pa.filename is None:
99 parser.error("length, offset and filename must all be supplied together") 126 parser.error("length, offset and filename must all be supplied together")
100 127
101 buf=bytearray(2024*1024) 128 buf=bytearray(128*1024*1024)
102 129
103 whole=not (pa.warc or pa.headers or pa.body) 130 whole=not (pa.warc or pa.headers or pa.body)
104 if pa.length is not None: 131 if pa.length is not None:
105 process(pa,buf,pa.root,FPAT%list(pa.filename.split('/')), 132 process(pa,buf,pa.root,FPAT%list(pa.filename.split('/')),
106 pa.offset,pa.length,whole) 133 pa.offset,pa.length,whole)