changeset 61:cfaf5223b071

trying to get my own mapper working
author Henry S. Thompson <ht@markup.co.uk>
date Sun, 31 May 2020 12:06:44 +0000
parents 5fdca5baa4e9
children 892e1c0240e1
files master/src/wecu/run_sac.sh master/src/wecu/sac_mapper.py master/src/wecu/sac_reducer.py master/src/wecu/sac_schemes.py master/src/wecu/wecu.py
diffstat 5 files changed, 104 insertions(+), 13 deletions(-) [+]
line wrap: on
line diff
--- a/master/src/wecu/run_sac.sh	Thu May 28 12:55:03 2020 +0000
+++ b/master/src/wecu/run_sac.sh	Sun May 31 12:06:44 2020 +0000
@@ -1,5 +1,5 @@
 #!/bin/bash
-# Usage: run_sac.sh numcores hostsFilename workDir map resType patType patterns
+# Usage: run_sac.sh numcores hostsFilename workDir mapper (-f filter) resType patType patterns
 cores=$1
 hosts=$2
 wd=$3
@@ -8,24 +8,40 @@
 shift
 shift
 shift
+if [ "$1" = "-f" ]
+then
+ shift
+ filter="$1"
+ shift
+else
+ filter=\"\"
+fi
+
 rm -f allout
 
 # Get quoting right...
 worker () {
+  set -e
+  set -o pipefail
   f=$1
   shift
-  mapper=$1
+  j=$1
+  shift
+  mapper="$1"
+  shift
+  filter="$1"
   shift
   shift # we don't need/want the resType either
-  hostname 1>&2
+  echo $(date) $(hostname) start $f >>${j}_log
   export PYTHONIOENCODING=utf-8
-  curl -s -N https://commoncrawl.s3.amazonaws.com/$f | \
-   unpigz -dp 1 -c | tee >(wc -l 1>&2) | ./$mapper "$@" 2>&1
+  { curl -s -N https://commoncrawl.s3.amazonaws.com/$f | \
+   unpigz -dp 1 -c | $filter ./$mapper "$@" ; } 2>>${j}_log
+  echo $(date) $(hostname) finished $f >>${j}_log
 }
 
 export -f worker
 
-parallel -v \
+parallel \
     --sshloginfile $hosts \
     --retries 3 \
     --transferfile $(which $mapper|sed 's/\(^.*\/\)/\1.\//') \
@@ -34,5 +50,5 @@
     --workdir $wd \
     -a input_paths \
     --env worker \
-    worker '{}' "$mapper" "$@" | tee -a allout | grep -v 'Authorized uses only' | \
+    worker '{}' '{#}' "$mapper" "$filter" "$@" 2>errs | grep -v 'Authorized uses only' | tee >(wc -l 1>&2) |\
     sac_reducer.py "$@"
--- a/master/src/wecu/sac_mapper.py	Thu May 28 12:55:03 2020 +0000
+++ b/master/src/wecu/sac_mapper.py	Sun May 31 12:06:44 2020 +0000
@@ -3,8 +3,6 @@
 import sys
 import re
 
-print('args',sys.argv)
-
 is_regex = sys.argv[1] == 'true'
 search_terms = sys.argv[2:]
 search_terms_counters = dict()
--- a/master/src/wecu/sac_reducer.py	Thu May 28 12:55:03 2020 +0000
+++ b/master/src/wecu/sac_reducer.py	Sun May 31 12:06:44 2020 +0000
@@ -1,8 +1,10 @@
-#!/usr/bin/python
+#!/usr/bin/python3
 
 import sys
 from collections import defaultdict
 
+print('reducing',sys.argv,file=sys.stderr)
+
 if sys.argv[1] == 'by-file':
     # Show results by file
     for line in sys.stdin:
@@ -17,9 +19,11 @@
             k = line[0] 
             v = line[1]
         except:
+            print('bogus',line,file=sys.stderr)
             continue
 
         counters[k] += int(v)
 
