Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/basic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
strategy:
matrix:
python:
- 2.7.18
# - 2.7.18
- 3.6.15
- 3.9.17

Expand Down Expand Up @@ -53,7 +53,7 @@ jobs:
strategy:
matrix:
python:
- 2.7.18
# - 2.7.18
- 3.6.15
- 3.9.17

Expand All @@ -77,7 +77,7 @@ jobs:
strategy:
matrix:
python:
- 2.7.18
# - 2.7.18
- 3.6.15
- 3.9.17

Expand Down
14 changes: 14 additions & 0 deletions Pilot/dirac-pilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
getCommand,
pythonPathCheck,
)
from Pilot.proxyTools import revokePilotToken
except ImportError:
from pilotTools import (
Logger,
Expand All @@ -49,6 +50,7 @@
getCommand,
pythonPathCheck,
)
from proxyTools import revokePilotToken
############################

if __name__ == "__main__":
Expand Down Expand Up @@ -124,3 +126,15 @@
if remote:
log.buffer.flush()
sys.exit(-1)

log.info("Pilot tasks finished.")

if pilotParams.jwt:
if not pilotParams.isLegacyPilot:
log.info("Revoking pilot token.")
revokePilotToken(
pilotParams.diracXServer,
pilotParams.pilotUUID,
pilotParams.jwt,
pilotParams.clientID
)
16 changes: 15 additions & 1 deletion Pilot/pilotCommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,20 @@ def __init__(self, pilotParams):

@logFinalizer
def execute(self):
"""Calls dirac-admin-add-pilot"""
"""Calls dirac-admin-add-pilot

Deprecated in DIRAC V8, new mechanism in V9 and DiracX."""

if self.pp.jwt:
if not self.pp.isLegacyPilot:
self.log.warn("Skipping module, normally it is already done via DiracX secret-exchange.")
return

# If we're here, this is a legacy pilot with a DiracX token embedded in it.
# TODO: See if we do a dirac-admin-add-pilot in DiracX for legacy pilots
else:
# If we're here, this is a DIRAC only pilot without diracX token embedded in it.
pass

if not self.pp.pilotReference:
self.log.warn("Skipping module, no pilot reference found")
Expand Down Expand Up @@ -1232,3 +1245,4 @@ def execute(self):
"""Standard entry point to a pilot command"""
self._setNagiosOptions()
self._runNagiosProbes()

99 changes: 97 additions & 2 deletions Pilot/pilotTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,23 @@ def load_module_from_path(module_name, path_to_module):
basestring = str

try:
from Pilot.proxyTools import getVO
from Pilot.proxyTools import (
getVO,
BaseRequest,
TokenBasedRequest,
extract_diracx_payload,
refreshPilotToken,
refreshUserToken
)
except ImportError:
from proxyTools import getVO
from proxyTools import (
getVO,
BaseRequest,
TokenBasedRequest,
extract_diracx_payload,
refreshPilotToken,
refreshUserToken
)

try:
FileNotFoundError # pylint: disable=used-before-assignment
Expand Down Expand Up @@ -908,10 +922,14 @@ def __init__(self):
self.site = ""
self.setup = ""
self.configServer = ""
self.diracXServer = ""
self.ceName = ""
self.ceType = ""
self.queueName = ""
self.gridCEType = ""
self.pilotSecret = ""
self.clientID = ""
self.jwt = {}
# maxNumberOfProcessors: the number of
# processors allocated to the pilot which the pilot can allocate to one payload
# used to set payloadProcessors unless other limits are reached (like the number of processors on the WN)
Expand Down Expand Up @@ -946,6 +964,7 @@ def __init__(self):
self.pilotCFGFile = "pilot.json"
self.pilotLogging = False
self.loggerURL = None
self.isLegacyPilot = False
self.loggerTimerInterval = 0
self.loggerBufsize = 1000
self.pilotUUID = "unknown"
Expand Down Expand Up @@ -996,6 +1015,7 @@ def __init__(self):
("y:", "CEType=", "CE Type (normally InProcess)"),
("z", "pilotLogging", "Activate pilot logging system"),
("C:", "configurationServer=", "Configuration servers to use"),
("", "diracx_URL=", "DiracX Server URL to use"),
("D:", "disk=", "Require at least <space> MB available"),
("E:", "commandExtensions=", "Python modules with extra commands"),
("F:", "pilotCFGFile=", "Specify pilot CFG file"),
Expand All @@ -1021,6 +1041,8 @@ def __init__(self):
("", "preinstalledEnvPrefix=", "preinstalled pilot environment area prefix"),
("", "architectureScript=", "architecture script to use"),
("", "CVMFS_locations=", "comma-separated list of CVMS locations"),
("", "pilotSecret=", "secret that the pilot uses with DiracX"),
("", "clientID=", "client id used by DiracX to revoke a token"),
)

