Mercurial > hg > cc > cirrus_home
comparison bin/ix.py @ 108:9e5b117dc461
using Popen to run igzip (also not great)
author | Henry S. Thompson <ht@inf.ed.ac.uk> |
---|---|
date | Thu, 22 Apr 2021 19:06:55 +0000 |
parents | 007f35b9df9c |
children | 15abf4aab307 |
comparison
equal
deleted
inserted
replaced
107:007f35b9df9c | 108:9e5b117dc461 |
---|---|
6 or complete cdx index lines. | 6 or complete cdx index lines. |
7 In all cases by 'filename' is meant crawlid/segmentid/filename | 7 In all cases by 'filename' is meant crawlid/segmentid/filename |
8 | 8 |
9 Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.''' | 9 Note that if no output flag(s) is/are given, the whole WARC record will be output, more efficiently than would be the case if -whb is given.''' |
10 | 10 |
11 import sys, argparse, regex, os, shutil | 11 import sys, argparse, regex, os, shutil, io, gzip, time |
12 #from isal import igzip | |
13 from subprocess import Popen, PIPE | |
14 #import asyncio | |
12 | 15 |
13 HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]') | 16 HACK_USAGE=regex.compile('\[-x\]\n\s*\[length\] \[offset\] \[filename\]') |
14 BINOUT=sys.stdout.buffer | 17 BINOUT=sys.stdout.buffer |
15 FPAT="/%s/%s/orig/warc/%s" | 18 FPAT="/%s/%s/orig/warc/%s" |
16 | 19 |
25 rfn=root+filename | 28 rfn=root+filename |
26 if root!="/beegfs/common_crawl": | 29 if root!="/beegfs/common_crawl": |
27 if not os.path.exists(rfn): | 30 if not os.path.exists(rfn): |
28 if not os.path.exists(os.path.dirname(rfn)): | 31 if not os.path.exists(os.path.dirname(rfn)): |
29 os.makedirs(os.path.dirname(rfn)) | 32 os.makedirs(os.path.dirname(rfn)) |
30 with open('/beegfs/common_crawl'+filename,'rb',0) as infile, \ | 33 with io.FileIO('/beegfs/common_crawl'+filename,'r') as infile, \ |
31 open(rfn,'wb',0) as outfile: | 34 io.FileIO(rfn,'w') as outfile: |
32 shutil.copyfileobj(infile,outfile,2048*1024) | 35 #shutil.copyfileobj(infile,outfile,128*1024*1024) |
33 # while True: | 36 while True: |
34 # l=infile.readinto(buf) | 37 l=infile.readinto(buf) |
35 # if l is None: | 38 if l==0: |
36 # break | 39 break |
37 # print(l,file=sys.stderr) | 40 outfile.write(memoryview(buf)[:l]) |
38 # outfile.write(memoryview(buf)[:l]) | |
39 infile.close() | |
40 outfile.close() | |
41 file=open(rfn,'rb',0) | 41 file=open(rfn,'rb',0) |
42 if whole: | 42 if whole: |
43 # try external unzip using Popen | |
43 file.seek(offset) | 44 file.seek(offset) |
44 bv=memoryview(buf)[:length] | 45 bv=memoryview(buf)[:length] |
45 nb=file.readinto(bv) | 46 nb=file.readinto(bv) |
46 if nb!=length: | 47 if nb!=length: |
47 print("losing",file.name,length,nb,file=sys.stderr) | 48 print("losing",file.name,length,nb,file=sys.stderr) |
48 BINOUT.write(bv) | 49 if options.zipped: |
50 BINOUT.write(bv) | |
51 else: | |
52 #gzip_chunk = io.BytesIO(bv) | |
53 uv=memoryview(buf)[length:] | |
54 #clear_bytes=io.BytesIO(uv) | |
55 p = Popen(["/lustre/home/dc007/hst/gentoo/usr/bin/igzip", | |
56 "-dc"], | |
57 stdin=PIPE, | |
58 stdout=None) | |
59 p.stdin.write(bv) | |
60 p.stdin.close() | |
61 res=p.wait() | |
62 if res!=0: | |
63 print('pipe failed',res,p.stderr.decode()) | |
64 exit(2) | |
65 file.close() | |
66 return | |
67 with igzip.IGzipFile(fileobj=gzip_chunk) as gzip_fin: | |
68 while True: | |
69 l=gzip_fin.readinto(uv) | |
70 if not l: | |
71 break | |
72 BINOUT.write(memoryview(uv)[:l]) | |
49 file.close() | 73 file.close() |
50 | 74 |
51 def main(): | 75 def main(): |
52 parser = argparse.ArgumentParser( | 76 parser = argparse.ArgumentParser( |
53 description='''Extract records from warc files given length, offset and file triples. | 77 description='''Extract records from warc files given length, offset and file triples. |
73 action='store_true') | 97 action='store_true') |
74 parser.add_argument('-c','--cmd',help='pipes each result thru CMD') | 98 parser.add_argument('-c','--cmd',help='pipes each result thru CMD') |
75 parser.add_argument('-r','--root',nargs='?', | 99 parser.add_argument('-r','--root',nargs='?', |
76 help='File path root, create a copy there if necessary', | 100 help='File path root, create a copy there if necessary', |
77 default='/beegfs/common_crawl'), | 101 default='/beegfs/common_crawl'), |
102 parser.add_argument('-z','--zipped', | |
103 help="output raw gzipped record, ignored if any of -bhw supplied", | |
104 action='store_true') | |
78 sg=parser.add_mutually_exclusive_group() | 105 sg=parser.add_mutually_exclusive_group() |
79 sg.add_argument('-x','--index', | 106 sg.add_argument('-x','--index', |
80 help='take lines of triples from a cdx index file as input', | 107 help='take lines of triples from a cdx index file as input', |
81 action='store_true') | 108 action='store_true') |
82 sg.add_argument('length',type=int, | 109 sg.add_argument('length',type=int, |
96 if pa.length is not None: | 123 if pa.length is not None: |
97 # We have to enforce our own check.. | 124 # We have to enforce our own check.. |
98 if pa.offset is None or pa.filename is None: | 125 if pa.offset is None or pa.filename is None: |
99 parser.error("length, offset and filename must all be supplied together") | 126 parser.error("length, offset and filename must all be supplied together") |
100 | 127 |
101 buf=bytearray(2024*1024) | 128 buf=bytearray(128*1024*1024) |
102 | 129 |
103 whole=not (pa.warc or pa.headers or pa.body) | 130 whole=not (pa.warc or pa.headers or pa.body) |
104 if pa.length is not None: | 131 if pa.length is not None: |
105 process(pa,buf,pa.root,FPAT%list(pa.filename.split('/')), | 132 process(pa,buf,pa.root,FPAT%list(pa.filename.split('/')), |
106 pa.offset,pa.length,whole) | 133 pa.offset,pa.length,whole) |