changeset 63:d46c8b12fc04

support multiple approaches to key combination, use local files to collect results
author Henry S. Thompson <ht@markup.co.uk>
date Wed, 03 Jun 2020 16:40:34 +0000
parents 892e1c0240e1
children b91e44355bbf
files master/src/wecu/run_sac.sh master/src/wecu/sac_reducer.py master/src/wecu/sac_schemes.py master/src/wecu/wecu.py
diffstat 4 files changed, 160 insertions(+), 39 deletions(-) [+]
line wrap: on
line diff
--- a/master/src/wecu/run_sac.sh	Tue Jun 02 17:35:07 2020 +0000
+++ b/master/src/wecu/run_sac.sh	Wed Jun 03 16:40:34 2020 +0000
@@ -1,5 +1,5 @@
 #!/bin/bash
-# Usage: run_sac.sh numcores hostsFilename workDir mapper (-f filter) (-k numKeys) resType patType patterns
+# Usage: run_sac.sh numcores hostsFilename workDir mapper keyHandler (-f filter) (-k numKeys) resType patType patterns
 echo "$@" 1>cmd
 cores=$1
 hosts=$2
@@ -29,6 +29,7 @@
   set -e
   set -o pipefail
   mkdir -p logs
+  mkdir -p res
   f=$1
   shift
   j=$1
@@ -43,7 +44,7 @@
   echo $(date +%Y-%m-%d.%H:%M:%S) $me start $j $ff >>logs/${j}_log
   export PYTHONIOENCODING=utf-8
   { IFS=$'\n' ; stderr=( $( { curl -s -N https://commoncrawl.s3.amazonaws.com/$f | \
-   unpigz -dp 1 -c | $filter ./$mapper "$@" ; } 2>&1 1>&4 ; ) ) ; unset IFS ; } 4>&1
+   unpigz -dp 1 -c | $filter ./$mapper "$@" ; } 2>&1 1>res/${j}.tsv ; ) ) ; unset IFS ; }
   { echo $(date +%Y-%m-%d.%H:%M:%S) $me finished $j $ff
     printf '%s\n' "${stderr[@]}" ; } | sed '2,$s/^/ /' >>logs/${j}_log # hack to try to
       # guarantee atomic entry in the log
@@ -60,6 +61,6 @@
     --workdir $wd \
     -a input_paths \
     --env worker \
