Mercurial > hg > cc > azure
view master/src/wecu/sac_reducer.py @ 62:892e1c0240e1
added more robust (I hope) error handling,
got reducer working with support for choosing dict or tsv output
author | Henry S. Thompson <ht@markup.co.uk> |
---|---|
date | Tue, 02 Jun 2020 17:35:07 +0000 |
parents | cfaf5223b071 |
children | d46c8b12fc04 |
line wrap: on
line source
#!/usr/bin/python3 '''merge results from multiple mappers # Usage: sac_reducer by-file|aggregate (numKeys) Input lines: tab-separated, numKeys keys (default 1) followed by count''' import sys from pprint import pprint print('reducing',sys.argv,file=sys.stderr) sys.stderr.flush() rtype=sys.argv[1] numKeys=int(sys.argv[2]) if len(sys.argv)==3 else 1 numDicts=numKeys-1 if rtype == 'by-file': # Show results by file for line in sys.stdin: stdout.write(line) else: # Aggregate results res={} for line in sys.stdin: d=res try: ll = line.split('\t',4) for i in range(numDicts): d=d.setdefault(ll[i],dict()) k=ll[numDicts].rstrip() d[k]=d.get(k,0)+int(ll[numKeys]) except Exception: print('bogus',line,ll,file=sys.stderr) continue print('nc',len(res), list(res.keys()), list(sum(len(res[i][j]) for j in res[i].keys()) for i in res.keys()) if numKeys>1 else '', file=sys.stderr) if rtype=='dict': print('res=',end='') pprint(res) else: for k1,v1 in res.items(): for k2,v2 in v1.items(): for k3,v3 in v2.items(): print(k1,k2,k3,v3,sep='\t')