view master/src/wecu/sac_reducer.py @ 63:d46c8b12fc04

support multiple approaches to key combination, use local files to collect results
author Henry S. Thompson <ht@markup.co.uk>
date Wed, 03 Jun 2020 16:40:34 +0000
parents 892e1c0240e1
children b91e44355bbf
line wrap: on
line source

#!/usr/bin/python3
'''merge results from multiple mappers

# Usage: sac_reducer by-file|aggregate (numKeys)

Input lines: tab-separated, numKeys keys (default 1) followed by count'''

import sys

#print('reducing',sys.argv,file=sys.stderr)
#sys.stderr.flush()

rtype=sys.argv[1]
numKeys=int(sys.argv[2]) if len(sys.argv)==3 else 1
numDicts=numKeys-1

def rec_print(d,buf,pos=0):
    if pos!=0:
        pos+=buf.write(b'\t')
    for k,v in d.items():
        npos=pos+buf.write(k.encode())
        #print(pos,buf.tell(),npos,file=sys.stderr)
        if isinstance(v,dict):
            rec_print(v,buf,npos)
        else:
            buf.write(b'\t')
            buf.write(b'%d'%v)
            buf.write(b'\n')
            buf.truncate()
            buf.seek(0)
            sys.stdout.buffer.write(buf.read(-1))
        buf.seek(pos)


if rtype == 'by-file':
    # Show results by file
    for line in sys.stdin:
        stdout.write(line)
else:
    # Aggregate results
    res={}

    for line in sys.stdin:
        d=res
        try:
            ll = line.split('\t',numKeys+1)
            for i in range(numDicts):
                d=d.setdefault(ll[i],dict())
            k=ll[numDicts].rstrip()
            d[k]=d.get(k,0)+int(ll[numKeys])
        except Exception:
            print('bogus',line,ll,file=sys.stderr)
            continue

#    print('nc',len(res),file=sys.stderr)
#    if numKeys>1:
#        print(' ',list(res.keys()),"\n ",
#              list(sum(len(res[i][j]) for j in res[i].keys()) for i in res.keys()), file=sys.stderr)
    if rtype=='dict':
        print('res=',end='')
        from pprint import pprint
        pprint(res)
    else:
        from io import BufferedRandom, BytesIO
        rec_print(res,BufferedRandom(BytesIO(),10000))