changeset 60:5fdca5baa4e9

refactor a bit, add support for sac with bespoke mapper
author Henry S. Thompson <ht@markup.co.uk>
date Thu, 28 May 2020 12:55:03 +0000
parents 8332faef25e1
children cfaf5223b071
files master/src/wecu/run_sac.sh master/src/wecu/sac_mapper.py master/src/wecu/sac_reducer.py master/src/wecu/wecu master/src/wecu/wecu.py
diffstat 5 files changed, 25 insertions(+), 22 deletions(-) [+]
line wrap: on
line diff
--- a/master/src/wecu/run_sac.sh	Thu May 28 09:58:38 2020 +0000
+++ b/master/src/wecu/run_sac.sh	Thu May 28 12:55:03 2020 +0000
@@ -1,8 +1,10 @@
 #!/bin/bash
-# Usage: run_sac.sh numcores hostsFilename workDir resType patType patterns
+# Usage: run_sac.sh numcores hostsFilename workDir map resType patType patterns
 cores=$1
 hosts=$2
 wd=$3
+mapper=$4
+shift
 shift
 shift
 shift
@@ -12,11 +14,13 @@
 worker () {
   f=$1
   shift
+  mapper=$1
+  shift
   shift # we don't need/want the resType either
   hostname 1>&2
   export PYTHONIOENCODING=utf-8
   curl -s -N https://commoncrawl.s3.amazonaws.com/$f | \
-   unpigz -dp 1 -c | tee >(wc -l 1>&2) | ./sac_mapper.py "$@" 2>&1
+   unpigz -dp 1 -c | tee >(wc -l 1>&2) | ./$mapper "$@" 2>&1
 }
 
 export -f worker
@@ -24,11 +28,11 @@
 parallel -v \
     --sshloginfile $hosts \
     --retries 3 \
-    --transferfile $(which sac_mapper.py|sed 's/sac_/.\/sac_/') \
+    --transferfile $(which $mapper|sed 's/\(^.*\/\)/\1.\//') \
     --will-cite \
     --jobs $cores \
     --workdir $wd \
     -a input_paths \
     --env worker \
-    worker '{}' "$@" | tee -a allout | grep -v 'Authorized uses only' | \
+    worker '{}' "$mapper" "$@" | tee -a allout | grep -v 'Authorized uses only' | \
     sac_reducer.py "$@"
--- a/master/src/wecu/sac_mapper.py	Thu May 28 09:58:38 2020 +0000
+++ b/master/src/wecu/sac_mapper.py	Thu May 28 12:55:03 2020 +0000
@@ -3,6 +3,8 @@
 import sys
 import re
 
+print('args',sys.argv)
+
 is_regex = sys.argv[1] == 'true'
 search_terms = sys.argv[2:]
 search_terms_counters = dict()
--- a/master/src/wecu/sac_reducer.py	Thu May 28 09:58:38 2020 +0000
+++ b/master/src/wecu/sac_reducer.py	Thu May 28 12:55:03 2020 +0000
@@ -3,7 +3,7 @@
 import sys
 from collections import defaultdict
 
-if sys.argv[0] == 'by-file':
+if sys.argv[1] == 'by-file':
     # Show results by file
     for line in sys.stdin:
         print(line.strip())
--- a/master/src/wecu/wecu	Thu May 28 09:58:38 2020 +0000
+++ b/master/src/wecu/wecu	Thu May 28 12:55:03 2020 +0000
@@ -1,5 +1,5 @@
 #!/bin/bash
-export PYTHONPATH=/home/cc/src/wecu
-export PATH=$PATH:/home/cc/src/wecu
-/home/cc/src/wecu/wecu.py "$@"
+export PYTHONPATH=/var/cc/master/src/wecu
+export PATH=$PATH:/var/cc/master/src/wecu
+/var/cc/master/src/wecu/wecu.py "$@"
 
--- a/master/src/wecu/wecu.py	Thu May 28 09:58:38 2020 +0000
+++ b/master/src/wecu/wecu.py	Thu May 28 12:55:03 2020 +0000
@@ -105,20 +105,16 @@
 
     cores_per_worker = num_cores(args)
 
-    if args.by_file:
-        os.system('run_sac.sh {} {} {} by-file {} {}'.format(cores_per_worker,
-                                                             HOSTS_FILEPATH,
-                                                             WORK_DIR,
-                                                             regex_str,
-                                                             patterns_str))
-        return
-
-
-    os.system('run_sac.sh {} {} {} aggregate {} {}'.format(cores_per_worker,
-                                                           HOSTS_FILEPATH,
-                                                           WORK_DIR,
-                                                           regex_str,
-                                                           patterns_str))
+    os.system('run_sac.sh {} {} {} {} {} {} {}'.format(
+        cores_per_worker,
+        HOSTS_FILEPATH,
+        WORK_DIR,
+        ('sac_mapper.py' if args.mapper is None
+         else args.mapper),
+        ('by-file' if args.by_file
+         else 'aggregate'),
+        regex_str,
+        patterns_str))
 
 def generate_handler(args):
     import generate_file_list
@@ -174,6 +170,7 @@
 sac_list.add_argument('pattern', type=str, nargs='+')
 sac_list.add_argument('--regex', action="store_true", help="Provide this flag to indicate that the provided strings should be treated as regular expressions")
 sac_list.add_argument('--by-file', action="store_true", help="Provide this flag to indicate that the output should not be aggregated and displayed per file instead")
+sac_list.add_argument('--mapper', type=str, help="Supply a bespoke mapper for use in place of sac_mapper.py")
 sac_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.")
 sac_list.set_defaults(handler=sac_handler)