Mercurial > hg > cc > azure
diff master/src/wecu/wecu.py~ @ 58:a3edba8dab11
move to right place in tree
author | Henry S. Thompson <ht@markup.co.uk> |
---|---|
date | Thu, 28 May 2020 09:56:42 +0000 |
parents | master/wecu/wecu.py~@ac1a20e627a9 |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/master/src/wecu/wecu.py~ Thu May 28 09:56:42 2020 +0000 @@ -0,0 +1,186 @@ +#!/usr/bin/python +import os +import sys +import argparse + +HOME=os.getenv('HOME') +DEFAULT_HOSTS='hosts' +DEFAULT_CORES='cores.txt' +DEFAULT_WD=os.getcwd() + +try: + with open('%s/.wecu'%HOME) as conf: + for l in conf: + if l[0]!='#': + eval(l.rstrip()) +except IOError as e: + raise e + pass + + +HOSTS_FILEPATH=os.getenv('WECU_HOSTS',DEFAULT_HOSTS) +CORES_FILEPATH=os.getenv('WECU_CORES',DEFAULT_CORES) +WORK_DIR=os.getenv('WECU_WD',DEFAULT_WD) + + +def setup_handler(args): + if args.check_files: + if os.path.exists(HOSTS_FILEPATH) and os.path.exists(CORES_FILEPATH): + print("Config ok!") + else: + print("WECU is not configured! Run `wecu setup`") + + return + + if args.check_nodes_up: + command = 'cat %s | '%HOSTS_FILEPATH + command += 'parallel --will-cite ' + command += '"nc -z {} 22 2> /dev/null; ' + command += 'if [ $? -eq 0 ]; ' + command += 'then ' + command += ' echo Node {} is OK; ' + command += 'else' + command += ' echo Node {} is down; ' + command += 'fi"' + + os.system(command) + return + + # Perform config + os.system('./setup.sh {}'.format(args.password)) + +def list_handler(args): + if args.object_to_list == 'machines': + os.system('cat %s'%HOSTS_FILEPATH) + print('') + elif args.object_to_list == 'input_files': + if args.all: + os.system('cat input_paths') + return + + print('Crawl name:') + os.system('cat crawl_name.txt') + print + print('Number of input files:') + os.system('wc -l < input_paths') + +def execute_handler(args): + command = "time parallel --onall " + command += "--sshloginfile %s "%HOSTS_FILEPATH + command += "--retries 3 " + + to_run_on_remote = "" + + if args.transfer_file: + for filename in args.transfer_file: + command += "--transferfile {} ".format(filename) + + to_run_on_remote += "chmod +x {}; ".format(filename) + + to_run_on_remote += args.command + + command += "--will-cite " + command += "--workdir %s "%WORK_DIR + command += "eval ::: '{}' 2>&1 | grep -v \"Authorized uses only\"\n".format(to_run_on_remote) + + os.system('bash -c "%s"'%command) + +def num_cores(args): + if args.jobs_per_worker is None: + with open(CORES_FILEPATH) as cf: + return cf.readline().rstrip() + else: + return args.jobs_per_worker + +def mapred_handler(args): + cores = num_cores(args) + run_mapred(args.mapper, args.reducer, cores) + +def sac_handler(args): + regex_str = 'false' + if args.regex: + regex_str = 'true' + + patterns_str = ' '.join(['"{}"'.format(x) for x in args.pattern]) + + cores_per_worker = num_cores(args) + + if args.by_file: + os.system('./run_sac.sh {} by-file {} {}'.format(cores_per_worker, regex_str, patterns_str)) + return + + + os.system('./run_sac.sh {} aggregate {} {}'.format(cores_per_worker, regex_str, patterns_str)) + +def generate_handler(args): + os.system('python3 generate_file_list.py') + +def utilization_handler(args): + from graph_hardware_usage import generate_hardware_graph + + duration_seconds = 120 + if args.seconds: + duration_seconds = args.seconds + + os.system('./get_hardware_util.sh {}'.format(duration_seconds)) + generate_hardware_graph(args.output_graph_filename) + +def run_mapred(mapper, reducer, cores): + + os.system('./run_mapreduce.sh {} {} {}'.format(cores_per_worker, mapper, reducer)) + +# Top-level parser +parser = argparse.ArgumentParser(description='Wee CommonCrawl Utility (WECU) is a CLI tool which allows running scan-and-count workloads on Common Crawl data without') +subparsers = parser.add_subparsers(help='A sub-command to be executed') + +# Cluster Setup parser +parser_setup = subparsers.add_parser('setup', help='Setup the framework to operate on an HDInsight cluster') +parser_setup.add_argument('password', type=str, help='Password to the cluster - used to setup passwordless communication') +parser_setup.add_argument('--check_files', action="store_true", help='Use this flag to check that all the required configuration files are in place') +parser_setup.add_argument('--check_nodes_up', action='store_true', help='Check if the worker machines are responsive') +parser_setup.set_defaults(handler=setup_handler) + +# Show cluster configuration parser +parser_list = subparsers.add_parser('list', help='List configuration') +parser_list.add_argument('object_to_list', type=str, choices=['machines', 'input_files'], help='Choose whether to list machines or (summary) of currently selected input files') +parser_list.add_argument('--all', action='store_true', help='Show a list of all input files instead of a summary (Can be used alongside input_files argument only)') +parser_list.set_defaults(handler=list_handler) + +# Remote command execution parser +execute_list = subparsers.add_parser('execute', help='Execute arbitrary command on all worker machines in the cluster in parallel') +execute_list.add_argument('command', type=str) +execute_list.add_argument('--transfer_file', nargs='+', help="Provide files which should be transferred to the remote workers before the executions starts.") +execute_list.set_defaults(handler=execute_handler) + +# MapReduce jobs parser +mapred_list = subparsers.add_parser('mapred', help="Execute mapreduce jobs using the provided mapper and reducer executable") +mapred_list.add_argument('mapper', type=str, help='Path to the map phase executable') +mapred_list.add_argument('reducer', type=str, help='Path to the reduce phase executable') +mapred_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.") + +mapred_list.set_defaults(handler=mapred_handler) + +# Scan-and-count parser +sac_list = subparsers.add_parser('sac', help='Execute scan-and-count (SAC) workloads directly from the command line') +sac_list.add_argument('pattern', type=str, nargs='+') +sac_list.add_argument('--regex', action="store_true", help="Provide this flag to indicate that the provided strings should be treated as regular expressions") +sac_list.add_argument('--by-file', action="store_true", help="Provide this flag to indicate that the output should not be aggregated and displayed per file instead") +sac_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.") +sac_list.set_defaults(handler=sac_handler) + +# Generate sample parser +generate_parser = subparsers.add_parser('generate-sample', help='Generate a sample of a chosen Common Crawl month') +generate_parser.set_defaults(handler=generate_handler) + +# Generate utilization graph parser +utilization_graph = subparsers.add_parser('utilisation', help='Generate CPU utilisation graph and files') +utilization_graph.add_argument('output_graph_filename', type=str, help='The path to the location of the output graph') +utilization_graph.add_argument('--seconds', type=int, help='Provide this flag to change how long the utilisation is measure for (the default is 120 seconds).') +utilization_graph.set_defaults(handler=utilization_handler) + +if(len(sys.argv) < 2): + parser.print_help() + sys.exit(0) + +args = parser.parse_args() +args.handler(args)