2222from urllib .parse import urlencode
2323from urllib .request import urlopen
2424
25- from proxyTools import getVO
26-
27- # Utilities functions
25+ from proxyTools import BaseRequest , extract_diracx_payload , getVO
2826
2927
3028def load_module_from_path (module_name , path_to_module ):
@@ -884,10 +882,14 @@ def __init__(self):
884882 self .setup = ""
885883 self .configServer = ""
886884 self .preferredURLPatterns = ""
885+ self .diracXServer = ""
887886 self .ceName = ""
888887 self .ceType = ""
889888 self .queueName = ""
890889 self .gridCEType = ""
890+ self .pilotSecret = ""
891+ self .clientID = ""
892+ self .jwt = {}
891893 # maxNumberOfProcessors: the number of
892894 # processors allocated to the pilot which the pilot can allocate to one payload
893895 # used to set payloadProcessors unless other limits are reached (like the number of processors on the WN)
@@ -922,6 +924,7 @@ def __init__(self):
922924 self .pilotCFGFile = "pilot.json"
923925 self .pilotLogging = False
924926 self .loggerURL = None
927+ self .isLegacyPilot = False
925928 self .loggerTimerInterval = 0
926929 self .loggerBufsize = 1000
927930 self .pilotUUID = "unknown"
@@ -978,6 +981,7 @@ def __init__(self):
978981 ("y:" , "CEType=" , "CE Type (normally InProcess)" ),
979982 ("z" , "pilotLogging" , "Activate pilot logging system" ),
980983 ("C:" , "configurationServer=" , "Configuration servers to use" ),
984+ ("" , "diracx_URL=" , "DiracX Server URL to use" ),
981985 ("D:" , "disk=" , "Require at least <space> MB available" ),
982986 ("E:" , "commandExtensions=" , "Python modules with extra commands" ),
983987 ("F:" , "pilotCFGFile=" , "Specify pilot CFG file" ),
@@ -1015,6 +1019,8 @@ def __init__(self):
10151019 ),
10161020 ("" , "architectureScript=" , "architecture script to use" ),
10171021 ("" , "CVMFS_locations=" , "comma-separated list of CVMS locations" ),
1022+ ("" , "pilotSecret=" , "secret that the pilot uses with DiracX" ),
1023+ ("" , "clientID=" , "client id used by DiracX to revoke a token" ),
10181024 )
10191025
10201026 # Possibly get Setup and JSON URL/filename from command line
@@ -1041,6 +1047,74 @@ def __init__(self):
10411047 self .installEnv ["X509_USER_PROXY" ] = self .certsLocation
10421048 os .environ ["X509_USER_PROXY" ] = self .certsLocation
10431049
1050+ try :
1051+ self .__get_diracx_jwt ()
1052+ except Exception as e :
1053+ self .log .error ("Error setting DiracX: %s" % e )
1054+ # Remove all settings to prevent using it.
1055+ self .diracXServer = None
1056+ self .pilotSecret = None
1057+ self .loggerURL = None
1058+ self .jwt = {}
1059+ self .log .error ("Won't use DiracX." )
1060+
1061+ def __get_diracx_jwt (self ):
1062+ # Pilot auth: two cases
1063+ # 1. Has a secret (DiracX Pilot), exchange for a token
1064+ # 2. Legacy Pilot, has a proxy with a DiracX section in it (extract the jwt from it)
1065+ if self .pilotUUID and self .pilotSecret and self .diracXServer :
1066+ self .log .info ("Fetching JWT in DiracX (URL: %s)" % self .diracXServer )
1067+
1068+ config = BaseRequest (
1069+ "%s/api/auth/secret-exchange" % (self .diracXServer ),
1070+ os .getenv ("X509_CERT_DIR" ),
1071+ self .pilotUUID ,
1072+ )
1073+
1074+ try :
1075+ self .jwt = config .executeRequest (
1076+ {"pilot_stamp" : self .pilotUUID , "pilot_secret" : self .pilotSecret }
1077+ )
1078+ except HTTPError as e :
1079+ self .log .error ("Request failed: %s" % str (e ))
1080+ self .log .error ("Could not fetch pilot tokens." )
1081+ if e .code == 401 :
1082+ # First test if the error occurred because of "bad pilot_stamp"
1083+ # If so, this pilot is in the vacuum case
1084+ # So we redo auth, but this time with the right data for vacuum cases
1085+ self .log .error ("Retrying with vacuum case data..." )
1086+ self .jwt = config .executeRequest (
1087+ {
1088+ "pilot_stamp" : self .pilotUUID ,
1089+ "pilot_secret" : self .pilotSecret ,
1090+ "vo" : self .wnVO ,
1091+ "grid_type" : self .gridCEType ,
1092+ "grid_site" : self .site ,
1093+ "status" : "Running" ,
1094+ }
1095+ )
1096+ else :
1097+ raise RuntimeError ("Can't be a vacuum case." )
1098+
1099+ self .log .info ("Fetched the pilot token with the pilot secret." )
1100+ self .isLegacyPilot = False
1101+ elif self .pilotUUID and self .diracXServer :
1102+ # Try to extract a token for proxy
1103+ self .log .info ("Trying to extract diracx token from proxy." )
1104+
1105+ cert = os .getenv ("X509_USER_PROXY" )
1106+ if cert :
1107+ with open (cert , "rb" ) as fp :
1108+ self .jwt = extract_diracx_payload (fp .read ())
1109+ self .isLegacyPilot = True
1110+ self .log .info ("Successfully extracted token from proxy." )
1111+ else :
1112+ raise RuntimeError ("Could not locate a proxy via X509_USER_PROXY" )
1113+ else :
1114+ self .log .info (
1115+ "PilotUUID, pilotSecret, and diracXServer are needed to support DiracX."
1116+ )
1117+
10441118 def __setSecurityDir (self , envName , dirLocation ):
10451119 """Set the environment variable of the `envName`, and add it also to the Pilot Parameters
10461120
@@ -1152,6 +1226,8 @@ def __initCommandLine2(self):
11521226 self .keepPythonPath = True
11531227 elif o in ("-C" , "--configurationServer" ):
11541228 self .configServer = v
1229+ elif o == "--diracx_URL" :
1230+ self .diracXServer = v
11551231 elif o in ("-G" , "--Group" ):
11561232 self .userGroup = v
11571233 elif o in ("-x" , "--execute" ):
@@ -1225,6 +1301,10 @@ def __initCommandLine2(self):
12251301 self .architectureScript = v
12261302 elif o == "--CVMFS_locations" :
12271303 self .CVMFS_locations = v .split ("," )
1304+ elif o == "--pilotSecret" :
1305+ self .pilotSecret = v
1306+ elif o == "--clientID" :
1307+ self .clientID = v
12281308
12291309 def __loadJSON (self ):
12301310 """
0 commit comments