Giter Club home page Giter Club logo

webdataset's Introduction

Test DeepSource

%matplotlib inline
import matplotlib.pyplot as plt
import torch.utils.data
import torch.nn
from random import randrange
import os
os.environ["WDS_VERBOSE_CACHE"] = "1"
os.environ["GOPEN_VERBOSE"] = "0"

The WebDataset Format

WebDataset format files are tar files, with two conventions:

  • within each tar file, files that belong together and make up a training sample share the same basename when stripped of all filename extensions
  • the shards of a tar file are numbered like something-000000.tar to something-012345.tar, usually specified using brace notation something-{000000..012345}.tar

You can find a longer, more detailed specification of the WebDataset format in the WebDataset Format Specification

WebDataset can read files from local disk or from any pipe, which allows it to access files using common cloud object stores. WebDataset can also read concatenated MsgPack and CBORs sources.

The WebDataset representation allows writing purely sequential I/O pipelines for large scale deep learning. This is important for achieving high I/O rates from local storage (3x-10x for local drives compared to random access) and for using object stores and cloud storage for training.

The WebDataset format represents images, movies, audio, etc. in their native file formats, making the creation of WebDataset format data as easy as just creating a tar archive. Because of the way data is aligned, WebDataset works well with block deduplication as well and aligns data on predictable boundaries.

Standard tools can be used for accessing and processing WebDataset-format files.

bucket = "https://storage.googleapis.com/webdataset/testdata/"
dataset = "publaynet-train-{000000..000009}.tar"

url = bucket + dataset
!curl -s {url} | tar tf - | sed 10q
PMC4991227_00003.json
PMC4991227_00003.png
PMC4537884_00002.json
PMC4537884_00002.png
PMC4323233_00003.json
PMC4323233_00003.png
PMC5429906_00004.json
PMC5429906_00004.png
PMC5592712_00002.json
PMC5592712_00002.png
tar: stdout: write error

Note that in these .tar files, we have pairs of .json and .png files; each such pair makes up a training sample.

WebDataset Libraries

There are several libraries supporting the WebDataset format:

  • webdataset for Python3 (includes the wids library), this repository
  • Webdataset.jl a Julia implementation
  • tarp, a Golang implementation and command line tool
  • Ray Data sources and sinks

The webdataset library can be used with PyTorch, Tensorflow, and Jax.

The webdataset Library

The webdataset library is an implementation of PyTorch IterableDataset (or a mock implementation thereof if you aren't using PyTorch). It implements as form of stream processing. Some of its features are:

  • large scale parallel data access through sharding
  • high performance disk I/O due to purely sequential reads
  • latency insensitive due to big fat pipes
  • no local storage required
  • instant startup for training jobs
  • only requires reading from file descriptors/network streams, no special APIs
  • its API encourages high performance I/O pipelines
  • scalable from tiny desktop datasets to petascale datasets
  • provides local caching if desired
  • requires no dataset metadata; any collection of shards can be read and used instantly

The main limitations people run into are related to the fact that IterableDataset is less commonly used in PyTorch and some existing code may not support it as well, and that achieving an exactly balanced number of training samples across many compute nodes for a fixed epoch size is tricky; for multinode training, webdataset is usually used with shard resampling.

There are two interfaces, the concise "fluid" interface and a longer "pipeline" interface. We'll show examples using the fluid interface, which is usually what you want.

import webdataset as wds
pil_dataset = wds.WebDataset(url).shuffle(1000).decode("pil").to_tuple("png", "json")

The resulting datasets are standard PyTorch IterableDataset instances.

isinstance(pil_dataset, torch.utils.data.IterableDataset)
True
for image, json in pil_dataset:
    break
plt.imshow(image)
<matplotlib.image.AxesImage at 0x7f73806db970>

png

We can add onto the existing pipeline for augmentation and data preparation.

import torchvision.transforms as transforms
from PIL import Image

preproc = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    lambda x: 1-x,
])

def preprocess(sample):
    image, json = sample
    try:
        label = json["annotations"][0]["category_id"]
    except:
        label = 0
    return preproc(image), label

dataset = pil_dataset.map(preprocess)

for image, label in dataset:
    break
plt.imshow(image.numpy().transpose(1, 2, 0))
<matplotlib.image.AxesImage at 0x7f7375fc2230>

png

WebDataset is just an instance of a standard IterableDataset. It's a single-threaded way of iterating over a dataset. Since image decompression and data augmentation can be compute intensive, PyTorch usually uses the DataLoader class to parallelize data loading and preprocessing. WebDataset is fully compatible with the standard DataLoader.

Here are a number of notebooks showing how to use WebDataset for image classification and LLM training:

The wds-notes notebook contains some additional documentation and information about the library.

The webdataset Pipeline API

The wds.WebDataset fluid interface is just a convenient shorthand for writing down pipelines. The underlying pipeline is an instance of the wds.DataPipeline class, and you can construct data pipelines explicitly, similar to the way you use nn.Sequential inside models.

dataset = wds.DataPipeline(
    wds.SimpleShardList(url),

    # at this point we have an iterator over all the shards
    wds.shuffle(100),

    # add wds.split_by_node here if you are using multiple nodes
    wds.split_by_worker,

    # at this point, we have an iterator over the shards assigned to each worker
    wds.tarfile_to_samples(),

    # this shuffles the samples in memory
    wds.shuffle(1000),

    # this decodes the images and json
    wds.decode("pil"),
    wds.to_tuple("png", "json"),
    wds.map(preprocess),
    wds.shuffle(1000),
    wds.batched(16)
)

batch = next(iter(dataset))
batch[0].shape, batch[1].shape
(torch.Size([16, 3, 224, 224]), (16,))

The wids Library for Indexed WebDatasets

Installing the webdataset library installs a second library called wids. This library provides fully indexed/random access to the same datasets that webdataset accesses using iterators/streaming.

