Source code for tensiometer.utilities.subprocess_runner

"""
This file contains some utilities that are useful to run a function inside an independent sub-process.

When the subprocess is finished the memory is fully released, which is useful to avoid memory leaks, especially from tensorflow.
"""

###########################################################################################
# Initial imports:

import copy
import datetime
import multiprocessing as mp
import time
from functools import wraps

import psutil

###########################################################################################
# Default Settings:

# These settings define the default behavior of the `run_in_process` decorator.

default_settings = {
    'subprocess': True,         # Whether to run the function in a subprocess.
    'feedback_level': 1,        # Level of feedback provided:
                                # 0 - None, 1 - Minimal, 2 - Medium, 3 - Full.
    'context': 'fork',          # Multiprocessing context: 'fork', 'spawn', or 'forkserver'.
    'monitoring': True,         # Enable or disable monitoring of the subprocess.
    'monitoring_frequency': 1,  # Frequency of monitoring updates, in seconds.
    'timeout': None,            # Maximum allowed runtime in seconds. None means no timeout.
                                # The timeout is rounded to the nearest monitoring frequency.
}

###########################################################################################
# Hard-Coded Settings:

# These constants define formatting options for feedback messages.

feedback_offset = '  '         # Indentation for feedback messages.
feedback_offset_2 = '   | '    # Indentation for nested feedback messages.
feedback_separator = '****************************************************************'
                                # Separator line for formatting feedback output.

###########################################################################################
# `run_in_process` Decorator:
# This decorator modifies a function to optionally run it in a separate subprocess
# with configurable feedback and monitoring.

[docs] def run_in_process(**kwargs): """ Decorator to run a function in a subprocess with optional feedback, monitoring, and timeout capabilities. :param subprocess: whether to execute in a subprocess. :param feedback_level: verbosity level from 0 to 3. :param context: multiprocessing context (``fork``, ``spawn`` or ``forkserver``). :param monitoring: enable monitoring of the subprocess. :param monitoring_frequency: interval, in seconds, for monitoring updates. :param timeout: maximum runtime in seconds; enables monitoring when set. :returns: wrapped function with subprocess capabilities. :raises ValueError: if supplied configuration values are invalid. """ # Update settings with the provided overrides. settings = copy.deepcopy(default_settings) for _k in kwargs.keys(): if _k in settings.keys(): settings[_k] = kwargs[_k] # Validate settings. if not isinstance(settings['subprocess'], bool): raise ValueError('subprocess must be a boolean.') if not isinstance(settings['feedback_level'], int): raise ValueError('feedback_level must be an integer.') if not settings['feedback_level'] in [0, 1, 2, 3]: raise ValueError('feedback_level must be 0, 1, 2, or 3.') if not settings['context'] in ['fork', 'spawn', 'forkserver']: raise ValueError("context must be 'fork', 'spawn', or 'forkserver'.") if not isinstance(settings['monitoring'], bool): raise ValueError('monitoring must be a boolean.') if not isinstance(settings['monitoring_frequency'], int): raise ValueError('monitoring_frequency must be an integer (in seconds).') if settings['timeout'] is not None and not isinstance(settings['timeout'], int): raise ValueError('timeout must be an integer (in seconds).') # Expand settings into individual variables. subprocess = settings['subprocess'] feedback_level = settings['feedback_level'] context = settings['context'] monitoring = settings['monitoring'] monitoring_frequency = settings['monitoring_frequency'] timeout = settings['timeout'] # If a timeout is set, monitoring must be enabled. if timeout is not None: monitoring = True def decorator(func): """ Inner decorator to wrap the target function. :param func: target function to decorate. :returns: wrapped function respecting the configured subprocess options. """ if not subprocess: # If not running in a subprocess, return the original function. return func @wraps(func) def wrapper(*args, **kwargs): """ Wrapper function to run the target function in a subprocess. :param args: positional arguments for the target function. :param kwargs: keyword arguments for the target function. :returns: result of the target function. """ # Record the start time of the process. global_start_time = datetime.datetime.now() func_name = func.__name__ result = None # Provide initial feedback based on the feedback level. if feedback_level > 0: print(feedback_separator, flush=True) print(f'* Running subprocess for function: {func_name}', flush=True) print(feedback_offset + f'Start time: {global_start_time.strftime("%Y-%m-%d %H:%M:%S")}', flush=True) print(feedback_separator, flush=True) if feedback_level > 1: print('* Settings:', flush=True) for key, value in settings.items(): print(feedback_offset + f'- {key}: {value}', flush=True) print(feedback_separator, flush=True) if feedback_level > 2: print('* Function arguments :', args, flush=True) print('* Function keyword args :', kwargs, flush=True) print(feedback_separator, flush=True) # Define the target function to be executed in the subprocess. def target_function(pipe_conn, *args, **kwargs): """ Execute the target function in a subprocess and send the result back. :param pipe_conn: pipe connection for inter-process communication. :param args: positional arguments for the target function. :param kwargs: keyword arguments for the target function. """ try: # Execute the function and send the result through the pipe. result = func(*args, **kwargs) pipe_conn.send(result) except Exception as e: # Send exceptions through the pipe for error handling. import sys import traceback exc_type, exc_value, exc_traceback = sys.exc_info() traceback_string = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback)) e.traceback = traceback_string pipe_conn.send(e) finally: pipe_conn.close() # Set up and start the subprocess. ctx = mp.get_context(context) parent_conn, child_conn = ctx.Pipe() # create the process: process = ctx.Process(target=target_function, args=(child_conn, *args), kwargs=kwargs) if feedback_level > 2: print('* Sub-process created.', flush=True) process.start() if feedback_level > 2: print('* Sub-process started.', flush=True) # initial memory usage: if monitoring: initial_memory = psutil.Process(process.pid).memory_info().rss / 1024 / 1024 if feedback_level > 1: print('* Initial memory usage:', initial_memory, 'MB', flush=True) print(feedback_separator, flush=True) peak_memory = initial_memory / 1024 / 1024 # Monitor the process and handle timeout. initial_time = time.time() if monitoring else None while True: # exit by process status: _process_status = process.is_alive() if feedback_level > 2: print('* Process is alive:', _process_status, flush=True) if not _process_status: if feedback_level > 2: print(f'* Process exited with code {process.exitcode}.', flush=True) if process.exitcode == 0: break else: raise Exception('Process exited with code %d' % process.exitcode) # monitoring: if monitoring: current_memory = psutil.Process(process.pid).memory_info().rss / 1024 / 1024 if feedback_level > 2: print('* Current memory usage:', current_memory, 'MB', flush=True) peak_memory = max(peak_memory, current_memory) # break if pipe is full: if parent_conn.poll(): if feedback_level > 2: print('* Breaking, sub-process pipe is full.', flush=True) break # break by timeout: if timeout and time.time() - initial_time > timeout: process.terminate() raise TimeoutError("Process timed out.") # sleep for monitoring frequency: time.sleep(monitoring_frequency) # Receive the result from the pipe result = parent_conn.recv() if feedback_level > 2: print('* Result received.', flush=True) # Wait for the process to finish and get the result: process.join() if feedback_level > 2: print('* Process joined.', flush=True) # monitoring stats if needed: if monitoring: final_time = time.time() total_time = final_time - initial_time if feedback_level > 0: print(feedback_separator, flush=True) print(f'* Total time elapsed: {total_time:.2f} seconds', flush=True) print(f'* Peak memory usage: {peak_memory:.2f} MB', flush=True) print(feedback_separator, flush=True) # Raise exceptions received from the subprocess. if isinstance(result, Exception): raise result return result return wrapper return decorator