Mercurial > hg > cc > azure
comparison master/src/wecu/wecu.py @ 58:a3edba8dab11
move to right place in tree
author | Henry S. Thompson <ht@markup.co.uk> |
---|---|
date | Thu, 28 May 2020 09:56:42 +0000 |
parents | master/wecu/wecu.py@ac1a20e627a9 |
children | 5fdca5baa4e9 |
comparison
equal
deleted
inserted
replaced
57:ac1a20e627a9 | 58:a3edba8dab11 |
---|---|
1 #!/usr/bin/python3 | |
2 import os | |
3 import sys | |
4 import argparse | |
5 | |
6 HOME=os.getenv('HOME') | |
7 DEFAULT_HOSTS='hosts' | |
8 DEFAULT_CORES='cores.txt' | |
9 DEFAULT_WD=os.getcwd() | |
10 | |
11 try: | |
12 with open('%s/.wecu'%HOME) as conf: | |
13 for l in conf: | |
14 if l[0]!='#': | |
15 eval(l.rstrip()) | |
16 except IOError as e: | |
17 raise e | |
18 pass | |
19 | |
20 | |
21 HOSTS_FILEPATH=os.getenv('WECU_HOSTS',DEFAULT_HOSTS) | |
22 CORES_FILEPATH=os.getenv('WECU_CORES',DEFAULT_CORES) | |
23 WORK_DIR=os.getenv('WECU_WD',DEFAULT_WD) | |
24 | |
25 | |
26 def setup_handler(args): | |
27 if args.check_files: | |
28 if os.path.exists(HOSTS_FILEPATH) and os.path.exists(CORES_FILEPATH): | |
29 print("Config ok!") | |
30 else: | |
31 print("WECU is not configured! Run `wecu setup`") | |
32 | |
33 return | |
34 | |
35 if args.check_nodes_up: | |
36 command = 'cat %s | '%HOSTS_FILEPATH | |
37 command += 'parallel --will-cite ' | |
38 command += '"nc -z {} 22 2> /dev/null; ' | |
39 command += 'if [ $? -eq 0 ]; ' | |
40 command += 'then ' | |
41 command += ' echo Node {} is OK; ' | |
42 command += 'else' | |
43 command += ' echo Node {} is down; ' | |
44 command += 'fi"' | |
45 | |
46 os.system(command) | |
47 return | |
48 | |
49 # Perform config | |
50 os.system('setup.sh {}'.format(args.password)) | |
51 | |
52 def list_handler(args): | |
53 if args.object_to_list == 'machines': | |
54 os.system('cat %s'%HOSTS_FILEPATH) | |
55 print('') | |
56 elif args.object_to_list == 'input_files': | |
57 if args.all: | |
58 os.system('cat input_paths') | |
59 return | |
60 | |
61 print('Crawl name:') | |
62 os.system('cat crawl_name.txt') | |
63 print | |
64 print('Number of input files:') | |
65 os.system('wc -l < input_paths') | |
66 | |
67 def execute_handler(args): | |
68 command = "time parallel --onall " | |
69 command += "--sshloginfile %s "%HOSTS_FILEPATH | |
70 command += "--retries 3 " | |
71 | |
72 to_run_on_remote = "" | |
73 | |
74 if args.transfer_file: | |
75 for filename in args.transfer_file: | |
76 command += "--transferfile {} ".format(filename) | |
77 | |
78 to_run_on_remote += "chmod +x {}; ".format(filename) | |
79 | |
80 to_run_on_remote += args.command | |
81 | |
82 command += "--will-cite " | |
83 command += "--workdir %s "%WORK_DIR | |
84 command += "eval ::: '{}' 2>&1 | grep -v \"Authorized uses only\"\n".format(to_run_on_remote) | |
85 | |
86 os.system('bash -c "%s"'%command) | |
87 | |
88 def num_cores(args): | |
89 if args.jobs_per_worker is None: | |
90 with open(CORES_FILEPATH) as cf: | |
91 return cf.readline().rstrip() | |
92 else: | |
93 return args.jobs_per_worker | |
94 | |
95 def mapred_handler(args): | |
96 cores = num_cores(args) | |
97 run_mapred(args.mapper, args.reducer, cores) | |
98 | |
99 def sac_handler(args): | |
100 regex_str = 'false' | |
101 if args.regex: | |
102 regex_str = 'true' | |
103 | |
104 patterns_str = ' '.join(['"{}"'.format(x) for x in args.pattern]) | |
105 | |
106 cores_per_worker = num_cores(args) | |
107 | |
108 if args.by_file: | |
109 os.system('run_sac.sh {} {} {} by-file {} {}'.format(cores_per_worker, | |
110 HOSTS_FILEPATH, | |
111 WORK_DIR, | |
112 regex_str, | |
113 patterns_str)) | |
114 return | |
115 | |
116 | |
117 os.system('run_sac.sh {} {} {} aggregate {} {}'.format(cores_per_worker, | |
118 HOSTS_FILEPATH, | |
119 WORK_DIR, | |
120 regex_str, | |
121 patterns_str)) | |
122 | |
123 def generate_handler(args): | |
124 import generate_file_list | |
125 generate_file_list.main() | |
126 | |
127 def utilization_handler(args): | |
128 from graph_hardware_usage import generate_hardware_graph | |
129 | |
130 duration_seconds = 120 | |
131 if args.seconds: | |
132 duration_seconds = args.seconds | |
133 | |
134 os.system('./get_hardware_util.sh {}'.format(duration_seconds)) | |
135 generate_hardware_graph(args.output_graph_filename) | |
136 | |
137 def run_mapred(mapper, reducer, cores): | |
138 | |
139 os.system('./run_mapreduce.sh {} {} {}'.format(cores_per_worker, mapper, reducer)) | |
140 | |
141 # Top-level parser | |
142 parser = argparse.ArgumentParser(description='Wee CommonCrawl Utility (WECU) is a CLI tool which allows running scan-and-count workloads on Common Crawl data without') | |
143 subparsers = parser.add_subparsers(help='A sub-command to be executed') | |
144 | |
145 # Cluster Setup parser | |
146 parser_setup = subparsers.add_parser('setup', help='Setup the framework to operate on an HDInsight cluster') | |
147 parser_setup.add_argument('password', type=str, help='Password to the cluster - used to setup passwordless communication') | |
148 parser_setup.add_argument('--check_files', action="store_true", help='Use this flag to check that all the required configuration files are in place') | |
149 parser_setup.add_argument('--check_nodes_up', action='store_true', help='Check if the worker machines are responsive') | |
150 parser_setup.set_defaults(handler=setup_handler) | |
151 | |
152 # Show cluster configuration parser | |
153 parser_list = subparsers.add_parser('list', help='List configuration') | |
154 parser_list.add_argument('object_to_list', type=str, choices=['machines', 'input_files'], help='Choose whether to list machines or (summary) of currently selected input files') | |
155 parser_list.add_argument('--all', action='store_true', help='Show a list of all input files instead of a summary (Can be used alongside input_files argument only)') | |
156 parser_list.set_defaults(handler=list_handler) | |
157 | |
158 # Remote command execution parser | |
159 execute_list = subparsers.add_parser('execute', help='Execute arbitrary command on all worker machines in the cluster in parallel') | |
160 execute_list.add_argument('command', type=str) | |
161 execute_list.add_argument('--transfer_file', nargs='+', help="Provide files which should be transferred to the remote workers before the executions starts.") | |
162 execute_list.set_defaults(handler=execute_handler) | |
163 | |
164 # MapReduce jobs parser | |
165 mapred_list = subparsers.add_parser('mapred', help="Execute mapreduce jobs using the provided mapper and reducer executable") | |
166 mapred_list.add_argument('mapper', type=str, help='Path to the map phase executable') | |
167 mapred_list.add_argument('reducer', type=str, help='Path to the reduce phase executable') | |
168 mapred_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.") | |
169 | |
170 mapred_list.set_defaults(handler=mapred_handler) | |
171 | |
172 # Scan-and-count parser | |
173 sac_list = subparsers.add_parser('sac', help='Execute scan-and-count (SAC) workloads directly from the command line') | |
174 sac_list.add_argument('pattern', type=str, nargs='+') | |
175 sac_list.add_argument('--regex', action="store_true", help="Provide this flag to indicate that the provided strings should be treated as regular expressions") | |
176 sac_list.add_argument('--by-file', action="store_true", help="Provide this flag to indicate that the output should not be aggregated and displayed per file instead") | |
177 sac_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.") | |
178 sac_list.set_defaults(handler=sac_handler) | |
179 | |
180 # Generate sample parser | |
181 generate_parser = subparsers.add_parser('generate-sample', help='Generate a sample of a chosen Common Crawl month') | |
182 generate_parser.set_defaults(handler=generate_handler) | |
183 | |
184 # Generate utilization graph parser | |
185 utilization_graph = subparsers.add_parser('utilisation', help='Generate CPU utilisation graph and files') | |
186 utilization_graph.add_argument('output_graph_filename', type=str, help='The path to the location of the output graph') | |
187 utilization_graph.add_argument('--seconds', type=int, help='Provide this flag to change how long the utilisation is measure for (the default is 120 seconds).') | |
188 utilization_graph.set_defaults(handler=utilization_handler) | |
189 | |
190 if(len(sys.argv) < 2): | |
191 parser.print_help() | |
192 sys.exit(0) | |
193 | |
194 args = parser.parse_args() | |
195 args.handler(args) |