Mercurial > hg > cc > azure
changeset 63:d46c8b12fc04
support multiple approaches to key combination, use local files to collect results
author | Henry S. Thompson <ht@markup.co.uk> |
---|---|
date | Wed, 03 Jun 2020 16:40:34 +0000 |
parents | 892e1c0240e1 |
children | b91e44355bbf |
files | master/src/wecu/run_sac.sh master/src/wecu/sac_reducer.py master/src/wecu/sac_schemes.py master/src/wecu/wecu.py |
diffstat | 4 files changed, 160 insertions(+), 39 deletions(-) [+] |
line wrap: on
line diff
--- a/master/src/wecu/run_sac.sh Tue Jun 02 17:35:07 2020 +0000 +++ b/master/src/wecu/run_sac.sh Wed Jun 03 16:40:34 2020 +0000 @@ -1,5 +1,5 @@ #!/bin/bash -# Usage: run_sac.sh numcores hostsFilename workDir mapper (-f filter) (-k numKeys) resType patType patterns +# Usage: run_sac.sh numcores hostsFilename workDir mapper keyHandler (-f filter) (-k numKeys) resType patType patterns echo "$@" 1>cmd cores=$1 hosts=$2 @@ -29,6 +29,7 @@ set -e set -o pipefail mkdir -p logs + mkdir -p res f=$1 shift j=$1 @@ -43,7 +44,7 @@ echo $(date +%Y-%m-%d.%H:%M:%S) $me start $j $ff >>logs/${j}_log export PYTHONIOENCODING=utf-8 { IFS=$'\n' ; stderr=( $( { curl -s -N https://commoncrawl.s3.amazonaws.com/$f | \ - unpigz -dp 1 -c | $filter ./$mapper "$@" ; } 2>&1 1>&4 ; ) ) ; unset IFS ; } 4>&1 + unpigz -dp 1 -c | $filter ./$mapper "$@" ; } 2>&1 1>res/${j}.tsv ; ) ) ; unset IFS ; } { echo $(date +%Y-%m-%d.%H:%M:%S) $me finished $j $ff printf '%s\n' "${stderr[@]}" ; } | sed '2,$s/^/ /' >>logs/${j}_log # hack to try to # guarantee atomic entry in the log @@ -60,6 +61,6 @@ --workdir $wd \ -a input_paths \ --env worker \ - --return 'logs/{#}_log' --cleanup \ - worker '{}' '{#}' "$mapper" "$filter" "$@" | tee >(wc -l 1>&2) |\ - sac_reducer.py $1 $numKeys + --return 'logs/{#}_log' --return 'res/{#}.tsv' --cleanup \ + worker '{}' '{#}' "$mapper" "$filter" "$@" +cat res/*.tsv | sac_reducer.py $1 $numKeys
--- a/master/src/wecu/sac_reducer.py Tue Jun 02 17:35:07 2020 +0000 +++ b/master/src/wecu/sac_reducer.py Wed Jun 03 16:40:34 2020 +0000 @@ -6,15 +6,32 @@ Input lines: tab-separated, numKeys keys (default 1) followed by count''' import sys -from pprint import pprint -print('reducing',sys.argv,file=sys.stderr) -sys.stderr.flush() +#print('reducing',sys.argv,file=sys.stderr) +#sys.stderr.flush() rtype=sys.argv[1] numKeys=int(sys.argv[2]) if len(sys.argv)==3 else 1 numDicts=numKeys-1 +def rec_print(d,buf,pos=0): + if pos!=0: + pos+=buf.write(b'\t') + for k,v in d.items(): + npos=pos+buf.write(k.encode()) + #print(pos,buf.tell(),npos,file=sys.stderr) + if isinstance(v,dict): + rec_print(v,buf,npos) + else: + buf.write(b'\t') + buf.write(b'%d'%v) + buf.write(b'\n') + buf.truncate() + buf.seek(0) + sys.stdout.buffer.write(buf.read(-1)) + buf.seek(pos) + + if rtype == 'by-file': # Show results by file for line in sys.stdin: @@ -26,7 +43,7 @@ for line in sys.stdin: d=res try: - ll = line.split('\t',4) + ll = line.split('\t',numKeys+1) for i in range(numDicts): d=d.setdefault(ll[i],dict()) k=ll[numDicts].rstrip() @@ -35,15 +52,15 @@ print('bogus',line,ll,file=sys.stderr) continue - print('nc',len(res), - list(res.keys()), - list(sum(len(res[i][j]) for j in res[i].keys()) for i in res.keys()) if numKeys>1 else '', - file=sys.stderr) +# print('nc',len(res),file=sys.stderr) +# if numKeys>1: +# print(' ',list(res.keys()),"\n ", +# list(sum(len(res[i][j]) for j in res[i].keys()) for i in res.keys()), file=sys.stderr) if rtype=='dict': print('res=',end='') + from pprint import pprint pprint(res) else: - for k1,v1 in res.items(): - for k2,v2 in v1.items(): - for k3,v3 in v2.items(): - print(k1,k2,k3,v3,sep='\t') + from io import BufferedRandom, BytesIO + rec_print(res,BufferedRandom(BytesIO(),10000)) +
--- a/master/src/wecu/sac_schemes.py Tue Jun 02 17:35:07 2020 +0000 +++ b/master/src/wecu/sac_schemes.py Wed Jun 03 16:40:34 2020 +0000 @@ -1,9 +1,22 @@ #!/usr/bin/python3 -'''Assumes export PYTHONIOENCODING=utf-8 has been done if necessary''' +'''Assumes export PYTHONIOENCODING=utf-8 has been done if necessary + +Usage: uz ...wat.gz | sac_schemes.py [-d] [altStorageScheme] + +where altStorageScheme if present selects an alternative approach to storing triple counts: + [absent]: three nested dictionaries + 1: one dictionary indexed by 4-tuple + 2: one dictionary indexed by ".".join(keys)''' import sys, json, regex from collections.abc import Iterable +if len(sys.argv)>1 and sys.argv[1]=='-d': + sys.argv.pop(1) + dictRes=True +else: + dictRes=False + META_PATH=['Envelope', 'Payload-Metadata', 'HTTP-Response-Metadata'] PATHS={'hdr':['Headers'], @@ -13,6 +26,8 @@ SCHEME=regex.compile('(<?[a-zA-Z][a-zA-Z0-9+.-]*):') URN=regex.compile('(<?urn:[a-z][a-z0-9+.-]*):',regex.I) +EMPTY='' + def walk(o,f,r,path=None): '''Apply f to every key+leaf of a json object in region r''' if isinstance(o,dict): @@ -38,6 +53,45 @@ walk(i,f,r,path) def pp(v,k,p,r): + '''Uses nested dictionaries''' + if isinstance(v,str): + m=SCHEME.match(v) + if m is not None: + n=URN.match(v) + if n is not None: + m=n + s=m.group(1) + # The following assumes paths are always either length 1 or length 2!!! + # by open-coding rather than using qq(p) + if p is not None: + assert p[0] is None + p=p[1] + d=res[r].setdefault(p,dict()) + d=d.setdefault(k,dict()) + d[s]=d.get(s,0)+1 + +def pp_tuple(v,k,p,r): + '''Uses one dict and 4-tuple''' + if isinstance(v,str): + m=SCHEME.match(v) + if m is not None: + n=URN.match(v) + if n is not None: + m=n + s=m.group(1) + # The following assumes paths are always either length 1 or length 2!!! + # by open-coding rather than using qq(p) + if p is not None: + assert p[0] is None + p=p[1] + k=(r,p,k,s) + res[k]=res.get(k,0)+1 + +SEP='\x00' +DOT='.' + +def pp_concat(v,k,p,r): + '''Uses one dict and one string''' if isinstance(v,str): m=SCHEME.match(v) if m is not None: @@ -45,14 +99,70 @@ if n is not None: m=n s=m.group(1) - d=res[r].setdefault(p,dict()) - d=d.setdefault(k,dict()) - d[s]=d.get(s,0)+1 + # The following assumes paths are always either length 1 or length 2!!! + # by open-coding rather than using qq(p) + if p is None: + p=EMPTY + else: + assert p[0] is None + p=p[1] + k=SEP.join((r,p,k,s)) + res[k]=res.get(k,0)+1 + +def dump(res): + for r in res.keys(): + rv=res[r] + for p in rv.keys(): + pv=rv[p] + for k,v in pv.items(): + for s,c in v.items(): + print(r,end=EMPTY) + if p is None: + print(EMPTY,end='\t') + else: + print('.',p,sep=EMPTY,end='\t') + print(k,end='\t') + print(s,c,sep='\t') + +def dump_tuple(res): + for (r,p,k,s),c in res.items(): + print(r,end=EMPTY) + # The following assumes paths are always either length 1 or length 2!!! + # by open-coding rather than using qq(p) + if p is None: + print(EMPTY,end='\t') + else: + print(DOT,p,sep=EMPTY,end='\t') + print(k,end='\t') + print(s,c,sep='\t') + +def dump_concat(res): + for ks,c in res.items(): + (r,p,k,s)=ks.split(SEP) + print(r,end=EMPTY) + # The following assumes paths are always either length 1 or length 2!!! + # by open-coding rather than using qq(p) + if p==EMPTY: + print(EMPTY,end='\t') + else: + print('.',p,sep=EMPTY,end='\t') + print(k,end='\t') + print(s,c,sep='\t') + +if len(sys.argv)==2: + res=dict() + if sys.argv[1]=='1': + pp=pp_tuple + dump=dump_tuple + else: + pp=pp_concat + dump=dump_concat +else: + res=dict((r,dict()) for r in PATHS.keys()) def main(): - global n,res # for debugging + global n # for debugging n=0 - res=dict((r,dict()) for r in PATHS.keys()) for l in sys.stdin: if l[0]=='{' and '"WARC-Type":"response"' in l: j=json.loads(l) @@ -70,22 +180,12 @@ print(n,file=sys.stderr) - for r in res.keys(): - rv=res[r] - for p in rv.keys(): - pv=rv[p] - for k,v in pv.items(): - for s,c in v.items(): - print(r,end='') - # The following assumes paths are always either length 1 or length 2!!! - # by open-coding rather than using qq(p) - if p is None: - print('',end='\t') - else: - assert p[0] is None - print('.',p[1],sep='',end='\t') - print(k,end='\t') - print(s,c,sep='\t') + if dictRes: + print('res=',end=EMPTY) + from pprint import pprint + pprint(res) + else: + dump(res) def qq(p): if p is None:
--- a/master/src/wecu/wecu.py Tue Jun 02 17:35:07 2020 +0000 +++ b/master/src/wecu/wecu.py Wed Jun 03 16:40:34 2020 +0000 @@ -111,6 +111,8 @@ WORK_DIR, ('sac_mapper.py' if args.mapper is None else args.mapper), + ('' if args.keyHandler is None + else "-h %s"%args.keyHandler) ('' if args.filter is None else "-f '%s'"%args.filter), ('' if args.numKeys is None @@ -177,6 +179,7 @@ sac_list.add_argument('--dict', action="store_true", help="Provide this flag to indicate that the output should aggregated and displayed in the form 'res={dict}'") sac_list.add_argument('--mapper', type=str, help="Supply a bespoke mapper for use in place of sac_mapper.py") sac_list.add_argument('--filter', type=str, help="Supply a filter on the unzipped warc file ahead of the mapper") +sac_list.add_argument('--keyHandler', type=int, help="Key handler, default for nested dicts, 1 for 4-tuple, 2 for concat") sac_list.add_argument('--numKeys', type=int, help="Depth of key list, default 1") sac_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.") sac_list.set_defaults(handler=sac_handler)