-    for k in counters:
-        print("{}\t{}".format(k, counters[k]))
+    print('nc',len(counters),file=sys.stderr)
+    for k,v in counters.items():
+        print("{}\t{}".format(k, v))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/master/src/wecu/sac_schemes.py	Sun May 31 12:06:44 2020 +0000
@@ -0,0 +1,70 @@
+#!/usr/bin/python3
+'''Assumes export PYTHONIOENCODING=utf-8 has been done if necessary'''
+
+import sys, json, regex
+from collections.abc import Iterable
+
+META_PATH=['Envelope', 'Payload-Metadata', 'HTTP-Response-Metadata']
+
+PATHS={'hdr':['Headers'],
+       'head':['HTML-Metadata','Head'],
+       'body':['HTML-Metadata','Links']}
+
+SCHEME=regex.compile('(<?[a-zA-Z][a-zA-Z0-9+.-]*):')
+URN=regex.compile('(<?urn:[a-z][a-z0-9+.-]*):',regex.I)
+
+def walk(o,f,path=""):
+  '''Apply f to every key+leaf of a json object'''
+  if isinstance(o,dict):
+    for k,v in o.items():
+      if isinstance(v,dict):
+        walk(v,f,"%s.%s"%(path,k))
+      elif isinstance(v,Iterable):
+        walked=False
+        for i in v:
+          if isinstance(i,dict):
+            if (not walked) and (i is not v[0]):
+              print('oops',path,k,i,file=sys.stderr)
+            walked=True
+            walk(i,f,"%s.%s"%(path,k))
+          elif walked:
+            print('oops2',path,k,i,file=sys.stderr)
+        if not walked:
+          f(k,v,"%s.%s"%(path,k))
+      else:
+        f(k,v,"%s.%s"%(path,k))
+  elif isinstance(o,Iterable):
+    for i in o:
+      walk(i,f,path)
+
+def pp(k,v,p):
+  if isinstance(v,str):
+    m=SCHEME.match(v)
+    if m is not None:
+      try:
+        n=v.index('\n')
+        v=v[:n]
+      except ValueError:
+        pass
+      n=URN.match(v)
+      if n is not None:
+        m=n
+      print(p,m.group(1),sep='\t')
+
+n=0
+for l in sys.stdin:
+  n+=1
+  if n%1000==0:
+    print(int(n/1000),file=sys.stderr)
+  if l[0]=='{' and '"WARC-Type":"response"' in l:
+    j=json.loads(l)
+    for s in META_PATH:
+      j=j[s]
+    for k,v in PATHS.items():
+      p=j
+      try:
+        for s in v:
+          p=p[s]
+      except KeyError:
+        continue
+      walk(p,pp,k)
--- a/master/src/wecu/wecu.py	Thu May 28 12:55:03 2020 +0000
+++ b/master/src/wecu/wecu.py	Sun May 31 12:06:44 2020 +0000
@@ -105,12 +105,14 @@
 
     cores_per_worker = num_cores(args)
 
-    os.system('run_sac.sh {} {} {} {} {} {} {}'.format(
+    os.system('run_sac.sh {} {} {} {} {} {} {} {}'.format(
         cores_per_worker,
         HOSTS_FILEPATH,
         WORK_DIR,
         ('sac_mapper.py' if args.mapper is None
          else args.mapper),
+        ('' if args.filter is None
+         else "-f '%s'"%args.filter),
         ('by-file' if args.by_file
          else 'aggregate'),
         regex_str,
@@ -171,6 +173,7 @@
 sac_list.add_argument('--regex', action="store_true", help="Provide this flag to indicate that the provided strings should be treated as regular expressions")
 sac_list.add_argument('--by-file', action="store_true", help="Provide this flag to indicate that the output should not be aggregated and displayed per file instead")
 sac_list.add_argument('--mapper', type=str, help="Supply a bespoke mapper for use in place of sac_mapper.py")
+sac_list.add_argument('--filter', type=str, help="Supply a filter on the unzipped warc file ahead of the mapper")
 sac_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.")
 sac_list.set_defaults(handler=sac_handler)