Giter Club home page Giter Club logo

litdata's Introduction

Lightning

Blazingly fast, distributed streaming of training data from any cloud storage

⚡ Welcome to LitData

With LitData, users can transform and optimize their data in cloud storage environments efficiently and intuitively, at any scale.

Once optimized, efficient distributed training becomes practical regardless of where the data is located, enabling users to seamlessly stream data of any size to one or multiple machines.

LitData supports images, text, video, audio, geo-spatial, and multimodal data types, is already adopted by frameworks such as LitGPT to pretrain LLMs and integrates smoothly with PyTorch Lightning, Lightning Fabric, and PyTorch.

Runnable templates published on the Lightning.AI Platform are available at the end, reproducible in 1-click.

Table of Contents

Getting Started

Installation

Install LitData with pip

pip install litdata

Install LitData with the extras

pip install 'litdata[extras]'

Quick Start

1. Prepare Your Data

Convert your raw dataset into LitData Optimized Streaming Format using the optimize operator.

Here is an example with some random images.

import numpy as np
from litdata import optimize
from PIL import Image


# Store random images into the data chunks
def random_images(index):
    data = {
        "index": index, # int data type
        "image": Image.fromarray(np.random.randint(0, 256, (32, 32, 3), np.uint8)), # PIL image data type
        "class": np.random.randint(10), # numpy array data type
    }
    # The data is serialized into bytes and stored into data chunks by the optimize operator.
    return data # The data is serialized into bytes and stored into data chunks by the optimize operator.

if __name__ == "__main__":
    optimize(
        fn=random_images,  # The function applied over each input.
        inputs=list(range(1000)),  # Provide any inputs. The fn is applied on each item.
        output_dir="my_optimized_dataset",  # The directory where the optimized data are stored.
        num_workers=4,  # The number of workers. The inputs are distributed among them.
        chunk_bytes="64MB"  # The maximum number of bytes to write into a data chunk.
    )

The optimize operator supports any data structures and types. Serialize whatever you want. The optimized data is stored under the output directory my_optimized_dataset.

2. Upload your Data to Cloud Storage

Cloud providers such as AWS, Google Cloud, Azure provide command line clients to upload your data to their storage solutions.

Here is how to upload the optimized dataset using the AWS CLI to AWS S3.

⚡ aws s3 cp --recursive my_optimized_dataset s3://my-bucket/my_optimized_dataset

3. Use StreamingDataset

Then, the Streaming Dataset can read the data directly from AWS S3.

from litdata import StreamingDataset
from torch.utils.data import DataLoader

# Remote path where full dataset is stored
input_dir = 's3://my-bucket/my_optimized_dataset'

# Create the Streaming Dataset
dataset = StreamingDataset(input_dir, shuffle=True)

# Access any elements of the dataset
sample = dataset[50]
img = sample['image']
cls = sample['class']

# Create PyTorch DataLoader and iterate over it to train your AI models.
dataloader = DataLoader(dataset)

Key Features

Multi-GPU / Multi-Node Support

The StreamingDataset and StreamingDataLoader automatically make sure each rank receives the same quantity of varied batches of data, so it works out of the box with your favorite frameworks (PyTorch Lightning, Lightning Fabric, or PyTorch) to do distributed training.

Here you can see an illustration showing how the Streaming Dataset works with multi node / multi gpu under the hood.

An illustration showing how the Streaming Dataset works with multi node.

Access any item

Access the data you need, whenever you need it, regardless of where it is stored.

from litdata import StreamingDataset

dataset = StreamingDataset("s3://my-bucket/my-data") # data are stored in the cloud

print(len(dataset)) # display the length of your data

print(dataset[42]) # show the 42th element of the dataset

Use any data transforms

Subclass the StreamingDataset and override its __getitem__ method to add any extra data transformations.

from litdata import StreamingDataset, StreamingDataLoader
import torchvision.transforms.v2.functional as F

class ImagenetStreamingDataset(StreamingDataset):

    def __getitem__(self, index):
        image = super().__getitem__(index)
        return F.resize(image, (224, 224))

dataset = ImagenetStreamingDataset(...)
dataloader = StreamingDataLoader(dataset, batch_size=4)

for batch in dataloader:
    print(batch.shape)
    # Out: (4, 3, 224, 224)

The Map Operator

The map operator can be used to apply a function over a list of inputs.

Here is an example where the map operator is used to apply a resize_image function over a folder of large images.

from lightning.data import map
from PIL import Image

# Note: Inputs could also refer to files on s3 directly.
input_dir = "my_large_images"
inputs = [os.path.join(input_dir, f) for f in os.listdir(input_dir)]

# The resize image takes one of the input (image_path) and the output directory. 
# Files written to output_dir are persisted.
def resize_image(image_path, output_dir):
  output_image_path = os.path.join(output_dir, os.path.basename(image_path))
  Image.open(image_path).resize((224, 224)).save(output_image_path)
  
map(
    fn=resize_image,
    inputs=inputs, 
    output_dir="s3://my-bucket/my_resized_images",
)

Easy Data Mixing with the Combined Streaming Dataset

Easily experiment with dataset mixtures using the CombinedStreamingDataset class.

As an example, this mixture of Slimpajama & StarCoder was used in the TinyLLAMA project to pretrain a 1.1B Llama model on 3 trillion tokens.

from litdata import StreamingDataset, CombinedStreamingDataset
from litdata.streaming.item_loader import TokensLoader
from tqdm import tqdm
import os
from torch.utils.data import DataLoader

train_datasets = [
    StreamingDataset(
        input_dir="s3://tinyllama-template/slimpajama/train/",
        item_loader=TokensLoader(block_size=2048 + 1), # Optimized loader for tokens used by LLMs 
        shuffle=True,
        drop_last=True,
    ),
    StreamingDataset(
        input_dir="s3://tinyllama-template/starcoder/",
        item_loader=TokensLoader(block_size=2048 + 1), # Optimized loader for tokens used by LLMs 
        shuffle=True,
        drop_last=True,
    ),
]

# Mix SlimPajama data and Starcoder data with these proportions:
weights = (0.693584, 0.306416)
combined_dataset = CombinedStreamingDataset(datasets=train_datasets, seed=42, weights=weights)

train_dataloader = DataLoader(combined_dataset, batch_size=8, pin_memory=True, num_workers=os.cpu_count())

# Iterate over the combined datasets
for batch in tqdm(train_dataloader):
    pass

Pause & Resume Made Simple

LitData provides a stateful Streaming DataLoader e.g. you can pause and resume your training whenever you want.

Info: The Streaming DataLoader was used by Lit-GPT to pretrain LLMs. Restarting from an older checkpoint was critical to get to pretrain the full model due to several failures (network, CUDA Errors, etc..).

import os
import torch
from litdata import StreamingDataset, StreamingDataLoader

dataset = StreamingDataset("s3://my-bucket/my-data", shuffle=True)
dataloader = StreamingDataLoader(dataset, num_workers=os.cpu_count(), batch_size=64)

# Restore the dataLoader state if it exists
if os.path.isfile("dataloader_state.pt"):
    state_dict = torch.load("dataloader_state.pt")
    dataloader.load_state_dict(state_dict)

# Iterate over the data
for batch_idx, batch in enumerate(dataloader):
  
    # Store the state every 1000 batches
    if batch_idx % 1000 == 0:
        torch.save(dataloader.state_dict(), "dataloader_state.pt")

Support Profiling

The StreamingDataLoader supports profiling of your data loading process. Simply use the profile_batches argument to specify the number of batches you want to profile:

from litdata import StreamingDataset, StreamingDataLoader

StreamingDataLoader(..., profile_batches=5)

This generates a Chrome trace called result.json. Then, visualize this trace by opening Chrome browser at the chrome://tracing URL and load the trace inside.

Reduce your memory footprint

When processing large files like compressed parquet files, use the Python yield keyword to process and store one item at the time, reducing the memory footprint of the entire program.

from pathlib import Path
import pyarrow.parquet as pq
from litdata import optimize
from tokenizer import Tokenizer
from functools import partial

