Mercurial > hg > cc > azure
changeset 62:892e1c0240e1
added more robust (I hope) error handling,
got reducer working with support for choosing dict or tsv output
author | Henry S. Thompson <ht@markup.co.uk> |
---|---|
date | Tue, 02 Jun 2020 17:35:07 +0000 |
parents | cfaf5223b071 |
children | d46c8b12fc04 |
files | master/src/wecu/run_sac.sh master/src/wecu/sac_reducer.py master/src/wecu/sac_schemes.py master/src/wecu/wecu.py |
diffstat | 4 files changed, 128 insertions(+), 58 deletions(-) [+] |
line wrap: on
line diff
--- a/master/src/wecu/run_sac.sh Sun May 31 12:06:44 2020 +0000 +++ b/master/src/wecu/run_sac.sh Tue Jun 02 17:35:07 2020 +0000 @@ -1,5 +1,6 @@ #!/bin/bash -# Usage: run_sac.sh numcores hostsFilename workDir mapper (-f filter) resType patType patterns +# Usage: run_sac.sh numcores hostsFilename workDir mapper (-f filter) (-k numKeys) resType patType patterns +echo "$@" 1>cmd cores=$1 hosts=$2 wd=$3 @@ -16,13 +17,18 @@ else filter=\"\" fi - -rm -f allout +if [ "$1" = "-k" ] +then + shift + numKeys="$1" + shift +fi # Get quoting right... worker () { set -e set -o pipefail + mkdir -p logs f=$1 shift j=$1 @@ -32,11 +38,15 @@ filter="$1" shift shift # we don't need/want the resType either - echo $(date) $(hostname) start $f >>${j}_log + me=$(hostname | cut -c 15) + ff=$(echo $f | cut -f 4,6 -d / | sed 's/CC-MAIN-//;s/\.warc.*$//') + echo $(date +%Y-%m-%d.%H:%M:%S) $me start $j $ff >>logs/${j}_log export PYTHONIOENCODING=utf-8 - { curl -s -N https://commoncrawl.s3.amazonaws.com/$f | \ - unpigz -dp 1 -c | $filter ./$mapper "$@" ; } 2>>${j}_log - echo $(date) $(hostname) finished $f >>${j}_log + { IFS=$'\n' ; stderr=( $( { curl -s -N https://commoncrawl.s3.amazonaws.com/$f | \ + unpigz -dp 1 -c | $filter ./$mapper "$@" ; } 2>&1 1>&4 ; ) ) ; unset IFS ; } 4>&1 + { echo $(date +%Y-%m-%d.%H:%M:%S) $me finished $j $ff + printf '%s\n' "${stderr[@]}" ; } | sed '2,$s/^/ /' >>logs/${j}_log # hack to try to + # guarantee atomic entry in the log } export -f worker @@ -50,5 +60,6 @@ --workdir $wd \ -a input_paths \ --env worker \ - worker '{}' '{#}' "$mapper" "$filter" "$@" 2>errs | grep -v 'Authorized uses only' | tee >(wc -l 1>&2) |\ - sac_reducer.py "$@" + --return 'logs/{#}_log' --cleanup \ + worker '{}' '{#}' "$mapper" "$filter" "$@" | tee >(wc -l 1>&2) |\ + sac_reducer.py $1 $numKeys
--- a/master/src/wecu/sac_reducer.py Sun May 31 12:06:44 2020 +0000 +++ b/master/src/wecu/sac_reducer.py Tue Jun 02 17:35:07 2020 +0000 @@ -1,29 +1,49 @@ #!/usr/bin/python3 +'''merge results from multiple mappers + +# Usage: sac_reducer by-file|aggregate (numKeys) + +Input lines: tab-separated, numKeys keys (default 1) followed by count''' import sys -from collections import defaultdict +from pprint import pprint print('reducing',sys.argv,file=sys.stderr) +sys.stderr.flush() -if sys.argv[1] == 'by-file': +rtype=sys.argv[1] +numKeys=int(sys.argv[2]) if len(sys.argv)==3 else 1 +numDicts=numKeys-1 + +if rtype == 'by-file': # Show results by file for line in sys.stdin: - print(line.strip()) + stdout.write(line) else: # Aggregate results - counters = defaultdict(int) + res={} for line in sys.stdin: + d=res try: - line = line.strip().split('\t') - k = line[0] - v = line[1] - except: - print('bogus',line,file=sys.stderr) + ll = line.split('\t',4) + for i in range(numDicts): + d=d.setdefault(ll[i],dict()) + k=ll[numDicts].rstrip() + d[k]=d.get(k,0)+int(ll[numKeys]) + except Exception: + print('bogus',line,ll,file=sys.stderr) continue - counters[k] += int(v) - - print('nc',len(counters),file=sys.stderr) - for k,v in counters.items(): - print("{}\t{}".format(k, v)) + print('nc',len(res), + list(res.keys()), + list(sum(len(res[i][j]) for j in res[i].keys()) for i in res.keys()) if numKeys>1 else '', + file=sys.stderr) + if rtype=='dict': + print('res=',end='') + pprint(res) + else: + for k1,v1 in res.items(): + for k2,v2 in v1.items(): + for k3,v3 in v2.items(): + print(k1,k2,k3,v3,sep='\t')
--- a/master/src/wecu/sac_schemes.py Sun May 31 12:06:44 2020 +0000 +++ b/master/src/wecu/sac_schemes.py Tue Jun 02 17:35:07 2020 +0000 @@ -13,58 +13,93 @@ SCHEME=regex.compile('(<?[a-zA-Z][a-zA-Z0-9+.-]*):') URN=regex.compile('(<?urn:[a-z][a-z0-9+.-]*):',regex.I) -def walk(o,f,path=""): - '''Apply f to every key+leaf of a json object''' +def walk(o,f,r,path=None): + '''Apply f to every key+leaf of a json object in region r''' if isinstance(o,dict): for k,v in o.items(): if isinstance(v,dict): - walk(v,f,"%s.%s"%(path,k)) + walk(v,f,r,(path,k)) elif isinstance(v,Iterable): walked=False for i in v: if isinstance(i,dict): if (not walked) and (i is not v[0]): - print('oops',path,k,i,file=sys.stderr) + print('oops',key,path,k,i,file=sys.stderr) walked=True - walk(i,f,"%s.%s"%(path,k)) + walk(i,f,r,(path,k)) elif walked: - print('oops2',path,k,i,file=sys.stderr) + print('oops2',key,path,k,i,file=sys.stderr) if not walked: - f(k,v,"%s.%s"%(path,k)) + f(v,k,path,r) else: - f(k,v,"%s.%s"%(path,k)) + f(v,k,path,r) elif isinstance(o,Iterable): for i in o: - walk(i,f,path) + walk(i,f,r,path) -def pp(k,v,p): +def pp(v,k,p,r): if isinstance(v,str): m=SCHEME.match(v) if m is not None: - try: - n=v.index('\n') - v=v[:n] - except ValueError: - pass n=URN.match(v) if n is not None: m=n - print(p,m.group(1),sep='\t') + s=m.group(1) + d=res[r].setdefault(p,dict()) + d=d.setdefault(k,dict()) + d[s]=d.get(s,0)+1 + +def main(): + global n,res # for debugging + n=0 + res=dict((r,dict()) for r in PATHS.keys()) + for l in sys.stdin: + if l[0]=='{' and '"WARC-Type":"response"' in l: + j=json.loads(l) + n+=1 + for s in META_PATH: + j=j[s] + for k,v in PATHS.items(): + p=j + try: + for s in v: + p=p[s] + except KeyError as e: + continue + walk(p,pp,k) + + print(n,file=sys.stderr) -n=0 -for l in sys.stdin: - n+=1 - if n%1000==0: - print(int(n/1000),file=sys.stderr) - if l[0]=='{' and '"WARC-Type":"response"' in l: - j=json.loads(l) - for s in META_PATH: - j=j[s] - for k,v in PATHS.items(): - p=j - try: - for s in v: - p=p[s] - except KeyError: - continue - walk(p,pp,k) + for r in res.keys(): + rv=res[r] + for p in rv.keys(): + pv=rv[p] + for k,v in pv.items(): + for s,c in v.items(): + print(r,end='') + # The following assumes paths are always either length 1 or length 2!!! + # by open-coding rather than using qq(p) + if p is None: + print('',end='\t') + else: + assert p[0] is None + print('.',p[1],sep='',end='\t') + print(k,end='\t') + print(s,c,sep='\t') + +def qq(p): + if p is None: + sys.stdout.write('\t') + else: + qq1(p[0]) + print(p[1],end='\t') + +def qq1(p): + if p is None: + return + else: + qq1(p[0]) + print(p[1],end='.') + +if __name__=="__main__": + main()
--- a/master/src/wecu/wecu.py Sun May 31 12:06:44 2020 +0000 +++ b/master/src/wecu/wecu.py Tue Jun 02 17:35:07 2020 +0000 @@ -105,7 +105,7 @@ cores_per_worker = num_cores(args) - os.system('run_sac.sh {} {} {} {} {} {} {} {}'.format( + os.system('run_sac.sh {} {} {} {} {} {} {} {} {}'.format( cores_per_worker, HOSTS_FILEPATH, WORK_DIR, @@ -113,8 +113,10 @@ else args.mapper), ('' if args.filter is None else "-f '%s'"%args.filter), + ('' if args.numKeys is None + else "-k %s"%args.numKeys), ('by-file' if args.by_file - else 'aggregate'), + else 'dict' if args.dict else 'aggregate'), regex_str, patterns_str)) @@ -172,8 +174,10 @@ sac_list.add_argument('pattern', type=str, nargs='+') sac_list.add_argument('--regex', action="store_true", help="Provide this flag to indicate that the provided strings should be treated as regular expressions") sac_list.add_argument('--by-file', action="store_true", help="Provide this flag to indicate that the output should not be aggregated and displayed per file instead") +sac_list.add_argument('--dict', action="store_true", help="Provide this flag to indicate that the output should aggregated and displayed in the form 'res={dict}'") sac_list.add_argument('--mapper', type=str, help="Supply a bespoke mapper for use in place of sac_mapper.py") sac_list.add_argument('--filter', type=str, help="Supply a filter on the unzipped warc file ahead of the mapper") +sac_list.add_argument('--numKeys', type=int, help="Depth of key list, default 1") sac_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.") sac_list.set_defaults(handler=sac_handler)