Like the webdataset library, wids is high scalable and provides efficient access to very large datasets. Being indexed, it is easily backwards compatible with existing data pipelines based on indexed dataset, including precise epochs for multinode training. The library comes with its own ChunkedSampler and DistributedChunkedSampler classes, which provided shuffling accross nodes while still preserving enough locality of reference for efficient training.

Internally, the library uses a mmap-based tar file reader implementation; this allows very fast access without precomputed indexes, and it also means that shard and the equivalet of "shuffle buffers" are shared in memory between workers on the same machine.

This additional power comes at some cost: the library requires a small metadata file that lists all the shards in a dataset and the number of samples contained in each, the library requires local storage for as many shards as there are I/O workers on a node, it uses shared memory and mmap, and the availability of indexing makes it easy to accidentally use inefficient access patterns.

Generally, the recommendation is to use webdataset for all data generation, data transformation, and training code, and to use wids only if you need fully random access to datasets (e.g., for browing or sparse sampling), need an indexed-based sampler, or are converting tricky legacy code.

import wids

train_url = "https://storage.googleapis.com/webdataset/fake-imagenet/imagenet-train.json"

dataset = wids.ShardListDataset(train_url)

sample = dataset[1900]

print(sample.keys())
print(sample[".txt"])
plt.imshow(sample[".jpg"])
dict_keys(['.cls', '.jpg', '.txt', '__key__', '__dataset__', '__index__', '__shard__', '__shardindex__'])
a high quality color photograph of a dog


https://storage.googleapis.com/webdataset/fake-ima base: https://storage.googleapis.com/webdataset/fake-imagenet name: imagenet-train nfiles: 1282 nbytes: 31242280960 samples: 128200 cache: /tmp/_wids_cache





<matplotlib.image.AxesImage at 0x7f7373669e70>

png

There are several examples of how to use wids in the examples directory.

Note that the APIs between webdataset and wids are not fully consistent:

  • wids keeps the extension's "." in the keys, while webdataset removes it (".txt" vs "txt")
  • wids doesn't have a fully fluid interface, and add_transformation just adds to a list of transformations
  • webdataset currently can't read the wids JSON specifications

Installation and Documentation

$ pip install webdataset

For the Github version:

$ pip install git+https://github.com/tmbdev/webdataset.git

Here are some videos talking about WebDataset and large scale deep learning:

Dependencies

The WebDataset library only requires PyTorch, NumPy, and a small library called braceexpand.

WebDataset loads a few additional libraries dynamically only when they are actually needed and only in the decoder:

  • PIL/Pillow for image decoding
  • torchvision, torchvideo, torchaudio for image/video/audio decoding
  • msgpack for MessagePack decoding
  • the curl command line tool for accessing HTTP servers
  • the Google/Amazon/Azure command line tools for accessing cloud storage buckets

Loading of one of these libraries is triggered by configuring a decoder that attempts to decode content in the given format and encountering a file in that format during decoding. (Eventually, the torch... dependencies will be refactored into those libraries.)

webdataset's People

Contributors

airsplay avatar asavpatel92 avatar drscotthawley avatar elicassion avatar g1910 avatar gabrielilharco avatar gui-miotto avatar hal-314 avatar haooooooqi avatar jacobbieker avatar jeadie avatar jpc avatar kaushikb11 avatar kdonbekci avatar kulits avatar liamjwang avatar mranzinger avatar mynameisvinn avatar nateraw avatar pranshubansaldev avatar royjames avatar sahiljain314 avatar snyk-bot avatar tmbdev avatar tmbnv avatar tshimko126 avatar usr-ein avatar voegtlel avatar yabosu avatar yncxcw avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

webdataset's Issues

Serious drop in data loading speed observed

Has anyone else noticed download issues with webdataset, about 10x drop in data loading speed? I'm observing slowdowns with everything (on GCS at least), for example also the @tmbdev 's openimages dataset...

Please see the gist here. Should be ready to run.

In my earlier benchmarks I was able to get about 2000 img/s with 8 processes with the above script (50 minibatches of size 256 in about 6.5 s), but now I'm getting about 400 img/s top.

I'm on a GCP VM. Tried downgrading various libraries, spun up a VM from an image from last July etc... gsutil multiprocessing downloads from GCS are still very fast (~570MiB/s => 11400 openimages img/s)

I'm totally confused what's going on!

> Installing awscli using `pip install awscli` and then using this code:

Installing awscli using pip install awscli and then using this code:

'pipe:aws s3 cp s3://bucket/file.tar -'

seems to work. However, is it efficient?
What are the best options for when we do/dont have sudo access?
Why doesn't the s3cmd get work?

often the "broken pipe" error occurs at some point throughout the training, more frequent with the aws, but happened with google storage as well.

I am also getting the broken pipe message when using awscli but only for specific files. It's strange since I can easily download those files using awscli manually.

Originally posted by @Nilabhra in #21 (comment)

Reading multiple shards in parallel, while treating each shard as a sequence of data (e.g. video/audio)

Hi,

This is a very nice project and thanks for sharing it!

I'm wondering if there is a way to read multiple shards in parallel, while treating each shard as a sequence of data, such as video or audio. For example, assuming there are two .tar files each including the frames of a video. The code snippet I used for loading them in parallel is:

ds = wds.Dataset([
    "video-0.tar", 
    "video-1.tar"]).decode()

batch_size=2
dataset = (
        wds.dataset.Processor(ds, wds.map, my_sample_decoder) 
        .batched(batch_size))

dataloader = torch.utils.data.DataLoader(dataset, num_workers=2, batch_size=None,)

for sample in dataloader:
    # process each batched sample#

The sequence of frames loaded in each iteration are: [frame_0_in_video_0, frame_1_in_video_0], then [frame_0_in video_1, frame_1_in_video_1], then [frame_2_in_video_0, frame_3_in_video_0] and so on, while the behavior I was trying to implement is: [frame_0_in_video_0, frame_0_in_video_1], then [frame_1_in_video_0, frame_1_in_video_1] and so on.

Is there a reason the decode function excludes tif images?

Hi.

