Concurrent Processing Documentation
Documentation for thread.ConcurrentProcessing.
Why Concurrent Processing?
Concurrent Processing is used to speed up the data processing of large datasets by splitting workflow into multiple threads.
Traditionally, this is achieved with a for loop.
my_dataset = [] # Large dataset
def my_data_processor(Data_In) -> Data_Out:
...
processed_data = []
for data in my_dataset:
processed_data = my_data_processor(data)
print(processed_data) # Processed dataWhile this is simple and decent enough for a small dataset, this is not ideal for large datasets, especially when runtime matters.
By using thread.ConcurrentProcessing we can split the large dataset into multiple chunks and process each chunk simultaneously.
Concurrent Processing is not True Parallel. Learn more here.
How It Works
Determine Thread Count
The number of threads used is determined by the following formula:
thread_count = min(max_threads, len(dataset))This ensures that the number of threads used will always be less than or equal to the length of the dataset, which prevents redundant threads to be initialized for small datasets.
Chunking
The dataset is split as evenly as possible into chunks, preserving the order of data. Chunks follow the structure:
chunks = [[1, 2, 3, ...], [50, 51, 52, ...], ...]Let be the length of the dataset and let be the number of threads.
The individual chunk lengths decrease down the chunk list. The length of each chunk will can be either or .
The chunks generated are generators, meaning they will not take up much memory.
Importing the class
import thread
thread.ConcurrentProcessing
from thread import ConcurrentProcessingQuick Start
There are main 2 ways of initializing a concurrent processing object.
On-Demand
You can create a simple process by initializing thread.ConcurrentProcessing and passing the function and dataset.
def my_data_processor(Data_In) -> Data_Out: ...
# Recommended way
my_processor = ConcurrentProcessing(
function = my_data_processor,
dataset = [i in range(0, n)]
)
# OR
# Not the recommended way
my_processor = ConcurrentProcessing(my_data_processor, [i in range(0, n)])It can be ran by invoking the start() method
my_processor.start()Decorated Function
You can decorate a function with thread.processor which uses thread.ConcurrentProcessing.
When the decorated function is invoked, it will automatically be ran in a new thread each time and return a thread.ConcurrentProcessing object.
A decorated function's signature is overwritten, replacing the first argument to require a sequence of the Data_In type.
import thread
@thread.processor
def my_target(Data_In, arg1, arg2, *, arg3: bool = False) -> Data_Out: ...
dataset: Sequence[type[Data_In]]
worker = my_target(dataset, arg1, arg2, arg3 = True) # thread.ConcurrentProcessing()Did you know?
Decorators can take in keyword arguments that change the behavior of the thread.
import thread
@thread.processor(name = 'my_thread', suppress_errors = True)
def my_target(): ...See the full list of arguments here
Compatibility
Data processing is usually achieved with external libraries like pandas.
However, there is no native support for dataset objects without both of the __len__() and __getitem__() methods.
This is primarily because:
- The
__len__()method is used to determine the length of the dataset using thelen(dataset)method. - The
__getitem__()method is used to access the dataset using thedataset[index]method.
This is also why thread.ConcurrentProcessing does not support Generator objects or Iterator objects out of the box.
Work-around
We now non-natively support all most dataset types.
We stopped explicitly supporting the Sequence type and instead now use Protocols to check if __len__() or __getitem__() are implemented.
We also added context-specific optional/required _length and _get_value arguments when initializing thread.ConcurrentProcessing.
You can find out more about the valid _length and get_value arguments here.
Mapping
Has __len__ and __getitem__
Does not have __len__
and __getitem__
Does not have __len__ and has
__getitem__
Has __len__ and does
not have __getitem__
_length Required
_get_value Required
Example
Now you do not have to pre-convert the dataset to a supported dataset type.
from thread import ConcurrentProcessing
myDataFrame: ...
process = ConcurrentProcessing(
function = lambda x: x + 1,
dataset = myDataFrame,
_length = myDataFrame.getLength(),
_get_value = lambda d, i: d.getIndex(i)
)Static type checking will reflect the whether _length and _get_value are
required or optional depending on the dataset type.
Initialization
This will cover the required and optional arguments initializing a concurrent process.
Required
function
(Data_In, *args, **kwargs) -> Data_Out
This should be a function that takes in a data from the dataset with/without overloads and returns Data_Out.
Arguments and keyword arguments excluding the first argument parsed to the function can be parsed through args and kwargs.
Data_Out will be written to the generated thread's Thread._returned_value and can be accessed via ConcurrentProcessing.results or ConcurrentProcessing.get_return_values().
function can be parsed as the first argument to ConcurrentProcessing.__init__(), although it is recommended to use only keyword arguments.
import thread
thread.ConcurrentProcessing(lambda x: x + 1, [])
thread.ConcurrentProcessing(function = lambda x: x + 1, dataset = [])Best Practices
While you can use a lambda function, it is best to use a normal function for your LSP/Linter to infer types.
from thread import ConcurrentProcessing
worker = ConcurrentProcessing(function = lambda x: x + 1, dataset = [1, 2, 3])
worker.start()
worker.join()
worker.results # This will be inferred as Unknown by your LSP/Linterfrom thread import ConcurrentProcessing
def my_target(x: int) -> int:
return x + 1
worker = ConcurrentProcessing(function = my_target, dataset = [1, 2, 3])
worker.start()
worker.join()
worker.results # This will be inferred as a list[int]dataset
Dataset[Data_In]
This should be an interable sequence of data parsed as the first argument to function.
This can be of any type if you pass the according _length and _get_value arguments. See here for more details.
import thread
def my_function(x: int) -> int:
...
thread.ConcurrentProcessing(function = my_function, dataset = [1, 2, 3])
thread.ConcurrentProcessing(function = my_function, dataset = ('hi')) # This will be highlighted by your LSP/LinterOptional
max_threads
int
(default: 8)
This is the maximum number of threads that will be created by thread.ConcurrentProcessing.
This value is not always the number of threads created. See here for more details.
_get_value
(Dataset, int) -> Data_Out
(default: None)
dataset type. See here for more details.This is invoked every time a value is retrieved from the dataset.
from thread import ConcurrentProcessing
dataset: MyDatasetType = ...
ConcurrentProcessing(
function = my_function,
dataset = dataset,
_get_value = lambda d, index: d[index],
)
ConcurrentProcessing(
function = my_function,
dataset = dataset,
_get_value = lambda d, index: d.getIndex(index),
)_length
int | (Dataset) -> Data_In
(default: dataset.len)
dataset type. See here for more details.This is the length of the dataset that will be processed by thread.ConcurrentProcessing.
This is invoked only once when thread.ConcurrentProcessing is initialized.
from thread import ConcurrentProcessing
dataset: MyDatasetType = ...
ConcurrentProcessing(
function = my_function,
dataset = dataset,
_length = 5,
)
def get_length(dataset: MyDatasetType) -> int: ...
ConcurrentProcessing(
function = my_function,
dataset = dataset,
_length = get_length,
)*args / **kwargs
(default: None)
Any / Mapping[str, Any]
These overloads are parsed to thread.Thread.__init__(), then threading.Thread.__init__().
If kwargs contain an argument named args, then it will automatically be removed from kwargs and joined with ConcurrentProcessing.__init__().args.
See thread.Thread documentation for more details.
See threading documentation (opens in a new tab) for more details.
Properties
Attributes
These are attributes of thread.ConcurrentProcessing class.
results
List[Data_Out]
This is a list of the data that was returned by the function in
thread.ConcurrentProcessing.
status
thread.ThreadStatus
This is the current status of the thread.
These Are The Possible Values
Methods
These are methods of thread.ConcurrentProcessing class.
start
() -> None
This starts the processing.
Simply invoke ConcurrentProcessing.start() on a ConcurrentProcessing object.
import thread
worker = thread.ConcurrentProcessing(function = my_func, dataset = [1, 2, 3])
worker.start()Exceptions Raised
is_alive
() -> bool
This indicates whether the threads are still alive.
Simply invoke ConcurrentProcessing.is_alive() on a ConcurrentProcessing object.
import thread
worker = thread.ConcurrentProcessing(function = my_func, dataset = [1, 2, 3])
worker.is_alive()Exceptions Raised
get_return_values
() -> List[Data_Out]
This halts the current thread execution until the processing completes and returns the value returned by function.
Simply invoke ConcurrentProcessing.get_return_values() on a thread object.
import thread
worker = thread.ConcurrentProcessing(function = my_func, dataset = [1, 2, 3])
worker.get_return_values()Exceptions Raised
join
(timeout: float = None) -> None
This halts the current thread execution until the ConcurrentProcessing completes or exceeds the timeout.
A None value for timeout will have the same effect as passing float("inf") as a timeout.
Simply invoke ConcurrentProcessing.join() on a ConcurrentProcessing object.
import thread
worker = thread.ConcurrentProcessing(function = my_func, dataset = [1, 2, 3])
worker.join(5)
worker.join()Exceptions Raised
kill
(yielding: bool = False, timeout: float = 5) -> bool
This schedules the threads to be killed.
If yielding is True, it halts the current thread execution until the threads are killed or the timeout is exceeded.
Similar to ConcurrentProcessing.join(), a None value for timeout will have the same effect as passing float("inf") as a timeout.
Simply invoke ConcurrentProcessing.kill() on a ConcurrentProcessing object.
import thread
worker = thread.ConcurrentProcessing(function = my_func, dataset = [1, 2, 3])
worker.kill(True, 10)
worker.kill(False)
worker.kill()Exceptions Raised
This only schedules the threads to be killed, and does not immediately kill the threads.
Meaning that if function has a long time.wait() call, it will only be killed after it moves onto the next line.