Mercurial > hg > cc > azure
changeset 61:cfaf5223b071
trying to get my own mapper working
author | Henry S. Thompson <ht@markup.co.uk> |
---|---|
date | Sun, 31 May 2020 12:06:44 +0000 |
parents | 5fdca5baa4e9 |
children | 892e1c0240e1 |
files | master/src/wecu/run_sac.sh master/src/wecu/sac_mapper.py master/src/wecu/sac_reducer.py master/src/wecu/sac_schemes.py master/src/wecu/wecu.py |
diffstat | 5 files changed, 104 insertions(+), 13 deletions(-) [+] |
line wrap: on
line diff
--- a/master/src/wecu/run_sac.sh Thu May 28 12:55:03 2020 +0000 +++ b/master/src/wecu/run_sac.sh Sun May 31 12:06:44 2020 +0000 @@ -1,5 +1,5 @@ #!/bin/bash -# Usage: run_sac.sh numcores hostsFilename workDir map resType patType patterns +# Usage: run_sac.sh numcores hostsFilename workDir mapper (-f filter) resType patType patterns cores=$1 hosts=$2 wd=$3 @@ -8,24 +8,40 @@ shift shift shift +if [ "$1" = "-f" ] +then + shift + filter="$1" + shift +else + filter=\"\" +fi + rm -f allout # Get quoting right... worker () { + set -e + set -o pipefail f=$1 shift - mapper=$1 + j=$1 + shift + mapper="$1" + shift + filter="$1" shift shift # we don't need/want the resType either - hostname 1>&2 + echo $(date) $(hostname) start $f >>${j}_log export PYTHONIOENCODING=utf-8 - curl -s -N https://commoncrawl.s3.amazonaws.com/$f | \ - unpigz -dp 1 -c | tee >(wc -l 1>&2) | ./$mapper "$@" 2>&1 + { curl -s -N https://commoncrawl.s3.amazonaws.com/$f | \ + unpigz -dp 1 -c | $filter ./$mapper "$@" ; } 2>>${j}_log + echo $(date) $(hostname) finished $f >>${j}_log } export -f worker -parallel -v \ +parallel \ --sshloginfile $hosts \ --retries 3 \ --transferfile $(which $mapper|sed 's/\(^.*\/\)/\1.\//') \ @@ -34,5 +50,5 @@ --workdir $wd \ -a input_paths \ --env worker \ - worker '{}' "$mapper" "$@" | tee -a allout | grep -v 'Authorized uses only' | \ + worker '{}' '{#}' "$mapper" "$filter" "$@" 2>errs | grep -v 'Authorized uses only' | tee >(wc -l 1>&2) |\ sac_reducer.py "$@"
--- a/master/src/wecu/sac_mapper.py Thu May 28 12:55:03 2020 +0000 +++ b/master/src/wecu/sac_mapper.py Sun May 31 12:06:44 2020 +0000 @@ -3,8 +3,6 @@ import sys import re -print('args',sys.argv) - is_regex = sys.argv[1] == 'true' search_terms = sys.argv[2:] search_terms_counters = dict()
--- a/master/src/wecu/sac_reducer.py Thu May 28 12:55:03 2020 +0000 +++ b/master/src/wecu/sac_reducer.py Sun May 31 12:06:44 2020 +0000 @@ -1,8 +1,10 @@ -#!/usr/bin/python +#!/usr/bin/python3 import sys from collections import defaultdict +print('reducing',sys.argv,file=sys.stderr) + if sys.argv[1] == 'by-file': # Show results by file for line in sys.stdin: @@ -17,9 +19,11 @@ k = line[0] v = line[1] except: + print('bogus',line,file=sys.stderr) continue counters[k] += int(v) - for k in counters: - print("{}\t{}".format(k, counters[k])) + print('nc',len(counters),file=sys.stderr) + for k,v in counters.items(): + print("{}\t{}".format(k, v))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/master/src/wecu/sac_schemes.py Sun May 31 12:06:44 2020 +0000 @@ -0,0 +1,70 @@ +#!/usr/bin/python3 +'''Assumes export PYTHONIOENCODING=utf-8 has been done if necessary''' + +import sys, json, regex +from collections.abc import Iterable + +META_PATH=['Envelope', 'Payload-Metadata', 'HTTP-Response-Metadata'] + +PATHS={'hdr':['Headers'], + 'head':['HTML-Metadata','Head'], + 'body':['HTML-Metadata','Links']} + +SCHEME=regex.compile('(<?[a-zA-Z][a-zA-Z0-9+.-]*):') +URN=regex.compile('(<?urn:[a-z][a-z0-9+.-]*):',regex.I) + +def walk(o,f,path=""): + '''Apply f to every key+leaf of a json object''' + if isinstance(o,dict): + for k,v in o.items(): + if isinstance(v,dict): + walk(v,f,"%s.%s"%(path,k)) + elif isinstance(v,Iterable): + walked=False + for i in v: + if isinstance(i,dict): + if (not walked) and (i is not v[0]): + print('oops',path,k,i,file=sys.stderr) + walked=True + walk(i,f,"%s.%s"%(path,k)) + elif walked: + print('oops2',path,k,i,file=sys.stderr) + if not walked: + f(k,v,"%s.%s"%(path,k)) + else: + f(k,v,"%s.%s"%(path,k)) + elif isinstance(o,Iterable): + for i in o: + walk(i,f,path) + +def pp(k,v,p): + if isinstance(v,str): + m=SCHEME.match(v) + if m is not None: + try: + n=v.index('\n') + v=v[:n] + except ValueError: + pass + n=URN.match(v) + if n is not None: + m=n + print(p,m.group(1),sep='\t') + +n=0 +for l in sys.stdin: + n+=1 + if n%1000==0: + print(int(n/1000),file=sys.stderr) + if l[0]=='{' and '"WARC-Type":"response"' in l: + j=json.loads(l) + for s in META_PATH: + j=j[s] + for k,v in PATHS.items(): + p=j + try: + for s in v: + p=p[s] + except KeyError: + continue + walk(p,pp,k)
--- a/master/src/wecu/wecu.py Thu May 28 12:55:03 2020 +0000 +++ b/master/src/wecu/wecu.py Sun May 31 12:06:44 2020 +0000 @@ -105,12 +105,14 @@ cores_per_worker = num_cores(args) - os.system('run_sac.sh {} {} {} {} {} {} {}'.format( + os.system('run_sac.sh {} {} {} {} {} {} {} {}'.format( cores_per_worker, HOSTS_FILEPATH, WORK_DIR, ('sac_mapper.py' if args.mapper is None else args.mapper), + ('' if args.filter is None + else "-f '%s'"%args.filter), ('by-file' if args.by_file else 'aggregate'), regex_str, @@ -171,6 +173,7 @@ sac_list.add_argument('--regex', action="store_true", help="Provide this flag to indicate that the provided strings should be treated as regular expressions") sac_list.add_argument('--by-file', action="store_true", help="Provide this flag to indicate that the output should not be aggregated and displayed per file instead") sac_list.add_argument('--mapper', type=str, help="Supply a bespoke mapper for use in place of sac_mapper.py") +sac_list.add_argument('--filter', type=str, help="Supply a filter on the unzipped warc file ahead of the mapper") sac_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.") sac_list.set_defaults(handler=sac_handler)