# 1. Define a function to convert the text within the parquet files into tokens
def tokenize_fn(filepath, tokenizer=None):
    parquet_file = pq.ParquetFile(filepath)
    # Process per batch to reduce RAM usage
    for batch in parquet_file.iter_batches(batch_size=8192, columns=["content"]):
        for text in batch.to_pandas()["content"]:
            yield tokenizer.encode(text, bos=False, eos=True)

# 2. Generate the inputs
input_dir = "/teamspace/s3_connections/tinyllama-template"
inputs = [str(file) for file in Path(f"{input_dir}/starcoderdata").rglob("*.parquet")]

# 3. Store the optimized data wherever you want under "/teamspace/datasets" or "/teamspace/s3_connections"
outputs = optimize(
    fn=partial(tokenize_fn, tokenizer=Tokenizer(f"{input_dir}/checkpoints/Llama-2-7b-hf")), # Note: Use HF tokenizer or any others
    inputs=inputs,
    output_dir="/teamspace/datasets/starcoderdata",
    chunk_size=(2049 * 8012), # Number of tokens to store by chunks. This is roughly 64MB of tokens per chunk.
)

Configure Cache Size Limit

Adapt the local caching limit of the StreamingDataset. This is useful to make sure the downloaded data chunks are deleted when used and the disk usage stays low.

from litdata import StreamingDataset

dataset = StreamingDataset(..., max_cache_size="10GB")

On-Prem Optimizations

On-prem compute nodes can mount and use a network drive. A network drive is a shared storage device on a local area network. In order to reduce their network overload, the StreamingDataset supports caching the data chunks.

from lightning.data import StreamingDataset

dataset = StreamingDataset(input_dir="local:/data/shared-drive/some-data")

Benchmarks

In order to measure the effectiveness of LitData, we used a commonly used dataset for benchmarks: Imagenet-1.2M where the training set contains 1,281,167 images.

To align with other benchmarks, we measured the streaming speed (images per second) loaded from AWS S3 for several frameworks.

Reproduce our benchmark by running this Studio.

Imagenet-1.2M Streaming from AWS S3

We can observe LitData is up to 85 % faster than the second best. Higher is better in the table below.

Framework Images / sec 1st Epoch (float32) Images / sec 2nd Epoch (float32) Images / sec 1st Epoch (torch16) Images / sec 2nd Epoch (torch16)
PL Data 5800.34 6589.98 6282.17 7221.88
Web Dataset 3134.42 3924.95 3343.40 4424.62
Mosaic ML 2898.61 5099.93 2809.69 5158.98

Imagenet-1.2M Conversion

We measured how fast the 1.2 million images can converted into a streamable format. Faster is better in the table below.

Framework Train Conversion Time Val Conversion Time Dataset Size # Files
PL Data 10:05 min 00:30 min 143.1 GB 2.339
Web Dataset 32:36 min 01:22 min 147.8 GB 1.144
Mosaic ML 49:49 min 01:04 min 143.1 GB 2.298

Runnable Templates

Fastest way to learn is with Studios.

Studios are reproducible cloud IDE with data, code, dependencies, e.g. so redo everything yourself with ease!

We've published public templates that demonstrates how best to use the LitData framework at scale and with several data types.

Sign up here and run your first Studio for free.

Studio Data type Dataset
Use or explore LAION-400MILLION dataset Image & Text LAION-400M
Convert GeoSpatial data to Lightning Streaming Image & Mask Chesapeake Roads Spatial Context
Benchmark cloud data-loading libraries Image & Label Imagenet 1M
Prepare the TinyLlama 1T token dataset Text SlimPajama & StarCoder
Tokenize 2M Swedish Wikipedia Articles Text Swedish Wikipedia
Embed English Wikipedia under 5 dollars Text English Wikipedia
Convert parquets to Lightning Streaming Parquet Files Randomly Generated data

Infinite cloud data processing

If you want to scale data processing, you typically need more machines and if you do this yourself, this becomes very tedious and can take a long time to get there.

Instead, create a free account on the Lightning.ai platform and use as many machines as you need from code.

On the platform, simply specify the number of nodes and the machine type you need as follows:

from litdata import map, Machine

map(
  ...
  num_nodes=32,
  machine=Machine.DATA_PREP, # Select between dozens of optimized machines
)

Also, the optimize operator can do the same to make immense datasets streamable as follows:

from litdata import optimize, Machine

optimize(
  ...
  num_nodes=32,
  machine=Machine.DATA_PREP, # Select between dozens of optimized machines
)

Within the LAION 400 MILLION Studio, we utilized 32 machines, each equipped with 32 CPUs, to execute the optimize operator, enabling the download of 400 million images in just 2 hours. Below is a screenshot of that job within the Lightning.ai platform. You can execute it yourself here.

Lightning

⚡ Contributors

We welcome any contributions, pull requests, or issues. If you use the Streaming Dataset for your own project, please reach out to us on Discord.

litdata's People

Contributors

andyland avatar awaelchli avatar borda avatar carmocca avatar cauyxy avatar dependabot[bot] avatar fabianaltendorfer avatar justusschock avatar lantiga avatar mads-oestergaard avatar nohalon avatar pre-commit-ci[bot] avatar rasbt avatar tchaton avatar thomasyoungson avatar vgurev avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

litdata's Issues

Bug in python packaging

🐛 Bug in python package

The litdata folder is not correctly placed under site-packages. Instead, the subfolders such as streaming etc are placed at the level where litdata should have been.

To Reproduce

Steps to reproduce the behavior:

With pip:

python3.10 -m venv .venv
source .venv/bin/activate
pip install -U litdata
python -c "import litdata"

gives

Traceback (most recent call last):
  File "<string>", line 1, in <module>
ModuleNotFoundError: No module named 'litdata'

The resulting RECORD file as found in: .venv/lib/python3.10/site-packages/litdata-0.2.0rc1.dist-info/RECORD:

processing/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
processing/data_processor.py,sha256=d3C1DDGdhghIiF7Tx9Vp0wCQHLRoqhBfPq-bHz3s3rI,41215
processing/functions.py,sha256=HID7cg0WhvlR_l_oQpW4xU81BJ9ZCK1DBAbqxbpnU9o,15033
processing/readers.py,sha256=PorNKmYS1Uij3xLyN-E9e0eOMcWvsvIpp5z17kAMyRo,3051
processing/utilities.py,sha256=evwBcAnLlvD4bxRwEKs0MFY2FA1JtD6ZkVOyMcHlo6s,4994
streaming/__init__.py,sha256=zfSEEdb0D8nPqH2bOAp0gJm1Fq4afnFaujiyZd7lB6E,988
streaming/cache.py,sha256=30DM2RH_qGqTB6pon5XeQ1x58wle3qALVLAannoQ4Po,6150
streaming/client.py,sha256=0Yzqgu-DQwhfzGwvMZLZc04TwqZWpkcxj1l_txcnZYU,2010
streaming/combined.py,sha256=2ZXNtvc4P0Vx1-xR_hYaQa6MrCbaYIiNr3Z9ipLCFyE,7028
streaming/compression.py,sha256=MHtmAABz332-bWzGnUFPfqsTOwyYeJr4gmv5lWFbvrk,2112
streaming/config.py,sha256=_TiHlkmta-VeC8SyawJWzgJFr2q7r7S0OfVRD0P609Y,6598
streaming/dataloader.py,sha256=7q4HWCzOnXKsXjK7_iowLz7CThs2k-hasK01naNdaaI,25854
streaming/dataset.py,sha256=WhiT5OoBJtcm3IUyGsSdA7a6U361jh2tgEcVTUZH9G0,19224
streaming/downloader.py,sha256=ZJwWzeJrNobzAkV5OK-_zpTGw8a5ItfeRTNXCpy43vg,4220
streaming/item_loader.py,sha256=AOTiV2Dmn9QHv1wcspoRSGuCLLACINcrh7A5wbc6K6c,7985
streaming/reader.py,sha256=MGM3Cw8sBpp7sbR5D8-DzqpxsrV_zlLOgjHUG6KGJeE,13038
streaming/resolver.py,sha256=UWwelXMak5NQ0nDbfaefbAaa_NvuEodt33IyLpdMuA4,11709
streaming/sampler.py,sha256=yjOHTFReJgeWR1igLGaDaZ3K0c9DiF-3HNBDlvCHEsM,9691
streaming/serializers.py,sha256=qc6dvbI1pgeQZsJpRBgy3-h-qpz88o5nEV_aK2cu_s8,14205
streaming/shuffle.py,sha256=msEhNjTB5BcSCQ2_ADksb1qX7Gp_8VY3A0GeUFz_zcQ,6028
streaming/writer.py,sha256=CMJc52vHkCeNS2nQ8HLE1FMqNjL2-2MrOcxm1QCh71E,18061
utilities/__init__.py,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0
utilities/broadcast.py,sha256=y1Vpu0IWbTvUtSqtWHYcuRsjTEh624lMrloRS9nVyVQ,5998
utilities/env.py,sha256=udxhFcGE7kIf8N_RBRboTyqmPemO6z_--gkE81OWNNQ,6294
utilities/format.py,sha256=dO5zMmQRHWCiIZ3u7KM2UzR-Poju7YggOBLO_n_dxOE,993
utilities/packing.py,sha256=hW_Q9eHCckAoTQ5BbpMwJcz9bsIN3c--M-kUrl3PsdU,1030
utilities/shuffle.py,sha256=CLUZ8qqWr95DFbUUE3iJL-MfqVfvwOXth8VZDWhG_Xk,2956
litdata-0.2.0rc1.dist-info/LICENSE,sha256=egEaW31il0xXi3pa43tywG1TRF_haci5JUUAujkexLc,11352
litdata-0.2.0rc1.dist-info/METADATA,sha256=jS-LrMdptzgjIzO6T9803BOpGIfqpmUNmylxXe65ltc,17105
litdata-0.2.0rc1.dist-info/WHEEL,sha256=oiQVh_5PnQM0E3gPdiz09WCNmwiHDMaGer_elqB3coM,92
litdata-0.2.0rc1.dist-info/top_level.txt,sha256=OrhmFsXL300k2JPaaXFeMsSPacr2GejH4Mw50BPl4Tw,31
litdata-0.2.0rc1.dist-info/INSTALLER,sha256=M8DFwDPypomsjL1_VmfQ21jtCcESdiUOn1c55TnwmTw,12
litdata-0.2.0rc1.dist-info/RECORD,,

