comparison master/src/wecu/wecu.py @ 58:a3edba8dab11

move to right place in tree
author Henry S. Thompson <ht@markup.co.uk>
date Thu, 28 May 2020 09:56:42 +0000
parents master/wecu/wecu.py@ac1a20e627a9
children 5fdca5baa4e9
comparison
equal deleted inserted replaced
57:ac1a20e627a9 58:a3edba8dab11
1 #!/usr/bin/python3
2 import os
3 import sys
4 import argparse
5
6 HOME=os.getenv('HOME')
7 DEFAULT_HOSTS='hosts'
8 DEFAULT_CORES='cores.txt'
9 DEFAULT_WD=os.getcwd()
10
11 try:
12 with open('%s/.wecu'%HOME) as conf:
13 for l in conf:
14 if l[0]!='#':
15 eval(l.rstrip())
16 except IOError as e:
17 raise e
18 pass
19
20
21 HOSTS_FILEPATH=os.getenv('WECU_HOSTS',DEFAULT_HOSTS)
22 CORES_FILEPATH=os.getenv('WECU_CORES',DEFAULT_CORES)
23 WORK_DIR=os.getenv('WECU_WD',DEFAULT_WD)
24
25
26 def setup_handler(args):
27 if args.check_files:
28 if os.path.exists(HOSTS_FILEPATH) and os.path.exists(CORES_FILEPATH):
29 print("Config ok!")
30 else:
31 print("WECU is not configured! Run `wecu setup`")
32
33 return
34
35 if args.check_nodes_up:
36 command = 'cat %s | '%HOSTS_FILEPATH
37 command += 'parallel --will-cite '
38 command += '"nc -z {} 22 2> /dev/null; '
39 command += 'if [ $? -eq 0 ]; '
40 command += 'then '
41 command += ' echo Node {} is OK; '
42 command += 'else'
43 command += ' echo Node {} is down; '
44 command += 'fi"'
45
46 os.system(command)
47 return
48
49 # Perform config
50 os.system('setup.sh {}'.format(args.password))
51
52 def list_handler(args):
53 if args.object_to_list == 'machines':
54 os.system('cat %s'%HOSTS_FILEPATH)
55 print('')
56 elif args.object_to_list == 'input_files':
57 if args.all:
58 os.system('cat input_paths')
59 return
60
61 print('Crawl name:')
62 os.system('cat crawl_name.txt')
63 print
64 print('Number of input files:')
65 os.system('wc -l < input_paths')
66
67 def execute_handler(args):
68 command = "time parallel --onall "
69 command += "--sshloginfile %s "%HOSTS_FILEPATH
70 command += "--retries 3 "
71
72 to_run_on_remote = ""
73
74 if args.transfer_file:
75 for filename in args.transfer_file:
76 command += "--transferfile {} ".format(filename)
77
78 to_run_on_remote += "chmod +x {}; ".format(filename)
79
80 to_run_on_remote += args.command
81
82 command += "--will-cite "
83 command += "--workdir %s "%WORK_DIR
84 command += "eval ::: '{}' 2>&1 | grep -v \"Authorized uses only\"\n".format(to_run_on_remote)
85
86 os.system('bash -c "%s"'%command)
87
88 def num_cores(args):
89 if args.jobs_per_worker is None:
90 with open(CORES_FILEPATH) as cf:
91 return cf.readline().rstrip()
92 else:
93 return args.jobs_per_worker
94
95 def mapred_handler(args):
96 cores = num_cores(args)
97 run_mapred(args.mapper, args.reducer, cores)
98
99 def sac_handler(args):
100 regex_str = 'false'
101 if args.regex:
102 regex_str = 'true'
103
104 patterns_str = ' '.join(['"{}"'.format(x) for x in args.pattern])
105
106 cores_per_worker = num_cores(args)
107
108 if args.by_file:
109 os.system('run_sac.sh {} {} {} by-file {} {}'.format(cores_per_worker,
110 HOSTS_FILEPATH,
111 WORK_DIR,
112 regex_str,
113 patterns_str))
114 return
115
116
117 os.system('run_sac.sh {} {} {} aggregate {} {}'.format(cores_per_worker,
118 HOSTS_FILEPATH,
119 WORK_DIR,
120 regex_str,
121 patterns_str))
122
123 def generate_handler(args):
124 import generate_file_list
125 generate_file_list.main()
126
127 def utilization_handler(args):
128 from graph_hardware_usage import generate_hardware_graph
129
130 duration_seconds = 120
131 if args.seconds:
132 duration_seconds = args.seconds
133
134 os.system('./get_hardware_util.sh {}'.format(duration_seconds))
135 generate_hardware_graph(args.output_graph_filename)
136
137 def run_mapred(mapper, reducer, cores):
138
139 os.system('./run_mapreduce.sh {} {} {}'.format(cores_per_worker, mapper, reducer))
140
141 # Top-level parser
142 parser = argparse.ArgumentParser(description='Wee CommonCrawl Utility (WECU) is a CLI tool which allows running scan-and-count workloads on Common Crawl data without')
143 subparsers = parser.add_subparsers(help='A sub-command to be executed')
144
145 # Cluster Setup parser
146 parser_setup = subparsers.add_parser('setup', help='Setup the framework to operate on an HDInsight cluster')
147 parser_setup.add_argument('password', type=str, help='Password to the cluster - used to setup passwordless communication')
148 parser_setup.add_argument('--check_files', action="store_true", help='Use this flag to check that all the required configuration files are in place')
149 parser_setup.add_argument('--check_nodes_up', action='store_true', help='Check if the worker machines are responsive')
150 parser_setup.set_defaults(handler=setup_handler)
151
152 # Show cluster configuration parser
153 parser_list = subparsers.add_parser('list', help='List configuration')
154 parser_list.add_argument('object_to_list', type=str, choices=['machines', 'input_files'], help='Choose whether to list machines or (summary) of currently selected input files')
155 parser_list.add_argument('--all', action='store_true', help='Show a list of all input files instead of a summary (Can be used alongside input_files argument only)')
156 parser_list.set_defaults(handler=list_handler)
157
158 # Remote command execution parser
159 execute_list = subparsers.add_parser('execute', help='Execute arbitrary command on all worker machines in the cluster in parallel')
160 execute_list.add_argument('command', type=str)
161 execute_list.add_argument('--transfer_file', nargs='+', help="Provide files which should be transferred to the remote workers before the executions starts.")
162 execute_list.set_defaults(handler=execute_handler)
163
164 # MapReduce jobs parser
165 mapred_list = subparsers.add_parser('mapred', help="Execute mapreduce jobs using the provided mapper and reducer executable")
166 mapred_list.add_argument('mapper', type=str, help='Path to the map phase executable')
167 mapred_list.add_argument('reducer', type=str, help='Path to the reduce phase executable')
168 mapred_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.")
169
170 mapred_list.set_defaults(handler=mapred_handler)
171
172 # Scan-and-count parser
173 sac_list = subparsers.add_parser('sac', help='Execute scan-and-count (SAC) workloads directly from the command line')
174 sac_list.add_argument('pattern', type=str, nargs='+')
175 sac_list.add_argument('--regex', action="store_true", help="Provide this flag to indicate that the provided strings should be treated as regular expressions")
176 sac_list.add_argument('--by-file', action="store_true", help="Provide this flag to indicate that the output should not be aggregated and displayed per file instead")
177 sac_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.")
178 sac_list.set_defaults(handler=sac_handler)
179
180 # Generate sample parser
181 generate_parser = subparsers.add_parser('generate-sample', help='Generate a sample of a chosen Common Crawl month')
182 generate_parser.set_defaults(handler=generate_handler)
183
184 # Generate utilization graph parser
185 utilization_graph = subparsers.add_parser('utilisation', help='Generate CPU utilisation graph and files')
186 utilization_graph.add_argument('output_graph_filename', type=str, help='The path to the location of the output graph')
187 utilization_graph.add_argument('--seconds', type=int, help='Provide this flag to change how long the utilisation is measure for (the default is 120 seconds).')
188 utilization_graph.set_defaults(handler=utilization_handler)
189
190 if(len(sys.argv) < 2):
191 parser.print_help()
192 sys.exit(0)
193
194 args = parser.parse_args()
195 args.handler(args)