crumpets.broker module¶
- class crumpets.broker.BufferManager(batch_size, buffer_specs)[source]¶
Bases:
object
BufferManager is a compatibility class that replaces the SharedDictManager for cases where shared memory is not used by the pipeline. It creates buffers from buffer specs for use with the BufferWorker.
- next()[source]¶
Return the dictionary of buffers as defined by buffer specs.
- Returns
buffer dictionary
- class crumpets.broker.BufferWorker(buffer_manager=None, **kwargs)[source]¶
Bases:
crumpets.broker.Worker
Base class for workers that use constant-size buffers.
- Parameters
buffer_manager – Dict of buffer specs (shape, dtype, fill_value). fill_value is optional and defaults to 0. It must be either a scalar or iterable of length equal to the number of channels in the respective image.
param_groups – Dict of fixed parameter dicts. To be used in conjunction with buffers of the same key.
kwargs – Passed to broker.Worker.
- add_buffer(key, buf)[source]¶
Register a new buffer with the worker.
- Parameters
key – name of the buffer
buf – buffer spec or array to use as template
- add_params(key, params, default=None)[source]¶
Add a parameter group to the worker.
- Parameters
key – name of the parameters
params – parameter object, usually dictionary
default – default value to use if params is None
- get_buffer_manager()[source]¶
Returns the current buffer manager. May be None. :return: BufferManager or SharedBufferManager object
- abstract prepare(sample, batch, buffers)[source]¶
Implement this method to define the behavior of the BufferWorker subclass. Results must be written to buffers and/or batch object.
- Parameters
sample – individual sample object to process
batch – the object the sample belongs to; append values to lists as necessary
buffers – output buffers to use for this sample
- process(request)[source]¶
- Implement this method to define worker behavior.
Can return an iterable to create several batches from one input. This method can return an iterable or define a generator with the yield keyword. For instance:
process()
- .
- param data
multipart zmq message from Producer to process
- return
iterable of zmq messages to send to Consumer
- class crumpets.broker.Consumer(result_address, recv_timeout=2000, queue_length=3, bind=True, io_threads=1)[source]¶
Bases:
crumpets.broker.ThreadedConsumerBase
Basic threaded Consumer that receives und unpacks msgpack messages.
- class crumpets.broker.ConsumerBase(result_address, recv_timeout=1000, queue_length=3, bind=True, io_threads=1)[source]¶
Bases:
object
Abstract base class for Consumers, the final pipeline stage. Implement the _transform method to define subclass behavior.
- class crumpets.broker.Dispatcher(worker_template, nworkers, work_addresses, result_addresses, control_address, daemon=None, gpu_augmentation=None)[source]¶
Bases:
object
The Dispatcher creates worker processes from a worker template, can starts and stops them and monitor their status.
- Parameters
worker_template – instance of Worker subclass to use as template for workers; copy.copy is used to create as many objects as needed
nworkers – number of worker processes to start
work_addresses – list of work addresses to use; cycled through
result_addresses – list of result addresses to use; cycles through
control_address – control address workers can send status updates on
daemon – daemon flag for processes, see multiprocessing.Process
gpu_augmentation – bool passed to workers, true disables cpu augmentations where gpu versions are available in
randomizer
; if None worker_template.gpu_augmentation is used
- class crumpets.broker.Pipeline(worker_template, nworkers, iterable, batch_size, work_addresses, result_addresses, producer_kwargs=None, control_address=None, gpu_augmentation=None)[source]¶
Bases:
object
- class crumpets.broker.Producer(work_addresses, iterable, batch, queue_length=8, io_threads=1)[source]¶
Bases:
crumpets.broker.ProducerBase
Producer implementation that reads sequentially from arbitrary iterable objects. Items must be a msgpack messages that are understood by the workers. :param iterable:
iterable of msgpack messages
- Parameters
batch – batch size for workers
- class crumpets.broker.ProducerBase(work_addresses, daemon=True, queue_length=8, io_threads=1)[source]¶
Bases:
multiprocessing.context.Process
Abstract base class for producer processes. Producers are the first stage of the pre-processing pipeline that load data into memory and supply it to workers. Implement the yield_requests method to customize its behavior.
- Parameters
work_addresses – List of worker addresses the producer pushes work to; cycled through for load balancing
daemon – Flag whether this Producer is a daemon process; see multiprocessing.Process
queue_length – Length of send queue per worker socket
io_threads – Number of IO threads to use; 1 is fine for almost all cases
- class crumpets.broker.Proxy(in_address, out_address, queue_length=1, daemon=True)[source]¶
Bases:
multiprocessing.context.Process
Utility class that receives and redirects zmq PULL/PUSH streams.
- class crumpets.broker.ThreadedConsumerBase(result_address, recv_timeout=2000, queue_length=3, bind=True, io_threads=1)[source]¶
Bases:
crumpets.broker.ConsumerBase
Abstract base class for Consumers that receive and transform data on a separate thread. Implement the _transform method to define subclass behavior.
- class crumpets.broker.Worker(timeout=1000, daemon=True, gpu_augmentation=False)[source]¶
Bases:
object
Abstract base class for workers. Implement the process method to define the behavior of subclasses.
Note
set_addresses must be called before starting a worker. The
TurboDataLoader
does this for you.- Parameters
timeout – zmq socket timeout in milliseconds
daemon – set daemon flag - used in process
gpu_augmentation – set GPU augmentation flag
- abstract process(data)[source]¶
Implement this method to define worker behavior. Can return an iterable to create several batches from one input. This method can return an iterable or define a generator with the yield keyword. For instance:
process()
- .
- param data
multipart zmq message from Producer to process
- return
iterable of zmq messages to send to Consumer
- set_addresses(work, result, control)[source]¶
Set all required zmq addresses. Required before run can be invoked.
- Parameters
work – address where work is received on
result – results are pushed to this address
control – control message are sent here, e.g., exceptions that occurred while processing
- set_gpu_augmentation(val)[source]¶
Sets the gpu_augmentation flag to given value, true disables all cpu_augmentations for which a gpu version is available. Note that this does not directly activate usage of gpu augmentation, as for that a
randomizer
module is used, which usually theTurboDataLoader
takes care of.- Parameters
val – boolean flag
- crumpets.broker.make_buffer(batchsize, shape, dtype, fill_value)[source]¶
Create an array for a given batch size and buffer spec. Resulting array has shape = (batchsize,) + shape.
- Parameters
batchsize – size of the first dimension
shape – remaining shape of the array
dtype – numpy dtype of the array
fill_value – array comes pre-filled with this value
- Returns
array
- crumpets.broker.make_bufferspec(buf)[source]¶
Turn numpy.ndarray into buffer specification: :param buf:
np.ndarray or buffer spec
- Returns
tuple(shape, dtype, fill_value)
- crumpets.broker.make_fill_value(shape, dtype, fill_value: Union[int, float, Iterable] = 0)[source]¶
Create a numpy array for a given fill value. This array can be used to fill any array of the given shape and dtype, e.g., arr[:] = make_fill_value(arr.shape, arr.dtype, 17) will set all elements of arr to 17.
Note: An implicit first dimension for the batch size is added.
fill_value can be a scalar or iterable. Iterables are padded ith unit dimensions until they match the number of dimensions of the given shape, e.g.:
>>> make_fill_value((3, 224, 224), np.uint8, (1, 2, 3)) array([[[[1]], [[2]], [[3]]]], dtype=uint8)
The resulting fill value array has shape (1, 3, 1, 1).
- Parameters
shape – array shape
dtype – array dtype
fill_value – optional fill value(s)
- Returns
fill value array