Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker ID functionality #261

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
3 changes: 2 additions & 1 deletion loky/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
from .reusable_executor import get_reusable_executor
from .cloudpickle_wrapper import wrap_non_picklable_objects
from .process_executor import BrokenProcessPool, ProcessPoolExecutor
from .worker_id import get_worker_id


__all__ = ["get_reusable_executor", "cpu_count", "wait", "as_completed",
"Future", "Executor", "ProcessPoolExecutor",
"BrokenProcessPool", "CancelledError", "TimeoutError",
"FIRST_COMPLETED", "FIRST_EXCEPTION", "ALL_COMPLETED",
"wrap_non_picklable_objects", "set_loky_pickler"]
"wrap_non_picklable_objects", "set_loky_pickler", "get_worker_id"]


__version__ = '3.0.0.dev0'
30 changes: 27 additions & 3 deletions loky/process_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,9 @@ def weakref_cb(_,
# A list of the ctx.Process instances used as workers.
self.processes = executor._processes

# A list of worker IDs for each process
pmla marked this conversation as resolved.
Show resolved Hide resolved
self.process_worker_ids = executor._process_worker_ids

# A ctx.Queue that will be filled with _CallItems derived from
# _WorkItems for processing by the process workers.
self.call_queue = executor._call_queue
Expand Down Expand Up @@ -668,6 +671,7 @@ def process_result_item(self, result_item):
# itself: we should not mark the executor as broken.
with self.processes_management_lock:
p = self.processes.pop(result_item, None)
self.process_worker_ids.pop(result_item, None)

# p can be None is the executor is concurrently shutting down.
if p is not None:
Expand Down Expand Up @@ -760,7 +764,10 @@ def kill_workers(self):
# terminates descendant workers of the children in case there is some
# nested parallelism.
while self.processes:
_, p = self.processes.popitem()
pid = list(self.processes.keys())[0]
pmla marked this conversation as resolved.
Show resolved Hide resolved
p = self.processes.pop(pid, None)
pmla marked this conversation as resolved.
Show resolved Hide resolved
self.process_worker_ids.pop(pid, None)

mp.util.debug('terminate process {}'.format(p.name))
try:
recursive_terminate(p)
Expand Down Expand Up @@ -962,6 +969,8 @@ def __init__(self, max_workers=None, job_reducers=None,
if context is None:
context = get_context()
self._context = context
if env is None:
env = {}
self._env = env

if initializer is not None and not callable(initializer):
Expand All @@ -983,8 +992,10 @@ def __init__(self, max_workers=None, job_reducers=None,
# Map of pids to processes
self._processes = {}

# Map of pids to process worker IDs
self._process_worker_ids = {}

# Internal variables of the ProcessPoolExecutor
self._processes = {}
self._queue_count = 0
self._pending_work_items = {}
self._running_work_items = []
Expand Down Expand Up @@ -1076,16 +1087,29 @@ def _adjust_process_count(self):
self._initargs, self._processes_management_lock,
self._timeout, worker_exit_lock, _CURRENT_DEPTH + 1)
worker_exit_lock.acquire()

worker_id = -1
if _CURRENT_DEPTH == 0:
used_ids = set(self._process_worker_ids.values())
available_ids = set(range(self._max_workers)) - used_ids
if len(available_ids):
worker_id = available_ids.pop()

try:
# Try to spawn the process with some environment variable to
# overwrite but it only works with the loky context for now.
env = self._env
if _CURRENT_DEPTH == 0 and self._env is not None:
pmla marked this conversation as resolved.
Show resolved Hide resolved
env = self._env.copy()
env['LOKY_WORKER_ID'] = str(worker_id)
p = self._context.Process(target=_process_worker, args=args,
env=self._env)
env=env)
pmla marked this conversation as resolved.
Show resolved Hide resolved
except TypeError:
p = self._context.Process(target=_process_worker, args=args)
p._worker_exit_lock = worker_exit_lock
p.start()
self._processes[p.pid] = p
self._process_worker_ids[p.pid] = worker_id
mp.util.debug('Adjust process count : {}'.format(self._processes))

def _ensure_executor_running(self):
Expand Down
8 changes: 8 additions & 0 deletions loky/worker_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import os


def get_worker_id():
wid = os.environ.get('LOKY_WORKER_ID', None)
pmla marked this conversation as resolved.
Show resolved Hide resolved
if wid is None:
return -1
return int(wid)
31 changes: 31 additions & 0 deletions tests/test_worker_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os
import time
import pytest
import numpy as np
from collections import defaultdict
from loky import get_reusable_executor, get_worker_id


def random_sleep(k):
rng = np.random.RandomState(seed=k)
duration = rng.uniform(0, 0.05)
t0 = time.time()
time.sleep(duration)
t1 = time.time()
wid = get_worker_id()
return (wid, t0, t1)


def test_worker_ids():
"""Test that worker IDs are always unique, with re-use over time"""
executor = get_reusable_executor(max_workers=4, timeout=2)
tomMoral marked this conversation as resolved.
Show resolved Hide resolved
results = executor.map(random_sleep, range(100))

all_intervals = defaultdict(list)
for wid, t0, t1 in results:
all_intervals[wid].append((t0, t1))
pmla marked this conversation as resolved.
Show resolved Hide resolved

for intervals in all_intervals.values():
intervals = sorted(intervals)
for i in range(len(intervals) - 1):
assert intervals[i + 1][0] >= intervals[i][1]