Documentation
v2.0.5 Latest
Concurrent Processing

Concurrent Processing Documentation

Documentation for thread.ConcurrentProcessing.

Why Concurrent Processing?

Concurrent Processing is used to speed up the data processing of large datasets by splitting workflow into multiple threads.

Traditionally, this is achieved with a for loop.

my_dataset = [] # Large dataset
def my_data_processor(Data_In) -> Data_Out:
  ...
 
processed_data = []
for data in my_dataset:
  processed_data = my_data_processor(data)
 
print(processed_data) # Processed data

While this is simple and decent enough for a small dataset, this is not ideal for large datasets, especially when runtime matters. By using thread.ConcurrentProcessing we can split the large dataset into multiple chunks and process each chunk simultaneously.

💡

Concurrent Processing is not True Parallel. Learn more here.

How It Works

Determine Thread Count

The number of threads used is determined by the following formula:

thread_count = min(max_threads, len(dataset))

This ensures that the number of threads used will always be less than or equal to the length of the dataset, which prevents redundant threads to be initialized for small datasets.

Chunking

The dataset is split as evenly as possible into chunks, preserving the order of data. Chunks follow the structure:

chunks = [[1, 2, 3, ...], [50, 51, 52, ...], ...]

Let NN be the length of the dataset and let MM be the number of threads.

The individual chunk lengths decrease down the chunk list. The length of each chunk will can be either N/M+0.5+1\lfloor{N/M + 0.5}\rfloor + 1 or N/MN/M.

The chunks generated are generators, meaning they will not take up much memory.

Importing the class

import thread
thread.ConcurrentProcessing
 
from thread import ConcurrentProcessing

Quick Start

There are main 2 ways of initializing a concurrent processing object.

On-Demand

You can create a simple process by initializing thread.ConcurrentProcessing and passing the function and dataset.

def my_data_processor(Data_In) -> Data_Out: ...
 
# Recommended way
my_processor = ConcurrentProcessing(
  function = my_data_processor,
  dataset = [i in range(0, n)]
)
 
# OR
# Not the recommended way
my_processor = ConcurrentProcessing(my_data_processor, [i in range(0, n)])

It can be ran by invoking the start() method

my_processor.start()

Decorated Function

You can decorate a function with thread.processor which uses thread.ConcurrentProcessing. When the decorated function is invoked, it will automatically be ran in a new thread each time and return a thread.ConcurrentProcessing object.

A decorated function's signature is overwritten, replacing the first argument to require a sequence of the Data_In type.

import thread
 
@thread.processor
def my_target(Data_In, arg1, arg2, *, arg3: bool = False) -> Data_Out: ...
 
dataset: Sequence[type[Data_In]]
worker = my_target(dataset, arg1, arg2, arg3 = True) # thread.ConcurrentProcessing()

Did you know?

Decorators can take in keyword arguments that change the behavior of the thread.

import thread
 
@thread.processor(name = 'my_thread', suppress_errors = True)
def my_target(): ...

See the full list of arguments here

Compatibility

Data processing is usually achieved with external libraries like pandas. However, there is no native support for dataset objects without both of the __len__() and __getitem__() methods.

This is primarily because:

  • The __len__() method is used to determine the length of the dataset using the len(dataset) method.
  • The __getitem__() method is used to access the dataset using the dataset[index] method.

This is also why thread.ConcurrentProcessing does not support Generator objects or Iterator objects out of the box.

Work-around

We now non-natively support all most dataset types.

We stopped explicitly supporting the Sequence type and instead now use Protocols to check if __len__() or __getitem__() are implemented. We also added context-specific optional/required _length and _get_value arguments when initializing thread.ConcurrentProcessing.

You can find out more about the valid _length and get_value arguments here.

Mapping

Has __len__ and __getitem__

Does not have __len__ and __getitem__

Does not have __len__ and has __getitem__

Has __len__ and does not have __getitem__

_length Required

_get_value Required

Example

Now you do not have to pre-convert the dataset to a supported dataset type.

from thread import ConcurrentProcessing
 
myDataFrame: ...
 
process = ConcurrentProcessing(
  function = lambda x: x + 1,
  dataset = myDataFrame,
  _length = myDataFrame.getLength(),
  _get_value = lambda d, i: d.getIndex(i)
)

Static type checking will reflect the whether _length and _get_value are required or optional depending on the dataset type.

Initialization

This will cover the required and optional arguments initializing a concurrent process.

Required

function (Data_In, *args, **kwargs) -> Data_Out

This should be a function that takes in a data from the dataset with/without overloads and returns Data_Out.

Arguments and keyword arguments excluding the first argument parsed to the function can be parsed through args and kwargs. Data_Out will be written to the generated thread's Thread._returned_value and can be accessed via ConcurrentProcessing.results or ConcurrentProcessing.get_return_values().

function can be parsed as the first argument to ConcurrentProcessing.__init__(), although it is recommended to use only keyword arguments.

import thread
 
thread.ConcurrentProcessing(lambda x: x + 1, [])
thread.ConcurrentProcessing(function = lambda x: x + 1, dataset = [])

Best Practices

While you can use a lambda function, it is best to use a normal function for your LSP/Linter to infer types.

from thread import ConcurrentProcessing
 
worker = ConcurrentProcessing(function = lambda x: x + 1, dataset = [1, 2, 3])
worker.start()
worker.join()
 
worker.results # This will be inferred as Unknown by your LSP/Linter
from thread import ConcurrentProcessing
 
def my_target(x: int) -> int:
  return x + 1
 