I'm trying to use webdataset to access my training samples more efficiently (avoid the 100000 files problem on my disk).
After a while I realised that the decode("pil") method cannot decode tif images (my source is a massive orthodox photo geotiff >320GB that I tile into 512x512px tifs)? I tried to follow the docs' decode notebook to build a custom tif decoder but have not had any success there... As a test, I used PIL.Image directly to read my tifs and it converts them happily.
Is there a reason this is not allowed in the Webdataset codebase (the auto decode function does not list tif as a supported format)...?

Are there any more examples of custom decode implementations?

Cheers,
C

Include cache related args in Dataset class

Feature Request

The wds.WebDataset class lets you define cache_dir, cache_size, cache_name, and cache_verbose args. It would be helpful to be able to provide these when instantiating classes directly with wds.Dataset. Something like this:

from webdataset import Dataset

ds = Dataset(urls, cache_dir='my_cache/')

Motivation

I have been subclassing wds.Dataset and would like my subclasses to have ability to cache. I chose this as the parent class for my use case as I noticed its just wrapping calls to wds.WebDataset.

Potential Solution

I'll submit a simple PR that does this w/o any breaking changes, but if I am instead mistaken and missing functionality from elsewhere, let me know! ๐Ÿ˜„

Is MultiDataset a complete replacement for torch.utils.data.DataLoader?

When using webdataset with pytorch-lightning, I discovered that if I pass dataloaders to pytorch-lightning as instances of MultiDataset, training will stall on epoch 0. Once I changed the dataloaders to be instances of torch.utils.data.DataLoader instead, the pytorch-lightning trainer behaved as expected.

Is MultiDataset supposed to completely replace torch.utils.data.DataLoader? If so, is there a way to make it work with pytorch-lightning?

True random access (i.e. Dataset vs. IterableDataset)

Hi, this is a really neat library! However it would be nice to have a simpler interface to TAR files that allows random access, instead of enforcing sequential access. Let me explain why.

The most common use-case for academic labs such as mine are not terabyte-scale remote datasets where sharding is common, but several gigabytes datasets with a few million small images.

So, a dataset fits in a single machine; but due to automatic scheduling in a cluster we cannot have all datasets in all machines. In this context, the easiest solution is to copy the dataset to the local machine at the start of training. However, millions of small files really strain the filesystem (both for copying and accessing during training). So copying and reading from a single TAR file would be ideal -- and WebDataset seems (on the surface) to do this well.

But the constraints of the IterableDataset are maybe a step too far. We now have to make decisions about how many shards to use, sizes of rolling buffers for shuffling, etc. This adds a ton of friction, and the uneasy feeling that now the samples are not sufficiently IID for training if you make a wrong decision. Compare this to the ease of training with tons of small images in a filesystem.

I was trying to use WebDataset and get colleagues to adopt it, but this is a big wall for adoption. Uncompressed TAR files allow random access. Could this not be a simpler entry-point for WebDataset? A user who wants to scale things up would find it easier to adapt then to the IterableDataset, but I think that many users would be perfectly happy with the random access version, which is much less restrictive.

torchvision warning

I get the following warning when using the torch_video decoder:
torchvision/io/video.py:116: UserWarning: The pts_unit 'pts' gives wrong results and will be removed in a follow-up version. Please use pts_unit 'sec'.

The solution would be to change line 221 in webdataset/autodecode.py:

return torchvision.io.read_video(fname)

to:

return torchvision.io.read_video(fname, pts_unit='sec')

Cannot specify ten/tb when using TarWriter

I found that numpy array will be encoded to bytearray, which has conflict with the assertion of bytes.

sample = {
        "__key__": f"{i:05}",
        "tb": numpy_array,
        "cls": label
}

image

image

What is the purpose of the library, if you have to download the whole dataset?

