A high-performance data processing pipeline for large-scale text datasets. (Note, readme generated by Claude, seems ...okay -mj)
DataMap is a Rust-based toolkit designed for efficient processing, filtering, and resharding of large text datasets, primarily in JSONL format. It provides a flexible pipeline architecture for text data transformations with various filters and modifiers.
Key features:
- Multi-threaded processing with Rayon
- Configurable processing pipeline via JSON/YAML configuration
- Comprehensive set of text filters and modifiers
- Data resharding capabilities
- Utilities for S3/GCP/WEKA integration
The core functionality is implemented in Rust for high performance:
-
Main Module (
src/main.rs
):- Command-line interface with subcommands
- Pipeline execution logic
- I/O and file operations
-
Data Processors (
src/map_fxn.rs
):- Pipeline processor architecture
- Text filters (length, language, URL, etc.)
- Content modifiers (newline removal, ID generation, etc.)
- Analytics processors (FastText annotation, etc.)
Python utilities for cloud storage operations:
- S3/GCP/WEKA integration via s5cmd
- Parallel file download/upload capabilities
- Progress tracking
Process data through a filtering/modification pipeline:
datamap map --input_dir ./data/input --output_dir ./data/output --config pipeline_config.yaml [--err_dir ./data/errors] [--threads 16]
Reshard files into specific size or line count chunks:
datamap reshard --input_dir ./data/input --output_dir ./data/output --max_lines 10000 --max_size 100000000 [--subsample 0.1] [--threads 16]
Upload/download files from cloud storage:
python utils/s5cmd_wrapper.py download --src s3://bucket/path --dst ./local/path [--part 0 --num-parts 4]
python utils/s5cmd_wrapper.py upload --src ./local/path --dst s3://bucket/path
Pipelines are defined using YAML or JSON configuration files. Example config:
text_field: "text"
pipeline:
- name: "text_len_filter"
kwargs:
lower_bound: 100
upper_bound: 100000
- name: "subsample"
kwargs:
subsample_rate: 0.8
- name: "stop_word_filter"
kwargs:
min_stop_word: 3
- name: "word_count_adder"
kwargs:
word_count_field: "word_count"
The toolkit includes many processors for various text transformation and filtering needs:
text_len_filter
: Filter by text lengthpage_len_filter
: Filter by length of words, sentences, etc.word_len_filter
: Filter by average word lengthsubsample
: Randomly subsample documentsurl_substring_filter
: Filter URLs by domain, subdomain, etc.float_filter
: Filter by float field valuessymbol_ratio_filter
: Filter by symbol densitybullet_filter
: Filter by bullet point densityellipsis_line_ratio_filter
: Filter by ellipsis usagealphabetic_word_ratio_filter
: Filter by non-alphabetic word ratiostop_word_filter
: Filter by presence of stop wordsmassive_web_repetition_filter
: Filter by content repetition patternsword_removal_ratio_filter
: Filter by word removal ratiomadlad400_sentence_filter
: Multi-criteria sentence filter from Madlad400
add_id
: Add UUID to documentsnewline_removal_modifier
: Control consecutive newlinesratio_line_modifier
: Filter lines by uppercase or digit ratioregex_line_modifier
: Filter lines using regexline_len_modifier
: Filter lines by word countsubstring_line_modifier
: Filter or modify lines with banned substringsword_count_adder
: Add word count field
fasttext_annotator
: Add language classification with FastText
- rayon (parallel processing)
- clap (command-line parsing)
- serde_json/serde_yaml (config parsing)
- anyhow (error handling)
- dashmap (concurrent hashmap)
- zstd (compression)
- boto3
- click
- tqdm
- Install Rust: https://www.rust-lang.org/tools/install
- Clone the repository
- Build the project:
cargo build --release
- Install Python dependencies:
pip install boto3 click tqdm
- Install s5cmd if using cloud storage utilities:
# Instructions vary by platform
[Insert your license information here]