worker = ConcurrentProcessing(function = my_target, dataset = [1, 2, 3])
worker.start()
worker.join()
 
worker.results # This will be inferred as a list[int]

dataset Dataset[Data_In]

This should be an interable sequence of data parsed as the first argument to function.

This can be of any type if you pass the according _length and _get_value arguments. See here for more details.

import thread
 
def my_function(x: int) -> int:
  ...
 
thread.ConcurrentProcessing(function = my_function, dataset = [1, 2, 3])
thread.ConcurrentProcessing(function = my_function, dataset = ('hi')) # This will be highlighted by your LSP/Linter

Optional

max_threads int (default: 8)

This is the maximum number of threads that will be created by thread.ConcurrentProcessing.

⚠️

This value is not always the number of threads created. See here for more details.

_get_value (Dataset, int) -> Data_Out (default: None)

⚠️
This can be a required argument depending on the dataset type. See here for more details.

This is invoked every time a value is retrieved from the dataset.

from thread import ConcurrentProcessing
 
dataset: MyDatasetType = ...
 
ConcurrentProcessing(
  function = my_function,
  dataset = dataset,
  _get_value = lambda d, index: d[index],
)
 
ConcurrentProcessing(
  function = my_function,
  dataset = dataset,
  _get_value = lambda d, index: d.getIndex(index),
)

_length int | (Dataset) -> Data_In (default: dataset.len)

⚠️
This can be a required argument depending on the dataset type. See here for more details.

This is the length of the dataset that will be processed by thread.ConcurrentProcessing.

This is invoked only once when thread.ConcurrentProcessing is initialized.

from thread import ConcurrentProcessing
 
dataset: MyDatasetType = ...
 
ConcurrentProcessing(
  function = my_function,
  dataset = dataset,
  _length = 5,
)
 
def get_length(dataset: MyDatasetType) -> int: ...
ConcurrentProcessing(
  function = my_function,
  dataset = dataset,
  _length = get_length,
)

*args / **kwargs (default: None) Any / Mapping[str, Any]

These overloads are parsed to thread.Thread.__init__(), then threading.Thread.__init__().

If kwargs contain an argument named args, then it will automatically be removed from kwargs and joined with ConcurrentProcessing.__init__().args.

See thread.Thread documentation for more details.
See threading documentation (opens in a new tab) for more details.

Properties

Attributes

These are attributes of thread.ConcurrentProcessing class.

results List[Data_Out]

This is a list of the data that was returned by the function in thread.ConcurrentProcessing.

💡
Raised when the thread is still running and cannot invoke the method. You can wait for the thread to terminate by calling `Thread.join()` or check the status with `Thread.status`.

status thread.ThreadStatus

This is the current status of the thread.

These Are The Possible Values

This means that the thread is idle and ready to be ran.

Methods

These are methods of thread.ConcurrentProcessing class.

start () -> None

This starts the processing.

Simply invoke ConcurrentProcessing.start() on a ConcurrentProcessing object.

import thread
 
worker = thread.ConcurrentProcessing(function = my_func, dataset = [1, 2, 3])
worker.start()
💡

Exceptions Raised

Raised when the thread is still running and cannot invoke the method. You can wait for the thread to terminate by calling `Thread.join()` or check the status with `Thread.status`.

is_alive () -> bool

This indicates whether the threads are still alive.

Simply invoke ConcurrentProcessing.is_alive() on a ConcurrentProcessing object.

import thread
 
worker = thread.ConcurrentProcessing(function = my_func, dataset = [1, 2, 3])
worker.is_alive()
💡

Exceptions Raised

Raised when the thread is not initialized and cannot invoke the method. You can initialize the thread by calling `Thread.__init__()`.

get_return_values () -> List[Data_Out]

This halts the current thread execution until the processing completes and returns the value returned by function.

Simply invoke ConcurrentProcessing.get_return_values() on a thread object.

import thread
 
worker = thread.ConcurrentProcessing(function = my_func, dataset = [1, 2, 3])
worker.get_return_values()
💡

Exceptions Raised

Raised when the thread is not initialized and cannot invoke the method. You can initialize the thread by calling `Thread.__init__()`.

join (timeout: float = None) -> None

This halts the current thread execution until the ConcurrentProcessing completes or exceeds the timeout. A None value for timeout will have the same effect as passing float("inf") as a timeout.

Simply invoke ConcurrentProcessing.join() on a ConcurrentProcessing object.

import thread
worker = thread.ConcurrentProcessing(function = my_func, dataset = [1, 2, 3])
worker.join(5)
worker.join()
💡

Exceptions Raised

Raised when the thread is not initialized and cannot invoke the method. You can initialize the thread by calling `Thread.__init__()`.

kill (yielding: bool = False, timeout: float = 5) -> bool

This schedules the threads to be killed.

If yielding is True, it halts the current thread execution until the threads are killed or the timeout is exceeded. Similar to ConcurrentProcessing.join(), a None value for timeout will have the same effect as passing float("inf") as a timeout.

Simply invoke ConcurrentProcessing.kill() on a ConcurrentProcessing object.

import thread
 
worker = thread.ConcurrentProcessing(function = my_func, dataset = [1, 2, 3])
worker.kill(True, 10)
worker.kill(False)
worker.kill()
💡

Exceptions Raised

Raised when the thread is not initialized and cannot invoke the method. You can initialize the thread by calling `Thread.__init__()`.
⚠️

This only schedules the threads to be killed, and does not immediately kill the threads.

Meaning that if function has a long time.wait() call, it will only be killed after it moves onto the next line.