crumpets.broker module

class crumpets.broker.BufferManager(batch_size, buffer_specs)[source]

Bases: object

BufferManager is a compatibility class that replaces the SharedDictManager for cases where shared memory is not used by the pipeline. It creates buffers from buffer specs for use with the BufferWorker.

next()[source]

Return the dictionary of buffers as defined by buffer specs.

Returns

buffer dictionary

static pack(obj)[source]

Pack an object using msgpack. Any shared object are replaced by references.

Parameters

obj – object to pack

Returns

msgpack message bytes

static unpack(data)[source]

Unpack an msgpack message. Any shared object references are replaced with the object.

Parameters

data – msgpack message bytes

Returns

packed objects

class crumpets.broker.BufferWorker(buffer_manager=None, **kwargs)[source]

Bases: crumpets.broker.Worker

Base class for workers that use constant-size buffers.

Parameters
  • buffer_manager – Dict of buffer specs (shape, dtype, fill_value). fill_value is optional and defaults to 0. It must be either a scalar or iterable of length equal to the number of channels in the respective image.

  • param_groups – Dict of fixed parameter dicts. To be used in conjunction with buffers of the same key.

  • kwargs – Passed to broker.Worker.

add_buffer(key, buf)[source]

Register a new buffer with the worker.

Parameters
  • key – name of the buffer

  • buf – buffer spec or array to use as template

add_params(key, params, default=None)[source]

Add a parameter group to the worker.

Parameters
  • key – name of the parameters

  • params – parameter object, usually dictionary

  • default – default value to use if params is None

get_buffer_manager()[source]

Returns the current buffer manager. May be None. :return: BufferManager or SharedBufferManager object

abstract prepare(sample, batch, buffers)[source]

Implement this method to define the behavior of the BufferWorker subclass. Results must be written to buffers and/or batch object.

Parameters
  • sample – individual sample object to process

  • batch – the object the sample belongs to; append values to lists as necessary

  • buffers – output buffers to use for this sample

process(request)[source]
Implement this method to define worker behavior.

Can return an iterable to create several batches from one input. This method can return an iterable or define a generator with the yield keyword. For instance: process()

.
param data

multipart zmq message from Producer to process

return

iterable of zmq messages to send to Consumer

set_buffer_manager(buffer_manager)[source]

Set the buffer manager to be used by this worker. Can be None, in which case a BufferManager will be created as necessary.

Parameters

buffer_manager – a BufferManager or SharedBufferManager object, or None

class crumpets.broker.Consumer(result_address, recv_timeout=2000, queue_length=3, bind=True, io_threads=1)[source]

Bases: crumpets.broker.ThreadedConsumerBase

Basic threaded Consumer that receives und unpacks msgpack messages.

class crumpets.broker.ConsumerBase(result_address, recv_timeout=1000, queue_length=3, bind=True, io_threads=1)[source]

Bases: object

Abstract base class for Consumers, the final pipeline stage. Implement the _transform method to define subclass behavior.

retrieve()[source]
retrieve_data()[source]
stop()[source]
class crumpets.broker.Dispatcher(worker_template, nworkers, work_addresses, result_addresses, control_address, daemon=None, gpu_augmentation=None)[source]

Bases: object

The Dispatcher creates worker processes from a worker template, can starts and stops them and monitor their status.

Parameters
  • worker_template – instance of Worker subclass to use as template for workers; copy.copy is used to create as many objects as needed

  • nworkers – number of worker processes to start

  • work_addresses – list of work addresses to use; cycled through

  • result_addresses – list of result addresses to use; cycles through

  • control_address – control address workers can send status updates on

  • daemon – daemon flag for processes, see multiprocessing.Process

  • gpu_augmentation – bool passed to workers, true disables cpu augmentations where gpu versions are available in randomizer; if None worker_template.gpu_augmentation is used

active()[source]

True if any workers are alive.

start()[source]
stop()[source]
terminate()[source]
class crumpets.broker.Pipeline(worker_template, nworkers, iterable, batch_size, work_addresses, result_addresses, producer_kwargs=None, control_address=None, gpu_augmentation=None)[source]

Bases: object

start()[source]
stop()[source]
class crumpets.broker.Producer(work_addresses, iterable, batch, queue_length=8, io_threads=1)[source]

Bases: crumpets.broker.ProducerBase

Producer implementation that reads sequentially from arbitrary iterable objects. Items must be a msgpack messages that are understood by the workers. :param iterable:

