Mercurial > hg > cc > azure
view master/src/wecu/wecu.py @ 62:892e1c0240e1
added more robust (I hope) error handling,
got reducer working with support for choosing dict or tsv output
author | Henry S. Thompson <ht@markup.co.uk> |
---|---|
date | Tue, 02 Jun 2020 17:35:07 +0000 |
parents | cfaf5223b071 |
children | d46c8b12fc04 |
line wrap: on
line source
#!/usr/bin/python3 import os import sys import argparse HOME=os.getenv('HOME') DEFAULT_HOSTS='hosts' DEFAULT_CORES='cores.txt' DEFAULT_WD=os.getcwd() try: with open('%s/.wecu'%HOME) as conf: for l in conf: if l[0]!='#': eval(l.rstrip()) except IOError as e: raise e pass HOSTS_FILEPATH=os.getenv('WECU_HOSTS',DEFAULT_HOSTS) CORES_FILEPATH=os.getenv('WECU_CORES',DEFAULT_CORES) WORK_DIR=os.getenv('WECU_WD',DEFAULT_WD) def setup_handler(args): if args.check_files: if os.path.exists(HOSTS_FILEPATH) and os.path.exists(CORES_FILEPATH): print("Config ok!") else: print("WECU is not configured! Run `wecu setup`") return if args.check_nodes_up: command = 'cat %s | '%HOSTS_FILEPATH command += 'parallel --will-cite ' command += '"nc -z {} 22 2> /dev/null; ' command += 'if [ $? -eq 0 ]; ' command += 'then ' command += ' echo Node {} is OK; ' command += 'else' command += ' echo Node {} is down; ' command += 'fi"' os.system(command) return # Perform config os.system('setup.sh {}'.format(args.password)) def list_handler(args): if args.object_to_list == 'machines': os.system('cat %s'%HOSTS_FILEPATH) print('') elif args.object_to_list == 'input_files': if args.all: os.system('cat input_paths') return print('Crawl name:') os.system('cat crawl_name.txt') print print('Number of input files:') os.system('wc -l < input_paths') def execute_handler(args): command = "time parallel --onall " command += "--sshloginfile %s "%HOSTS_FILEPATH command += "--retries 3 " to_run_on_remote = "" if args.transfer_file: for filename in args.transfer_file: command += "--transferfile {} ".format(filename) to_run_on_remote += "chmod +x {}; ".format(filename) to_run_on_remote += args.command command += "--will-cite " command += "--workdir %s "%WORK_DIR command += "eval ::: '{}' 2>&1 | grep -v \"Authorized uses only\"\n".format(to_run_on_remote) os.system('bash -c "%s"'%command) def num_cores(args): if args.jobs_per_worker is None: with open(CORES_FILEPATH) as cf: return cf.readline().rstrip() else: return args.jobs_per_worker def mapred_handler(args): cores = num_cores(args) run_mapred(args.mapper, args.reducer, cores) def sac_handler(args): regex_str = 'false' if args.regex: regex_str = 'true' patterns_str = ' '.join(['"{}"'.format(x) for x in args.pattern]) cores_per_worker = num_cores(args) os.system('run_sac.sh {} {} {} {} {} {} {} {} {}'.format( cores_per_worker, HOSTS_FILEPATH, WORK_DIR, ('sac_mapper.py' if args.mapper is None else args.mapper), ('' if args.filter is None else "-f '%s'"%args.filter), ('' if args.numKeys is None else "-k %s"%args.numKeys), ('by-file' if args.by_file else 'dict' if args.dict else 'aggregate'), regex_str, patterns_str)) def generate_handler(args): import generate_file_list generate_file_list.main() def utilization_handler(args): from graph_hardware_usage import generate_hardware_graph duration_seconds = 120 if args.seconds: duration_seconds = args.seconds os.system('./get_hardware_util.sh {}'.format(duration_seconds)) generate_hardware_graph(args.output_graph_filename) def run_mapred(mapper, reducer, cores): os.system('./run_mapreduce.sh {} {} {}'.format(cores_per_worker, mapper, reducer)) # Top-level parser parser = argparse.ArgumentParser(description='Wee CommonCrawl Utility (WECU) is a CLI tool which allows running scan-and-count workloads on Common Crawl data without') subparsers = parser.add_subparsers(help='A sub-command to be executed') # Cluster Setup parser parser_setup = subparsers.add_parser('setup', help='Setup the framework to operate on an HDInsight cluster') parser_setup.add_argument('password', type=str, help='Password to the cluster - used to setup passwordless communication') parser_setup.add_argument('--check_files', action="store_true", help='Use this flag to check that all the required configuration files are in place') parser_setup.add_argument('--check_nodes_up', action='store_true', help='Check if the worker machines are responsive') parser_setup.set_defaults(handler=setup_handler) # Show cluster configuration parser parser_list = subparsers.add_parser('list', help='List configuration') parser_list.add_argument('object_to_list', type=str, choices=['machines', 'input_files'], help='Choose whether to list machines or (summary) of currently selected input files') parser_list.add_argument('--all', action='store_true', help='Show a list of all input files instead of a summary (Can be used alongside input_files argument only)') parser_list.set_defaults(handler=list_handler) # Remote command execution parser execute_list = subparsers.add_parser('execute', help='Execute arbitrary command on all worker machines in the cluster in parallel') execute_list.add_argument('command', type=str) execute_list.add_argument('--transfer_file', nargs='+', help="Provide files which should be transferred to the remote workers before the executions starts.") execute_list.set_defaults(handler=execute_handler) # MapReduce jobs parser mapred_list = subparsers.add_parser('mapred', help="Execute mapreduce jobs using the provided mapper and reducer executable") mapred_list.add_argument('mapper', type=str, help='Path to the map phase executable') mapred_list.add_argument('reducer', type=str, help='Path to the reduce phase executable') mapred_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.") mapred_list.set_defaults(handler=mapred_handler) # Scan-and-count parser sac_list = subparsers.add_parser('sac', help='Execute scan-and-count (SAC) workloads directly from the command line') sac_list.add_argument('pattern', type=str, nargs='+') sac_list.add_argument('--regex', action="store_true", help="Provide this flag to indicate that the provided strings should be treated as regular expressions") sac_list.add_argument('--by-file', action="store_true", help="Provide this flag to indicate that the output should not be aggregated and displayed per file instead") sac_list.add_argument('--dict', action="store_true", help="Provide this flag to indicate that the output should aggregated and displayed in the form 'res={dict}'") sac_list.add_argument('--mapper', type=str, help="Supply a bespoke mapper for use in place of sac_mapper.py") sac_list.add_argument('--filter', type=str, help="Supply a filter on the unzipped warc file ahead of the mapper") sac_list.add_argument('--numKeys', type=int, help="Depth of key list, default 1") sac_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.") sac_list.set_defaults(handler=sac_handler) # Generate sample parser generate_parser = subparsers.add_parser('generate-sample', help='Generate a sample of a chosen Common Crawl month') generate_parser.set_defaults(handler=generate_handler) # Generate utilization graph parser utilization_graph = subparsers.add_parser('utilisation', help='Generate CPU utilisation graph and files') utilization_graph.add_argument('output_graph_filename', type=str, help='The path to the location of the output graph') utilization_graph.add_argument('--seconds', type=int, help='Provide this flag to change how long the utilisation is measure for (the default is 120 seconds).') utilization_graph.set_defaults(handler=utilization_handler) if(len(sys.argv) < 2): parser.print_help() sys.exit(0) args = parser.parse_args() args.handler(args)