changeset 62:892e1c0240e1

added more robust (I hope) error handling, got reducer working with support for choosing dict or tsv output
author Henry S. Thompson <ht@markup.co.uk>
date Tue, 02 Jun 2020 17:35:07 +0000
parents cfaf5223b071
children d46c8b12fc04
files master/src/wecu/run_sac.sh master/src/wecu/sac_reducer.py master/src/wecu/sac_schemes.py master/src/wecu/wecu.py
diffstat 4 files changed, 128 insertions(+), 58 deletions(-) [+]
line wrap: on
line diff
--- a/master/src/wecu/run_sac.sh	Sun May 31 12:06:44 2020 +0000
+++ b/master/src/wecu/run_sac.sh	Tue Jun 02 17:35:07 2020 +0000
@@ -1,5 +1,6 @@
 #!/bin/bash
-# Usage: run_sac.sh numcores hostsFilename workDir mapper (-f filter) resType patType patterns
+# Usage: run_sac.sh numcores hostsFilename workDir mapper (-f filter) (-k numKeys) resType patType patterns
+echo "$@" 1>cmd
 cores=$1
 hosts=$2
 wd=$3
@@ -16,13 +17,18 @@
 else
  filter=\"\"
 fi
-
-rm -f allout
+if [ "$1" = "-k" ]
+then
+ shift
+ numKeys="$1"
+ shift
+fi
 
 # Get quoting right...
 worker () {
   set -e
   set -o pipefail
+  mkdir -p logs
   f=$1
   shift
   j=$1
@@ -32,11 +38,15 @@
   filter="$1"
   shift
   shift # we don't need/want the resType either
-  echo $(date) $(hostname) start $f >>${j}_log
+  me=$(hostname | cut -c 15)
+  ff=$(echo $f | cut -f 4,6 -d / | sed 's/CC-MAIN-//;s/\.warc.*$//')
+  echo $(date +%Y-%m-%d.%H:%M:%S) $me start $j $ff >>logs/${j}_log
   export PYTHONIOENCODING=utf-8
-  { curl -s -N https://commoncrawl.s3.amazonaws.com/$f | \
-   unpigz -dp 1 -c | $filter ./$mapper "$@" ; } 2>>${j}_log
-  echo $(date) $(hostname) finished $f >>${j}_log
+  { IFS=$'\n' ; stderr=( $( { curl -s -N https://commoncrawl.s3.amazonaws.com/$f | \
+   unpigz -dp 1 -c | $filter ./$mapper "$@" ; } 2>&1 1>&4 ; ) ) ; unset IFS ; } 4>&1
+  { echo $(date +%Y-%m-%d.%H:%M:%S) $me finished $j $ff
+    printf '%s\n' "${stderr[@]}" ; } | sed '2,$s/^/ /' >>logs/${j}_log # hack to try to
+      # guarantee atomic entry in the log
 }
 
 export -f worker
@@ -50,5 +60,6 @@
     --workdir $wd \
     -a input_paths \
     --env worker \
-    worker '{}' '{#}' "$mapper" "$filter" "$@" 2>errs | grep -v 'Authorized uses only' | tee >(wc -l 1>&2) |\
-    sac_reducer.py "$@"
+    --return 'logs/{#}_log' --cleanup \
+    worker '{}' '{#}' "$mapper" "$filter" "$@" | tee >(wc -l 1>&2) |\
+    sac_reducer.py $1 $numKeys
--- a/master/src/wecu/sac_reducer.py	Sun May 31 12:06:44 2020 +0000
+++ b/master/src/wecu/sac_reducer.py	Tue Jun 02 17:35:07 2020 +0000
@@ -1,29 +1,49 @@
 #!/usr/bin/python3
+'''merge results from multiple mappers
+
+# Usage: sac_reducer by-file|aggregate (numKeys)
+
+Input lines: tab-separated, numKeys keys (default 1) followed by count'''
 
 import sys
