From 34527a5a5904571ad1222526a04ba7203499e8fc Mon Sep 17 00:00:00 2001 From: zclok010 Date: Thu, 1 Aug 2019 20:12:50 +0800 Subject: [PATCH 1/3] Add Linux node manager execution filter samples --- .../README.md | 101 +++++++++ .../filters/AddDataIoCommand.py | 20 ++ .../filters/AdjustMpiCommand.py | 23 +++ .../filters/AdjustTaskAffinity.py | 32 +++ .../filters/OnJobEnd.sh | 1 + .../filters/OnJobTaskStart.sh | 20 ++ .../filters/OnTaskStart.sh | 20 ++ .../filters/ResolveUserName.py | 50 +++++ .../filters/ResolveUserNameAndDoMount.py | 191 ++++++++++++++++++ 9 files changed, 458 insertions(+) create mode 100644 Linux node manager execution filters/README.md create mode 100644 Linux node manager execution filters/filters/AddDataIoCommand.py create mode 100644 Linux node manager execution filters/filters/AdjustMpiCommand.py create mode 100644 Linux node manager execution filters/filters/AdjustTaskAffinity.py create mode 100644 Linux node manager execution filters/filters/OnJobEnd.sh create mode 100644 Linux node manager execution filters/filters/OnJobTaskStart.sh create mode 100644 Linux node manager execution filters/filters/OnTaskStart.sh create mode 100644 Linux node manager execution filters/filters/ResolveUserName.py create mode 100644 Linux node manager execution filters/filters/ResolveUserNameAndDoMount.py diff --git a/Linux node manager execution filters/README.md b/Linux node manager execution filters/README.md new file mode 100644 index 0000000..cf9f338 --- /dev/null +++ b/Linux node manager execution filters/README.md @@ -0,0 +1,101 @@ +# Microsoft HPC Pack 2016 Linux Node Manager Execution Filter Sample + +Execution filter on Linux compute nodes allows cluster admin to plugin customized scripts to be executed (under root) on Linux compute node during different stage of job/task execution. + +## Usage + +Download the `filters` directory to `/opt/hpcnodemanager/` on each Linux compute node, add execution permission to the scripts and modify them on demand. + +There are three execution filter scripts as entry point, which read json format input from stdin, modify it and write it to stdout. + +1. `OnJobTaskStart.sh` is called when a new job (or a task) is dispatched from scheduler to current Linux compute node. + + The json format stdin is like: + + ```json + { + "m_Item1": { + "JobId": number, + "ResIds": array of number, + "TaskId": number + }, + "m_Item2": { + "affinity": array of number, + "commandLine": string, + "environmentVariables": object, + "inputFiles": string, + "outputFiles": string, + "role": number, + "stderr": string, + "stdin": string, + "stdout": string, + "taskRequeueCount": number, + "workingDirectory": string + }, + "m_Item3": username as string, + "m_Item4": password as string, + "m_Item5": SSH private key as string, + "m_Item6": SSH public key as string + } + ``` + +2. `OnTaskStart.sh` is called when a new task is dispatched from scheduler to current Linux compute node. + + The json format stdin is like: + + ```json + { + "m_Item1": { + "JobId": number, + "ResIds": array of number, + "TaskId": number + }, + "m_Item2": { + "affinity": array of number, + "commandLine": string, + "environmentVariables": object, + "inputFiles": string, + "outputFiles": string, + "role": number, + "stderr": string, + "stdin": string, + "stdout": string, + "taskRequeueCount": number, + "workingDirectory": string + }, + } + ``` + +3. `OnJobEnd.sh` is called when a job ends. + + The json format stdin is like: + + ```json + { + "JobId": number, + "JobInfo": object, + "TaskId": number + } + ``` + +## Samples Introduction + +1. ResolveUserName.py + + This execution filter is a sample script to compose an Active Directory user name. For an Active Directory integrated Linux environment, it is necessary to compose the RunAs user with different settings, such as: 'winbind seperator' set in /etc/samba/smb.conf for Winbind or 're_expression' set in /etc/sssd/sssd.conf for SSSD to ensure HPC jobs are run as the correct user. + +2. ResolveUserNameAndDoMount.py + + This execution filter is a sample script to compose an Active Directory user name, and mount an SMB share if the user is not an HPC administrator. It reuses `ResolveUserName.py` to compose the Active Directory user name. + +3. AddDataIoCommand.py + + This execution filter is to modify command for supporting downloading input files and uploading output files of task with HpcData service. + +4. AdjustMpiCommand.py + + This execution filter is to modify command for preparation of mpi task. + +5. AdjustTaskAffinity.py + + This execution filter is to adjust task affinity in terms of core distribution in NUMA nodes. diff --git a/Linux node manager execution filters/filters/AddDataIoCommand.py b/Linux node manager execution filters/filters/AddDataIoCommand.py new file mode 100644 index 0000000..fe1adb3 --- /dev/null +++ b/Linux node manager execution filters/filters/AddDataIoCommand.py @@ -0,0 +1,20 @@ +# python2.7, python3 + +import sys, json + +j = json.load(sys.stdin) + +HpcDataClient = '/opt/HpcData/HpcDataClient.exe' +commandLine = j['m_Item2'].get('commandLine') +if commandLine: + inputFiles = j['m_Item2'].get('inputFiles') + outputFiles = j['m_Item2'].get('outputFiles') + if outputFiles and not outputFiles.isspace(): + commandUpload = '{} upload /source:. /dest:{} /overwrite'.format(HpcDataClient, outputFiles) + commandLine = '({}); ec=$? && {} || exit 192 && exit $ec'.format(commandLine, commandUpload) + if inputFiles and not inputFiles.isspace(): + commandDownload = '{} download /source:{} /dest:. /overwrite'.format(HpcDataClient, inputFiles) + commandLine = '{} || exit 191 && {}'.format(commandDownload, commandLine) + j['m_Item2']['commandLine'] = commandLine + +print(json.dumps(j)) \ No newline at end of file diff --git a/Linux node manager execution filters/filters/AdjustMpiCommand.py b/Linux node manager execution filters/filters/AdjustMpiCommand.py new file mode 100644 index 0000000..cdb77dc --- /dev/null +++ b/Linux node manager execution filters/filters/AdjustMpiCommand.py @@ -0,0 +1,23 @@ +# python2.7, python3 + +import sys, json + +j = json.load(sys.stdin) + +commandLine = j['m_Item2'].get('commandLine') +mpiSource = j['m_Item2']['environmentVariables'].get('CCP_MPI_SOURCE') +if commandLine and mpiSource: + if mpiSource.endswith('/mpirun') or mpiSource.endswith('/mpiexec'): + mpiCommand = '/'.join(mpiSource.split('/')[:-1]) + '/mpiexec' + elif mpiSource.endswith('/'): + mpiCommand = '{}mpiexec'.format(mpiSource) + elif mpiSource.endswith('/mpivars.sh'): + mpiCommand = 'source {}; mpiexec'.format(mpiSource) + else: + mpiCommand = '{}/mpiexec'.format(mpiSource) + mpiCommand += ' ' + if 'CCP_MPI_HOSTFILE_FORMAT' in j['m_Item2']['environmentVariables']: + mpiCommand += '-machinefile $CCP_MPI_HOSTFILE ' + j['m_Item2']['commandLine'] = commandLine.replace('mpiexec ', mpiCommand).replace('mpirun ', mpiCommand) + +print(json.dumps(j)) \ No newline at end of file diff --git a/Linux node manager execution filters/filters/AdjustTaskAffinity.py b/Linux node manager execution filters/filters/AdjustTaskAffinity.py new file mode 100644 index 0000000..2aac758 --- /dev/null +++ b/Linux node manager execution filters/filters/AdjustTaskAffinity.py @@ -0,0 +1,32 @@ +# python2.7, python3 + +import sys, json, subprocess, math + +j = json.load(sys.stdin) + +numaInfo = subprocess.check_output('lscpu | grep NUMA', shell = True) +numaCoreId = [] +for line in numaInfo.splitlines(): + if line.startswith('NUMA') and 'CPU(s):' in line: + coreIds = line.split()[-1].split(',') + for coreId in coreIds: + if '-' in coreId: + beginEnd = map(int, coreId.split('-')) + numaCoreId += list(range(beginEnd[0], beginEnd[1] + 1)) + else: + numaCoreId.append(int(coreId)) + +AFFINITY_BITS = 64 +if numaCoreId: + affinity = j['m_Item2'].get('affinity') # This is an array of signed 64 bit number, which will be converted to bit array format for adjusting core id. + if affinity: + affinityList = [bit for int64 in affinity for bit in list('{:064b}'.format((2 ** AFFINITY_BITS - 1) & int64))[::-1]] + mappedCoreIds = set([numaCoreId[coreId] for coreId in range(len(affinityList)) if affinityList[coreId] == '1']) + mappedAffinityList = ['1' if coreId in mappedCoreIds else '0' for coreId in range(int(math.ceil(float(len(numaCoreId)) / AFFINITY_BITS) * AFFINITY_BITS))] + j['m_Item2']['affinity'] = [int(''.join(mappedAffinityList[i * AFFINITY_BITS : (i + 1) * AFFINITY_BITS - 1][::-1]), 2) - int(mappedAffinityList[(i + 1) * AFFINITY_BITS - 1]) * 2 ** (AFFINITY_BITS - 1) for i in range(len(mappedAffinityList) // AFFINITY_BITS)] + + ccpCoreIds = j['m_Item2']['environmentVariables'].get('CCP_COREIDS') + if ccpCoreIds: + j['m_Item2']['environmentVariables']['CCP_COREIDS'] = ' '.join([str(numaCoreId[originalCoreId]) for originalCoreId in map(int, ccpCoreIds.split())]) + +print(json.dumps(j)) \ No newline at end of file diff --git a/Linux node manager execution filters/filters/OnJobEnd.sh b/Linux node manager execution filters/filters/OnJobEnd.sh new file mode 100644 index 0000000..5d5b306 --- /dev/null +++ b/Linux node manager execution filters/filters/OnJobEnd.sh @@ -0,0 +1 @@ +cat \ No newline at end of file diff --git a/Linux node manager execution filters/filters/OnJobTaskStart.sh b/Linux node manager execution filters/filters/OnJobTaskStart.sh new file mode 100644 index 0000000..e0fac6e --- /dev/null +++ b/Linux node manager execution filters/filters/OnJobTaskStart.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +input=`cat` +job_id=`echo $input | grep -o '"JobId":[[:digit:]]*' | awk -F: '{print $NF}'` +task_id=`echo $input | grep -o '"TaskId":[[:digit:]]*' | awk -F: '{print $NF}'` +requeue_count=`echo $input | grep -o '"taskRequeueCount":[[:digit:]]*' | awk -F: '{print $NF}'` +log_dir=/opt/hpcnodemanager/filters/ERROR_LOG +mkdir -p $log_dir +log_prefix=$log_dir/$job_id.$task_id.$requeue_count +log_input=$log_prefix.input +log_error=$log_prefix.error + +echo $input | (\ +python /opt/hpcnodemanager/filters/AdjustTaskAffinity.py | \ +python /opt/hpcnodemanager/filters/AdjustMpiCommand.py | \ +python /opt/hpcnodemanager/filters/AddDataIoCommand.py \ +) 2>$log_error + +error_code=$? +[ ! -s $log_error ] && [ "$error_code" -eq "0" ] && rm -f $log_error || (echo $input >$log_input && exit $error_code) \ No newline at end of file diff --git a/Linux node manager execution filters/filters/OnTaskStart.sh b/Linux node manager execution filters/filters/OnTaskStart.sh new file mode 100644 index 0000000..e0fac6e --- /dev/null +++ b/Linux node manager execution filters/filters/OnTaskStart.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +input=`cat` +job_id=`echo $input | grep -o '"JobId":[[:digit:]]*' | awk -F: '{print $NF}'` +task_id=`echo $input | grep -o '"TaskId":[[:digit:]]*' | awk -F: '{print $NF}'` +requeue_count=`echo $input | grep -o '"taskRequeueCount":[[:digit:]]*' | awk -F: '{print $NF}'` +log_dir=/opt/hpcnodemanager/filters/ERROR_LOG +mkdir -p $log_dir +log_prefix=$log_dir/$job_id.$task_id.$requeue_count +log_input=$log_prefix.input +log_error=$log_prefix.error + +echo $input | (\ +python /opt/hpcnodemanager/filters/AdjustTaskAffinity.py | \ +python /opt/hpcnodemanager/filters/AdjustMpiCommand.py | \ +python /opt/hpcnodemanager/filters/AddDataIoCommand.py \ +) 2>$log_error + +error_code=$? +[ ! -s $log_error ] && [ "$error_code" -eq "0" ] && rm -f $log_error || (echo $input >$log_input && exit $error_code) \ No newline at end of file diff --git a/Linux node manager execution filters/filters/ResolveUserName.py b/Linux node manager execution filters/filters/ResolveUserName.py new file mode 100644 index 0000000..43bef65 --- /dev/null +++ b/Linux node manager execution filters/filters/ResolveUserName.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +# Hpc Execution Filter Sample - Compose Customized Active Directory User Name +# Introduction: +# When it's in an Active Directly integrated Linux environment, +# it's necessary to compose right RunAs user with different settings, +# such as: 'winbind seperator' set in /etc/samba/smb.conf for Winbind +# or 're_expression' set in /etc/sssd/sssd.conf for SSSD. +# to ensure right user is used when HPC run jobs. +# +# In this case, we compose RunAs user, for example: +# composedUserName = "{0}.{1}".format(domainName, userName) when Winbind Seperator set to . delimiter +# or In SSSD, when set re_expression = ((?P.+)\.(?P[^\\\.@]+$)) +# +# Return codes: +# 0 success +# 1 incorrect invocation + +import json +import sys + +def ComposeAdUserName(domainName, userName): + """ + Examples: + composedUserName = "{0}@{1}".format(userName, domainName), when using userName@domainName + """ + composedUserName = "{0}.{1}".format(domainName, userName) + return composedUserName + +def Main(): + """The input is job execution context in json format.""" + jsonData = json.loads(sys.stdin.readline()) + + """Get and compose user name, by default it's in domain\username format.""" + composedUserName = jsonData["m_Item3"] + runAsUserInfo = composedUserName.split('\\') + if len(runAsUserInfo) == 2: + domainName = runAsUserInfo[0] + userName = runAsUserInfo[1] + composedUserName = ComposeAdUserName(domainName, userName) + + """Set composedUserName.""" + jsonData["m_Item3"] = composedUserName + + """Return the result through stdout""" + print json.dumps(jsonData) + + sys.exit(0) + +if __name__ == '__main__': + Main() \ No newline at end of file diff --git a/Linux node manager execution filters/filters/ResolveUserNameAndDoMount.py b/Linux node manager execution filters/filters/ResolveUserNameAndDoMount.py new file mode 100644 index 0000000..e0c06ed --- /dev/null +++ b/Linux node manager execution filters/filters/ResolveUserNameAndDoMount.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python +# Hpc Execution Filter Sample - Compose Customized Active Directory User Name and Do Mount For Non-Admin Users +# This script reuse ResolveUserName.py to compose right Active Directory user name, +# and do mount for Non-Admin users. +# +# The sample fulfils the following scenario: +# Administrators use HPC Linux Support with Active Directory Integrated, +# and provide SMB Shares with pattern //SmbShareBasePath/UserName for each Active Directory User to do data movement. +# Administrators wish to ensure the shares can be mounted for different users. +# In this script, the specific SMB share will be mounted to the share folder in each users' home directory, +# with uid and gid set correspondingly, and file_mode and dir_mode both set to 755. +# +# Please notice this sample script will parse the password of users for mounting, +# and be sure this aligns security policies before using it in production environments. +# +# Return codes: +# This script follow command mount's return codes: +# 0 success +# 1 incorrect invocation or permissions +# 2 system error (out of memory, cannot fork, no more loop devices) +# 4 internal mount bug +# 8 user interrupt +# 16 problems writing or locking /etc/mtab +# 32 mount failure +# 64 some mount succeeded +# For more about mount's return codes, please refer man 8 mount. + +import json +import os +import pwd +import string +import subprocess +import sys +import time +import ResolveUserName + +"""Define the constants.""" +SmbShareBasePath = "//[SmbShareSever]/SmbShareDemo" + +def MountSmbShare(smbSharePath, targetPath, domainName, userName, password, uid, gid, fileMode="0755", dirMode="0755"): + retCode = 0 + if os.path.ismount(targetPath) == False: + maxRetry = 3 + while(maxRetry > 0): + retCode = Run("mount -t cifs {0} {1} -o domain={2},username={3},password='{4}',uid={5},gid={6},file_mode={7},dir_mode={8}".format(smbSharePath, targetPath, domainName, userName, password, uid, gid, fileMode, dirMode)) + """Check if succeeded, and skip the case when another process successfully mount the share.""" + if retCode == 0 or os.path.ismount(targetPath): + retCode = 0 + break + maxRetry = maxRetry - 1 + time.sleep(1) + return retCode + +"""Run command facilities.""" +if not hasattr(subprocess,'check_output'): + def check_output(*popenargs, **kwargs): + r"""Backport from subprocess module from python 2.7""" + if 'stdout' in kwargs: + raise ValueError('stdout argument not allowed, it will be overridden.') + process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs) + output, unused_err = process.communicate() + retcode = process.poll() + if retcode: + cmd = kwargs.get("args") + if cmd is None: + cmd = popenargs[0] + raise subprocess.CalledProcessError(retcode, cmd, output=output) + return output + + # Exception classes used by this module. + class CalledProcessError(Exception): + def __init__(self, returncode, cmd, output=None): + self.returncode = returncode + self.cmd = cmd + self.output = output + def __str__(self): + return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode) + + subprocess.check_output=check_output + subprocess.CalledProcessError=CalledProcessError + +def Run(cmd,chk_err=True): + retcode,out=RunGetOutput(cmd,chk_err) + return retcode + +def RunGetOutput(cmd,chk_err=True): + try: + output=subprocess.check_output(cmd,stderr=subprocess.STDOUT,shell=True) + except subprocess.CalledProcessError,e : + if chk_err : + Error('CalledProcessError. Error Code is ' + str(e.returncode) ) + Error('CalledProcessError. Command result was ' + (e.output[:-1]).decode('latin-1')) + return e.returncode,e.output.decode('latin-1') + return 0,output.decode('latin-1') +"""End of run command facilities.""" + +""" +Logging facilities can be removed from the script. +Log can be used for trouble shooting, and remember to comment them out when performance is considered more important. +""" +LocalTime = time.localtime() +ExecutionFilterSampleLogFile = "./ExecutionFilter_ResolveUserAndMount_%04u%02u%02u-%02u%02u%02u.log" % (LocalTime.tm_year, LocalTime.tm_mon, LocalTime.tm_mday, LocalTime.tm_hour, LocalTime.tm_min, LocalTime.tm_sec) +def LogWithPrefix(prefix, message): + t = time.localtime() + t = "%04u/%02u/%02u %02u:%02u:%02u " % (t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec) + t += prefix + for line in message.split('\n'): + line = t + line + line = filter(lambda x : x in string.printable, line) + try: + with open(ExecutionFilterSampleLogFile, "a") as F : + F.write(line.encode('ascii','ignore') + "\n") + except IOError, e: + pass + +def Log(message): + LogWithPrefix("INFO: ", message) + +def Error(message): + LogWithPrefix("ERROR: ", message) + +def Warn(message): + LogWithPrefix("WARNING: ", message) +"""End of logging facilities.""" + +def Main(): + retCode = 0 + + """The input is job execution context in json format.""" + jsonData = json.loads(sys.stdin.readline()) + try: + """Get user name, by default it's in domain\username format.""" + composedUserName = jsonData["m_Item3"] + runAsUserInfo = composedUserName.split('\\') + if len(runAsUserInfo) < 2: + Error("Illegal input runAsUser: {0}, be sure the input is hpc job context in json format.".format(composedUserName)) + sys.exit(1) + domainName = runAsUserInfo[0] + userName = runAsUserInfo[1] + + """Resolve right Active Directory user name.""" + composedUserName = ResolveUserName.ComposeAdUserName(domainName, userName) + + """Query if the user is admin, and mount for Non-Admin users.""" + isAdmin = "0" + try: + isAdmin = jsonData["m_Item2"]["environmentVariables"]["CCP_ISADMIN"] + except KeyError: + pass + + if isAdmin == "0": + """Check whether user exists, touch user's home dir, and get user information.""" + retCode = Run("mkhomedir_helper {0}".format(composedUserName)) + if retCode != 0: + Error("No such user: {0}, or home directory for this user cannot be used or generated properly.".format(composedUserName)) + sys.exit(1) + + pwdInfo = pwd.getpwnam(composedUserName) + uid = pwdInfo.pw_uid + gid = pwdInfo.pw_gid + homeDir = pwdInfo.pw_dir + + """Get password, please note the risk here.""" + password = jsonData["m_Item4"] + + """Do mount for Non-Admin users.""" + smbSharePath = "{0}/{1}".format(SmbShareBasePath, userName) + targetPath = "{0}/share".format(homeDir) + retCode = Run("mkdir -p {0}".format(targetPath)) + if retCode != 0: + Error("Cannot find and create mount target path: {0}".format(targetPath)) + sys.exit(1) + + retCode = MountSmbShare(smbSharePath, targetPath, domainName, userName, password, uid, gid) + + """Set composedUserName.""" + jsonData["m_Item3"] = composedUserName + + except KeyError: + """Please check whether the script is used correctly.""" + Error("Please check whether the script is used correctly, and ensure it get right format job context json.") + retCode = 1 + + """Return the result through stdout.""" + print json.dumps(jsonData) + + #Log("ExecutionFitler finished with retCode:{0}".format(retCode)) + sys.exit(retCode) + +if __name__ == '__main__': + Main() \ No newline at end of file From cf2011c502e575767f35e4d4e28b96c13f6b04b9 Mon Sep 17 00:00:00 2001 From: zclok010 Date: Mon, 12 Aug 2019 16:25:07 +0800 Subject: [PATCH 2/3] some revise --- Linux node manager execution filters/README.md | 6 +++--- .../filters/AddDataIoCommand.py | 14 +++++++------- .../filters/AdjustMpiCommand.py | 14 +++++++------- .../filters/AdjustTaskAffinity.py | 14 +++++++------- .../filters/OnJobEnd.sh | 2 ++ 5 files changed, 26 insertions(+), 24 deletions(-) diff --git a/Linux node manager execution filters/README.md b/Linux node manager execution filters/README.md index cf9f338..3ec309d 100644 --- a/Linux node manager execution filters/README.md +++ b/Linux node manager execution filters/README.md @@ -12,7 +12,7 @@ There are three execution filter scripts as entry point, which read json format The json format stdin is like: - ```json + ``` { "m_Item1": { "JobId": number, @@ -43,7 +43,7 @@ There are three execution filter scripts as entry point, which read json format The json format stdin is like: - ```json + ``` { "m_Item1": { "JobId": number, @@ -70,7 +70,7 @@ There are three execution filter scripts as entry point, which read json format The json format stdin is like: - ```json + ``` { "JobId": number, "JobInfo": object, diff --git a/Linux node manager execution filters/filters/AddDataIoCommand.py b/Linux node manager execution filters/filters/AddDataIoCommand.py index fe1adb3..91885b5 100644 --- a/Linux node manager execution filters/filters/AddDataIoCommand.py +++ b/Linux node manager execution filters/filters/AddDataIoCommand.py @@ -2,19 +2,19 @@ import sys, json -j = json.load(sys.stdin) - +data = json.load(sys.stdin) +processStartInfo = data['m_Item2'] HpcDataClient = '/opt/HpcData/HpcDataClient.exe' -commandLine = j['m_Item2'].get('commandLine') +commandLine = processStartInfo.get('commandLine') if commandLine: - inputFiles = j['m_Item2'].get('inputFiles') - outputFiles = j['m_Item2'].get('outputFiles') + inputFiles = processStartInfo.get('inputFiles') + outputFiles = processStartInfo.get('outputFiles') if outputFiles and not outputFiles.isspace(): commandUpload = '{} upload /source:. /dest:{} /overwrite'.format(HpcDataClient, outputFiles) commandLine = '({}); ec=$? && {} || exit 192 && exit $ec'.format(commandLine, commandUpload) if inputFiles and not inputFiles.isspace(): commandDownload = '{} download /source:{} /dest:. /overwrite'.format(HpcDataClient, inputFiles) commandLine = '{} || exit 191 && {}'.format(commandDownload, commandLine) - j['m_Item2']['commandLine'] = commandLine + processStartInfo['commandLine'] = commandLine -print(json.dumps(j)) \ No newline at end of file +print(json.dumps(data)) \ No newline at end of file diff --git a/Linux node manager execution filters/filters/AdjustMpiCommand.py b/Linux node manager execution filters/filters/AdjustMpiCommand.py index cdb77dc..3406285 100644 --- a/Linux node manager execution filters/filters/AdjustMpiCommand.py +++ b/Linux node manager execution filters/filters/AdjustMpiCommand.py @@ -2,10 +2,10 @@ import sys, json -j = json.load(sys.stdin) - -commandLine = j['m_Item2'].get('commandLine') -mpiSource = j['m_Item2']['environmentVariables'].get('CCP_MPI_SOURCE') +data = json.load(sys.stdin) +processStartInfo = data['m_Item2'] +commandLine = processStartInfo.get('commandLine') +mpiSource = processStartInfo['environmentVariables'].get('CCP_MPI_SOURCE') if commandLine and mpiSource: if mpiSource.endswith('/mpirun') or mpiSource.endswith('/mpiexec'): mpiCommand = '/'.join(mpiSource.split('/')[:-1]) + '/mpiexec' @@ -16,8 +16,8 @@ else: mpiCommand = '{}/mpiexec'.format(mpiSource) mpiCommand += ' ' - if 'CCP_MPI_HOSTFILE_FORMAT' in j['m_Item2']['environmentVariables']: + if 'CCP_MPI_HOSTFILE_FORMAT' in processStartInfo['environmentVariables']: mpiCommand += '-machinefile $CCP_MPI_HOSTFILE ' - j['m_Item2']['commandLine'] = commandLine.replace('mpiexec ', mpiCommand).replace('mpirun ', mpiCommand) + processStartInfo['commandLine'] = commandLine.replace('mpiexec ', mpiCommand).replace('mpirun ', mpiCommand) -print(json.dumps(j)) \ No newline at end of file +print(json.dumps(data)) \ No newline at end of file diff --git a/Linux node manager execution filters/filters/AdjustTaskAffinity.py b/Linux node manager execution filters/filters/AdjustTaskAffinity.py index 2aac758..b78c87c 100644 --- a/Linux node manager execution filters/filters/AdjustTaskAffinity.py +++ b/Linux node manager execution filters/filters/AdjustTaskAffinity.py @@ -2,8 +2,8 @@ import sys, json, subprocess, math -j = json.load(sys.stdin) - +data = json.load(sys.stdin) +processStartInfo = data['m_Item2'] numaInfo = subprocess.check_output('lscpu | grep NUMA', shell = True) numaCoreId = [] for line in numaInfo.splitlines(): @@ -18,15 +18,15 @@ AFFINITY_BITS = 64 if numaCoreId: - affinity = j['m_Item2'].get('affinity') # This is an array of signed 64 bit number, which will be converted to bit array format for adjusting core id. + affinity = processStartInfo.get('affinity') # This is an array of signed 64 bit number, which will be converted to bit array format for adjusting core id. if affinity: affinityList = [bit for int64 in affinity for bit in list('{:064b}'.format((2 ** AFFINITY_BITS - 1) & int64))[::-1]] mappedCoreIds = set([numaCoreId[coreId] for coreId in range(len(affinityList)) if affinityList[coreId] == '1']) mappedAffinityList = ['1' if coreId in mappedCoreIds else '0' for coreId in range(int(math.ceil(float(len(numaCoreId)) / AFFINITY_BITS) * AFFINITY_BITS))] - j['m_Item2']['affinity'] = [int(''.join(mappedAffinityList[i * AFFINITY_BITS : (i + 1) * AFFINITY_BITS - 1][::-1]), 2) - int(mappedAffinityList[(i + 1) * AFFINITY_BITS - 1]) * 2 ** (AFFINITY_BITS - 1) for i in range(len(mappedAffinityList) // AFFINITY_BITS)] + processStartInfo['affinity'] = [int(''.join(mappedAffinityList[i * AFFINITY_BITS : (i + 1) * AFFINITY_BITS - 1][::-1]), 2) - int(mappedAffinityList[(i + 1) * AFFINITY_BITS - 1]) * 2 ** (AFFINITY_BITS - 1) for i in range(len(mappedAffinityList) // AFFINITY_BITS)] - ccpCoreIds = j['m_Item2']['environmentVariables'].get('CCP_COREIDS') + ccpCoreIds = processStartInfo['environmentVariables'].get('CCP_COREIDS') if ccpCoreIds: - j['m_Item2']['environmentVariables']['CCP_COREIDS'] = ' '.join([str(numaCoreId[originalCoreId]) for originalCoreId in map(int, ccpCoreIds.split())]) + processStartInfo['environmentVariables']['CCP_COREIDS'] = ' '.join([str(numaCoreId[originalCoreId]) for originalCoreId in map(int, ccpCoreIds.split())]) -print(json.dumps(j)) \ No newline at end of file +print(json.dumps(data)) \ No newline at end of file diff --git a/Linux node manager execution filters/filters/OnJobEnd.sh b/Linux node manager execution filters/filters/OnJobEnd.sh index 5d5b306..b629fda 100644 --- a/Linux node manager execution filters/filters/OnJobEnd.sh +++ b/Linux node manager execution filters/filters/OnJobEnd.sh @@ -1 +1,3 @@ +#!/bin/bash +# This execution filter do nothing but writing stdin to stdout cat \ No newline at end of file From 69d06e6616832321cfe174c0ba7ee48ab7bed006 Mon Sep 17 00:00:00 2001 From: zclok010 Date: Fri, 20 Sep 2019 15:57:15 +0800 Subject: [PATCH 3/3] Update README of Microsoft HPC Pack 2016 Linux Node Manager Execution Filter Sample --- .../README.md | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/Linux node manager execution filters/README.md b/Linux node manager execution filters/README.md index 3ec309d..60aa7f5 100644 --- a/Linux node manager execution filters/README.md +++ b/Linux node manager execution filters/README.md @@ -6,6 +6,30 @@ Execution filter on Linux compute nodes allows cluster admin to plugin customize Download the `filters` directory to `/opt/hpcnodemanager/` on each Linux compute node, add execution permission to the scripts and modify them on demand. +### Shortcut + +Clusrun can be used to deploy the filters to Linux compute nodes in an HPC Pack cluster. + +- Clusrun via HPC Pack Cluster Manager with command: + +```bash +cd /opt/hpcnodemanager && curl https://codeload.github.com/Azure-Samples/hpcpack-samples/tar.gz/master | tar -xz --strip=2 hpcpack-samples-master/'Linux node manager execution filters' && chmod +x filters/* +``` + +- Clusrun via CMD with command: + +```CMD +clusrun /nodegroup:LinuxNodes cd /opt/hpcnodemanager ^&^& curl https://codeload.github.com/Azure-Samples/hpcpack-samples/tar.gz/master ^| tar -xz --strip=2 hpcpack-samples-master/'Linux node manager execution filters' ^&^& chmod +x filters/* +``` + +- Clusrun via Powershell with command: + +```Powershell +clusrun /nodegroup:LinuxNodes cd /opt/hpcnodemanager `&`& curl https://codeload.github.com/Azure-Samples/hpcpack-samples/tar.gz/master `| tar -xz --strip=2 hpcpack-samples-master/'Linux node manager execution filters' `&`& chmod +x filters/* +``` + +### Description + There are three execution filter scripts as entry point, which read json format input from stdin, modify it and write it to stdout. 1. `OnJobTaskStart.sh` is called when a new job (or a task) is dispatched from scheduler to current Linux compute node.