# HG changeset patch # User Henry S. Thompson # Date 1591119307 0 # Node ID 892e1c0240e17fc1b5e90c8e3b218170da197a77 # Parent cfaf5223b071abb3c4216e44c97d59d1ea0be506 added more robust (I hope) error handling, got reducer working with support for choosing dict or tsv output diff -r cfaf5223b071 -r 892e1c0240e1 master/src/wecu/run_sac.sh --- a/master/src/wecu/run_sac.sh Sun May 31 12:06:44 2020 +0000 +++ b/master/src/wecu/run_sac.sh Tue Jun 02 17:35:07 2020 +0000 @@ -1,5 +1,6 @@ #!/bin/bash -# Usage: run_sac.sh numcores hostsFilename workDir mapper (-f filter) resType patType patterns +# Usage: run_sac.sh numcores hostsFilename workDir mapper (-f filter) (-k numKeys) resType patType patterns +echo "$@" 1>cmd cores=$1 hosts=$2 wd=$3 @@ -16,13 +17,18 @@ else filter=\"\" fi - -rm -f allout +if [ "$1" = "-k" ] +then + shift + numKeys="$1" + shift +fi # Get quoting right... worker () { set -e set -o pipefail + mkdir -p logs f=$1 shift j=$1 @@ -32,11 +38,15 @@ filter="$1" shift shift # we don't need/want the resType either - echo $(date) $(hostname) start $f >>${j}_log + me=$(hostname | cut -c 15) + ff=$(echo $f | cut -f 4,6 -d / | sed 's/CC-MAIN-//;s/\.warc.*$//') + echo $(date +%Y-%m-%d.%H:%M:%S) $me start $j $ff >>logs/${j}_log export PYTHONIOENCODING=utf-8 - { curl -s -N https://commoncrawl.s3.amazonaws.com/$f | \ - unpigz -dp 1 -c | $filter ./$mapper "$@" ; } 2>>${j}_log - echo $(date) $(hostname) finished $f >>${j}_log + { IFS=$'\n' ; stderr=( $( { curl -s -N https://commoncrawl.s3.amazonaws.com/$f | \ + unpigz -dp 1 -c | $filter ./$mapper "$@" ; } 2>&1 1>&4 ; ) ) ; unset IFS ; } 4>&1 + { echo $(date +%Y-%m-%d.%H:%M:%S) $me finished $j $ff + printf '%s\n' "${stderr[@]}" ; } | sed '2,$s/^/ /' >>logs/${j}_log # hack to try to + # guarantee atomic entry in the log } export -f worker @@ -50,5 +60,6 @@ --workdir $wd \ -a input_paths \ --env worker \ - worker '{}' '{#}' "$mapper" "$filter" "$@" 2>errs | grep -v 'Authorized uses only' | tee >(wc -l 1>&2) |\ - sac_reducer.py "$@" + --return 'logs/{#}_log' --cleanup \ + worker '{}' '{#}' "$mapper" "$filter" "$@" | tee >(wc -l 1>&2) |\ + sac_reducer.py $1 $numKeys diff -r cfaf5223b071 -r 892e1c0240e1 master/src/wecu/sac_reducer.py --- a/master/src/wecu/sac_reducer.py Sun May 31 12:06:44 2020 +0000 +++ b/master/src/wecu/sac_reducer.py Tue Jun 02 17:35:07 2020 +0000 @@ -1,29 +1,49 @@ #!/usr/bin/python3 +'''merge results from multiple mappers + +# Usage: sac_reducer by-file|aggregate (numKeys) + +Input lines: tab-separated, numKeys keys (default 1) followed by count''' import sys -from collections import defaultdict +from pprint import pprint print('reducing',sys.argv,file=sys.stderr) +sys.stderr.flush() -if sys.argv[1] == 'by-file': +rtype=sys.argv[1] +numKeys=int(sys.argv[2]) if len(sys.argv)==3 else 1 +numDicts=numKeys-1 + +if rtype == 'by-file': # Show results by file for line in sys.stdin: - print(line.strip()) + stdout.write(line) else: # Aggregate results - counters = defaultdict(int) + res={} for line in sys.stdin: + d=res try: - line = line.strip().split('\t') - k = line[0] - v = line[1] - except: - print('bogus',line,file=sys.stderr) + ll = line.split('\t',4) + for i in range(numDicts): + d=d.setdefault(ll[i],dict()) + k=ll[numDicts].rstrip() + d[k]=d.get(k,0)+int(ll[numKeys]) + except Exception: + print('bogus',line,ll,file=sys.stderr) continue - counters[k] += int(v) - - print('nc',len(counters),file=sys.stderr) - for k,v in counters.items(): - print("{}\t{}".format(k, v)) + print('nc',len(res), + list(res.keys()), + list(sum(len(res[i][j]) for j in res[i].keys()) for i in res.keys()) if numKeys>1 else '', + file=sys.stderr) + if rtype=='dict': + print('res=',end='') + pprint(res) + else: + for k1,v1 in res.items(): + for k2,v2 in v1.items(): + for k3,v3 in v2.items(): + print(k1,k2,k3,v3,sep='\t') diff -r cfaf5223b071 -r 892e1c0240e1 master/src/wecu/sac_schemes.py --- a/master/src/wecu/sac_schemes.py Sun May 31 12:06:44 2020 +0000 +++ b/master/src/wecu/sac_schemes.py Tue Jun 02 17:35:07 2020 +0000 @@ -13,58 +13,93 @@ SCHEME=regex.compile('(