Python Multiprocessing Support
    • PDF

    Python Multiprocessing Support

    • PDF

    Article summary

    Multiprocessing timeouts and memory limits

    The GlasswallProcessManager class is designed to manage multiprocessing with a designated timeout and memory limit for each file being processed by the Glasswall engine.

    The GlasswallProcessManager consumes Task objects which must be created and added to the queue. A Task object consists of a function that will be called, and arguments and keyword arguments that will be passed to that function.

    The GlasswallProcessManager produces either a list of TaskResult objects once processing has completed, or yields individual TaskResult objects as they are completed. A TaskResult object contains attributes related to the processing of the file.

    TaskResult Attributes

    task: Task
    success: bool  # True if function did not raise an exception
    result: Any  # function return value
    exception: Union[Exception, None],  # the exception raised by the function
    exit_code: Union[int, None],  # multiprocessing.Process.exitcode, 0 = success
    timeout_seconds: Optional[float]  # time limit for each process
    memory_limit_in_gib: Optional[float]  # memory limit for each process, 1 gibibyte = 1024 ** 3 bytes
    start_time: float  # uses time.time(), current time in seconds since the Epoch
    end_time: float  # uses time.time(), current time in seconds since the Epoch
    elapsed_time: float  # end_time - start_time
    timed_out: bool  # terminated for exceeding the time limit: 'timeout_seconds'
    max_memory_used_in_gib: float  # the highest recorded memory usage of the process
    out_of_memory: bool  # terminated for exceeding the memory limit: 'memory_limit_in_gib'
    

    Producing a list of TaskResult objects once processing has completed

    In this example tasks are queued and processed in parallel up to the maximum number of workers, which by default is equal to the number of logical CPUs in the system.

    After all tasks are queued, processing begins automatically when exiting the GlasswallProcessManager context. Once all tasks are completed, the process_manager.task_results list attribute is populated with TaskResult objects that show the processing results.

    Once all tasks are completed, this example iterates process_manager.task_results in a for loop and prints each TaskResult object.

    import os
    import time
    
    import glasswall
    from glasswall.multiprocessing import GlasswallProcessManager, Task
    
    
    INPUT_DIRECTORY = r"C:\gwpw\input"
    OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
    LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"
    
    glasswall.config.logging.console.setLevel("CRITICAL")
    EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
    gw_policy = glasswall.content_management.policies.Editor(default="sanitise")
    
    
    def worker_function(*args, **kwargs):
        EDITOR.export_file(*args, **kwargs)
    
    
    def main():
        start_time = time.time()
        input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
        with GlasswallProcessManager(max_workers=None, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
            for input_file in input_files:
                relative_path = os.path.relpath(input_file, INPUT_DIRECTORY)
                output_file = os.path.join(OUTPUT_DIRECTORY, relative_path) + ".zip"
    
                task = Task(
                    func=worker_function,
                    args=tuple(),
                    kwargs=dict(
                        input_file=input_file,
                        output_file=output_file,
                        content_management_policy=gw_policy,
                    ),
                )
                process_manager.queue_task(task)
    
        for task_result in process_manager.task_results:
            print(task_result)
    
        print(f"Elapsed: {time.time() - start_time} seconds")
    
    
    if __name__ == "__main__":
        main()
    
    TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', outp..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507465.3883162, end_time=1710507466.5565898, elapsed_time=1.168273687362671, timed_out=False, max_memory_used_in_gib=0.06385421752929688, out_of_memory=False)
    TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', outpu..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507466.299694, end_time=1710507467.366209, elapsed_time=1.0665149688720703, timed_out=False, max_memory_used_in_gib=0.06365966796875, out_of_memory=False)
    TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507465.3763025, end_time=1710507467.7441902, elapsed_time=2.3678877353668213, timed_out=False, max_memory_used_in_gib=0.1662139892578125, out_of_memory=False)
    Elapsed: 6.226853370666504 seconds
    

    Yielding individual TaskResult objects as they are completed

    This example uses the external library tqdm to visualise progress during processing.

    Tasks are queued and processed in parallel up to the maximum number of workers, which by default is equal to the number of logical CPUs in the system.

    After all tasks are queued, processing begins within the GlasswallProcessManager context by invoking the process_manager.as_completed() generator method. Once any task is completed, its corresponding TaskResult object is yielded. This allows results to be accessed as they become available, rather than waiting for the completion of all tasks. The process_manager.task_results list attribute will not be populated.

    As each task is completed, this example prints the yielded TaskResult object.

    import os
    import time
    
    from tqdm import tqdm
    
    import glasswall
    from glasswall.multiprocessing import GlasswallProcessManager, Task
    
    
    INPUT_DIRECTORY = r"C:\gwpw\input"
    OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
    LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"
    
    glasswall.config.logging.console.setLevel("CRITICAL")
    EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
    gw_policy = glasswall.content_management.policies.Editor(default="sanitise")
    
    
    def worker_function(*args, **kwargs):
        EDITOR.export_file(*args, **kwargs)
    
    
    def main():
        start_time = time.time()
        input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
        with GlasswallProcessManager(max_workers=None, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
            for input_file in tqdm(input_files, desc="Queueing files", miniters=len(input_files) // 10):
                relative_path = os.path.relpath(input_file, INPUT_DIRECTORY)
                output_file = os.path.join(OUTPUT_DIRECTORY, relative_path) + ".zip"
    
                task = Task(
                    func=worker_function,
                    args=tuple(),
                    kwargs=dict(
                        input_file=input_file,
                        output_file=output_file,
                        content_management_policy=gw_policy,
                    ),
                )
                process_manager.queue_task(task)
    
            for task_result in tqdm(process_manager.as_completed(), total=len(input_files), desc="Processing tasks", miniters=len(input_files) // 100):
                print(task_result)
    
        print(f"Elapsed: {time.time() - start_time} seconds")
    
    
    if __name__ == "__main__":
        main()
    
    Queueing files: 100%|███████████████████████████████████████████████████████████| 3/3 [00:00<00:00, 2993.79it/s]
    Processing tasks:   0%|                                                                   | 0/3 [00:00<?, ?it/s]
    TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', outp..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507493.8832896, end_time=1710507496.1666203, elapsed_time=2.2833306789398193, timed_out=False, max_memory_used_in_gib=0.072662353515625, out_of_memory=False)
    Processing tasks:  33%|███████████████████▋                                       | 1/3 [00:04<00:08,  4.08s/it]
    TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', outpu..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507495.4604173, end_time=1710507497.5040953, elapsed_time=2.043678045272827, timed_out=False, max_memory_used_in_gib=0.06534576416015625, out_of_memory=False)
    Processing tasks:  67%|███████████████████████████████████████▎                   | 2/3 [00:05<00:02,  2.46s/it]
    TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507495.3951488, end_time=1710507498.389851, elapsed_time=2.9947023391723633, timed_out=False, max_memory_used_in_gib=0.1673431396484375, out_of_memory=False)
    Processing tasks: 100%|███████████████████████████████████████████████████████████| 3/3 [00:06<00:00,  2.11s/it] 
    Elapsed: 6.356592416763306 seconds
    

    Note that while the GlasswallProcessManager can handle large returns of data from the worker_function, holding this data in memory can quickly fill up the available RAM. When possible, it is advised not to return from the worker_function, and instead to rely on file to file processing.
    If processing files to disk is undesirable or returning the file bytes from the worker function is required, we recommend the following steps:

    • Limit max_workers to allow for at least 4 GiB of memory available for each process.
    • Use the as_completed generator.
    • Ensure file bytes are not retained after being yielded from as_completed so that the Python garbage collector will free up memory after the file bytes are no longer referenced.

    Yielding file bytes in file to memory mode and limiting max_workers

    This example uses the external library tqdm to visualise progress during processing.

    The worker_function has been modified to return the result of EDITOR.export_file, which will be either the export zip file's bytes, or None.

    The max_workers is limited based on the logical CPUs and RAM available.

    Tasks are queued and processed in parallel up to the specified number of workers.

    After all tasks are queued, processing begins within the GlasswallProcessManager context by invoking the process_manager.as_completed() generator method. Once any task is completed, its corresponding TaskResult object is yielded. This allows results to be accessed as they become available, rather than waiting for the completion of all tasks. The process_manager.task_results list attribute will not be populated.

    As each task is completed, this example prints the yielded TaskResult object, and if the task_result.result attribute is populated, it also prints information on the file size of the export zip file.

    import os
    import time
    
    from tqdm import tqdm
    
    import glasswall
    from glasswall.multiprocessing import GlasswallProcessManager, Task
    from glasswall.multiprocessing.memory_usage import get_available_memory_gib
    
    
    INPUT_DIRECTORY = r"C:\gwpw\input"
    OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
    LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"
    
    glasswall.config.logging.console.setLevel("CRITICAL")
    EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
    gw_policy = glasswall.content_management.policies.Editor(default="sanitise")
    
    
    def worker_function(*args, **kwargs):
        return EDITOR.export_file(*args, **kwargs)
    
    
    def main():
        start_time = time.time()
        available_memory_gib = get_available_memory_gib()
        print(f"Available memory: {available_memory_gib} GiB")
        # Set max_workers to lowest between cpu_count or available memory // 4 (4gib per process)
        cpu_count = os.cpu_count() or 1
        max_workers = int(min(cpu_count, available_memory_gib // 4))
        print(f"Max workers: {max_workers}")
    
        input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
        with GlasswallProcessManager(max_workers=max_workers, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
            for input_file in tqdm(input_files, desc="Queueing files", miniters=len(input_files) // 10):
                # No output_file specified, export_file will run in file to memory mode
                task = Task(
                    func=worker_function,
                    args=tuple(),
                    kwargs=dict(
                        input_file=input_file,
                        content_management_policy=gw_policy,
                    ),
                )
                process_manager.queue_task(task)
    
            for task_result in tqdm(process_manager.as_completed(), total=len(input_files), desc="Processing tasks", miniters=len(input_files) // 100):
                print(task_result)
                # Do something with export zip file bytes in memory
                if task_result.result:
                    print(f"Export zip file size is: {len(task_result.result)} bytes for input_file: '{task_result.task.kwargs['input_file']}'")
                # task_result no longer referenced and is garbage collected here, freeing up memory
    
        print(f"Elapsed: {time.time() - start_time} seconds")
    
    
    if __name__ == "__main__":
        main()
    
    Available memory: 13.020416259765625 GiB
    Max workers: 3
    Queueing files: 100%|█████████████████████████████████████████████████████████████████████| 3/3 [00:00<?, ?it/s]
    Processing tasks:   0%|                                                                   | 0/3 [00:00<?, ?it/s]
    TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', cont..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xceloX\xc7\x95\x89\xba\xce\x07\x00\x00-\x19\x00\x00\x19\..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509905.3188772, end_time=1710509908.5976403, elapsed_time=3.2787630558013916, timed_out=False, max_memory_used_in_gib=0.06348419189453125, out_of_memory=False)
    Export zip file size is: 97553 bytes for input_file: 'C:\gwpw\input\TestFile_11.doc'
    Processing tasks:  33%|███████████████████▋                                       | 1/3 [00:04<00:09,  4.99s/it]
    TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xcdloX\xca@\xc8\x8a\xf0\x1d\x00\x00k\xc6\x00\x00\x19\x00..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509905.1324441, end_time=1710509908.958884, elapsed_time=3.82643985748291, timed_out=False, max_memory_used_in_gib=0.166259765625, out_of_memory=False)
    Export zip file size is: 664734 bytes for input_file: 'C:\gwpw\input\PDFWithGifAndJpeg.pdf'
    Processing tasks:  67%|███████████████████████████████████████▎                   | 2/3 [00:05<00:02,  2.25s/it]
    TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', conte..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xcfloX\xc7\x95\x89\xba\xce\x07\x00\x00-\x19\x00\x00\x19\..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509907.833435, end_time=1710509910.3073368, elapsed_time=2.4739017486572266, timed_out=False, max_memory_used_in_gib=0.0728759765625, out_of_memory=False)
    Export zip file size is: 139697 bytes for input_file: 'C:\gwpw\input\TestFile_9.doc'
    Processing tasks: 100%|███████████████████████████████████████████████████████████| 3/3 [00:06<00:00,  2.23s/it] 
    Elapsed: 6.706916809082031 seconds
    

    API Documentation

    https://glasswall-python-wrapper-documentation.glasswall.com/


    Was this article helpful?

    What's Next