Skip to content

Commit c80fcaa

Browse files
chrisburrweb-flow
authored andcommitted
sweep: DIRACGrid#5469 Fixes for the executor framework
1 parent 806bb96 commit c80fcaa

File tree

5 files changed

+11
-6
lines changed

5 files changed

+11
-6
lines changed

.git-blame-ignore-revs

+3
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,6 @@
22
c5981031194f8d995148db09586179ce0d91bf95
33
45ddde1137f63427b587c6bcecaddbd86020a2c8
44
75eb00b9f222aeab5b149b6676c0c05ef8cccac3
5+
6+
# pylint fixes
7+
2dea5280d4b05d6c373fd6cd882dec043b16b7b2

docs/source/DeveloperGuide/AddingNewComponents/DevelopingExecutors/index.rst

+4-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
Developing Executors
33
======================================
44

5-
The *Executor framework* is designed around two components. The *Executor Mind* knows how to retrieve, store and dispatch tasks. And *Executors* are the working processes that know what to do depending on the task type. Each *Executor* is an independent process that connects to the *Mind* and waits for tasks to be sent to them by the . The mechanism used to connect the *Executors* to the is described in section . A diagram of both components can been seen in the diagram.
5+
The *Executor framework* is designed around two components. The *Executor Mind* knows how to retrieve, store and dispatch tasks. And *Executors* are the working processes that know what to do depending on the task type. Each *Executor* is an independent process that connects to the *Mind* and waits for tasks to be sent to it. The mechanism used to connect to the *Executors* is described in section :ref:`about stable connections <stable_connections>`. A diagram of both components can been seen in the diagram.
66

77
.. figure:: ExecutorsSchema.png
88
:width: 400px
@@ -13,11 +13,11 @@ The *Mind* is a *DIRAC* service. It is the only component of the *Executor* fram
1313

1414
When the *Mind* receives a task that has been properly processed by an *Executor*, the result will have to be stored in the database. But before storing it in the database the *Mind* needs to check that the task has not been modified by anyone else while the executor was processing it. To do so, the *Mind* has to store a task state in memory and check that this task state has not been modified before committing the result back to the database. The task state will be different for each type of task and has to be defined in each case.
1515

16-
When an *Executor* process starts it will connect to the *Mind* and send a list of task types it can process. The acts as task scheduler and dispatcher. When the *Mind* has a task to be processed it will look for an idle *Executor* that can process that task type. If there is no idle *Executor* or no can process that task type, the *Mind* will internally queue the task in memory. As soon a an *Executor* connects or becomes idle, the *Mind* will pop a task from one of the queues that the can process and send the task to it. If the *Executor* manages to process the task, the *Mind* will store back the result of the task and then it will try to fill the again with a new task. If the *Executor* disconnects while processing a task, the *Mind* will assume that the has crashed and will reschedule the task to prevent any data loss.
16+
When an *Executor* process starts it will connect to the *Mind* and send a list of task types it can process. The acts as task scheduler and dispatcher. When the *Mind* has a task to be processed it will look for an idle *Executor* that can process that task type. If there is no idle *Executor* or no can process that task type, the *Mind* will internally queue the task in memory. As soon a an *Executor* connects or becomes idle, the *Mind* will pop a task from one of the queues that the can process and send the task to it. If the *Executor* manages to process the task, the *Mind* will store back the result of the task and then it will try to fill the *Executor* again with a new task. If the *Executor* disconnects while processing a task, the *Mind* will assume that the *Executor* has crashed and will reschedule the task to prevent any data loss.
1717

1818
Tasks may need to go through several different steps before being completely processed. This can easily be accomplished by having one task type for each step the task has to go through. Each *Executor* can then publish what task types it knows how to process. For each step the task has to go through, the *Mind* will send the task to an *Executor* that can process that type of task, receive and store the result, change the task to the next type and then send the task to the next *Executor*. The *Mind* will repeat this mechanism until the task has gone through all the types.
1919

20-
This architecture allows to add and remove *Executors* at any time. If the removed *Executor* was being processing a task, the *Mind* will send the task to another *Executor*. If the task throughput is not enough *Executors* can be started and the *Mind* will send them tasks to process. Although *Executors* can be added and removed at any time, the *Mind* is still a single point of failure. If the *Mind* stops working the whole system will stop working.
20+
This architecture allows to add and remove *Executors* at any time. If the removed *Executor* was processing a task, the *Mind* will send the task to another *Executor*. If the task throughput is not enough, new *Executors* can be started and the *Mind* will send them tasks to process. Although *Executors* can be added and removed at any time, the *Mind* is still a single point of failure. If the *Mind* stops working the whole system will stop working.
2121

2222
Implementing an Executor module
2323
================================
@@ -31,7 +31,7 @@ Implementing an executor module is quite straightforward. It just needs 4 method
3131
All *Executor* modules need to know to which mind they have to connect. In the *initialize* method we define the mind to which the module
3232
will connect. This method can also have any other initialization required by the *Executor*.
3333

34-
Funciton *processTask* does the task processing. It receives the task to be processed already deserialized. Once the work it's done it can
34+
Function *processTask* does the task processing. It receives the task to be processed already deserialized. Once the work is done it can
3535
to return the modified task or just and empty *S_OK*.
3636

3737
The last two methods provide the knowledge on how to serialize and deserialize tasks when receiving and sending them to the *Mind*.

docs/source/DeveloperGuide/Systems/Framework/stableconns/index.rst

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
.. _stable_connections:
2+
13
==========================
24
DISET Stable connections
35
==========================

src/DIRAC/Core/Base/ExecutorMindHandler.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ def conn_connected(self, trid, identity, kwargs):
157157

158158
def conn_drop(self, trid):
159159
self.__eDispatch.removeExecutor(trid)
160-
return S_OK()
160+
return self.srv_disconnect(trid)
161161

162162
auth_msg_TaskDone = ["all"]
163163

src/DIRAC/Core/Utilities/ExecutorDispatcher.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,7 @@ def __taskFreezeCallback(self, taskId, taskObj, eType):
620620
def __getNextExecutor(self, taskId):
621621
try:
622622
eTask = self.__tasks[taskId]
623-
except IndexError:
623+
except KeyError:
624624
msg = "Task %s was deleted prematurely while being dispatched" % taskId
625625
self.__log.error("Task was deleted prematurely while being dispatched", "%s" % taskId)
626626
return S_ERROR(msg)

0 commit comments

Comments
 (0)