Mercurial > hg > cc > azure
changeset 60:5fdca5baa4e9
refactor a bit, add support for sac with bespoke mapper
author | Henry S. Thompson <ht@markup.co.uk> |
---|---|
date | Thu, 28 May 2020 12:55:03 +0000 |
parents | 8332faef25e1 |
children | cfaf5223b071 |
files | master/src/wecu/run_sac.sh master/src/wecu/sac_mapper.py master/src/wecu/sac_reducer.py master/src/wecu/wecu master/src/wecu/wecu.py |
diffstat | 5 files changed, 25 insertions(+), 22 deletions(-) [+] |
line wrap: on
line diff
--- a/master/src/wecu/run_sac.sh Thu May 28 09:58:38 2020 +0000 +++ b/master/src/wecu/run_sac.sh Thu May 28 12:55:03 2020 +0000 @@ -1,8 +1,10 @@ #!/bin/bash -# Usage: run_sac.sh numcores hostsFilename workDir resType patType patterns +# Usage: run_sac.sh numcores hostsFilename workDir map resType patType patterns cores=$1 hosts=$2 wd=$3 +mapper=$4 +shift shift shift shift @@ -12,11 +14,13 @@ worker () { f=$1 shift + mapper=$1 + shift shift # we don't need/want the resType either hostname 1>&2 export PYTHONIOENCODING=utf-8 curl -s -N https://commoncrawl.s3.amazonaws.com/$f | \ - unpigz -dp 1 -c | tee >(wc -l 1>&2) | ./sac_mapper.py "$@" 2>&1 + unpigz -dp 1 -c | tee >(wc -l 1>&2) | ./$mapper "$@" 2>&1 } export -f worker @@ -24,11 +28,11 @@ parallel -v \ --sshloginfile $hosts \ --retries 3 \ - --transferfile $(which sac_mapper.py|sed 's/sac_/.\/sac_/') \ + --transferfile $(which $mapper|sed 's/\(^.*\/\)/\1.\//') \ --will-cite \ --jobs $cores \ --workdir $wd \ -a input_paths \ --env worker \ - worker '{}' "$@" | tee -a allout | grep -v 'Authorized uses only' | \ + worker '{}' "$mapper" "$@" | tee -a allout | grep -v 'Authorized uses only' | \ sac_reducer.py "$@"
--- a/master/src/wecu/sac_mapper.py Thu May 28 09:58:38 2020 +0000 +++ b/master/src/wecu/sac_mapper.py Thu May 28 12:55:03 2020 +0000 @@ -3,6 +3,8 @@ import sys import re +print('args',sys.argv) + is_regex = sys.argv[1] == 'true' search_terms = sys.argv[2:] search_terms_counters = dict()
--- a/master/src/wecu/sac_reducer.py Thu May 28 09:58:38 2020 +0000 +++ b/master/src/wecu/sac_reducer.py Thu May 28 12:55:03 2020 +0000 @@ -3,7 +3,7 @@ import sys from collections import defaultdict -if sys.argv[0] == 'by-file': +if sys.argv[1] == 'by-file': # Show results by file for line in sys.stdin: print(line.strip())
--- a/master/src/wecu/wecu Thu May 28 09:58:38 2020 +0000 +++ b/master/src/wecu/wecu Thu May 28 12:55:03 2020 +0000 @@ -1,5 +1,5 @@ #!/bin/bash -export PYTHONPATH=/home/cc/src/wecu -export PATH=$PATH:/home/cc/src/wecu -/home/cc/src/wecu/wecu.py "$@" +export PYTHONPATH=/var/cc/master/src/wecu +export PATH=$PATH:/var/cc/master/src/wecu +/var/cc/master/src/wecu/wecu.py "$@"
--- a/master/src/wecu/wecu.py Thu May 28 09:58:38 2020 +0000 +++ b/master/src/wecu/wecu.py Thu May 28 12:55:03 2020 +0000 @@ -105,20 +105,16 @@ cores_per_worker = num_cores(args) - if args.by_file: - os.system('run_sac.sh {} {} {} by-file {} {}'.format(cores_per_worker, - HOSTS_FILEPATH, - WORK_DIR, - regex_str, - patterns_str)) - return - - - os.system('run_sac.sh {} {} {} aggregate {} {}'.format(cores_per_worker, - HOSTS_FILEPATH, - WORK_DIR, - regex_str, - patterns_str)) + os.system('run_sac.sh {} {} {} {} {} {} {}'.format( + cores_per_worker, + HOSTS_FILEPATH, + WORK_DIR, + ('sac_mapper.py' if args.mapper is None + else args.mapper), + ('by-file' if args.by_file + else 'aggregate'), + regex_str, + patterns_str)) def generate_handler(args): import generate_file_list @@ -174,6 +170,7 @@ sac_list.add_argument('pattern', type=str, nargs='+') sac_list.add_argument('--regex', action="store_true", help="Provide this flag to indicate that the provided strings should be treated as regular expressions") sac_list.add_argument('--by-file', action="store_true", help="Provide this flag to indicate that the output should not be aggregated and displayed per file instead") +sac_list.add_argument('--mapper', type=str, help="Supply a bespoke mapper for use in place of sac_mapper.py") sac_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.") sac_list.set_defaults(handler=sac_handler)