diff master/src/wecu/wecu.py~ @ 58:a3edba8dab11

move to right place in tree
author Henry S. Thompson <ht@markup.co.uk>
date Thu, 28 May 2020 09:56:42 +0000
parents master/wecu/wecu.py~@ac1a20e627a9
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/master/src/wecu/wecu.py~	Thu May 28 09:56:42 2020 +0000
@@ -0,0 +1,186 @@
+#!/usr/bin/python
+import os
+import sys
+import argparse
+
+HOME=os.getenv('HOME')
+DEFAULT_HOSTS='hosts'
+DEFAULT_CORES='cores.txt'
+DEFAULT_WD=os.getcwd()
+
+try:
+    with open('%s/.wecu'%HOME) as conf:
+        for l in conf:
+            if l[0]!='#':
+                eval(l.rstrip())
+except IOError as e:
+    raise e
+    pass
+
+
+HOSTS_FILEPATH=os.getenv('WECU_HOSTS',DEFAULT_HOSTS)
+CORES_FILEPATH=os.getenv('WECU_CORES',DEFAULT_CORES)
+WORK_DIR=os.getenv('WECU_WD',DEFAULT_WD)
+
+
+def setup_handler(args):
+    if args.check_files:
+        if os.path.exists(HOSTS_FILEPATH) and os.path.exists(CORES_FILEPATH):
+            print("Config ok!")
+        else:
+            print("WECU is not configured! Run `wecu setup`")
+        
+        return
+    
+    if args.check_nodes_up:
+        command  = 'cat %s | '%HOSTS_FILEPATH
+        command += 'parallel --will-cite '
+        command += '"nc -z {} 22 2> /dev/null; '
+        command += 'if [ $? -eq 0 ]; '
+        command += 'then '
+        command += '    echo Node {} is OK; '
+        command += 'else'
+        command += '    echo Node {} is down; '
+        command += 'fi"'
+
+        os.system(command)
+        return
+
+    # Perform config
+    os.system('./setup.sh {}'.format(args.password))
+
+def list_handler(args):
+    if args.object_to_list == 'machines':
+        os.system('cat %s'%HOSTS_FILEPATH)    
+        print('')
+    elif args.object_to_list == 'input_files':
+        if args.all:
+            os.system('cat input_paths')
+            return
+
+        print('Crawl name:')
+        os.system('cat crawl_name.txt')
+        print
+        print('Number of input files:')
+        os.system('wc -l < input_paths')
+
+def execute_handler(args):
+    command =  "time parallel --onall "
+    command += "--sshloginfile %s "%HOSTS_FILEPATH
+    command += "--retries 3 "
+    
+    to_run_on_remote = ""
+
+    if args.transfer_file:
+        for filename in args.transfer_file:
+            command += "--transferfile {} ".format(filename)
+        
+            to_run_on_remote += "chmod +x {}; ".format(filename)
+    
+    to_run_on_remote += args.command
+
+    command += "--will-cite "
+    command += "--workdir %s "%WORK_DIR
+    command += "eval ::: '{}' 2>&1 | grep -v \"Authorized uses only\"\n".format(to_run_on_remote)
+
+    os.system('bash -c "%s"'%command)
+
+def num_cores(args):
+    if args.jobs_per_worker is None:
+        with open(CORES_FILEPATH) as cf:
+            return cf.readline().rstrip()
+    else:
+        return args.jobs_per_worker
+
+def mapred_handler(args):
+    cores = num_cores(args)
+    run_mapred(args.mapper, args.reducer, cores)
+
+def sac_handler(args):
+    regex_str = 'false'
+    if args.regex:
+        regex_str = 'true'
+
+    patterns_str = ' '.join(['"{}"'.format(x) for x in args.pattern])
+
+    cores_per_worker = num_cores(args)
+
+    if args.by_file:
+        os.system('./run_sac.sh {} by-file {} {}'.format(cores_per_worker, regex_str, patterns_str))
+        return
+
+
+    os.system('./run_sac.sh {} aggregate {} {}'.format(cores_per_worker, regex_str, patterns_str))
+
+def generate_handler(args):
+    os.system('python3 generate_file_list.py')
+
+def utilization_handler(args):
+    from graph_hardware_usage import generate_hardware_graph
+
+    duration_seconds = 120
+    if args.seconds:
+        duration_seconds = args.seconds
+    
+    os.system('./get_hardware_util.sh {}'.format(duration_seconds))
+    generate_hardware_graph(args.output_graph_filename)
+
+def run_mapred(mapper, reducer, cores):
+
+    os.system('./run_mapreduce.sh {} {} {}'.format(cores_per_worker, mapper, reducer))
+
+# Top-level parser
+parser = argparse.ArgumentParser(description='Wee CommonCrawl Utility (WECU) is a CLI tool which allows running scan-and-count workloads on Common Crawl data without')
+subparsers = parser.add_subparsers(help='A sub-command to be executed')
+
+# Cluster Setup parser
+parser_setup = subparsers.add_parser('setup', help='Setup the framework to operate on an HDInsight cluster')
+parser_setup.add_argument('password', type=str, help='Password to the cluster - used to setup passwordless communication')
+parser_setup.add_argument('--check_files', action="store_true", help='Use this flag to check that all the required configuration files are in place')
+parser_setup.add_argument('--check_nodes_up', action='store_true', help='Check if the worker machines are responsive')
+parser_setup.set_defaults(handler=setup_handler)
+
+# Show cluster configuration parser
+parser_list = subparsers.add_parser('list', help='List configuration')
+parser_list.add_argument('object_to_list', type=str, choices=['machines', 'input_files'], help='Choose whether to list machines or (summary) of currently selected input files')
+parser_list.add_argument('--all', action='store_true', help='Show a list of all input files instead of a summary (Can be used alongside input_files argument only)')
+parser_list.set_defaults(handler=list_handler)
+
+# Remote command execution parser
+execute_list = subparsers.add_parser('execute', help='Execute arbitrary command on all worker machines in the cluster in parallel')
+execute_list.add_argument('command', type=str)
+execute_list.add_argument('--transfer_file', nargs='+', help="Provide files which should be transferred to the remote workers before the executions starts.")
+execute_list.set_defaults(handler=execute_handler)
+
+# MapReduce jobs parser
+mapred_list = subparsers.add_parser('mapred', help="Execute mapreduce jobs using the provided mapper and reducer executable")
+mapred_list.add_argument('mapper', type=str, help='Path to the map phase executable')
+mapred_list.add_argument('reducer', type=str, help='Path to the reduce phase executable')
+mapred_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.")
+
+mapred_list.set_defaults(handler=mapred_handler)
+
+# Scan-and-count parser
+sac_list = subparsers.add_parser('sac', help='Execute scan-and-count (SAC) workloads directly from the command line')
+sac_list.add_argument('pattern', type=str, nargs='+')
+sac_list.add_argument('--regex', action="store_true", help="Provide this flag to indicate that the provided strings should be treated as regular expressions")
+sac_list.add_argument('--by-file', action="store_true", help="Provide this flag to indicate that the output should not be aggregated and displayed per file instead")
+sac_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.")
+sac_list.set_defaults(handler=sac_handler)
+
+# Generate sample parser
+generate_parser = subparsers.add_parser('generate-sample', help='Generate a sample of a chosen Common Crawl month')
+generate_parser.set_defaults(handler=generate_handler)
+
+# Generate utilization graph parser
+utilization_graph = subparsers.add_parser('utilisation', help='Generate CPU utilisation graph and files')
+utilization_graph.add_argument('output_graph_filename', type=str, help='The path to the location of the output graph')
+utilization_graph.add_argument('--seconds', type=int, help='Provide this flag to change how long the utilisation is measure for (the default is 120 seconds).')
+utilization_graph.set_defaults(handler=utilization_handler)
+
+if(len(sys.argv) < 2):
+    parser.print_help()
+    sys.exit(0)
+
+args = parser.parse_args()
+args.handler(args)