-from collections import defaultdict
+from pprint import pprint
 
 print('reducing',sys.argv,file=sys.stderr)
+sys.stderr.flush()
 
-if sys.argv[1] == 'by-file':
+rtype=sys.argv[1]
+numKeys=int(sys.argv[2]) if len(sys.argv)==3 else 1
+numDicts=numKeys-1
+
+if rtype == 'by-file':
     # Show results by file
     for line in sys.stdin:
-        print(line.strip())
+        stdout.write(line)
 else:
     # Aggregate results
-    counters = defaultdict(int)
+    res={}
 
     for line in sys.stdin:
+        d=res
         try:
-            line = line.strip().split('\t')
-            k = line[0] 
-            v = line[1]
-        except:
-            print('bogus',line,file=sys.stderr)
+            ll = line.split('\t',4)
+            for i in range(numDicts):
+                d=d.setdefault(ll[i],dict())
+            k=ll[numDicts].rstrip()
+            d[k]=d.get(k,0)+int(ll[numKeys])
+        except Exception:
+            print('bogus',line,ll,file=sys.stderr)
             continue
 
-        counters[k] += int(v)
-
-    print('nc',len(counters),file=sys.stderr)
-    for k,v in counters.items():
-        print("{}\t{}".format(k, v))
+    print('nc',len(res),
+          list(res.keys()),
+          list(sum(len(res[i][j]) for j in res[i].keys()) for i in res.keys()) if numKeys>1 else '',
+          file=sys.stderr)
+    if rtype=='dict':
+        print('res=',end='')
+        pprint(res)
+    else:
+        for k1,v1 in res.items():
+            for k2,v2 in v1.items():
+                for k3,v3 in v2.items():
+                    print(k1,k2,k3,v3,sep='\t')
--- a/master/src/wecu/sac_schemes.py	Sun May 31 12:06:44 2020 +0000
+++ b/master/src/wecu/sac_schemes.py	Tue Jun 02 17:35:07 2020 +0000
@@ -13,58 +13,93 @@
 SCHEME=regex.compile('(<?[a-zA-Z][a-zA-Z0-9+.-]*):')
 URN=regex.compile('(<?urn:[a-z][a-z0-9+.-]*):',regex.I)
 
-def walk(o,f,path=""):
-  '''Apply f to every key+leaf of a json object'''
+def walk(o,f,r,path=None):
+  '''Apply f to every key+leaf of a json object in region r'''
   if isinstance(o,dict):
     for k,v in o.items():
       if isinstance(v,dict):
-        walk(v,f,"%s.%s"%(path,k))
+        walk(v,f,r,(path,k))
       elif isinstance(v,Iterable):
         walked=False
         for i in v:
           if isinstance(i,dict):
             if (not walked) and (i is not v[0]):
-              print('oops',path,k,i,file=sys.stderr)
+              print('oops',key,path,k,i,file=sys.stderr)
             walked=True
-            walk(i,f,"%s.%s"%(path,k))
+            walk(i,f,r,(path,k))
           elif walked:
-            print('oops2',path,k,i,file=sys.stderr)
+            print('oops2',key,path,k,i,file=sys.stderr)
         if not walked:
-          f(k,v,"%s.%s"%(path,k))
+          f(v,k,path,r)
       else:
-        f(k,v,"%s.%s"%(path,k))
+        f(v,k,path,r)
   elif isinstance(o,Iterable):
     for i in o:
-      walk(i,f,path)
+      walk(i,f,r,path)
 
-def pp(k,v,p):
+def pp(v,k,p,r):
   if isinstance(v,str):
     m=SCHEME.match(v)
     if m is not None:
-      try:
-        n=v.index('\n')
-        v=v[:n]
-      except ValueError:
-        pass
       n=URN.match(v)
       if n is not None:
         m=n