iterable of msgpack messages

Parameters

batch – batch size for workers

yield_requests()[source]
class crumpets.broker.ProducerBase(work_addresses, daemon=True, queue_length=8, io_threads=1)[source]

Bases: multiprocessing.context.Process

Abstract base class for producer processes. Producers are the first stage of the pre-processing pipeline that load data into memory and supply it to workers. Implement the yield_requests method to customize its behavior.

Parameters
  • work_addresses – List of worker addresses the producer pushes work to; cycled through for load balancing

  • daemon – Flag whether this Producer is a daemon process; see multiprocessing.Process

  • queue_length – Length of send queue per worker socket

  • io_threads – Number of IO threads to use; 1 is fine for almost all cases

run()[source]

Method to be run in sub-process; can be overridden in sub-class

stop()[source]
yield_requests()[source]
class crumpets.broker.Proxy(in_address, out_address, queue_length=1, daemon=True)[source]

Bases: multiprocessing.context.Process

Utility class that receives and redirects zmq PULL/PUSH streams.

run()[source]

Method to be run in sub-process; can be overridden in sub-class

class crumpets.broker.ThreadedConsumerBase(result_address, recv_timeout=2000, queue_length=3, bind=True, io_threads=1)[source]

Bases: crumpets.broker.ConsumerBase

Abstract base class for Consumers that receive and transform data on a separate thread. Implement the _transform method to define subclass behavior.

retrieve()[source]
stop()[source]
class crumpets.broker.Value(*_, **__)[source]

Bases: object

class crumpets.broker.Worker(timeout=1000, daemon=True, gpu_augmentation=False)[source]

Bases: object

Abstract base class for workers. Implement the process method to define the behavior of subclasses.

Note

set_addresses must be called before starting a worker. The TurboDataLoader does this for you.

Parameters
  • timeout – zmq socket timeout in milliseconds

  • daemon – set daemon flag - used in process

  • gpu_augmentation – set GPU augmentation flag

inner()[source]
abstract process(data)[source]

Implement this method to define worker behavior. Can return an iterable to create several batches from one input. This method can return an iterable or define a generator with the yield keyword. For instance: process()

.
param data

multipart zmq message from Producer to process

return

iterable of zmq messages to send to Consumer

run()[source]

Starts the worker process.

set_addresses(work, result, control)[source]

Set all required zmq addresses. Required before run can be invoked.

Parameters
  • work – address where work is received on

  • result – results are pushed to this address

  • control – control message are sent here, e.g., exceptions that occurred while processing

set_gpu_augmentation(val)[source]

Sets the gpu_augmentation flag to given value, true disables all cpu_augmentations for which a gpu version is available. Note that this does not directly activate usage of gpu augmentation, as for that a randomizer module is used, which usually the TurboDataLoader takes care of.

Parameters

val – boolean flag

stop()[source]

Stops the worker process.

crumpets.broker.make_buffer(batchsize, shape, dtype, fill_value)[source]

Create an array for a given batch size and buffer spec. Resulting array has shape = (batchsize,) + shape.

Parameters
  • batchsize – size of the first dimension

  • shape – remaining shape of the array

  • dtype – numpy dtype of the array

  • fill_value – array comes pre-filled with this value

Returns

array

crumpets.broker.make_bufferspec(buf)[source]

Turn numpy.ndarray into buffer specification: :param buf:

np.ndarray or buffer spec

Returns

tuple(shape, dtype, fill_value)

crumpets.broker.make_fill_value(shape, dtype, fill_value: Union[int, float, Iterable] = 0)[source]

Create a numpy array for a given fill value. This array can be used to fill any array of the given shape and dtype, e.g., arr[:] = make_fill_value(arr.shape, arr.dtype, 17) will set all elements of arr to 17.

Note: An implicit first dimension for the batch size is added.

fill_value can be a scalar or iterable. Iterables are padded ith unit dimensions until they match the number of dimensions of the given shape, e.g.:

>>> make_fill_value((3, 224, 224), np.uint8, (1, 2, 3))
array([[[[1]], [[2]], [[3]]]], dtype=uint8)

The resulting fill value array has shape (1, 3, 1, 1).

Parameters
  • shape – array shape

  • dtype – array dtype

  • fill_value – optional fill value(s)

Returns

fill value array

crumpets.broker.unpack(obj)[source]