Skip to content

multiprocess_data_loader

allennlp.data.data_loaders.multiprocess_data_loader

[SOURCE]


MultiProcessDataLoader

@DataLoader.register("multiprocess")
class MultiProcessDataLoader(DataLoader):
 | def __init__(
 |     self,
 |     reader: DatasetReader,
 |     data_path: DatasetReaderInput,
 |     *, batch_size: int = None,
 |     *, drop_last: bool = False,
 |     *, shuffle: bool = False,
 |     *, batch_sampler: BatchSampler = None,
 |     *, batches_per_epoch: int = None,
 |     *, num_workers: int = 0,
 |     *, max_instances_in_memory: int = None,
 |     *, start_method: str = "fork",
 |     *, cuda_device: Optional[Union[int, str, torch.device]] = None,
 |     *, quiet: bool = False,
 |     *, collate_fn: DataCollator = DefaultDataCollator()
 | ) -> None

The MultiProcessDataLoader is a DataLoader that's optimized for AllenNLP experiments.

See Using your reader with multi-process or distributed data loading for more information on how to optimize your DatasetReader for use with this DataLoader.

Parameters

  • reader : DatasetReader
    A DatasetReader used to load instances from the data_path.

  • data_path : DatasetReaderInput
    Passed to DatasetReader.read().

    Note

    In a typical AllenNLP configuration file, the reader and data_path parameters don't get an entry under the data_loader. The reader is constructed separately from the corresponding dataset_reader params, and the data_path is taken from the train_data_path, validation_data_path, or test_data_path.

  • batch_size : int, optional (default = None)
    When batch_sampler is unspecified, this option can be combined with drop_last and shuffle to control automatic batch sampling.

  • drop_last : bool, optional (default = False)
    When batch_sampler is unspecified, this option can be combined with batch_size and shuffle to control automatic batch sampling.

    If True, the last batch will be dropped if it doesn't contain a full batch_size number of Instances.

  • shuffle : bool, optional (default = False)
    When batch_sampler is unspecified, this option can be combined with batch_size and drop_last to control automatic batch sampling.

  • batch_sampler : BatchSampler, optional (default = None)
    A BatchSampler to handle batching. This option is mutually exclusive with batch_size, drop_last, and shuffle.

  • batches_per_epoch : int, optional (default = None)
    If specified, exactly batches_per_epoch batches will be generated with each call to __iter__().

  • num_workers : int, optional (default = 0)
    The number of workers to use to read Instances in parallel. If num_workers = 0, everything is done in the main process. Otherwise num_workers workers are forked or spawned (depending on the value of start_method), each of which calls read() on their copy of the reader.

    This means that in order for multi-process loading to be efficient when num_workers > 1, the reader needs to implement manual_multiprocess_sharding.

    Warning

    Multi-processing code in Python is complicated! We highly recommend you read the short Best practices and Common issues sections below before using this option.

  • max_instances_in_memory : int, optional (default = None)
    If not specified, all instances will be read and cached in memory for the duration of the data loader's life. This is generally ideal when your data can fit in memory during training. However, when your datasets are too big, using this option will turn on lazy loading, where only max_instances_in_memory instances are processed at a time.

    Note

    This setting will affect how a batch_sampler is applied. If max_instances_in_memory is None, the sampler will be applied to all Instances. Otherwise the sampler will be applied to only max_instances_in_memory Instances at a time.

    Therefore when using this option with a sampler, you should generally set it to a multiple of the sampler's batch_size (if it has one).

  • start_method : str, optional (default = "fork")
    The start method used to spin up workers.

    On Linux or OS X, "fork" usually has the lowest overhead for starting workers but could potentially lead to dead-locks if you're using lower-level libraries that are not fork-safe.

    If you run into these issues, try using "spawn" instead.

  • cuda_device : Optional[Union[int, str, torch.device]], optional (default = None)
    If given, batches will automatically be put on this device.

    Note

    This should typically not be set in an AllenNLP configuration file. The Trainer will automatically call set_target_device() before iterating over batches.

  • quiet : bool, optional (default = False)
    If True, tqdm progress bars will be disabled.

  • collate_fn : DataCollator, optional (default = DefaultDataCollator)

Best practices

  • Large datasets

    If your dataset is too big to fit into memory (a common problem), you'll need to load it lazily. This is done by simply setting the max_instances_in_memory parameter to a non-zero integer. The optimal value depends on your use case.

    If you're using a batch_sampler, you will generally get better samples by setting max_instances_in_memory to a higher number - such as 10 to 100 times your batch size - since this determines how many Instances your batch_sampler gets to sample from at a time.

    If you're not using a batch_sampler then this number is much less important. Setting it to 2 to 10 times your batch size is a reasonable value.

    Keep in mind that using max_instances_in_memory generally results in a slower training loop unless you load data in worker processes by setting the num_workers option to a non-zero integer (see below). That way data loading won't block the main process.

  • Performance

    The quickest way to increase the performance of data loading is adjust the num_workers parameter. num_workers determines how many workers are used to read Instances from your DatasetReader. By default, this is set to 0, which means everything is done in the main process.

    Before trying to set num_workers to a non-zero number, you should make sure your DatasetReader is optimized for use with multi-process data loading.

Common issues

  • Dead-locks

    Multiprocessing code in Python is complicated! Especially code that involves lower-level libraries which may be spawning their own threads. If you run into dead-locks while using num_workers > 0, luckily there are two simple work-arounds which usually fix the issue.

    The first work-around is to disable parallelism for these low-level libraries. For example, setting the environment variables OMP_NUM_THREADS=1 and TOKENIZERS_PARALLELISM=0 will do so for PyTorch and Numpy (for CPU operations) and HuggingFace Tokenizers, respectively.

    Alternatively, changing the start_method to "spawn" (when available, depending on your OS) may fix your issues without disabling parallelism for other libraries.

    See issue #4848 for more info.

    Dead-locks could also be caused by running out of shared memory (see below).

  • Shared memory restrictions

    Tensors are passed between processes using shared memory, and some systems impose strict limits on the allowed size of shared memory.

    Luckily this is simple to debug and simple to fix.

    First, to verify that this is your issue just watch your shared memory as your data loader runs. For example, run watch -n 0.3 'df -h | grep shm'.

    If you're seeing your shared memory blow up until it maxes-out, then you either need to decrease max_instances_in_memory or increase your system's ulimit.

    If you're using Docker, you can increase the shared memory available on a container by running it with the option --ipc=host or by setting --shm-size.

    See issue #4847 for more info.

index_with

class MultiProcessDataLoader(DataLoader):
 | ...
 | def index_with(self, vocab: Vocabulary) -> None

__iter__

class MultiProcessDataLoader(DataLoader):
 | ...
 | def __iter__(self) -> Iterator[TensorDict]

iter_instances

class MultiProcessDataLoader(DataLoader):
 | ...
 | def iter_instances(self) -> Iterator[Instance]

set_target_device

class MultiProcessDataLoader(DataLoader):
 | ...
 | def set_target_device(self, device: torch.device) -> None

WorkerError

class WorkerError(Exception):
 | def __init__(
 |     self,
 |     original_err_repr: str,
 |     traceback: List[str]
 | ) -> None

An error raised when a worker fails.