Source code for swot_simulator.dispatch
# Copyright (c) 2021 CNES/JPL
#
# All rights reserved. Use of this source code is governed by a
# BSD-style license that can be found in the LICENSE file.
"""
Dispatch task on free workers
=============================
"""
from typing import Any, Callable, Iterator, List, Set
import time
import dask.distributed
def _available_workers(client: dask.distributed.Client) -> Set[str]:
"""Get the list of available workers.
Args:
client (dask.distributed.Client): Client connected to the Dask
cluster.
Returns:
list: The list of available workers.
"""
while True:
info = client.scheduler_info()
result = set(info['workers']) - set(
k for k, v in client.processing().items() if v)
if result:
return result
time.sleep(0.1)
[docs]def compute(client: dask.distributed.Client, func: Callable, seq: Iterator,
*args, **kwargs) -> List[Any]:
"""Distribute the execution of functions to free workers, i.e. those who do
not perform any tasks.
Args:
client (dask.distributed.Client): Client connected to the Dask
cluster.
func (callable): Function to execute
seq (iterable): The sequence of arguments handled by ``func``.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
list: The result of the execution of the functions.
"""
completed = dask.distributed.as_completed()
workers = set()
result = []
iterate = True
# As long as there is data to traverse in the iterator.
while iterate:
# As long as there are free workers
while completed.count() < len(client.scheduler_info()['workers']):
try:
if not workers:
workers = _available_workers(client)
item = next(seq)
completed.add(
client.submit(func,
item,
*args,
workers=workers.pop(),
allow_other_workers=False,
**kwargs))
except StopIteration:
iterate = False
break
# The computation queue is full, we consume the finished jobs to be
# able to continue.
if iterate:
try:
result += client.gather(completed.next_batch())
except StopIteration:
pass
result += [item.result() for item in completed]
return result