multiprocess_data_loader
allennlp.data.data_loaders.multiprocess_data_loader
MultiProcessDataLoader¶
@DataLoader.register("multiprocess")
class MultiProcessDataLoader(DataLoader):
| def __init__(
| self,
| reader: DatasetReader,
| data_path: DatasetReaderInput,
| *, batch_size: int = None,
| *, drop_last: bool = False,
| *, shuffle: bool = False,
| *, batch_sampler: BatchSampler = None,
| *, batches_per_epoch: int = None,
| *, num_workers: int = 0,
| *, max_instances_in_memory: int = None,
| *, start_method: str = "fork",
| *, cuda_device: Optional[Union[int, str, torch.device]] = None,
| *, quiet: bool = False,
| *, collate_fn: DataCollator = DefaultDataCollator()
| ) -> None
The MultiProcessDataLoader is a DataLoader
that's optimized for AllenNLP experiments.
See
Using your reader with multi-process or distributed data loading
for more information on how to optimize your DatasetReader for use with this DataLoader.
Parameters¶
-
reader :
DatasetReader
ADatasetReaderused to load instances from thedata_path. -
data_path :
DatasetReaderInput
Passed toDatasetReader.read().Note
In a typical AllenNLP configuration file, the
readeranddata_pathparameters don't get an entry under thedata_loader. Thereaderis constructed separately from the correspondingdataset_readerparams, and thedata_pathis taken from thetrain_data_path,validation_data_path, ortest_data_path. -
batch_size :
int, optional (default =None)
Whenbatch_sampleris unspecified, this option can be combined withdrop_lastandshuffleto control automatic batch sampling. -
drop_last :
bool, optional (default =False)
Whenbatch_sampleris unspecified, this option can be combined withbatch_sizeandshuffleto control automatic batch sampling.If
True, the last batch will be dropped if it doesn't contain a fullbatch_sizenumber ofInstances. -
shuffle :
bool, optional (default =False)
Whenbatch_sampleris unspecified, this option can be combined withbatch_sizeanddrop_lastto control automatic batch sampling. -
batch_sampler :
BatchSampler, optional (default =None)
ABatchSamplerto handle batching. This option is mutually exclusive withbatch_size,drop_last, andshuffle. -
batches_per_epoch :
int, optional (default =None)
If specified, exactlybatches_per_epochbatches will be generated with each call to__iter__(). -
num_workers :
int, optional (default =0)
The number of workers to use to readInstancesin parallel. Ifnum_workers = 0, everything is done in the main process. Otherwisenum_workersworkers are forked or spawned (depending on the value ofstart_method), each of which callsread()on their copy of thereader.This means that in order for multi-process loading to be efficient when
num_workers > 1, thereaderneeds to implementmanual_multiprocess_sharding.Warning
Multi-processing code in Python is complicated! We highly recommend you read the short Best practices and Common issues sections below before using this option.
-
max_instances_in_memory :
int, optional (default =None)
If not specified, all instances will be read and cached in memory for the duration of the data loader's life. This is generally ideal when your data can fit in memory during training. However, when your datasets are too big, using this option will turn on lazy loading, where onlymax_instances_in_memoryinstances are processed at a time.Note
This setting will affect how a
batch_sampleris applied. Ifmax_instances_in_memoryisNone, the sampler will be applied to allInstances. Otherwise the sampler will be applied to onlymax_instances_in_memoryInstancesat a time.Therefore when using this option with a sampler, you should generally set it to a multiple of the sampler's
batch_size(if it has one). -
start_method :
str, optional (default ="fork")
The start method used to spin up workers.On Linux or OS X, "fork" usually has the lowest overhead for starting workers but could potentially lead to dead-locks if you're using lower-level libraries that are not fork-safe.
If you run into these issues, try using "spawn" instead.
-
cuda_device :
Optional[Union[int, str, torch.device]], optional (default =None)
If given, batches will automatically be put on this device.Note
This should typically not be set in an AllenNLP configuration file. The
Trainerwill automatically callset_target_device()before iterating over batches. -
quiet :
bool, optional (default =False)
IfTrue, tqdm progress bars will be disabled. -
collate_fn :
DataCollator, optional (default =DefaultDataCollator)
Best practices¶
-
Large datasets
If your dataset is too big to fit into memory (a common problem), you'll need to load it lazily. This is done by simply setting the
max_instances_in_memoryparameter to a non-zero integer. The optimal value depends on your use case.If you're using a
batch_sampler, you will generally get better samples by settingmax_instances_in_memoryto a higher number - such as 10 to 100 times your batch size - since this determines how manyInstancesyourbatch_samplergets to sample from at a time.If you're not using a
batch_samplerthen this number is much less important. Setting it to 2 to 10 times your batch size is a reasonable value.Keep in mind that using
max_instances_in_memorygenerally results in a slower training loop unless you load data in worker processes by setting thenum_workersoption to a non-zero integer (see below). That way data loading won't block the main process. -
Performance
The quickest way to increase the performance of data loading is adjust the
num_workersparameter.num_workersdetermines how many workers are used to readInstancesfrom yourDatasetReader. By default, this is set to0, which means everything is done in the main process.Before trying to set
num_workersto a non-zero number, you should make sure yourDatasetReaderis optimized for use with multi-process data loading.
Common issues¶
-
Dead-locks
Multiprocessing code in Python is complicated! Especially code that involves lower-level libraries which may be spawning their own threads. If you run into dead-locks while using
num_workers > 0, luckily there are two simple work-arounds which usually fix the issue.The first work-around is to disable parallelism for these low-level libraries. For example, setting the environment variables
OMP_NUM_THREADS=1andTOKENIZERS_PARALLELISM=0will do so for PyTorch and Numpy (for CPU operations) and HuggingFace Tokenizers, respectively.Alternatively, changing the
start_methodto "spawn" (when available, depending on your OS) may fix your issues without disabling parallelism for other libraries.See issue #4848 for more info.
Dead-locks could also be caused by running out of shared memory (see below).
-
Shared memory restrictions
Tensors are passed between processes using shared memory, and some systems impose strict limits on the allowed size of shared memory.
Luckily this is simple to debug and simple to fix.
First, to verify that this is your issue just watch your shared memory as your data loader runs. For example, run
watch -n 0.3 'df -h | grep shm'.If you're seeing your shared memory blow up until it maxes-out, then you either need to decrease
max_instances_in_memoryor increase your system'sulimit.If you're using Docker, you can increase the shared memory available on a container by running it with the option
--ipc=hostor by setting--shm-size.See issue #4847 for more info.
index_with¶
class MultiProcessDataLoader(DataLoader):
| ...
| def index_with(self, vocab: Vocabulary) -> None
__iter__¶
class MultiProcessDataLoader(DataLoader):
| ...
| def __iter__(self) -> Iterator[TensorDict]
iter_instances¶
class MultiProcessDataLoader(DataLoader):
| ...
| def iter_instances(self) -> Iterator[Instance]
set_target_device¶
class MultiProcessDataLoader(DataLoader):
| ...
| def set_target_device(self, device: torch.device) -> None
WorkerError¶
class WorkerError(Exception):
| def __init__(
| self,
| original_err_repr: str,
| traceback: List[str]
| ) -> None
An error raised when a worker fails.