The imagenet example (https://github.com/tmbdev/pytorch-imagenet-wds) seems to demonstrate that the library is completely useless.

First of all, what is a "shard"? Why do I need to "shard" a dataset? What is this about, where is the documentation for that?

Second, how can a library called "WEB Dataset" demand that you download the dataset in order to "shard" it and only then use it?

I need to train a model with imagenet, but I do not have hard drive space to download it. It seems that webdataset doesn't help me at all. What does this says about multi-terabyte datasets? Either the example was horrible, or the library is very questionable in terms of achieving what it aims to achieve.

Usage with s3 image buckets

Hi,

I'm trying to find a solution to stream images from s3 buckets in PyTorch, something like: bucket/{image_ids}.jpg for tons of images.
Suppose I cannot change way data are stored, is it possible to use this project to stream from s3 buckets?

Thank you!

What is the meaning of `maxsize` argument for `ShardWriter`?

I have hard time figuring out what ShardWriter(..., maxsize=32e6) actually means. I assume it's the size of each shard, but it's not consistent with my observations. I guess it's measured in bytes, in which case 32e6 = 32MB.

I stored small png images and each shard was consistently larger than 32MB (from 60 to 80MB). I moved to tar.gz with compress=True and the size started to closely match 32MB which is already odd.
Later I had to move to raw pth files for my examples, and each shard was around 9MB... Tried uncompressed version and it jumped to 39MB.

I guess the png case could be caused by internal fragmentation, because each file was ~1KB (< 4KB), but I'm not sure if that's an issue in a tar archive. Compression would be highly ineffective for png so I guess it would at most remove the internal fragmentation which led to accurate size.
In case of the pth I'm puzzled. Compression rate is much higher, since raw image arrays have a lot of redundancy so it makes sense that 32MB archive could be compressed down to 9MB. On the other hand, I have no idea why the uncompressed version is slightly bigger?

allow starting shard numbering above 0

I am saving a persistent stream of data over long periods of time and saving them. Occasionally the script dies and I need to restart it. However, then it will overwrite the original files. It would be nice to be able to start from some number > 0 in these cases. The fix is relatively easy.

On this line in the ShardWriter class (the constructor for ShardWriter class) just add an optional parameter shard_num with default value 0 (i.e. shard_num=0, and then in the constructor (line 302) just set the shard to this number: self.shard = shard_num.

Memory Issue when using Multidataset + AIStore + Horovod

Problem description:
Possible memory leak / free up memory issue or multi processing issue when using Multidataset instead of the pytorch Dataloader.
Setup:
AIstore locally as the storage backend.
Webdataset:

url = "{}{}-{{000000..{:06d}}}.tar".format(
            ais_path, self._config['Datahandler']['name']['train'], num_shards)
url = f"pipe:curl -L -s {url} || true"
...
_ds = wds.Dataset(url, length = num_batches, shard_selection = worker_urls)\
                              .shuffle(self._config['Datahandler']['shuffle_buffer_size'])\
                              .decode('pil')\
                              .to_tuple("jpg;png;pyd;txt;ppm", "json;cls")\
                              .map_tuple(self._model_module.transform_train, self._model_module.transform_target)
...
dataloader = wds.MultiDataset(_ds, workers = num_workers, nominal=nominal).batched(batch_size)

Training backend: PyTorch Lightning with horovod as ddp strategy.
Num workers: 4/gpu
Num Gpus: 2-16 (configurable)

So what happens is that the training starts and the Multidatasets are setup in each of horovod/mpirun spawned processes according to the configuration. Then the memory starts filling up and I am able to identify that somehow there is a whole bunch of python processes (of the script that does the training and which is called via horovodrun -np python <path_to_entry_point.py>) that do not seem to finish but each occupy around 1-2 GB of memory.
They never seem to be joined/ finished and hence eventually the memory of the machine will be full after which swap will be filled and the whole training process obviously stalls.

I tried changing the order of calling shuffling as shown in the webdataset repository unittest of the Multidataset but it did not change the behavior.

Is there something I am missing here? Btw. training with ResizedDataset and the PyTorch dataloader works fine even though the Dataloader throws a bunch of warnings because of inconsistent dataset lengths returned (but I think you mention this strange behavior in the Readme and thats why there is Multidataset).

How to undersample a class with WebDataset?

Hello,

I'm using WebDataset to store and load ~400 tar files containing 40k samples, with 89% of the data being class 0 and the other 11% class 1. I want to undersample the class 0 to obtain a balanced dataset, and I can't seem to do it. I've tried to use WeightedRandomSampler but it doesn't work with IterableDataset, so what is the best approach to do this with a WebDataset?
I'd like to avoid recreating tar files with only 10% of my class 0 because ideally I'd like to sample 10% of my class 0 randomly when the Dataloader is created.

I'm open to all ideas, thanks in advance!

Specify named parameter to WebDataset constructor

I think you have to make this transformation for all your WebDataset class initialisation.

ds = wds.WebDataset("testdata/imagenet-000000.tgz", 1000, decoder="pil")
=>
ds = wds.WebDataset("testdata/imagenet-000000.tgz", size=1000, decoder="pil")

ShardWriter for numpy arrays

Simple question: if I am using a custom Pytorch dataset that returns a few numpy arrays (read from HDF5 files), how should I setup ShardWriter? I get "no handler for data" when I pass in normally (encoder=True below), and "ValueError: data doesn't map to a bytes after encoding (<class 'numpy.ndarray'>)" when encoder=False. Looking through the code, it looks like tenbin should be used, I'm just not sure how to use it. Do the dictionary keys have to correspond to handler extensions? They aren't custom key names? What do I do for multiple numpy arrays?

with wds.ShardWriter('shards/shard-%06d.tar',maxcount=1000,encoder=False) as sink:
    for idx,(X,target,index,weight) in enumerate(dataset):
        sink.write({
            '__key__': "%06d"%index,
            'data': X,
            'target': target,
            'weight': weight
            })

Using MultiDataset and ResizedDataset simultaneously gives rise to an AttributeError

Hello,
when I used ResizedDataset and MultiDataset, I encountered the following error:

# Used code
import webdataset as wds
dataset = wds.Dataset(url).shuffle(100).decode()
dataset = wds.ResizedDataset(dataset, 5)
loader = wds.MultiDataset(dataset, workers=2, nominal=5, pin_memory=False).shuffle(2).batched(2)
for sample in loader:
   ...
Process Process-1:
Traceback (most recent call last):
...
  File ".../webdataset/multi.py", line 64, in _parallel_job
    D("job", i, "done", dataset.sample_urls)
AttributeError: 'ResizedDataset' object has no attribute 'sample_urls'
Process Process-2:
Traceback (most recent call last):
...
  File ".../webdataset/multi.py", line 64, in _parallel_job
    D("job", i, "done", dataset.sample_urls)
AttributeError: 'ResizedDataset' object has no attribute 'sample_urls'

Please check out this issue.
Thank you.

Issue next release?

Is the current version suitable for release?

I'm trying to package a webdataset dependent app for google's ai-platform, and it can install from pip, but not from git.

Thanks!

Help in creating tar archive for webdataset using tarfile python package

Hello,

Could someone help me with what the equivalent to (from your docs) -

$ tar --sort=name -cf dataset.tar dataset/

would be using the tarfile Python package. The above command is able to collate the files with a common basename into one group with a common key. It seems to me that this is taken care of by the --sort=name param.

I am trying to create a similar tar archive using the tarfile python package. This is the code that I'm using so far -

     with tarfile.open(archive_path, "w") as tar:
         tar.add(dataset_path, arcname=os.path.basename(dataset_path))

But this doesn't group the files with a common basename into one key, as is required by webdataset to process a sample.

(I've already tried Webdataset's TarWriter, but with a large dataset like the one I'm using, my system runs out of memory when I create a tar archive using TarWriter)

Any help is appreciated, thanks!

npy support

in many cases reading tensor data using numpy.load is many times faster than loading a pickle file. Would it be possible to add it to the basic_handlers?

Not compatible with python 3.8+?

when trying to install the library I'm getting the following error. Is that supposed to happen and python 3.6 is a hard requirement or is it a bug?

Requirement already satisfied: dataclasses in /envs/pfe_submit/lib/python3.8/site-packages/dataclasses-0.8-py3.8.egg (from torch->webdataset==0.1.49) (0.8)
ERROR: Package 'dataclasses' requires a different Python: 3.8.5 not in '>=3.6, <3.7'

Best,
Bruno

[Question] DALI and object detection

Hi, does webdataset support NVIDIA DALI? If so, are there any examples? If not, do you happen to have an idea of how the data loading speed would compare (say for example using V100's and dali.ops.COCOReader)?


Context for my question if you have the time:

I'm looking for the fastest way to load data in PyTorch for a very large (2TB) object detection dataset.
I'm using AWS, and my required region is limited to S3, EBS, or EFS for data storage.

My first consideration was DALI + sharded COCO files as in this example, however, since my dataset is very large, I suffer from large transfer overhead when starting training jobs (I use SageMaker, so the entire training set is transfered from S3 to a mounted EBS volume when training is kicked off [or alternatively, a slower and more expensive persistent EFS volume can be mounted]).

I would like to set up a named pipe to stream data from S3 to my training instances (distributed horovod), but am unsure how fast the data loading would compare to the DALI+COCO option. Any advice would be greatly appreciated.

How should I write the targets such that it is automatically recognized as a torch.LongTensor instead of Int?

First of all, thanks for the amazing lib! I hope it makes it into PyTorch core soon.

I see that the imagenet example performs no casting to long in order to work with nn.CrossEntropyLoss:
https://github.com/tmbdev/pytorch-imagenet-wds/blob/master/main-wds.py#L397

How is this behavior achieved? I don't have access to the imagenet dataset right now, so don't really know what is the type of ds.targets.

I have tried defining an identity function that returns a np.int64 (long) copy of the input, but that seems to break automatic batching (yielding a list of scalar tensors).

Thanks in advance!

How exactly does shuffling work in webdataset?

I found this project today and it looks very promising for my work. However, I can't really find information about how shuffling is handled with webdataset.

I see that I can do shuffling as follows:

dataset = (
    wds.Dataset(url)
    .shuffle(100))

However I'm uncertain what exactly the 100 means in this case? and similarly the 1000 that I see in another place in your examples. When I check in the code I find this definition of shuffle, which unfortunately doesn't make me any wiser to the details:

    def shuffle(self, size, rng=None, **kw):
        """Shuffle the data."""
        if size == 0:
            return self
        if rng is None:
            rng = random.Random()
        self.rng = rng
        self.reseed_hook = self.reseed_rng
        self.shard_shuffle = Shuffler(rng)
        self.pipeline.append(filters.shuffle(size, rng=rng, **kw))
        return self

In my general use case I have about 2.1 billion datapoints, which I want to save and start training over. I want to ensure that the shuffling done in webdataset actually enables a complete sampling of all datapoints before any datapoints are repeated in training.

Error in reading shards.

When I generate shards via the following code:

def generate_sample():
    x = np.random.normal(0.0, 0.25, size=(1000, 128))
    y = [['a', 'b'][np.random.randint(0, 2)] for _ in range(1000)]
    return x, y
shard_writer = wds.ShardWriter("data/shard-%03d.tar", maxcount=500)
for i, (x, y) in enumerate(zip(*generate_sample())):
    entry = {'__key__': "entry-%03d" % i,
             'x.pyd': x,
             'y.txt': y
            }
    shard_writer.write(entry)

And then try to read the shards back with:

path = "data/shard-{000..001}.tar"
dataset = wds.Dataset(path).decode().shuffle(100).to_tuple("x.pyd", "y.txt").batched(32)
batchgen = torch.utils.data.DataLoader(dataset, num_workers=1,)
for i, batch in enumerate(batchgen):
    pass

I get the following error:

    raise ValueError(f"didn't find {keys} in {list(a.keys())}")
ValueError: didn't find ['y.txt'] in ['__key__', 'x.pyd']

Is there a simple example that shows creation and use of a Webdataset from python dicts?

This library looked quite simple at first, but I'm now realizing that there are very few examples of its usage online, nothing on stackoverflow, and I'm finding myself stuck on something I thought would be straightforward.

Like probably a lot of Pytorch users, I'm starting with an array of dicts. The array looks like this:

[
{"source": dict_of_source_data, 
"target": dict_of_target_data},

{"source": dict_of_source_data, 
"target": dict_of_target_data},

{"source": dict_of_source_data, 
"target": dict_of_target_dat"},
.....  ]

My understand is that it should be straightforward to use webdataset.TarWriter to make a .tar of this data.

I start with this example: "Direct Conversion of Any Dataset", here: https://github.com/tmbdev/webdataset/blob/master/docs/creating.ipynb

I read: "Here is a quick way of converting an existing dataset into a WebDataset; this will store all tensors as Python pickles:"

Here is the example code:

dataset = torchvision.datasets.MNIST(root="./temp", download=True)
sink = wds.TarWriter("mnist.tar")
for index, (input, output) in enumerate(dataset):
    if index%1000==0:
        print(f"{index:6d}", end="\r", flush=True, file=sys.stderr)
    sink.write({
        "__key__": "sample%06d" % index,
        "input.pyd": input,
        "output.pyd": output,
    })
sink.close()

My first question is -- why is the extension pyd used here?

Isn't it conventional to use the .pkl extension for pickle files?
https://stackoverflow.com/questions/40433474/preferred-or-most-common-file-extension-for-a-python-pickle

In any case, here is the code that I constructed:

import webdataset as wds

sink = wds.TarWriter('~/workspace/vox-py/corpora/small-10.tar')
for my_unique_id in my_unique_ids:
        my_dicts = make_my_dicts(my_unique_id)
        for index, my_dict in enumerate(my_dicts):
            # my_dict has sub-dicts, my_dict['source'], my_dict['target']
            print(f"{my_unique_id:8d}" + "-" + str(index), end="\r", flush=True, file=sys.stderr)
            sink.write({
                "__key__": "%08d" % my_unique_id + "-" + str(index),
                "source.pyd": track['source'],
                "target.pyd": track['target'],
            })
sink.close()

After doing this, I go to the output directly, and run tar tvf small-10.tar

This outputs:

-r--r--r-- bigdata/bigdata 7686150 2021-01-03 15:56 00002160-0.source.pyd
-r--r--r-- bigdata/bigdata    6848 2021-01-03 15:56 00002160-0.target.pyd
-r--r--r-- bigdata/bigdata 7686150 2021-01-03 15:56 00002160-1.source.pyd
-r--r--r-- bigdata/bigdata    6848 2021-01-03 15:56 00002160-1.target.pyd
-r--r--r-- bigdata/bigdata 7686150 2021-01-03 15:56 00002160-2.source.pyd
-r--r--r-- bigdata/bigdata    6848 2021-01-03 15:56 00002160-2.target.pyd
-r--r--r-- bigdata/bigdata 7686150 2021-01-03 15:56 00002160-3.source.pyd
-r--r--r-- bigdata/bigdata    6848 2021-01-03 15:56 00002160-3.target.pyd
-r--r--r-- bigdata/bigdata 7686150 2021-01-03 15:56 00002160-4.source.pyd
-r--r--r-- bigdata/bigdata    6848 2021-01-03 15:56 00002160-4.target.pyd

Everything looks good.

Now I want to try to iterate through these, and see if I can get these dict instances back.

This writing example is not paired with an example for reading the data....
https://github.com/tmbdev/webdataset/blob/master/docs/creating.ipynb

... so I go here:

https://github.com/tmbdev/webdataset/blob/master/docs/gettingstarted.ipynb

..and I see this code:

dataset = wds.WebDataset(url)

for sample in islice(dataset, 0, 3):
    for key, value in sample.items():
        print(key, repr(value)[:50])
    print()

So, with my .tar, I try this code:

dataset = '~/workspace/vox-py/corpora/small-10.tar'
from itertools import islice
for sample in islice(dataset, 0, 3):
        for key, value in sample.items():
            print(key, repr(value)[:50])
            print()

and the console prints....

__key__ '00002160-0'

source.pyd b'\x80\x03}q\x00(X\n\x00\x00\x00trackIndexq\x01G\x

target.pyd b'\x80\x03}q\x00(X\n\x00\x00\x00trackIndexq\x01G\x

__key__ '00002160-1'

source.pyd b'\x80\x03}q\x00(X\n\x00\x00\x00trackIndexq\x01G?\

target.pyd b'\x80\x03}q\x00(X\n\x00\x00\x00trackIndexq\x01G?\

__key__ '00002160-2'

source.pyd b'\x80\x03}q\x00(X\n\x00\x00\x00trackIndexq\x01G@\

target.pyd b'\x80\x03}q\x00(X\n\x00\x00\x00trackIndexq\x01G@\

This looks like I am getting close! I see that for each instance, there is (what looks like) binary data associated with each source.pyd and target.pyd!

But how would I convert these back to a dict?

Supposedly these are in pickle format, right?

So I try this code:

dataset = '~/workspace/vox-py/corpora/small-10.tar'
from itertools import islice
    for sample in islice(dataset, 0, 3):
        for key, value in sample.items():
            print(key,  pickle.load(value))
            print()

... and then I get this error:

  File "/workspace/vox-py/src/training/training.py", line 14, in run_training
    print(key,  pickle.load(value))
TypeError: file must have 'read' and 'readline' attributes

I think now I probably need to do something with decode, right? Because I have to tell Webdataset what format to output?

I tried to understand what is going on here... https://github.com/tmbdev/webdataset/blob/master/docs/decoding.ipynb

But I got lost, and I'm really not sure if I need to make a custom decoder?

Surely this is a very simple use case.....does anyone have an example of writing and then reading simple python dicts with Webdataset?

thanks

TypeError: rename_() got multiple values for argument 'data'

Error for examples

dataset = (
        wds.Dataset(url)
        .shuffle(100)
        .decode()
        .rename(image="jpg;png", data="json")
        .to_tuple("image", "data")
   )
   
   for image, data in islice(dataset, 0, 3):
       print(image.shape, image.dtype, type(data))
       

TypeError: rename_() got multiple values for argument 'data'

Workaround :
Change

.rename(image="jpg;png", data="json")
.to_tuple("image", "data")

to

.rename(image="jpg;png", info="json")
.to_tuple("image", "info")

Path separator not decoded correctly

I tried to open a local tar file in Windows 10, but the path separator was decoded incorrectly.

from itertools import islice
import webdataset as wds
dataset_path = os.path.join('dataset', 'dataset.tar)
dataset = wds.WebDataset(dataset_path)
for batch in islice(dataset, 3):
    print(batch.keys())

This should open the dataset as normal; however, this throws a FileNotFoundError with the following message: FileNotFoundError: [Errno 2] No such file or directory: 'datasetdataset.tar'

Manually changing the dataset_path variable to

from itertools import islice
import webdataset as wds
dataset_path = '/'.join(['dataset', 'dataset.tar'])
dataset = wds.WebDataset(dataset_path)
for batch in islice(dataset, 3):
    print(batch.keys())

opens the dataset correctly.

sequence support?

Suppose a dataset is composed of sequences of samples. In other words, 128 sequences of 8 images, so each set of 8 images is a sequence that belongs together, in order.

When my dataloader reads from this dataset, assuming i'm shuffling, i want to randomize the order of the sequences, but the order of files within a sequence must be coherent and ordered.

As far as i can tell, such a construct isn't supported yet?

ImportError: No module named PIL

Install

pip install webdataset==0.1.49

Code

import webdataset
...

when I run this program, it throws

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/raman/miniconda2/envs/temp/lib/python3.8/site-packages/webdataset/__init__.py", line 28, in <module>
    from .writer import ShardWriter, TarWriter, torch_dumps
  File "/home/raman/miniconda2/envs/temp/lib/python3.8/site-packages/webdataset/writer.py", line 17, in <module>
    import PIL
ModuleNotFoundError: No module named 'PIL'

Should we install PIL separately? or this is a bug?

Creating a Webdataset .tar file

I want use webdataset format to load data faster than usual. First of all, I have a folder with images and DataFrame with annotations for every image.

I faced the issue, after creating the webdataset via TarWriter. The dataloader of Pytorch doesn't see any file and return empy file error.

here is my code:

    #define Tarwriter 
    sink = wds.TarWriter('MyDataset.tar', encoder=True)
    #extracting all images name 
    basenames = [f for f in os.listdir(path) if f.endswith(".jpg")]
    #Take only up to 20000 images
    for basename in tqdm.tqdm(islice(basenames, 0, 20000)):
        with open (f'./images/{basename}', 'rb') as stream:
            image = stream.read()
        #annotations preprocessing
        annotation = np.array(annotations[annotations[1]==int(basename[:-4])].iloc[:, 1:11])
        annotation = annotation/np.sum(annotation)
        #json file
        dataset = {
            "__key__":basename[:-4],
            "jpg": image,
            "cls": annotation
        }
        sink.write(dataset)
    sink.close()

After creating, I tried to check it
!curl -s MyDataset.tar | tar tf - | sed 10q
tar: This does not look like a tar archive
tar: Exiting with failure status due to previous errors

also another way:

url = f"pipe:curl -L -s MyDataset.tar || true"

dataset = (
    wds.Dataset(url)
    .shuffle(100)
    .decode("pil")
    .rename(image="jpg;png", data="json")
    .to_tuple("image", "data")
)

dataloader = torch.utils.data.DataLoader(dataset, num_workers=4, batch_size=16)
images, targets = next(iter(dataloader))
images.shape

Response:
File "/home/adil/anaconda3/envs/pytorch/lib/python3.7/tarfile.py", line 2304, in next
raise ReadError("empty file")
tarfile.ReadError: empty file

I am using
Ubuntu 18.04
Visual Studio

Environment Anaconda

Thanks in advance!

Creating Webdataset from tar files

I am looking for some help/advice to shard a video dataset into tar files and use it with webdataset. In a folder I have a number of XX.mp4 (video), XX.info.json (script and action-labels) and XX.mp4.json (video metadata ). To use webdataset().decode() if I make a tar file out of each sample (i.e tar 000.tar -cf XX.mp4 XX.info.json XX.mp4.json) then pass the files such as ds = wds.Dataset("/Users/../../data/tardb/{000..010}.tar").decode() then I can see the right keys in

for sample in ds:
    for key, value in sample.items():
        print(key, repr(value)[:50])
    break

output	: dict_keys(['__key__', 'mp4.json', 'info.json', 'mp4'])

If I have multiple folders that in each folder there is a number of samples that share same base name as above XX.mp4, XX.info.json and XX.mp4.json, and then tar each folder and name it sequentially such as

tar -cf "db-{file_index}.tar" "db-{file_index}"

Then ds = wds.Dataset("/Users/../../data/db-{0..10}.tar").decode(), the only key I see in the sample is only info.json

dict_keys(['__key__', 'info.json'])

I was wondering what I am missing in this process/ what is a better way of making a webdataset out of such a data settings. Thanks.

Cache file not created on certain files.

Caching consistently fails on certain .tar files when iterating over the dataset. The constructed cachefile remains in the temporary filepath. Upon sniff test of the problematic archives, there seems to be no apparent problem such as missing a file for a key or corrupted files. I manually iterated over the tars, extracted all the keys and compared them to the wds iteration.

image

I've also created a Jupyter Colab here, demonstrating the issue.
https://colab.research.google.com/drive/1WNx5gqjcPpk-WLZsEu7XJmNWs9JRWpI9?usp=sharing

Unable to reach maximum download speed

I have an image dataset of about 250 1GiB tars on GCS, which I'm loading by using urls = f'pipe:gsutil cat {urls}'.

I tested the maximum download speed with gsutil -m cp ... and got about 750MiB/s with 96 processes (on a 96 vCPU GCP VM), which would amount to about 15000 images/s. I'm testing the data I/O only, no accelerators.

However, using wds.MultiDataset (96 workers) with basic decode, to_tuple etc steps (and minimal cropping with albumentations) and batching to 256 results in much slower download & processing, about 2500 images/s (this is actually already really fast but I'd like to try to get it even faster :D ).

So I'm not bottlenecked by download speed, and CPU utilization for the 96 processes shows <30%, so I'm not bottlenecked by the CPU either...

I profiled the code with pyinstrument and the bottleneck seems to be somewhere in the batching:
Screenshot from 2020-10-09 15-11-46

I'm working on non-public data, but basically same thing happens with the docs openimages example.

So given that gsutil -m results in fast download speeds but webdataset doesn't, and both use python (?) multiprocessing, maybe gsutil doing something more efficiently here? I don't really know the multiprocessing etc. libraries well at all so I'm at a bit of a loss here...

What is the recommended way of using webdataset with pytorch-lightning and ddp?

I am trying to build a LightningDataModule using webdataset, however I have encountered some difficulties implementing, since previously I have never used IterableDataset.

The first thing I tried is this example in webdataset-examples: In LightningDataModule, I defined a setup() method which defines train/val/test datasets with wds.Dataset, and in train/val/test_dataloader() DataLoader instances are initialized with those datasets. This led to duplicate batching.

Then after googling a bit, I found this comment and another comment, so apparently either letting the dataset itself do the batching, or using wds.MultiDataset. Just replacing DataLoader in train/val/test_dataloader() with MultiDataset resulted in the same batch duplication, so I suppose something is wrong. What is the recommended method for ddp in general?

TarWriter Error when given full pathname

Hello,

I am running into an error when providing the entire pathname of the destination tar file to the TarWriter function:

sink = wds.TarWriter('D:/dataset/dataset.tar', encoder=False)

ValueError: objectio: D:/dataset/dataset.tar: no handler found for d (known: file gs http https az)

However, this works:

sink = wds.TarWriter('dataset.tar', encoder=False)

How can I specify where I want my tar file written?

Thanks in advance!

Decoupling Pipelines from Training Nodes

Hello and thanks for the awesome project! I have been using it a ton and am successfully using it with with azure blob and azcopy.

I wanted to ask a question about potentially decoupling the data pipeline from the training node (where gpu(s) live(s)).

With single GPU or Multiple GPU nodes, often I can not get enough CPU power to keep up with the GPU. I have thought about decoupling the data pipelines from the gpu. What I am imaging is anything from a designated large cpu instance or kubernetes cluster that sends the batch results from webdataset to the training node. This is outside the scope of webdataset, but I figured you might have some insight into this realm.

In my mind, the cpu, data pipeline instance/cluster should focus on pulling down data from cloud storage, apply transforms and prep data for dataloader.

Some ideas so far:

Any thoughts? With this approach, the only limitation on arbitrary batch per second rates is network bandwidth and number of cpus on server/cluster.

Open to a PR for compressed tar input?

This is a really nice library! I just came across it after going down a rabbit hole looking into distributed training with TorchElastic and planning how data loading would work. (Most tutorials I see now are either bundling the data into the docker image, creating a K8s volume and then running a job to pre-fetch the data before training, or creating an NFS volume).

I was wondering if you'd be open to a PR for adding compressed tar files as input (.tar.gz, etc.) I think that this would be pretty easy to add but wanted to see if there were any objections before looking into it.

aws storage

The google cloud storage buckets work well (for the most part) with this framework. Is there any suggested/preferred methodology to utilize the amazon s3 storage with this? (Putting tar files in that bucket and use the URL to load them in a local machine)

Reading in two images

Hello,
I have to a webdataset with entries of the following form:

  • id.image.png
  • id.depth.png
  • id.intrinsics.pyd

The image is a RGB image and depth is a uint16 grayscale image.

How can I use .decode in way that I get a [H, W, C]-uint8 array for iamge and a [H, W]-uint16 array for depth?

Hierarchical index/ subsampling issue

Hi! First off, really great to bump into this project, seems awesome :)

I may have a bit of a problem though, because I need to sample 2 (or more) image/target frames per video clip. So images and target .npy files are of form ('{id}.{n}.jpg', '{id}.{n}.npy') with multiple n values for each id (maybe 10 to 100).

I'm now actually first sampling a random id, then sampling 2 different n:s inside the dataset .__getitem__() method and then streaming the (image, target) pairs on demand from GCS. There's lots of small files, which cause quite a bit of overhead, so I figured Webdataset could be a great solution for this.

But I'm not sure how to tackle this subsampling issue with Webdataset... should I define 1 shard per id? Something else?

EDIT: no 1 shard per id would clearly not work, since I don't want lots of data in a minibatch from the same shard. Hmm...

Multiple images in a sample?

I searched but couldn't find an answer to this ...

Suppose i have a dataset where there are multiple files with the same extension within a sample, such as a stereo dataset where there's a left and right image, but each share the same basename (by necessity with webdataset) and same file extension. Is this construct supported?

I was thinking to hack the file extension to add a key, such as: image0.png_left and image0.png_right, and then i'd just have to process these files, knowing what the special extensions mean.

Resized Dataset for multiprocessing data loading

I'm trying to use webdataset for a distributed Pytorch XLA POC. I tried implementing the ResizedDataset class but start receiving many errors like the following after ~40 training steps. Any ideas how to fix this?

Error:
2021-03-15 16:16:21 10.164.0.29 [0] /anaconda3/envs/torch-xla-1.7/lib/python3.6/site-packages/torch/utils/data/dataloader.py:447: UserWarning: Length of IterableDataset <webdataset.dataset.ResizedDataset object at 0x7f5f978c2438> was reported to be 78 (when accessing len(dataloader)), but 205 samples have been fetched. For multiprocessing data-loading, this could be caused by not properly configuring the IterableDataset replica at each worker. Please see https://pytorch.org/docs/stable/data.html#torch.utils.data.IterableDataset for examples.

Training Dataloader code snippet:

def my_node_splitter(urls):
    """Split urls_ correctly per accelerator node
    :param urls:
    :return: slice of urls_
    """
    rank=xm.get_ordinal()
    num_replicas=xm.xrt_world_size()

    urls_this = urls[rank::num_replicas]
    
    return urls_this

normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

def make_train_loader(img_dim, shuffle=20000, batch_size=FLAGS.batch_size):
    
    num_dataset_instances = xm.xrt_world_size() * FLAGS.num_workers
    epoch_size = trainsize // num_dataset_instances
    # num_batches = (epoch_size + batch_size - 1) // batch_size
    num_batches = epoch_size // batch_size

    image_transform = transforms.Compose(
        [
            transforms.RandomResizedCrop(img_dim),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            normalize,
        ]
    )
    # something like "nodesplitter=lambda l: l[node_index::num_nodes]"
    # length=num_batches
    dataset = (
        wds.WebDataset("pipe:gsutil cat gs://tpu-demo-eu-west/imagenet-wds/wds-data/shards/imagenet-train-{000000..001281}.tar", 
        splitter=wds.split_by_worker, nodesplitter=my_node_splitter, shardshuffle=True, length=epoch_size)
        .shuffle(shuffle)
        .decode("pil") # handler=wds.warn_and_continue
        .to_tuple("ppm;jpg;jpeg;png", "cls") # handler=wds.warn_and_continue
        .map_tuple(image_transform, identity) # handler=wds.warn_and_continue
        .batched(batch_size)
        )

    dataset = wds.ResizedDataset(dataset, epoch_size, nominal=num_batches)

    loader = torch.utils.data.DataLoader(dataset, batch_size=None, shuffle=False, num_workers=FLAGS.num_workers)
    return loader

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.