-    --return 'logs/{#}_log' --cleanup \
-    worker '{}' '{#}' "$mapper" "$filter" "$@" | tee >(wc -l 1>&2) |\
-    sac_reducer.py $1 $numKeys
+    --return 'logs/{#}_log' --return 'res/{#}.tsv' --cleanup \
+    worker '{}' '{#}' "$mapper" "$filter" "$@"
+cat res/*.tsv | sac_reducer.py $1 $numKeys
--- a/master/src/wecu/sac_reducer.py	Tue Jun 02 17:35:07 2020 +0000
+++ b/master/src/wecu/sac_reducer.py	Wed Jun 03 16:40:34 2020 +0000
@@ -6,15 +6,32 @@
 Input lines: tab-separated, numKeys keys (default 1) followed by count'''
 
 import sys
-from pprint import pprint
 
-print('reducing',sys.argv,file=sys.stderr)
-sys.stderr.flush()
+#print('reducing',sys.argv,file=sys.stderr)
+#sys.stderr.flush()
 
 rtype=sys.argv[1]
 numKeys=int(sys.argv[2]) if len(sys.argv)==3 else 1
 numDicts=numKeys-1
 
+def rec_print(d,buf,pos=0):
+    if pos!=0:
+        pos+=buf.write(b'\t')
+    for k,v in d.items():
+        npos=pos+buf.write(k.encode())
+        #print(pos,buf.tell(),npos,file=sys.stderr)
+        if isinstance(v,dict):
+            rec_print(v,buf,npos)
+        else:
+            buf.write(b'\t')
+            buf.write(b'%d'%v)
+            buf.write(b'\n')
+            buf.truncate()
+            buf.seek(0)
+            sys.stdout.buffer.write(buf.read(-1))
+        buf.seek(pos)
+
+
 if rtype == 'by-file':
     # Show results by file
     for line in sys.stdin:
@@ -26,7 +43,7 @@
     for line in sys.stdin:
         d=res
         try:
-            ll = line.split('\t',4)
+            ll = line.split('\t',numKeys+1)
             for i in range(numDicts):
                 d=d.setdefault(ll[i],dict())
             k=ll[numDicts].rstrip()
@@ -35,15 +52,15 @@
             print('bogus',line,ll,file=sys.stderr)
             continue
 
-    print('nc',len(res),
-          list(res.keys()),
-          list(sum(len(res[i][j]) for j in res[i].keys()) for i in res.keys()) if numKeys>1 else '',
-          file=sys.stderr)
+#    print('nc',len(res),file=sys.stderr)
+#    if numKeys>1:
+#        print(' ',list(res.keys()),"\n ",
+#              list(sum(len(res[i][j]) for j in res[i].keys()) for i in res.keys()), file=sys.stderr)
     if rtype=='dict':
         print('res=',end='')
+        from pprint import pprint
         pprint(res)
     else:
-        for k1,v1 in res.items():
-            for k2,v2 in v1.items():
-                for k3,v3 in v2.items():
-                    print(k1,k2,k3,v3,sep='\t')
+        from io import BufferedRandom, BytesIO
+        rec_print(res,BufferedRandom(BytesIO(),10000))
+
--- a/master/src/wecu/sac_schemes.py	Tue Jun 02 17:35:07 2020 +0000
+++ b/master/src/wecu/sac_schemes.py	Wed Jun 03 16:40:34 2020 +0000
@@ -1,9 +1,22 @@
 #!/usr/bin/python3
-'''Assumes export PYTHONIOENCODING=utf-8 has been done if necessary'''
+'''Assumes export PYTHONIOENCODING=utf-8 has been done if necessary
+
+Usage: uz ...wat.gz | sac_schemes.py [-d] [altStorageScheme]
+
+where altStorageScheme if present selects an alternative approach to storing triple counts:
+  [absent]: three nested dictionaries
+         1: one dictionary indexed by 4-tuple
+         2: one dictionary indexed by ".".join(keys)'''
 
 import sys, json, regex
 from collections.abc import Iterable
 
+if len(sys.argv)>1 and sys.argv[1]=='-d':
+  sys.argv.pop(1)
+  dictRes=True
+else:
+  dictRes=False
+
 META_PATH=['Envelope', 'Payload-Metadata', 'HTTP-Response-Metadata']
 
 PATHS={'hdr':['Headers'],
@@ -13,6 +26,8 @@
 SCHEME=regex.compile('(<?[a-zA-Z][a-zA-Z0-9+.-]*):')
 URN=regex.compile('(<?urn:[a-z][a-z0-9+.-]*):',regex.I)
 
+EMPTY=''
+
 def walk(o,f,r,path=None):
   '''Apply f to every key+leaf of a json object in region r'''
   if isinstance(o,dict):
@@ -38,6 +53,45 @@
       walk(i,f,r,path)
 
 def pp(v,k,p,r):
+  '''Uses nested dictionaries'''
+  if isinstance(v,str):
+    m=SCHEME.match(v)
+    if m is not None:
+      n=URN.match(v)
+      if n is not None:
+        m=n
+      s=m.group(1)
+      # The following assumes paths are always either length 1 or length 2!!!
+      #  by open-coding rather than using qq(p)
+      if p is not None:
+        assert p[0] is None
+        p=p[1]
+      d=res[r].setdefault(p,dict())
+      d=d.setdefault(k,dict())
+      d[s]=d.get(s,0)+1
+
+def pp_tuple(v,k,p,r):
+  '''Uses one dict and 4-tuple'''
+  if isinstance(v,str):
+    m=SCHEME.match(v)
+    if m is not None:
+      n=URN.match(v)
+      if n is not None:
+        m=n
+      s=m.group(1)
+      # The following assumes paths are always either length 1 or length 2!!!
+      #  by open-coding rather than using qq(p)
+      if p is not None:
+        assert p[0] is None
+        p=p[1]
+      k=(r,p,k,s)
+      res[k]=res.get(k,0)+1
+
+SEP='\x00'
+DOT='.'
+
+def pp_concat(v,k,p,r):
+  '''Uses one dict and one string'''
   if isinstance(v,str):
     m=SCHEME.match(v)
     if m is not None:
@@ -45,14 +99,70 @@
       if n is not None:
         m=n
       s=m.group(1)
-      d=res[r].setdefault(p,dict())
-      d=d.setdefault(k,dict())
-      d[s]=d.get(s,0)+1
+      # The following assumes paths are always either length 1 or length 2!!!
+      #  by open-coding rather than using qq(p)
+      if p is None:
+        p=EMPTY
+      else:
+        assert p[0] is None
+        p=p[1]
+      k=SEP.join((r,p,k,s))
+      res[k]=res.get(k,0)+1
+
+def dump(res):
+  for r in res.keys():
+    rv=res[r]
+    for p in rv.keys():
+      pv=rv[p]
+      for k,v in pv.items():
+        for s,c in v.items():
+          print(r,end=EMPTY)
+          if p is None:
+            print(EMPTY,end='\t')
+          else:
+            print('.',p,sep=EMPTY,end='\t')
+          print(k,end='\t')
+          print(s,c,sep='\t')
+
+def dump_tuple(res):
+  for (r,p,k,s),c in res.items():
+    print(r,end=EMPTY)
+    # The following assumes paths are always either length 1 or length 2!!!
+    #  by open-coding rather than using qq(p)
+    if p is None:
+      print(EMPTY,end='\t')
+    else:
+      print(DOT,p,sep=EMPTY,end='\t')
+    print(k,end='\t')
+    print(s,c,sep='\t')
+
+def dump_concat(res):
+  for ks,c in res.items():
+    (r,p,k,s)=ks.split(SEP)
+    print(r,end=EMPTY)
+    # The following assumes paths are always either length 1 or length 2!!!
+    #  by open-coding rather than using qq(p)
+    if p==EMPTY:
+      print(EMPTY,end='\t')
+    else:
+      print('.',p,sep=EMPTY,end='\t')
+    print(k,end='\t')
+    print(s,c,sep='\t')
+
+if len(sys.argv)==2:
+  res=dict()
+  if sys.argv[1]=='1':
+    pp=pp_tuple 
+    dump=dump_tuple
+  else:
+    pp=pp_concat
+    dump=dump_concat
+else:
+  res=dict((r,dict()) for r in PATHS.keys())
 
 def main():
-  global n,res # for debugging
+  global n # for debugging
   n=0
-  res=dict((r,dict()) for r in PATHS.keys())
   for l in sys.stdin:
     if l[0]=='{' and '"WARC-Type":"response"' in l:
       j=json.loads(l)
@@ -70,22 +180,12 @@
 
   print(n,file=sys.stderr)
 
-  for r in res.keys():
-    rv=res[r]
-    for p in rv.keys():
-      pv=rv[p]
-      for k,v in pv.items():
-        for s,c in v.items():
-          print(r,end='')
-          # The following assumes paths are always either length 1 or length 2!!!
-          #  by open-coding rather than using qq(p)
-          if p is None:
-            print('',end='\t')
-          else:
-            assert p[0] is None
-            print('.',p[1],sep='',end='\t')
-          print(k,end='\t')
-          print(s,c,sep='\t')
+  if dictRes:
+    print('res=',end=EMPTY)
+    from pprint import pprint
+    pprint(res)
+  else:
+    dump(res)
 
 def qq(p):
   if p is None:
--- a/master/src/wecu/wecu.py	Tue Jun 02 17:35:07 2020 +0000
+++ b/master/src/wecu/wecu.py	Wed Jun 03 16:40:34 2020 +0000
@@ -111,6 +111,8 @@
         WORK_DIR,
         ('sac_mapper.py' if args.mapper is None
          else args.mapper),
+        ('' if args.keyHandler is None
+         else "-h %s"%args.keyHandler)
         ('' if args.filter is None
          else "-f '%s'"%args.filter),
         ('' if args.numKeys is None
@@ -177,6 +179,7 @@
 sac_list.add_argument('--dict', action="store_true", help="Provide this flag to indicate that the output should aggregated and displayed in the form 'res={dict}'")
 sac_list.add_argument('--mapper', type=str, help="Supply a bespoke mapper for use in place of sac_mapper.py")
 sac_list.add_argument('--filter', type=str, help="Supply a filter on the unzipped warc file ahead of the mapper")
+sac_list.add_argument('--keyHandler', type=int, help="Key handler, default for nested dicts, 1 for 4-tuple, 2 for concat")
 sac_list.add_argument('--numKeys', type=int, help="Depth of key list, default 1")
 sac_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.")
 sac_list.set_defaults(handler=sac_handler)