# HG changeset patch # User Henry S. Thompson # Date 1591202434 0 # Node ID d46c8b12fc04b3656ed72830f8f64bab4f98dbac # Parent 892e1c0240e17fc1b5e90c8e3b218170da197a77 support multiple approaches to key combination, use local files to collect results diff -r 892e1c0240e1 -r d46c8b12fc04 master/src/wecu/run_sac.sh --- a/master/src/wecu/run_sac.sh Tue Jun 02 17:35:07 2020 +0000 +++ b/master/src/wecu/run_sac.sh Wed Jun 03 16:40:34 2020 +0000 @@ -1,5 +1,5 @@ #!/bin/bash -# Usage: run_sac.sh numcores hostsFilename workDir mapper (-f filter) (-k numKeys) resType patType patterns +# Usage: run_sac.sh numcores hostsFilename workDir mapper keyHandler (-f filter) (-k numKeys) resType patType patterns echo "$@" 1>cmd cores=$1 hosts=$2 @@ -29,6 +29,7 @@ set -e set -o pipefail mkdir -p logs + mkdir -p res f=$1 shift j=$1 @@ -43,7 +44,7 @@ echo $(date +%Y-%m-%d.%H:%M:%S) $me start $j $ff >>logs/${j}_log export PYTHONIOENCODING=utf-8 { IFS=$'\n' ; stderr=( $( { curl -s -N https://commoncrawl.s3.amazonaws.com/$f | \ - unpigz -dp 1 -c | $filter ./$mapper "$@" ; } 2>&1 1>&4 ; ) ) ; unset IFS ; } 4>&1 + unpigz -dp 1 -c | $filter ./$mapper "$@" ; } 2>&1 1>res/${j}.tsv ; ) ) ; unset IFS ; } { echo $(date +%Y-%m-%d.%H:%M:%S) $me finished $j $ff printf '%s\n' "${stderr[@]}" ; } | sed '2,$s/^/ /' >>logs/${j}_log # hack to try to # guarantee atomic entry in the log @@ -60,6 +61,6 @@ --workdir $wd \ -a input_paths \ --env worker \ - --return 'logs/{#}_log' --cleanup \ - worker '{}' '{#}' "$mapper" "$filter" "$@" | tee >(wc -l 1>&2) |\ - sac_reducer.py $1 $numKeys + --return 'logs/{#}_log' --return 'res/{#}.tsv' --cleanup \ + worker '{}' '{#}' "$mapper" "$filter" "$@" +cat res/*.tsv | sac_reducer.py $1 $numKeys diff -r 892e1c0240e1 -r d46c8b12fc04 master/src/wecu/sac_reducer.py --- a/master/src/wecu/sac_reducer.py Tue Jun 02 17:35:07 2020 +0000 +++ b/master/src/wecu/sac_reducer.py Wed Jun 03 16:40:34 2020 +0000 @@ -6,15 +6,32 @@ Input lines: tab-separated, numKeys keys (default 1) followed by count''' import sys -from pprint import pprint -print('reducing',sys.argv,file=sys.stderr) -sys.stderr.flush() +#print('reducing',sys.argv,file=sys.stderr) +#sys.stderr.flush() rtype=sys.argv[1] numKeys=int(sys.argv[2]) if len(sys.argv)==3 else 1 numDicts=numKeys-1 +def rec_print(d,buf,pos=0): + if pos!=0: + pos+=buf.write(b'\t') + for k,v in d.items(): + npos=pos+buf.write(k.encode()) + #print(pos,buf.tell(),npos,file=sys.stderr) + if isinstance(v,dict): + rec_print(v,buf,npos) + else: + buf.write(b'\t') + buf.write(b'%d'%v) + buf.write(b'\n') + buf.truncate() + buf.seek(0) + sys.stdout.buffer.write(buf.read(-1)) + buf.seek(pos) + + if rtype == 'by-file': # Show results by file for line in sys.stdin: @@ -26,7 +43,7 @@ for line in sys.stdin: d=res try: - ll = line.split('\t',4) + ll = line.split('\t',numKeys+1) for i in range(numDicts): d=d.setdefault(ll[i],dict()) k=ll[numDicts].rstrip() @@ -35,15 +52,15 @@ print('bogus',line,ll,file=sys.stderr) continue - print('nc',len(res), - list(res.keys()), - list(sum(len(res[i][j]) for j in res[i].keys()) for i in res.keys()) if numKeys>1 else '', - file=sys.stderr) +# print('nc',len(res),file=sys.stderr) +# if numKeys>1: +# print(' ',list(res.keys()),"\n ", +# list(sum(len(res[i][j]) for j in res[i].keys()) for i in res.keys()), file=sys.stderr) if rtype=='dict': print('res=',end='') + from pprint import pprint pprint(res) else: - for k1,v1 in res.items(): - for k2,v2 in v1.items(): - for k3,v3 in v2.items(): - print(k1,k2,k3,v3,sep='\t') + from io import BufferedRandom, BytesIO + rec_print(res,BufferedRandom(BytesIO(),10000)) + diff -r 892e1c0240e1 -r d46c8b12fc04 master/src/wecu/sac_schemes.py --- a/master/src/wecu/sac_schemes.py Tue Jun 02 17:35:07 2020 +0000 +++ b/master/src/wecu/sac_schemes.py Wed Jun 03 16:40:34 2020 +0000 @@ -1,9 +1,22 @@ #!/usr/bin/python3 -'''Assumes export PYTHONIOENCODING=utf-8 has been done if necessary''' +'''Assumes export PYTHONIOENCODING=utf-8 has been done if necessary + +Usage: uz ...wat.gz | sac_schemes.py [-d] [altStorageScheme] + +where altStorageScheme if present selects an alternative approach to storing triple counts: + [absent]: three nested dictionaries + 1: one dictionary indexed by 4-tuple + 2: one dictionary indexed by ".".join(keys)''' import sys, json, regex from collections.abc import Iterable +if len(sys.argv)>1 and sys.argv[1]=='-d': + sys.argv.pop(1) + dictRes=True +else: + dictRes=False + META_PATH=['Envelope', 'Payload-Metadata', 'HTTP-Response-Metadata'] PATHS={'hdr':['Headers'], @@ -13,6 +26,8 @@ SCHEME=regex.compile('(