Mercurial > hg > cc > azure
view master/src/wecu/sac_reducer.py @ 63:d46c8b12fc04
support multiple approaches to key combination, use local files to collect results
author | Henry S. Thompson <ht@markup.co.uk> |
---|---|
date | Wed, 03 Jun 2020 16:40:34 +0000 |
parents | 892e1c0240e1 |
children | b91e44355bbf |
line wrap: on
line source
#!/usr/bin/python3 '''merge results from multiple mappers # Usage: sac_reducer by-file|aggregate (numKeys) Input lines: tab-separated, numKeys keys (default 1) followed by count''' import sys #print('reducing',sys.argv,file=sys.stderr) #sys.stderr.flush() rtype=sys.argv[1] numKeys=int(sys.argv[2]) if len(sys.argv)==3 else 1 numDicts=numKeys-1 def rec_print(d,buf,pos=0): if pos!=0: pos+=buf.write(b'\t') for k,v in d.items(): npos=pos+buf.write(k.encode()) #print(pos,buf.tell(),npos,file=sys.stderr) if isinstance(v,dict): rec_print(v,buf,npos) else: buf.write(b'\t') buf.write(b'%d'%v) buf.write(b'\n') buf.truncate() buf.seek(0) sys.stdout.buffer.write(buf.read(-1)) buf.seek(pos) if rtype == 'by-file': # Show results by file for line in sys.stdin: stdout.write(line) else: # Aggregate results res={} for line in sys.stdin: d=res try: ll = line.split('\t',numKeys+1) for i in range(numDicts): d=d.setdefault(ll[i],dict()) k=ll[numDicts].rstrip() d[k]=d.get(k,0)+int(ll[numKeys]) except Exception: print('bogus',line,ll,file=sys.stderr) continue # print('nc',len(res),file=sys.stderr) # if numKeys>1: # print(' ',list(res.keys()),"\n ", # list(sum(len(res[i][j]) for j in res[i].keys()) for i in res.keys()), file=sys.stderr) if rtype=='dict': print('res=',end='') from pprint import pprint pprint(res) else: from io import BufferedRandom, BytesIO rec_print(res,BufferedRandom(BytesIO(),10000))