view master/src/wecu/wecu.py @ 61:cfaf5223b071

trying to get my own mapper working
author Henry S. Thompson <ht@markup.co.uk>
date Sun, 31 May 2020 12:06:44 +0000
parents 5fdca5baa4e9
children 892e1c0240e1
line wrap: on
line source

#!/usr/bin/python3
import os
import sys
import argparse

HOME=os.getenv('HOME')
DEFAULT_HOSTS='hosts'
DEFAULT_CORES='cores.txt'
DEFAULT_WD=os.getcwd()

try:
    with open('%s/.wecu'%HOME) as conf:
        for l in conf:
            if l[0]!='#':
                eval(l.rstrip())
except IOError as e:
    raise e
    pass


HOSTS_FILEPATH=os.getenv('WECU_HOSTS',DEFAULT_HOSTS)
CORES_FILEPATH=os.getenv('WECU_CORES',DEFAULT_CORES)
WORK_DIR=os.getenv('WECU_WD',DEFAULT_WD)


def setup_handler(args):
    if args.check_files:
        if os.path.exists(HOSTS_FILEPATH) and os.path.exists(CORES_FILEPATH):
            print("Config ok!")
        else:
            print("WECU is not configured! Run `wecu setup`")
        
        return
    
    if args.check_nodes_up:
        command  = 'cat %s | '%HOSTS_FILEPATH
        command += 'parallel --will-cite '
        command += '"nc -z {} 22 2> /dev/null; '
        command += 'if [ $? -eq 0 ]; '
        command += 'then '
        command += '    echo Node {} is OK; '
        command += 'else'
        command += '    echo Node {} is down; '
        command += 'fi"'

        os.system(command)
        return

    # Perform config
    os.system('setup.sh {}'.format(args.password))

def list_handler(args):
    if args.object_to_list == 'machines':
        os.system('cat %s'%HOSTS_FILEPATH)    
        print('')
    elif args.object_to_list == 'input_files':
        if args.all:
            os.system('cat input_paths')
            return

        print('Crawl name:')
        os.system('cat crawl_name.txt')
        print
        print('Number of input files:')
        os.system('wc -l < input_paths')

def execute_handler(args):
    command =  "time parallel --onall "
    command += "--sshloginfile %s "%HOSTS_FILEPATH
    command += "--retries 3 "
    
    to_run_on_remote = ""

    if args.transfer_file:
        for filename in args.transfer_file:
            command += "--transferfile {} ".format(filename)
        
            to_run_on_remote += "chmod +x {}; ".format(filename)
    
    to_run_on_remote += args.command

    command += "--will-cite "
    command += "--workdir %s "%WORK_DIR
    command += "eval ::: '{}' 2>&1 | grep -v \"Authorized uses only\"\n".format(to_run_on_remote)

    os.system('bash -c "%s"'%command)

def num_cores(args):
    if args.jobs_per_worker is None:
        with open(CORES_FILEPATH) as cf:
            return cf.readline().rstrip()
    else:
        return args.jobs_per_worker

def mapred_handler(args):
    cores = num_cores(args)
    run_mapred(args.mapper, args.reducer, cores)

def sac_handler(args):
    regex_str = 'false'
    if args.regex:
        regex_str = 'true'

    patterns_str = ' '.join(['"{}"'.format(x) for x in args.pattern])

    cores_per_worker = num_cores(args)

    os.system('run_sac.sh {} {} {} {} {} {} {} {}'.format(
        cores_per_worker,
        HOSTS_FILEPATH,
        WORK_DIR,
        ('sac_mapper.py' if args.mapper is None
         else args.mapper),
        ('' if args.filter is None
         else "-f '%s'"%args.filter),
        ('by-file' if args.by_file
         else 'aggregate'),
        regex_str,
        patterns_str))

def generate_handler(args):
    import generate_file_list
    generate_file_list.main()

