Mercurial > hg > cc > azure
comparison master/src/wecu/sac_reducer.py @ 62:892e1c0240e1
added more robust (I hope) error handling,
got reducer working with support for choosing dict or tsv output
author | Henry S. Thompson <ht@markup.co.uk> |
---|---|
date | Tue, 02 Jun 2020 17:35:07 +0000 |
parents | cfaf5223b071 |
children | d46c8b12fc04 |
comparison
equal
deleted
inserted
replaced
61:cfaf5223b071 | 62:892e1c0240e1 |
---|---|
1 #!/usr/bin/python3 | 1 #!/usr/bin/python3 |
2 '''merge results from multiple mappers | |
3 | |
4 # Usage: sac_reducer by-file|aggregate (numKeys) | |
5 | |
6 Input lines: tab-separated, numKeys keys (default 1) followed by count''' | |
2 | 7 |
3 import sys | 8 import sys |
4 from collections import defaultdict | 9 from pprint import pprint |
5 | 10 |
6 print('reducing',sys.argv,file=sys.stderr) | 11 print('reducing',sys.argv,file=sys.stderr) |
12 sys.stderr.flush() | |
7 | 13 |
8 if sys.argv[1] == 'by-file': | 14 rtype=sys.argv[1] |
15 numKeys=int(sys.argv[2]) if len(sys.argv)==3 else 1 | |
16 numDicts=numKeys-1 | |
17 | |
18 if rtype == 'by-file': | |
9 # Show results by file | 19 # Show results by file |
10 for line in sys.stdin: | 20 for line in sys.stdin: |
11 print(line.strip()) | 21 stdout.write(line) |
12 else: | 22 else: |
13 # Aggregate results | 23 # Aggregate results |
14 counters = defaultdict(int) | 24 res={} |
15 | 25 |
16 for line in sys.stdin: | 26 for line in sys.stdin: |
27 d=res | |
17 try: | 28 try: |
18 line = line.strip().split('\t') | 29 ll = line.split('\t',4) |
19 k = line[0] | 30 for i in range(numDicts): |
20 v = line[1] | 31 d=d.setdefault(ll[i],dict()) |
21 except: | 32 k=ll[numDicts].rstrip() |
22 print('bogus',line,file=sys.stderr) | 33 d[k]=d.get(k,0)+int(ll[numKeys]) |
34 except Exception: | |
35 print('bogus',line,ll,file=sys.stderr) | |
23 continue | 36 continue |
24 | 37 |
25 counters[k] += int(v) | 38 print('nc',len(res), |
26 | 39 list(res.keys()), |
27 print('nc',len(counters),file=sys.stderr) | 40 list(sum(len(res[i][j]) for j in res[i].keys()) for i in res.keys()) if numKeys>1 else '', |
28 for k,v in counters.items(): | 41 file=sys.stderr) |
29 print("{}\t{}".format(k, v)) | 42 if rtype=='dict': |
43 print('res=',end='') | |
44 pprint(res) | |
45 else: | |
46 for k1,v1 in res.items(): | |
47 for k2,v2 in v1.items(): | |
48 for k3,v3 in v2.items(): | |
49 print(k1,k2,k3,v3,sep='\t') |