This is engine that schedules tasks to workers – separate processes dedicated for certain atoms execution, possibly running on other machines, connected via amqp (or other supported kombu transports).
Note
This engine is under active development and is usable and does work but is missing some features (please check the blueprint page for known issues and plans) that will make it more production ready.
There are two communication sides, the executor (and associated engine derivative) and worker that communicate using a proxy component. The proxy is designed to accept/publish messages from/into a named exchange.
Let’s consider how communication between an executor and a worker happens. First of all an engine resolves all atoms dependencies and schedules atoms that can be performed at the moment. This uses the same scheduling and dependency resolution logic that is used for every other engine type. Then the atoms which can be executed immediately (ones that are dependent on outputs of other tasks will be executed when that output is ready) are executed by the worker-based engine executor in the following manner:
Note
Failure objects are not directly json-serializable (they contain references to tracebacks which are not serializable), so they are converted to dicts before sending and converted from dicts after receiving on both executor & worker sides (this translation is lossy since the traceback can’t be fully retained, due to its contents containing internal interpreter references and details).
Additionally, the following parameters are added to the request message:
Example:
{
"action": "execute",
"arguments": {
"x": 111
},
"task_cls": "taskflow.tests.utils.TaskOneArgOneReturn",
"task_name": "taskflow.tests.utils.TaskOneArgOneReturn",
"task_version": [
1,
0
]
}
When reverting:
{
"action": "revert",
"arguments": {},
"failures": {
"taskflow.tests.utils.TaskWithFailure": {
"exc_type_names": [
"RuntimeError",
"StandardError",
"Exception"
],
"exception_str": "Woot!",
"traceback_str": " File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n result = task.execute(**arguments)\n File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n raise RuntimeError('Woot!')\n",
"version": 1
}
},
"result": [
"failure",
{
"exc_type_names": [
"RuntimeError",
"StandardError",
"Exception"
],
"exception_str": "Woot!",
"traceback_str": " File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n result = task.execute(**arguments)\n File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n raise RuntimeError('Woot!')\n",
"version": 1
}
],
"task_cls": "taskflow.tests.utils.TaskWithFailure",
"task_name": "taskflow.tests.utils.TaskWithFailure",
"task_version": [
1,
0
]
}
When running:
{
"data": {},
"state": "RUNNING"
}
When progressing:
{
"details": {
"progress": 0.5
},
"event_type": "update_progress",
"state": "EVENT"
}
When succeeded:
{
"data": {
"result": 666
},
"state": "SUCCESS"
}
When failed:
{
"data": {
"result": {
"exc_type_names": [
"RuntimeError",
"StandardError",
"Exception"
],
"exception_str": "Woot!",
"traceback_str": " File \"/homes/harlowja/dev/os/taskflow/taskflow/engines/action_engine/executor.py\", line 56, in _execute_task\n result = task.execute(**arguments)\n File \"/homes/harlowja/dev/os/taskflow/taskflow/tests/utils.py\", line 165, in execute\n raise RuntimeError('Woot!')\n",
"version": 1
}
},
"state": "FAILURE"
}
WAITING - Request placed on queue (or other kombu message bus/transport) but not yet consumed.
PENDING - Worker accepted request and is pending to run using its executor (threads, processes, or other).
FAILURE - Worker failed after running request (due to task exception) or no worker moved/started executing (by placing the request into RUNNING state) with-in specified time span (this defaults to 60 seconds unless overridden).
RUNNING - Workers executor (using threads, processes...) has started to run requested task (once this state is transitioned to any request timeout no longer becomes applicable; since at this point it is unknown how long a task will run since it can not be determined if a task is just taking a long time or has failed).
SUCCESS - Worker finished running task without exception.
Note
During the WAITING and PENDING stages the engine keeps track of how long the request has been alive for and if a timeout is reached the request will automatically transition to FAILURE and any further transitions from a worker will be disallowed (for example, if a worker accepts the request in the future and sets the task to PENDING this transition will be logged and ignored). This timeout can be adjusted and/or removed by setting the engine transition_timeout option to a higher/lower value or by setting it to None (to remove the timeout completely). In the future this will be improved to be more dynamic by implementing the blueprints associated with failover and info/resilence.
To use the worker based engine a set of workers must first be established on remote machines. These workers must be provided a list of task objects, task names, modules names (or entrypoints that can be examined for valid tasks) they can respond to (this is done so that arbitrary code execution is not possible).
For complete parameters and object usage please visit Worker.
Example:
from taskflow.engines.worker_based import worker as w
config = {
'url': 'amqp://guest:guest@localhost:5672//',
'exchange': 'test-exchange',
'topic': 'test-tasks',
'tasks': ['tasks:TestTask1', 'tasks:TestTask2'],
}
worker = w.Worker(**config)
worker.run()
To use the worker based engine a flow must be constructed (which contains tasks that are visible on remote machines) and the specific worker based engine entrypoint must be selected. Certain configuration options must also be provided so that the transport backend can be configured and initialized correctly. Otherwise the usage should be mostly transparent (and is nearly identical to using any other engine type).
For complete parameters and object usage please see WorkerBasedActionEngine.
Example with amqp transport:
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine='worker-based',
url='amqp://guest:guest@localhost:5672//',
exchange='test-exchange',
topics=['topic1', 'topic2'])
eng.run()
Example with filesystem transport:
flow = lf.Flow('simple-linear').add(...)
eng = taskflow.engines.load(flow, engine='worker-based',
exchange='test-exchange',
topics=['topic1', 'topic2'],
transport='filesystem',
transport_options={
'data_folder_in': '/tmp/in',
'data_folder_out': '/tmp/out',
})
eng.run()
Additional supported keyword arguments: