crumpets.dataloader module

class crumpets.dataloader.Consumer(result_address, control_address, recv_timeout=1000, bind=True)[source]

Bases: object

A Consumer retrieves and forward processed samples from workers.

Parameters
  • result_address – address to retrieve processed samples from, workers send their results to it

  • control_address – address to retrieve control messages from, such as exceptions raised in other processes

  • recv_timeout – time to wait in ms until another receiving attempt is made

  • bind – bind addresses instead of connecting to them

retrieve()[source]
retrieve_data()[source]
set_buffer_manager(buffer_manager)[source]
start()[source]

Starts the sample retriever thread and listen on the control stream.

stop()[source]

Stops all threads opened by this consumer.

class crumpets.dataloader.Slicer(iterable)[source]

Bases: object

class crumpets.dataloader.TurboDataLoader(iterable, batch_size, worker_template, nworkers, length=None, num_mini_batches=1, start_iteration=0, shared_memory=True)[source]

Bases: object

TurboDataLoader provides fast parallel loading and processing of input data. Use TorchTurboDataLoader for a version supporting gpu and pytorch tensors.

Always use the loader inside of a with statement, otherwise workers and consumer won’t start and stop.

`TurboDataLoader`s are intended to be used as iterators. Each iteration yields the following data structure:

By default iteration starts at 0 and counts the number of batches that the loader has yielded. The list contains as many mini-batches as specified by num_mini_batches. Note that the number of samples across all mini-batches is equal to batch_size, i.e., batch_size must be divisible by num_mini_batches. Finally each mini-batch is a dictionary that contains key-value-pairs produced by the workers. E.g., a ClassificationWorker produces keys ‘image’, ‘label’, and ‘augmentation’. Image and label are arrays and augmentation contains a list of one dictionary per sample in the batch with parameters used to create said sample.

Example usage:

model = make_some_model()
with loader:
    for epoch in range(epochs):
        for iteration, mini_batch in loader:
            for sample in mini_batch:
                sample = model(sample)
                images = sample['image']
                ...

Depending on parameters, the TurboDataLoaders starts several processes, some of which cannot be started with the standard “fork” method that Python uses in *nix systems. This can result in crashing with an obscure error message. Thus loaders need to be guarded against starting in non-main modules, i.e.:

if __name__ == "__main__":
    # stuff
    with loader:
        # other stuff
Parameters
  • iterable – An iterable providing a sample per iteration.

  • batch_size – The amount of samples per batch.

  • worker_template – An actual worker instance, determines the kind of processing. Has to inherit crumpets.broker.Worker.

  • nworkers – Number of workers processing the samples simultaneously. worker_template is copied to create them.

  • length – Specifies the length of the dataset. Defaults to the actual length of iterable (if available). If given differs from default, the number of iterations per epoch is modified accordingly.

  • num_mini_batches – Number of mini_batches per batch.

  • start_iteration – Start the iteration counter from this number. Useful when resuming training.

  • shared_memory – Whether to use shared memory to transfer data from workers. If 0 or False, shared memory is disabled. If True, 2*nworkers shared buffers will be used. If any number > 0, that number of buffers will be used. A value of 1 is strongly discouraged to prevent deadlocks. Permanently storing values returned by a loader may also cause deadlocks.

set_epoch_iterations(iterations)[source]

Set number of iterations in one epoch. Does not modify length. :param iterations: number of iterations per epoch

set_length(length)[source]

Set the length of enclosed iterable. Modifies epoch_iterations accordingly. :param length: len(iterable)

start()[source]

Start the processing pipeline.

stop()[source]

Stop the processing pipeline.

crumpets.dataloader.make_addresses(uid, pipeline, numbers=(('work', 1), ('consume', 1)))[source]
crumpets.dataloader.remove_files(files)[source]
crumpets.dataloader.remove_ipc_handles(handles)[source]