Expected behavior

litdata has a separate folder under site-packages

Environment

  • PyTorch Version (e.g., 1.0): torch 2.2.0
  • OS (e.g., Linux): linux
  • How you installed PyTorch (conda, pip, source):
  • Build command you used (if compiling from source):
  • Python version:
  • CUDA/cuDNN version:
  • GPU models and configuration:
  • Any other relevant information:

Additional context

KeyError: Caught KeyError in DataLoader worker process 0.

🐛 Bug

I've created a dataset following the approach on https://lightning.ai/lightning-ai/studios/convert-spatial-data-to-lightning-streaming

On using the dataset I get

KeyError: Caught KeyError in DataLoader worker process 0.
Original Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/torch/utils/data/_utils/worker.py", line 308, in _worker_loop
    data = fetcher.fetch(index)
  File "/usr/local/lib/python3.10/dist-packages/torch/utils/data/_utils/fetch.py", line 32, in fetch
    data.append(next(self.dataset_iter))
  File "/usr/local/lib/python3.10/dist-packages/lightning/data/streaming/dataset.py", line 274, in __next__
    data = self.__getitem__(
  File "/code/common/datasets/streaming.py", line 20, in __getitem__
    image_name, mask, image = super().__getitem__(index)
  File "/usr/local/lib/python3.10/dist-packages/lightning/data/streaming/dataset.py", line 244, in __getitem__
    return self.cache[index]
  File "/usr/local/lib/python3.10/dist-packages/lightning/data/streaming/cache.py", line 135, in __getitem__
    return self._reader.read(index)
  File "/usr/local/lib/python3.10/dist-packages/lightning/data/streaming/reader.py", line 252, in read
    item = self._item_loader.load_item_from_chunk(index.index, index.chunk_index, chunk_filepath, begin)
  File "/usr/local/lib/python3.10/dist-packages/lightning/data/streaming/item_loader.py", line 104, in load_item_from_chunk
    return self.deserialize(data)
  File "/usr/local/lib/python3.10/dist-packages/lightning/data/streaming/item_loader.py", line 112, in deserialize
    serializer = self._serializers[data_format]
KeyError: 'str'

To Reproduce

Steps to reproduce the behavior:

  1. Go to '...'
  2. Run '....'
  3. Scroll down to '....'
  4. See error

Code sample

Dataset creation script:

import os
from litdata import optimize
from glob import glob
from typing import Optional, Tuple

# 1. List the files and associate the images and their masks
root = "/Volumes/Toshiba4TB/EDA/cloud_detection"
input_dir = f"{root}/tuning_split"
output_root = f"{root}/tuning_split_optimized"

def get_info(img_filepath: str) -> Optional[Tuple]:
    mask_filepath = img_filepath.replace("S2L1C.tif", "labels/manual_hq.tif")
    if not os.path.exists(mask_filepath):
        return None
    splits = img_filepath.split("/")
    name = f"{splits[7]}_{splits[8]}"
    return name, img_filepath, mask_filepath

def compress(data):
    name, image_filepath, mask_filepath = data
    return name, image_filepath, mask_filepath

train_images = glob(f"{input_dir}/train/*/*/S2L1C.tif")
train_inputs = []
for img in train_images:
    info = get_info(img)
    if info is not None:
        train_inputs.append(info)

val_inputs = []
val_images = glob(f"{input_dir}/val/*/*/S2L1C.tif")
for img in val_images:
    info = get_info(img)
    if info is not None:
        val_inputs.append(info)

test_inputs = []
test_images = glob(f"{input_dir}/test/*/*/S2L1C.tif")
for img in test_images:
    info = get_info(img)
    if info is not None:
        test_inputs.append(info)

# 4. Run the optimization
if __name__ == "__main__":
    optimize(
        fn=compress,
        inputs=train_inputs,
        output_dir=f"{output_root}/train",
        chunk_bytes="128MB",
    )

    optimize(
        fn=compress,
        inputs=val_inputs,
        output_dir=f"{output_root}/val",
        chunk_bytes="128MB",
    )

    optimize(
        fn=compress,
        inputs=test_inputs,
        output_dir=f"{output_root}/test",
        chunk_bytes="128MB",
    )

And the dataset:

class SegmentationStreamingDataset(StreamingDataset):
    """
    Segmentation dataset with streaming

    Args:
        input_dir (str): Local directory or S3 location of the dataset
        transforms (Optional[Callable]): A transform that takes in an image and returns a transformed version.
    """

    def __init__(self, *args, transforms=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.transforms = transforms

    def __getitem__(self, index) -> dict:
        image_name, mask, image = super().__getitem__(index)

        with MemoryFile(image) as memfile:
            with memfile.open() as dataset:
                image = torch.from_numpy(dataset.read()).float()

        with MemoryFile(mask) as memfile:
            with memfile.open() as dataset:
                mask = torch.from_numpy(dataset.read()).long()

        sample = {"image": image, "mask": mask, "image_name": image_name}
        if self.transforms is not None:
            sample = self.transforms(sample)
        return sample

Expected behavior

No errors

Environment

lightning==2.2.1
lightning-cloud==0.5.64
lightning-utilities==0.10.0
lit==15.0.7
litdata==0.2.1

Additional context

Please add s3 path support to optimize (read and write to s3)

🚀 Feature

add s3 path support to optimize function

Motivation

To directly read and write to s3

Pitch

I want be able to add path from s3 to the optimize function and that the optimize function and direcly output to s3.

Alternatives

None

Additional context

None

ValueError: buffer size must be a multiple of element size

🐛 Bug

A very simple example of optimizing a tensor dataset inspired by the README does not work.

To Reproduce

Steps to reproduce the behavior:

Install litdata 0.2.3 or main branch. Run the code below.

Code sample

import torch
from litdata import optimize
from litdata import StreamingDataset
from torch.utils.data import DataLoader


def random_images(index):
    return torch.randint(0, 256, (32, 32, 3))


if __name__ == "__main__":
    optimize(
        fn=random_images,
        inputs=list(range(10)), 
        output_dir="my_optimized_dataset", 
        num_workers=2,
        chunk_bytes="5MB"
    )
    
    dataset = StreamingDataset('my_optimized_dataset')
    dataloader = DataLoader(dataset)

    iterator = iter(dataloader)
    next(iterator)

Error:

Traceback (most recent call last):
  File "/teamspace/studios/this_studio/main.py", line 25, in <module>
    next(iterator)
  File "/teamspace/studios/this_studio/litdata/src/litdata/streaming/dataloader.py", line 598, in __iter__
    for batch in super().__iter__():
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 631, in __next__
    data = self._next_data()
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 675, in _next_data
    data = self._dataset_fetcher.fetch(index)  # may raise StopIteration
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/torch/utils/data/_utils/fetch.py", line 32, in fetch
    data.append(next(self.dataset_iter))
  File "/teamspace/studios/this_studio/litdata/src/litdata/streaming/dataset.py", line 298, in __next__
    data = self.__getitem__(
  File "/teamspace/studios/this_studio/litdata/src/litdata/streaming/dataset.py", line 268, in __getitem__
    return self.cache[index]
  File "/teamspace/studios/this_studio/litdata/src/litdata/streaming/cache.py", line 128, in __getitem__
    return self._reader.read(index)
  File "/teamspace/studios/this_studio/litdata/src/litdata/streaming/reader.py", line 252, in read
    item = self._item_loader.load_item_from_chunk(
  File "/teamspace/studios/this_studio/litdata/src/litdata/streaming/item_loader.py", line 126, in load_item_from_chunk
    return self.deserialize(data)
  File "/teamspace/studios/this_studio/litdata/src/litdata/streaming/item_loader.py", line 136, in deserialize
    data.append(serializer.deserialize(data_bytes))
  File "/teamspace/studios/this_studio/litdata/src/litdata/streaming/serializers.py", line 185, in deserialize
    shape.append(np.frombuffer(data[8 + 4 * shape_idx : 8 + 4 * (shape_idx + 1)], np.uint32).item())
ValueError: buffer size must be a multiple of element size

Expected behavior

Iterator returns the tensor.

Environment

Fresh CPU Studio.

Dataloading is not working when used in litgpt's debug pretraining example

The pretraining example of litgpt with

litgpt pretrain \
   --model_name pythia-14m \
   --config https://raw.githubusercontent.com/Lightning-AI/litgpt/main/config_hub/pretrain/debug.yaml

is doing some data preprocessing which slows-down from >100 it/sec to about 14 it/sec (ca. 40 it/s shown below because computed over few last iterations). Overall, it takes about 1 hour which seems >10x longer than it should be for this small dataset. At the end, it does not complete because some workers still have something to do that they don't do:

Worker 18 is terminating.
Worker 18 is done.████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▉| 99996/100000 [57:22<00:00, 41.02it/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▉| 99986/100000 [57:23<00:00, 40.94it/s]
Progress: 92%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▎ | 45/49 [57:38<05:07, 76.85s/it]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▉| 99996/100000 [57:23<00:00, 41.58it/s]

99%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▌ | 99024/100000 [57:12<00:24, 40.35it/s]

99%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▏| 99429/100000 [57:22<00:13, 42.13it/s]
99%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▏| 99434/100000 [57:22<00:13, 42.12it/s]

99%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▏| 99449/100000 [57:22<00:13, 42.32it/s]

99%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▏| 99459/100000 [57:22<00:12, 42.21it/s]

99%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▏| 99479/100000 [57:23<00:12, 40.69it/s]

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▉| 99999/100000 [57:35<00:00, 42.37it/s]

When I relaunch that code, it restarts the whole data preprocessing from scratch. Could you please have a look at it?

Preprocessing fails when returning tensor instead of yielding

🐛 Bug

To Reproduce

  1. Clone this Studio template: https://lightning.ai/lightning-ai/studios/how-to-scrape-web-data-to-finetune-llms?view=public&section=featured
  2. pip install -U lightning-sdk lightning litdata
  3. Run main.py
  4. Run optimize.py (code sample below)

Code sample

import requests, os
from bs4 import BeautifulSoup
from litdata import optimize 
from requests.exceptions import SSLError, ConnectionError, ReadTimeout
from urllib3.exceptions import HeaderParsingError
import pandas as pd
from lightning_sdk import Machine
from lit_gpt import Tokenizer
from functools import partial

# 1. List of the text files 
input_dir = f"/teamspace/datasets/{os.getenv('LIGHTNING_USERNAME', 'undefined')}/website-data"
files = [os.path.join(input_dir, filepath) for filepath in os.listdir(input_dir)]

# 2. Define the tokenize function
def tokenize_fn(filepath, tokenizer=None):
    with open(filepath, "r") as f:
        text = f.read()

    encoded = tokenizer.encode(text, bos=False, eos=True)
    return encoded
    # yield encoded   # <------------ Works if you yield instead of return

# 3. Use the optimize operator to apply the `tokenize_fn` over all the files and write its return into chunks
optimize(
    fn=partial(tokenize_fn, tokenizer=Tokenizer("./checkpoints/Llama-2-7b-hf")),
    inputs=files,
    output_dir=f"/teamspace/datasets/{os.getenv('LIGHTNING_USERNAME', 'undefined')}/website-data-optimized2",
    num_workers=1,
    chunk_size=2049 * 1024,
    reorder_files=True,
)

Error:

Starting 1 workers with 8 items.
Workers are ready ! Starting data processing...
                                                                                                                                                                       Rank 0 inferred the following `['pickle']` data format.                                                                                          | 0/8 [00:00<?, ?it/s]
Traceback (most recent call last):
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/data_processor.py", line 616, in _handle_data_chunk_recipe
    chunk_filepath = self.cache._add_item(self._index_counter, item_data)
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/streaming/cache.py", line 129, in _add_item
    return self._writer.add_item(index, data)
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/streaming/writer.py", line 286, in add_item
    data, dim = self.serialize(items)
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/streaming/writer.py", line 170, in serialize
    return data[0], flattened[0].shape[0]
IndexError: tuple index out of range

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/data_processor.py", line 409, in run
    self._loop()
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/data_processor.py", line 458, in _loop
    self._handle_data_chunk_recipe(index)
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/data_processor.py", line 624, in _handle_data_chunk_recipe
    raise RuntimeError(f"Failed processing {self.items[index]}") from e
RuntimeError: Failed processing /cache/data/website-data/7.txt

Worker 0 is done.
Traceback (most recent call last):
  File "/teamspace/studios/this_studio/optimize.py", line 25, in <module>
    optimize(
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/functions.py", line 355, in optimize
    data_processor.run(
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/data_processor.py", line 960, in run
    self._exit_on_error(error)
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/data_processor.py", line 1020, in _exit_on_error
    raise RuntimeError(f"We found the following error {error}.")
RuntimeError: We found the following error Traceback (most recent call last):
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/data_processor.py", line 616, in _handle_data_chunk_recipe
    chunk_filepath = self.cache._add_item(self._index_counter, item_data)
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/streaming/cache.py", line 129, in _add_item
    return self._writer.add_item(index, data)
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/streaming/writer.py", line 286, in add_item
    data, dim = self.serialize(items)
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/streaming/writer.py", line 170, in serialize
    return data[0], flattened[0].shape[0]
IndexError: tuple index out of range

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/data_processor.py", line 409, in run
    self._loop()
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/data_processor.py", line 458, in _loop
    self._handle_data_chunk_recipe(index)
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/data_processor.py", line 624, in _handle_data_chunk_recipe
    raise RuntimeError(f"Failed processing {self.items[index]}") from e
RuntimeError: Failed processing /cache/data/website-data/7.txt

If you change the return statement to yield in the processing function, it works. I failed to make a minimal repro outside this Studio.

Expected behavior

Both return and yield should work depending on whether the user wants to return one or more examples from the preprocessing function.

Environment

  • PyTorch Version (e.g., 1.0): 2.2.1
  • OS (e.g., Linux): Linux
  • How you installed PyTorch (conda, pip, source): pip
  • Build command you used (if compiling from source):
  • Python version: 3.10
  • CUDA/cuDNN version:
  • GPU models and configuration:
  • Any other relevant information:

Additional context

litdata                   0.2.2
lightning                 2.2.1
lightning-cloud           0.5.64
lightning_sdk             0.1.2

Issue with StreamingDataset when not using all GPUs on host.

🐛 Bug

When initializing a StreamingDataset object, _DistributedEnv.detect() is called (code) and in the detect() function there a world_size check (code). This check fails for my use case when I am not using all GPUs on a host, such that world_size is set to 6 but torch.cuda.device_count() will return 8. 6 % 8 != 0, thus raising the error.

Perhaps this check failure is the intended behavior, but I do not know enough about the litdata repository to understand why the code should raise an error when world_size % device_count != 0. I would imagine that torch would run various checks when setting up a torch distributed environment, such that this check would not be needed unless it provides a particular purpose to StreamingDataset() or another object. If there is any insight here on (1) if this check is needed and (2) why it is needed, that would be great!

To Reproduce

This issue came up when experimenting with LitGPT. I don't think this section or the following sections are necessary and I will leave the blank for now. Please let me know if any of these sections would help and I can add more description.

Code sample

Not needed at the moment.

Expected behavior

Not needed at the moment.

Environment

Not needed at the moment.

Additional context

Assert when deserializing `no_header_numpy` or `no_header_tensor`.

🐛 Bug

To Reproduce

Steps to reproduce the behavior:

  1. Create/serialize a dataset with integer tensor or numpy.
  2. Read/deserialize the created dataset.

Code sample

from litdata import optimize
import numpy as np
from litdata.streaming import StreamingDataLoader, StreamingDataset


def random_images(index):
    data = {
        "index": index,  # int data type
        "class": np.arange(1, 100),  # numpy array data type
    }
    # The data is serialized into bytes and stored into data chunks by the optimize operator.
    return data  # The data is serialized into bytes and stored into data chunks by the optimize operator.


if __name__ == "__main__":
    optimize(
        fn=random_images,  # The function applied over each input.
        inputs=list(range(10)),  # Provide any inputs. The fn is applied on each item.
        output_dir="my_optimized_dataset",  # The directory where the optimized data are stored.
        num_workers=0,  # The number of workers. The inputs are distributed among them.
        chunk_bytes="64MB",  # The maximum number of bytes to write into a data chunk.
    )

    dataset = StreamingDataset("my_optimized_dataset", shuffle=False, drop_last=False)
    dataloader = StreamingDataLoader(
        dataset,
        num_workers=0,
        batch_size=1,
        drop_last=False,
        shuffle=False,
    )

    for data in dataloader:
        print(data)

Expected behavior

Read and print the batch data.

Environment

  • PyTorch Version (e.g., 1.0): 2.1.2
  • OS (e.g., Linux): MacOS and Linux
  • How you installed PyTorch (conda, pip, source): pip install
  • Build command you used (if compiling from source):
  • Python version: 3.11
  • CUDA/cuDNN version: N/A
  • GPU models and configuration: N/A
  • Any other relevant information:

Additional context

Assert stack

Traceback (most recent call last):
  File "/Users/jou2/work/./test_optimize.py", line 33, in <module>
    for data in dataloader:
  File "/opt/homebrew/lib/python3.11/site-packages/litdata/streaming/dataloader.py", line 598, in __iter__
    for batch in super().__iter__():
  File "/opt/homebrew/lib/python3.11/site-packages/torch/utils/data/dataloader.py", line 630, in __next__
    data = self._next_data()
           ^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/torch/utils/data/dataloader.py", line 674, in _next_data
    data = self._dataset_fetcher.fetch(index)  # may raise StopIteration
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/torch/utils/data/_utils/fetch.py", line 32, in fetch
    data.append(next(self.dataset_iter))
                ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/litdata/streaming/dataset.py", line 298, in __next__
    data = self.__getitem__(
           ^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/litdata/streaming/dataset.py", line 268, in __getitem__
    return self.cache[index]
           ~~~~~~~~~~^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/litdata/streaming/cache.py", line 135, in __getitem__
    return self._reader.read(index)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/litdata/streaming/reader.py", line 252, in read
    item = self._item_loader.load_item_from_chunk(index.index, index.chunk_index, chunk_filepath, begin, chunk_bytes)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/litdata/streaming/item_loader.py", line 110, in load_item_from_chunk
    return self.deserialize(data)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/litdata/streaming/item_loader.py", line 129, in deserialize
    data.append(serializer.deserialize(data_bytes))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/lib/python3.11/site-packages/litdata/streaming/serializers.py", line 261, in deserialize
    assert self._dtype
AssertionError

Support `StreamingDataLoader` passed to `map`

🚀 Feature

It would be great to be able to pass a StreamingDataLoader to map. When experimenting with CLIP embeddings I've found that I needed to use StreamingDataLoader to be able to fully utilise the GPU - but it doesn't play nicely with map because we don't set the right env variables and things for it to work in a distributed setting.

Motivation

This would let us run distributed embedding of much larger data sets like LAION

Pitch

Allow providing a StreamingDataLoader to the map function, and then set correct envs etc. so that we still visit each sample just once.

GCSFuse mount + Vertex AI custom training jobs support

🚀 Feature

DDP on models in GCP with data stored in GCS

Question

Has litdata being tested in the case you want to train on GCP (Vertex AI) instead of just storing and streaming the data from GCS?

Motivation

I've been trying to set up ddp on GCP (Vertex AI) using lighting Fabric with my data being stored in GCS, when you use GCS and Vertex AI you can "mount" a bucket to the instance(s)/containers running in their infrastructure using GCSFuse, in such case the only thing you need to do is replacing gs:// by /gcs/ and the bucket acts as a file system, does litdata have been tested under such setting? will it work ?

I tried already with mosaicml-streaming and I have ran into lots of throughput issues, that lead to data starvation in the multi-node setting

I wrote this cluster environment to use configure the cluster on vertex AI + Fabric:

import os
import json

from lightning.fabric.plugins.environments.lightning import LightningEnvironment


class VertexAICluster(LightningEnvironment):
    """
    Configures distributed training on a vertex ai custom training job,
    ex:
        Consider a cluster with 3 nodes, each composed of 2 gpus

        The "cluster" key in CLUSTER_SPEC will be:
            {
                'workerpool0': ['cmle-training-workerpool0-d604929a6a-0:2222'],
                'workerpool1': [
                                'cmle-training-workerpool1-d604929a6a-0:2222',
                                'cmle-training-workerpool1-d604929a6a-1:2222'
                              ]
            }

        and each process scheduled will be under the "task" key, following the same example
        the three tasks will look like this:
            task0 ("chief" spawn process) -> node 0:
            {'type': 'workerpool0', 'index': 0}
            task 1 (on first node on workerpool1) -> node 1:
            {'type': 'workerpool1', 'index': 0}
            task 2 (on second node on workerpool1) -> node 2:
            {'type': 'workerpool1', 'index': 1}
    """

    def __init__(self):
        super().__init__()
        self.cluster_spec = json.loads(os.environ['CLUSTER_SPEC'])

    @property
    def main_address(self) -> str:
        return self.cluster_spec["cluster"]["workerpool0"][0].split(':')[0]

    @property
    def main_port(self) -> int:
        """Set common fixed MASTER_PORT port across processes
        """
        return int(self.cluster_spec["cluster"]["workerpool0"][0].split(':')[1])

    def node_rank(self) -> int:
        task = self.cluster_spec["task"]
        if task["type"] == "workerpool0":
            return 0
        else:
            return task["index"] + 1

do I need to set up some other env variables if I wanted to test litdata?, do the ones defined here in this cluster environment are used by litdata, in mosaicml you had to configure these

Dataset not created when using `map()` on data structure without file paths inside

🐛 Bug

Dataset does not get created when map is given a list of items without paths.

To Reproduce

  1. pip install litdata==0.2.3 lightning_sdk==0.1.3
  2. Run this script
  3. Wait for job to finish
  4. Restart studio
  5. ls /teamspace/datasets

Code sample

import requests, os
from litdata import map
from lightning_sdk import Machine


def create_files(idx, output_dir):
    with open(os.path.join(output_dir, f"{idx}.txt") , "w") as f:
        f.write(str(idx))


def main():
    # root_dir = "./data-processed"
    root_dir = "/teamspace/datasets/data-processed"
    # os.makedirs(root_dir, exist_ok=True)

    inputs = list(range(100))

    map(
        fn=create_files,
        inputs=inputs,
        output_dir=root_dir,
        num_workers=os.cpu_count(),
        num_nodes=2,
        machine=Machine.CPU,
    )


if __name__ == "__main__":
    main()

In #73 a condition and self.input_dir.path was added:
https://github.com/Lightning-AI/litdata/blame/58f7aeb5836383ff839b4d966462c568aa6e7435/src/litdata/processing/data_processor.py#L1022
The assumption there was that map gets a datastructure of file paths, but that's not always true. For example, map could be called on a list of URLs to download and process.

ValueError with tree_unflatten when trying to read local cache

🐛 Bug

For some reason I get this bug trying to iterate over a local dataset. The dataset contains raw bytes (read with audio_path.read_bytes() where audio_path is a pathlib.Path object) and a filename (str). I optimize with

import torchaudio
from litdata import optimize, StreamingDataset
from pathlib import Path

def fn(path: Path):
    return {"data": path.read_bytes(), "filename": path.stem}

output_dir = "~/my-special-place"
input_dir = Path("path/to/wav/files")

optimize(fn, list(input_dir.rglob("*.wav")), "~/my-special-place", chunk_bytes="64MB"), num_workers=4)

class Dataset(StreamingDataset):
    def __getitem__(self, idx: int) -> tuple[torch.Tensor, int, str]:
        obj = super().__getitem__(idx)
        x, sr = torchaudio.load(BytesIO(obj["data"]))
        return x, sr, obj["filename"]

dataset = Dataset("local:" + output_dir)
dataloader = DataLoader(
    dataset,
    batch_size=1,
    num_workers=4,
)
for x, sr, filename in tqdm(dataloader, total=len(inputs), desc="Verifying"):
    pass

but then it (always) fails with this ValueError:

ValueError: Caught ValueError in DataLoader worker process 3.
Original Traceback (most recent call last):
  File "/home/mads/Repos/dev-mads/my-repo/.venv/lib/python3.10/site-packages/torch/utils/data/_utils/worker.py", line 308, in _worker_loop
    data = fetcher.fetch(index)
  File "/home/mads/Repos/dev-mads/my-repo/.venv/lib/python3.10/site-packages/torch/utils/data/_utils/fetch.py", line 32, in fetch
    data.append(next(self.dataset_iter))
  File "/home/mads/Repos/dev-mads/my-repo/.venv/lib/python3.10/site-packages/litdata/streaming/dataset.py", line 298, in __next__
    data = self.__getitem__(
  File "/home/mads/Repos/dev-mads/my-repo/scripts/upload_data_for_streaming-litdata.py", line 20, in __getitem__
    obj = super().__getitem__(idx)
  File "/home/mads/Repos/dev-mads/my-repo/.venv/lib/python3.10/site-packages/litdata/streaming/dataset.py", line 268, in __getitem__
    return self.cache[index]
  File "/home/mads/Repos/dev-mads/my-repo/.venv/lib/python3.10/site-packages/litdata/streaming/cache.py", line 135, in __getitem__
    return self._reader.read(index)
  File "/home/mads/Repos/dev-mads/my-repo/.venv/lib/python3.10/site-packages/litdata/streaming/reader.py", line 252, in read
    item = self._item_loader.load_item_from_chunk(index.index, index.chunk_index, chunk_filepath, begin)
  File "/home/mads/Repos/dev-mads/my-repo/.venv/lib/python3.10/site-packages/litdata/streaming/item_loader.py", line 106, in load_item_from_chunk
    return self.deserialize(data)
  File "/home/mads/Repos/dev-mads/my-repo/.venv/lib/python3.10/site-packages/litdata/streaming/item_loader.py", line 127, in deserialize
    return tree_unflatten(data, self._config["data_spec"])
  File "/home/mads/Repos/dev-mads/my-repo/.venv/lib/python3.10/site-packages/torch/utils/_pytree.py", line 552, in tree_unflatten
    raise ValueError(
ValueError: tree_unflatten(leaves, treespec): `leaves` has length 0 but the spec refers to a pytree that holds 2 items (TreeSpec(dict, ['data', 'filename'], [*,
  *])).

Code sample

Expected behavior

Environment

  • PyTorch Version (e.g., 1.0):
  • OS (e.g., Linux):
  • How you installed PyTorch (conda, pip, source):
  • Build command you used (if compiling from source):
  • Python version:
  • CUDA/cuDNN version:
  • GPU models and configuration:
  • Any other relevant information:

Additional context

ValueError: The provided None isn't supported.

Script:

import os
from litdata import optimize
from utils import compress
from typing import Tuple, Dict


if __name__ == "__main__":
    # 1. List the files and associate the images and their masks
    # split = "train"
    # split = "val"
    split = "test"
    input_dir = f"/teamspace/studios/this_studio/data/datasetv10/split_all_bands/{split}"
    output_dir = f"/teamspace/studios/this_studio/data/datasetv10/split_all_bands_stream_dict/{split}"

    filepath_pairs = dict()
    for f in sorted(os.listdir(input_dir)):
        name = f.split('.')[0].replace("_merged", "")
        if name not in filepath_pairs:
            filepath_pairs[name] = [name]
        filepath_pairs[name].append(os.path.join(input_dir, f))

    assert all(len(v) == 3 for v in filepath_pairs.values())

    # 3. Define the inputs
    inputs = list([v for v in filepath_pairs.values()])
    len(inputs)
    # 4. Run the optimization
    optimize(
        fn=compress,
        inputs=inputs,
        output_dir=output_dir,
        chunk_bytes="128MB",
        num_workers=2,
    )

Error:

Storing the files under /teamspace/studios/this_studio/data/datasetv10/split_all_bands_stream_dict/test
Setup started with fast_dev_run=False.
Worker 0 gets 1201.2 MB (88 files)
Worker 1 gets 1201.2 MB (88 files)
Setup finished in 0.138 seconds. Found 176 items to process.
Starting 2 workers with 176 items.
Workers are ready ! Starting data processing...
                                                                                                                                                       Process Process-1:1:                                                                                                           | 0/176 [00:00<?, ?it/s]
Traceback (most recent call last):
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/data_processor.py", line 163, in _download_data_target
    raise ValueError(f"The provided {input_dir.url} isn't supported.")
ValueError: The provided None isn't supported.
Process Process-1:2:
Traceback (most recent call last):
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/data_processor.py", line 163, in _download_data_target
    raise ValueError(f"The provided {input_dir.url} isn't supported.")
ValueError: The provided None isn't supported.
Process Process-2:1:
Traceback (most recent call last):
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/data_processor.py", line 163, in _download_data_target
    raise ValueError(f"The provided {input_dir.url} isn't supported.")
ValueError: The provided None isn't supported.
Process Process-2:2:
Traceback (most recent call last):
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/data_processor.py", line 163, in _download_data_target
    raise ValueError(f"The provided {input_dir.url} isn't supported.")
ValueError: The provided None isn't supported.

Append data to pre-optimized dataset

Description & Motivation

It's not uncommon to have to update the data one is training on or computing embeddings on etc. We could support appending data to an optimized (chunked) dataset.
Appending alone is sufficient, removal is more specific and can be performed by rerunning through the samples with a map and creating a new dataset from it.

Pitch

When we map or optimize specifying a target location, add ability to append to existing chunks (through a mode="append" argument or something along these lines).

Alternatives

The alternative is creating a new dataset and compose with the previous one during iteration. However composition is not trivial because we need to make sure we are drawing each sample once from each and then avoid bumping into StopIteration. We would need to add a specific mode to composed dataset.

If the data added is misaligned with the chunk size and appending happens often it would create a suboptimal dataset after a while, that would need to be compacted into a single one by iterating sequentially. This could be a further alternative: a utility where you pass a list of datasets and create a single dataset by iterating through all of them.

Additional context

No response

cc @Borda @tchaton

Moved from Lightning-AI/pytorch-lightning#19519, submitted by @lantiga

litdata with huggingface instead of S3

🚀 Feature

I wanna use litdata to stream huggingface dataset cerebras/SlimPajama-627B. (not S3)

Motivation

How can I stream huggingface dataset instead of S3

Pitch

I wanna stream huggingface dataset not S3

Alternatives

just to stream huggingface dataset instead of S3

Additional context

I wanna use huggingface dataset, not S3

`map` device management

🚀 Feature

Provide an easy or automated way to get batches + models on to the correct device with map.

Motivation

We often want to map over a bunch of GPU machines, maybe each with more that one GPU on board. Right now, deciding which device to use in each process is a little tricky, you have to get the rank modulo the number of CUDA devices.

Pitch

Probably the cleanest thing would be to just automatically handle devices more like a LightningModule - maybe if you pass an nn.Module to map we could put it on a correct device for the process and wrangle the inputs / outputs.

Alternatives

Additional context

The tested speed is not as fast as expected.

🐛 Bug

The tested speed is not as fast as expected.

Code sample

import os
import torch
import numpy as np
from tqdm import tqdm
from torchvision.transforms import Compose, Lambda
from litdata import StreamingDataset, StreamingDataLoader

from torchvision.transforms._transforms_video import NormalizeVideo, RandomCropVideo, RandomHorizontalFlipVideo, CenterCropVideo

input_dir = 's3://extract_frames/'
OPENAI_DATASET_MEAN = (0.48145466, 0.4578275, 0.40821073)
OPENAI_DATASET_STD = (0.26862954, 0.26130258, 0.27577711)

class ImagenetStreamingDataset(StreamingDataset):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.transform = Compose(
            [
                Lambda(lambda x: x / 255.0),
                NormalizeVideo(mean=OPENAI_DATASET_MEAN, std=OPENAI_DATASET_STD),
                # ShortSideScale(size=224),
                CenterCropVideo(224),
            ]
        )
    
    def __getitem__(self, index):
        data = super().__getitem__(index)
        video_data = []
        for i in range(8):
            frame = np.array(data["image"][i])
            video_data.append(torch.from_numpy(frame).permute(2, 0, 1))
        video_data = torch.stack(video_data, dim=1)
        video_data = self.transform(video_data)
        return video_data

dataset = ImagenetStreamingDataset(input_dir, shuffle=True)
dataloader = StreamingDataLoader(dataset, batch_size=64, num_workers=8)
for batch in tqdm(dataloader, total=len(dataloader)):
    pass

Expected behavior

There are approximately 200,000 data points, each consisting of 8 frames extracted. Based on the tested speed, it should be very fast, but in reality, it is not.

Screenshot 2024-03-07 at 20 42 20

The tested speed is approximately as follows:
Screenshot 2024-03-07 at 20 48 30

Environment

  • PyTorch Version (e.g., 1.0): 2.2.1
  • OS (e.g., Linux): linux
  • How you installed PyTorch (conda, pip, source): pip
  • Python version: 3.9
  • CUDA/cuDNN version:11.6

StreamingDataset support for older PyTorch versions

Currently, StreamingDataset is incompatible with PyTorch version <2.1. Would it be possible to relax the package constraint to accommodate for torch >= 2.0.0 or even PyTorch 1.0?

The following is the error message logged with the current
ModuleNotFoundError: PyTorch version 2.1 or higher is required to use the cache.

litdata==0.2.2
pytorch_lightning==2.2.2
torch==2.1.0

Compression using the optimize function from litdata

🐛 Bug

To Reproduce

Steps to reproduce the behavior:

Just use the https://lightning.ai/lightning-ai/studios/convert-parquets-to-lightning-streaming.

Modify it to using litdata, instead of lightning.data

Add to the optimize function in convert.py any compression method for example "zstd".

Code sample

optimize(convert_parquet_to_lightning_data, parquet_files[:10], output_dir, num_workers=os.cpu_count(), chunk_bytes="64MB", compression="gzip")

Expected behavior

Just creation of the compressed shards.
Error:

Traceback (most recent call last):
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/data_processor.py", line 426, in run
    self._setup()
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/data_processor.py", line 436, in _setup
    self._create_cache()
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/processing/data_processor.py", line 511, in _create_cache
    self.cache = Cache(
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/streaming/cache.py", line 65, in __init__
    self._writer = BinaryWriter(
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/litdata/streaming/writer.py", line 85, in __init__
    raise ValueError("No compresion algorithms are installed.")
ValueError: No compresion algorithms are installed.

Environment

lightning-ai, with pip install litdata

Additional context

Check the size of the dataset (compressed and uncompressed - in my first implementation on aws, i got same size for the data set.

`litdata.optimize` accidentally deletes files from the local filesystem

🐛 Bug

When filepaths are passed as inputs to litdata.optimize, it attempts to resolve input_dir. This input_dir is later used in DataWorker to cache these files and manage cleanup.

But _get_input_dir is very error-prone, as it only looks at the first element of inputs:

indexed_paths = _get_indexed_paths(inputs[0])

and assumes that input_dir is always three directories deep from the root:

return "/" + os.path.join(*str(absolute_path).split("/")[:4])

However, if our input files that don't follow these assumptions, e.g. come from different top-level directories, it can really mess things up. That's because when clearing the cache, filepaths are determined simply by replacing input_dir with cache_dir:

for path in paths:
if input_dir:
if not path.startswith(cache_dir) and input_dir.path is not None:
path = path.replace(input_dir.path, cache_dir)
if os.path.exists(path):
os.remove(path)

But if input_dir.path is not in path, replace does nothing, and then it just proceeds to delete a valid file! Removing these paths should be done with much more caution.

To Reproduce

Create a directory and ensure python can save to it:

sudo mkdir /mnt/litdata-example
sudo chmod 777 /mnt/litdata-example/

Then run a simple python script:

import os
import uuid
from glob import glob

import litdata
import torch

base_dir = "/mnt/litdata-example"

for _ in range(4):  # create 4 random directories with pytorch tensors
    dir_name = os.path.join(base_dir, str(uuid.uuid4())[:8])
    os.makedirs(dir_name, exist_ok=True)
    torch.save(torch.randn(4, 4), os.path.join(dir_name, "tensor.pt"))  # Save a random pytorch tensor

files_before = glob(os.path.join(base_dir, "*/*.pt"))
print(files_before)  # print the paths of the saved tensors to confirm creation

litdata.optimize(fn=lambda x: x, inputs=files_before, output_dir="output_dir", num_workers=1, chunk_bytes="64MB")

files_after = glob(os.path.join(base_dir, "*/*.pt"))
print(files_after)  # some files are gone! 👋
assert len(files_before) == len(files_after)

And yes... this actually happened to me. I was quite astonished to see some of my files just deleted 🤯

Environment

  • litdata==0.2.3

Additional context

Is caching input files in litdata.optimize actually necessary? The most common use case is to retrieve a file only once during dataset preparation. If we simply set an empty input directory input_dir = Dir() in DataProcessor, we can avoid all of this.

Pytorch locked to version 2.2.0

🐛 Bug

To Reproduce

Steps to reproduce the behavior:

  1. Try to install the latest torchvision version 0.17.1 which requires torch version 2.2.1, but litdata is locked to torch version 2.2.0

Expected behavior

litdata works with any torch version higher than 2.1.0
i.e. unlock torch version by removing the upper limit

Environment

  • PyTorch Version (e.g., 1.0): 2.2.1
  • OS (e.g., Linux): Linux
  • How you installed PyTorch (conda, pip, source): pip
  • Build command you used (if compiling from source): -
  • Python version: 3.10
  • CUDA/cuDNN version:
  • GPU models and configuration:
  • Any other relevant information:

Additional context

Allow a StreamingDataset to wrap around when running in a CombinedStreamingDataset

🚀 Feature

Consider adding the ability to wrap around a StreamingDataset without issuing a StopIteration when combining datasets.

This is something we haven't ported from PackedDataset https://github.com/Lightning-AI/lit-llama/blob/main/lit_llama/packed_dataset.py#L190

Motivation

This is useful to combine a smaller dataset with a very large one, so that we can make sure certain batches of data make it into the training process frequently enough, but without invalidating the epoch when the other datasets are multi-billion tokens in size.

Pitch

Add a wrap property to a StreamingDataset that will not have it raise a StopIteration but just keep looping through the data.

Alternatives

Add handling of this at the CombinedStreamingDataset level, so that each dataset raises StopIteration when it has to, but we don't invalidate the others. In both cases we need to decide what happens to epoch within the dataset.

optimize function on multiple machine writing to local pathes

🚀 Feature

optimize function on multiple machine writing to local pathes, atm two machine using fails.

Motivation

parallize the optimize function using the lit data app in lightning studios

Pitch

I want that lidata support distributed with local paths

Alternatives

Additional context

TPU support

🚀 Feature

TPU support

Motivation

Does litdata supports TPU environments, specifically when using lighting fabric?

Additional context

I have >16M image-text pairs I am writing in mosaic-ml streaming format to train contrastive models, I am working with lighting fabric to train using DDP in GCP and I want to move to TPU training. mosaic-ml streaming dataset doesn't support TPU (afaik), all of this bring me to the questions:

  • Does litdata work on TPU?
  • Does it require to set up something in the code additional to what is provided in the available documentation. (ex. is it necessary to provide a distributed sampler? or set different env variables?)
  • Do you have an example of how to setup TPU training with litdata?

Fast random access for `StreamingDataset`

🚀 Feature

Support a way to request just a single sample from a StreamingDataset without internally pulling the whole chunk.

Motivation

Streaming chunks is great for cases where you want to visit the whole dataset but sub-optimal if you just want to view individual samples. Right now, if you just index a StreamingDataset directly the latency is very high. This is a bit of an issue if you want to explore the dataset (e.g. in a streamlit or gradio app).

Pitch

We could have a way to request a single sample from the dataset that would download only the bytes of that sample instead of downloading the whole chunk. This would enable building visualizations etc. on top of streaming datasets.

Alternatives

Additional context

Resuming StreamingDataloader with num_workers=0 fails

Bug description

Using a StreamingDataloader with num_workers=0 works, but resuming the state does not. There is an explicit length check for the state that fails.

Using num_workers=0 is maybe not very meaningful for real applications, but it might be good for debugging and testing purposes. Alternatively, if that's difficult to support, then StreamingDataloader could just force having num_workers>=1. I think we should do something about it, since 0 is the default for the dataloader and users might forget to set it and then run into this error which could be confusing them.

What version are you seeing the problem on?

master

How to reproduce the bug

import torch


def run():
    checkpoint_path = "checkpoint.pt"

    # Save a checkpoint
    train_dataloader = create_dataloader()
    train_iterator = iter(train_dataloader)
    next(train_iterator)
    next(train_iterator)
    torch.save(train_dataloader.state_dict(), checkpoint_path)

    # Reset and attempt resume
    train_dataloader = create_dataloader()
    state = {"train_dataloader": train_dataloader}
    train_dataloader.load_state_dict(torch.load(checkpoint_path))
    train_iterator = iter(train_dataloader)
    next(train_iterator)
    next(train_iterator)


def create_dataloader():
    from lightning.data import StreamingDataset, CombinedStreamingDataset, StreamingDataLoader
    from lightning.data.streaming.item_loader import TokensLoader

    train_datasets = [
        StreamingDataset(
            input_dir="/teamspace/s3_connections/tinyllama-template/slimpajama/train",
            item_loader=TokensLoader(block_size=4),
        ),
        StreamingDataset(
            input_dir="/teamspace/s3_connections/tinyllama-template/starcoder",
            item_loader=TokensLoader(block_size=4),
        ),
    ]
    combined_dataset = CombinedStreamingDataset(datasets=train_datasets)
    train_dataloader = StreamingDataLoader(combined_dataset, batch_size=4, num_workers=0)  # <--- BUG WHEN NUM WORKERS=0
    return train_dataloader


if __name__ == "__main__":
    run()

Error messages and logs

Traceback (most recent call last):
  File "/teamspace/studios/this_studio/repro_worker.py", line 50, in <module>
    run()
  File "/teamspace/studios/this_studio/repro_worker.py", line 25, in run
    next(train_iterator)
  File "/teamspace/studios/this_studio/lightning/src/lightning/data/streaming/dataloader.py", line 432, in __iter__
    for batch in super().__iter__():
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 438, in __iter__
    return self._get_iterator()
  File "/teamspace/studios/this_studio/lightning/src/lightning/data/streaming/dataloader.py", line 504, in _get_iterator
    return _SingleProcessDataLoaderIter(self)
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 669, in __init__
    self._dataset_fetcher = _DatasetKind.create_fetcher(
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 79, in create_fetcher
    return _utils.fetch._IterableDatasetFetcher(dataset, auto_collation, collate_fn, drop_last)
  File "/home/zeus/miniconda3/envs/cloudspace/lib/python3.10/site-packages/torch/utils/data/_utils/fetch.py", line 21, in __init__
    self.dataset_iter = iter(dataset)
  File "/teamspace/studios/this_studio/lightning/src/lightning/data/streaming/combined.py", line 83, in __iter__
    self._iterator = _CombinedDatasetIterator(
  File "/teamspace/studios/this_studio/lightning/src/lightning/data/streaming/combined.py", line 126, in __init__
    self._dataset_iters = [iter(dataset) for dataset in datasets]
  File "/teamspace/studios/this_studio/lightning/src/lightning/data/streaming/combined.py", line 126, in <listcomp>
    self._dataset_iters = [iter(dataset) for dataset in datasets]
  File "/teamspace/studios/this_studio/lightning/src/lightning/data/streaming/dataset.py", line 146, in __iter__
    self._validate_state_dict()
  File "/teamspace/studios/this_studio/lightning/src/lightning/data/streaming/dataset.py", line 328, in _validate_state_dict
    raise ValueError(
ValueError: The provided `num_workers` state doesn't match the current one. Found `1` instead of `0`.

Environment

Current environment
#- Lightning Component (e.g. Trainer, LightningModule, LightningApp, LightningWork, LightningFlow):
#- PyTorch Lightning Version (e.g., 1.5.0): master (2.2dev)
#- Lightning App Version (e.g., 0.5.2):
#- PyTorch Version (e.g., 2.0):
#- Python version (e.g., 3.9):
#- OS (e.g., Linux):
#- CUDA/cuDNN version:
#- GPU models and configuration:
#- How you installed Lightning(`conda`, `pip`, source):
#- Running environment of LightningApp (e.g. local, cloud):

More info

No response

Moved from Lightning-AI/pytorch-lightning#19335, submitted by @awaelchli

`litdata.streaming.resolver::_execute` prints incorrect URL

🐛 Bug

This line in litdata.streaming.resolver::_execute causes the following to log to terminal:

Find your job at https://lightning.ai/Organization(name=some-name)/...

rather than

Find your job at https://lightning.ai/some-name/...

To Reproduce

Steps to reproduce the behavior:

Duplicate this Studio and run python main.py --name some-dataset-name

Code sample

fix is

- job_url = f"{cloud_url}/{studio.owner}/{studio._teamspace.name}"
+ job_url = f"{cloud_url}/{studio.owner.name}/{studio._teamspace.name}"

Expected behavior

URL is actual path to Job

Environment

  • Studio

Additional context

DM'd @tchaton for approval to submit PR; please assign this to me 😄

Prints inside the worker processes mess up the progress bar

🐛 Bug

In my code, I am enabling a tqdm bar per worker with:

    global_rank = int(os.environ["DATA_OPTIMIZER_GLOBAL_RANK"])
    num_workers = int(os.environ["DATA_OPTIMIZER_NUM_WORKERS"])
    local_rank = global_rank % num_workers
    for example in tqdm(data, position=local_rank):
        tokens = tokenizer.encode(example)
        yield tokens

But litdata prints this in each rank:

Rank 3 inferred the following `['no_header_tensor:16']` data format.

Breaking the tqdm bars at the beginning.

Since this print doesn't seem very useful for users, I would suggest that it is removed or put under fast_dev_run or a similar verbose-like flag.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.