-      print(p,m.group(1),sep='\t')
+      s=m.group(1)
+      d=res[r].setdefault(p,dict())
+      d=d.setdefault(k,dict())
+      d[s]=d.get(s,0)+1
+
+def main():
+  global n,res # for debugging
+  n=0
+  res=dict((r,dict()) for r in PATHS.keys())
+  for l in sys.stdin:
+    if l[0]=='{' and '"WARC-Type":"response"' in l:
+      j=json.loads(l)
+      n+=1
+      for s in META_PATH:
+        j=j[s]
+      for k,v in PATHS.items():
+        p=j
+        try:
+          for s in v:
+            p=p[s]
+        except KeyError as e:
+          continue
+        walk(p,pp,k)
+
+  print(n,file=sys.stderr)
 
-n=0
-for l in sys.stdin:
-  n+=1
-  if n%1000==0:
-    print(int(n/1000),file=sys.stderr)
-  if l[0]=='{' and '"WARC-Type":"response"' in l:
-    j=json.loads(l)
-    for s in META_PATH:
-      j=j[s]
-    for k,v in PATHS.items():
-      p=j
-      try:
-        for s in v:
-          p=p[s]
-      except KeyError:
-        continue
-      walk(p,pp,k)
+  for r in res.keys():
+    rv=res[r]
+    for p in rv.keys():
+      pv=rv[p]
+      for k,v in pv.items():
+        for s,c in v.items():
+          print(r,end='')
+          # The following assumes paths are always either length 1 or length 2!!!
+          #  by open-coding rather than using qq(p)
+          if p is None:
+            print('',end='\t')
+          else:
+            assert p[0] is None
+            print('.',p[1],sep='',end='\t')
+          print(k,end='\t')
+          print(s,c,sep='\t')
+
+def qq(p):
+  if p is None:
+    sys.stdout.write('\t')
+  else:
+    qq1(p[0])
+    print(p[1],end='\t')
+
+def qq1(p):
+  if p is None:
+    return
+  else:
+    qq1(p[0])
+    print(p[1],end='.')
+
+if __name__=="__main__":
+  main()
--- a/master/src/wecu/wecu.py	Sun May 31 12:06:44 2020 +0000
+++ b/master/src/wecu/wecu.py	Tue Jun 02 17:35:07 2020 +0000
@@ -105,7 +105,7 @@
 
     cores_per_worker = num_cores(args)
 
-    os.system('run_sac.sh {} {} {} {} {} {} {} {}'.format(
+    os.system('run_sac.sh {} {} {} {} {} {} {} {} {}'.format(
         cores_per_worker,
         HOSTS_FILEPATH,
         WORK_DIR,
@@ -113,8 +113,10 @@
          else args.mapper),
         ('' if args.filter is None
          else "-f '%s'"%args.filter),
+        ('' if args.numKeys is None
+         else "-k %s"%args.numKeys),
         ('by-file' if args.by_file
-         else 'aggregate'),
+         else 'dict' if args.dict else 'aggregate'),
         regex_str,
         patterns_str))
 
@@ -172,8 +174,10 @@
 sac_list.add_argument('pattern', type=str, nargs='+')
 sac_list.add_argument('--regex', action="store_true", help="Provide this flag to indicate that the provided strings should be treated as regular expressions")
 sac_list.add_argument('--by-file', action="store_true", help="Provide this flag to indicate that the output should not be aggregated and displayed per file instead")
+sac_list.add_argument('--dict', action="store_true", help="Provide this flag to indicate that the output should aggregated and displayed in the form 'res={dict}'")
 sac_list.add_argument('--mapper', type=str, help="Supply a bespoke mapper for use in place of sac_mapper.py")
 sac_list.add_argument('--filter', type=str, help="Supply a filter on the unzipped warc file ahead of the mapper")
+sac_list.add_argument('--numKeys', type=int, help="Depth of key list, default 1")
 sac_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.")
 sac_list.set_defaults(handler=sac_handler)