# Possibly get Setup and JSON URL/filename from command line
Expand All @@ -1047,6 +1069,73 @@ def __init__(self):
self.installEnv["X509_USER_PROXY"] = self.certsLocation
os.environ["X509_USER_PROXY"] = self.certsLocation

try:
self.__get_diracx_jwt()
except Exception as e:
self.log.error("Error setting DiracX: %s" % e)
# Remove all settings to prevent using it.
self.diracXServer = None
self.pilotSecret = None
self.loggerURL = None
self.jwt = {}
self.log.error("Won't use DiracX.")

def __get_diracx_jwt(self):
# Pilot auth: two cases
# 1. Has a secret (DiracX Pilot), exchange for a token
# 2. Legacy Pilot, has a proxy with a DiracX section in it (extract the jwt from it)
if self.pilotUUID and self.pilotSecret and self.diracXServer:
self.log.info("Fetching JWT in DiracX (URL: %s)" % self.diracXServer)

config = BaseRequest(
"%s/api/auth/secret-exchange" % (
self.diracXServer
),
os.getenv("X509_CERT_DIR"),
self.pilotUUID
)

try:
self.jwt = config.executeRequest({
"pilot_stamp": self.pilotUUID,
"pilot_secret": self.pilotSecret
})
except HTTPError as e:
self.log.error("Request failed: %s" % str(e))
self.log.error("Could not fetch pilot tokens.")
if e.code == 401:
# First test if the error occurred because of "bad pilot_stamp"
# If so, this pilot is in the vacuum case
# So we redo auth, but this time with the right data for vacuum cases
self.log.error("Retrying with vacuum case data...")
self.jwt = config.executeRequest({
"pilot_stamp": self.pilotUUID,
"pilot_secret": self.pilotSecret,
"vo": self.wnVO,
"grid_type": self.gridCEType,
"grid_site": self.site,
"status": "Running"
})
else:
raise RuntimeError("Can't be a vacuum case.")

self.log.info("Fetched the pilot token with the pilot secret.")
self.isLegacyPilot = False
elif self.pilotUUID and self.diracXServer:
# Try to extract a token for proxy
self.log.info("Trying to extract diracx token from proxy.")

cert = os.getenv("X509_USER_PROXY")
if cert:
with open(cert, "rb") as fp:
self.jwt = extract_diracx_payload(fp.read())
self.isLegacyPilot = True
self.log.info("Successfully extracted token from proxy.")
else:
raise RuntimeError("Could not locate a proxy via X509_USER_PROXY")
else:
self.log.info("PilotUUID, pilotSecret, and diracXServer are needed to support DiracX.")

def __setSecurityDir(self, envName, dirLocation):
"""Set the environment variable of the `envName`, and add it also to the Pilot Parameters

Expand Down Expand Up @@ -1151,6 +1240,8 @@ def __initCommandLine2(self):
self.keepPythonPath = True
elif o in ("-C", "--configurationServer"):
self.configServer = v
elif o == "--diracx_URL":
self.diracXServer = v
elif o in ("-G", "--Group"):
self.userGroup = v
elif o in ("-x", "--execute"):
Expand Down Expand Up @@ -1224,6 +1315,10 @@ def __initCommandLine2(self):
self.architectureScript = v
elif o == "--CVMFS_locations":
self.CVMFS_locations = v.split(",")
elif o == "--pilotSecret":
self.pilotSecret = v
elif o == "--clientID":
self.clientID = v

def __loadJSON(self):
"""
Expand Down
Loading