def utilization_handler(args):
    from graph_hardware_usage import generate_hardware_graph

    duration_seconds = 120
    if args.seconds:
        duration_seconds = args.seconds
    
    os.system('./get_hardware_util.sh {}'.format(duration_seconds))
    generate_hardware_graph(args.output_graph_filename)

def run_mapred(mapper, reducer, cores):

    os.system('./run_mapreduce.sh {} {} {}'.format(cores_per_worker, mapper, reducer))

# Top-level parser
parser = argparse.ArgumentParser(description='Wee CommonCrawl Utility (WECU) is a CLI tool which allows running scan-and-count workloads on Common Crawl data without')
subparsers = parser.add_subparsers(help='A sub-command to be executed')

# Cluster Setup parser
parser_setup = subparsers.add_parser('setup', help='Setup the framework to operate on an HDInsight cluster')
parser_setup.add_argument('password', type=str, help='Password to the cluster - used to setup passwordless communication')
parser_setup.add_argument('--check_files', action="store_true", help='Use this flag to check that all the required configuration files are in place')
parser_setup.add_argument('--check_nodes_up', action='store_true', help='Check if the worker machines are responsive')
parser_setup.set_defaults(handler=setup_handler)

# Show cluster configuration parser
parser_list = subparsers.add_parser('list', help='List configuration')
parser_list.add_argument('object_to_list', type=str, choices=['machines', 'input_files'], help='Choose whether to list machines or (summary) of currently selected input files')
parser_list.add_argument('--all', action='store_true', help='Show a list of all input files instead of a summary (Can be used alongside input_files argument only)')
parser_list.set_defaults(handler=list_handler)

# Remote command execution parser
execute_list = subparsers.add_parser('execute', help='Execute arbitrary command on all worker machines in the cluster in parallel')
execute_list.add_argument('command', type=str)
execute_list.add_argument('--transfer_file', nargs='+', help="Provide files which should be transferred to the remote workers before the executions starts.")
execute_list.set_defaults(handler=execute_handler)

# MapReduce jobs parser
mapred_list = subparsers.add_parser('mapred', help="Execute mapreduce jobs using the provided mapper and reducer executable")
mapred_list.add_argument('mapper', type=str, help='Path to the map phase executable')
mapred_list.add_argument('reducer', type=str, help='Path to the reduce phase executable')
mapred_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.")

mapred_list.set_defaults(handler=mapred_handler)

# Scan-and-count parser
sac_list = subparsers.add_parser('sac', help='Execute scan-and-count (SAC) workloads directly from the command line')
sac_list.add_argument('pattern', type=str, nargs='+')
sac_list.add_argument('--regex', action="store_true", help="Provide this flag to indicate that the provided strings should be treated as regular expressions")
sac_list.add_argument('--by-file', action="store_true", help="Provide this flag to indicate that the output should not be aggregated and displayed per file instead")
sac_list.add_argument('--mapper', type=str, help="Supply a bespoke mapper for use in place of sac_mapper.py")
sac_list.add_argument('--filter', type=str, help="Supply a filter on the unzipped warc file ahead of the mapper")
sac_list.add_argument('--jobs-per-worker', type=int, help="By deafult the number of concurrent tasks is set to the number of available logical cores. Provide this flag to set a different number of concurrent tasks.")
sac_list.set_defaults(handler=sac_handler)

# Generate sample parser
generate_parser = subparsers.add_parser('generate-sample', help='Generate a sample of a chosen Common Crawl month')
generate_parser.set_defaults(handler=generate_handler)

# Generate utilization graph parser
utilization_graph = subparsers.add_parser('utilisation', help='Generate CPU utilisation graph and files')
utilization_graph.add_argument('output_graph_filename', type=str, help='The path to the location of the output graph')
utilization_graph.add_argument('--seconds', type=int, help='Provide this flag to change how long the utilisation is measure for (the default is 120 seconds).')
utilization_graph.set_defaults(handler=utilization_handler)

if(len(sys.argv) < 2):
    parser.print_help()
    sys.exit(0)

args = parser.parse_args()
args.handler(args)