Skip to content

Commit

Permalink
Chunks include their spec as an attribute
Browse files Browse the repository at this point in the history
Fixes #1143
  • Loading branch information
jimhester committed May 26, 2021
1 parent c88303c commit cfe4c86
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 11 deletions.
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ Previously versions of readr were licensed as GPL-3.

### Other second edition changes

* `read_*_chunked()` functions now include their specification as an attribute (#1143)

* All `read_*()` functions gain a `col_select` argument to more easily choose which columns to select.

* All `read_*()` functions gain a `id` argument to optionally store the file paths when reading multiple files.
Expand Down
4 changes: 2 additions & 2 deletions R/cpp11.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ read_tokens_ <- function(sourceSpec, tokenizerSpec, colSpecs, colNames, locale_,
.Call(`_readr_read_tokens_`, sourceSpec, tokenizerSpec, colSpecs, colNames, locale_, n_max, progress)
}

read_tokens_chunked_ <- function(sourceSpec, callback, chunkSize, tokenizerSpec, colSpecs, colNames, locale_, progress) {
invisible(.Call(`_readr_read_tokens_chunked_`, sourceSpec, callback, chunkSize, tokenizerSpec, colSpecs, colNames, locale_, progress))
read_tokens_chunked_ <- function(sourceSpec, callback, chunkSize, tokenizerSpec, colSpecs, colNames, locale_, spec, progress) {
invisible(.Call(`_readr_read_tokens_chunked_`, sourceSpec, callback, chunkSize, tokenizerSpec, colSpecs, colNames, locale_, spec, progress))
}

melt_tokens_ <- function(sourceSpec, tokenizerSpec, colSpecs, locale_, n_max, progress) {
Expand Down
6 changes: 3 additions & 3 deletions R/read_delim_chunked.R
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ generate_read_delimited_chunked <- function(x) {
chunked_call <- chunked_call[!names(chunked_call) == "n_max"]

# Add the callback and chunk_size arguments
b[[i]] <- as.call(append(chunked_call, alist(callback = callback, chunk_size = chunk_size), 2))
b[[i]] <- as.call(append(chunked_call, alist(callback = callback, chunk_size = chunk_size, spec = spec), 2))

# Remove additional calls
b <- b[-seq(i + 1, length(b))]
Expand All @@ -34,11 +34,11 @@ generate_read_delimited_chunked <- function(x) {
x
}

read_tokens_chunked <- function(data, callback, chunk_size, tokenizer, col_specs, col_names, locale_, progress) {
read_tokens_chunked <- function(data, callback, chunk_size, tokenizer, col_specs, col_names, locale_, spec, progress) {
callback <- as_chunk_callback(callback)
on.exit(callback$finally(), add = TRUE)

read_tokens_chunked_(data, callback, chunk_size, tokenizer, col_specs, col_names, locale_, progress)
read_tokens_chunked_(data, callback, chunk_size, tokenizer, col_specs, col_names, locale_, spec, progress)

return(callback$result())
}
Expand Down
10 changes: 5 additions & 5 deletions src/cpp11.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ extern "C" SEXP _readr_read_tokens_(SEXP sourceSpec, SEXP tokenizerSpec, SEXP co
END_CPP11
}
// read.cpp
void read_tokens_chunked_(const cpp11::list& sourceSpec, const cpp11::environment& callback, int chunkSize, const cpp11::list& tokenizerSpec, const cpp11::list& colSpecs, const cpp11::strings& colNames, const cpp11::list& locale_, bool progress);
extern "C" SEXP _readr_read_tokens_chunked_(SEXP sourceSpec, SEXP callback, SEXP chunkSize, SEXP tokenizerSpec, SEXP colSpecs, SEXP colNames, SEXP locale_, SEXP progress) {
void read_tokens_chunked_(const cpp11::list& sourceSpec, const cpp11::environment& callback, int chunkSize, const cpp11::list& tokenizerSpec, const cpp11::list& colSpecs, const cpp11::strings& colNames, const cpp11::list& locale_, const cpp11::sexp& spec, bool progress);
extern "C" SEXP _readr_read_tokens_chunked_(SEXP sourceSpec, SEXP callback, SEXP chunkSize, SEXP tokenizerSpec, SEXP colSpecs, SEXP colNames, SEXP locale_, SEXP spec, SEXP progress) {
BEGIN_CPP11
read_tokens_chunked_(cpp11::as_cpp<cpp11::decay_t<const cpp11::list&>>(sourceSpec), cpp11::as_cpp<cpp11::decay_t<const cpp11::environment&>>(callback), cpp11::as_cpp<cpp11::decay_t<int>>(chunkSize), cpp11::as_cpp<cpp11::decay_t<const cpp11::list&>>(tokenizerSpec), cpp11::as_cpp<cpp11::decay_t<const cpp11::list&>>(colSpecs), cpp11::as_cpp<cpp11::decay_t<const cpp11::strings&>>(colNames), cpp11::as_cpp<cpp11::decay_t<const cpp11::list&>>(locale_), cpp11::as_cpp<cpp11::decay_t<bool>>(progress));
read_tokens_chunked_(cpp11::as_cpp<cpp11::decay_t<const cpp11::list&>>(sourceSpec), cpp11::as_cpp<cpp11::decay_t<const cpp11::environment&>>(callback), cpp11::as_cpp<cpp11::decay_t<int>>(chunkSize), cpp11::as_cpp<cpp11::decay_t<const cpp11::list&>>(tokenizerSpec), cpp11::as_cpp<cpp11::decay_t<const cpp11::list&>>(colSpecs), cpp11::as_cpp<cpp11::decay_t<const cpp11::strings&>>(colNames), cpp11::as_cpp<cpp11::decay_t<const cpp11::list&>>(locale_), cpp11::as_cpp<cpp11::decay_t<const cpp11::sexp&>>(spec), cpp11::as_cpp<cpp11::decay_t<bool>>(progress));
return R_NilValue;
END_CPP11
}
Expand Down Expand Up @@ -214,7 +214,7 @@ extern SEXP _readr_read_lines_chunked_(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP)
extern SEXP _readr_read_lines_raw_(SEXP, SEXP, SEXP);
extern SEXP _readr_read_lines_raw_chunked_(SEXP, SEXP, SEXP, SEXP);
extern SEXP _readr_read_tokens_(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
extern SEXP _readr_read_tokens_chunked_(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
extern SEXP _readr_read_tokens_chunked_(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
extern SEXP _readr_stream_delim_(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
extern SEXP _readr_tokenize_(SEXP, SEXP, SEXP);
extern SEXP _readr_type_convert_col(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
Expand Down Expand Up @@ -242,7 +242,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_readr_read_lines_raw_", (DL_FUNC) &_readr_read_lines_raw_, 3},
{"_readr_read_lines_raw_chunked_", (DL_FUNC) &_readr_read_lines_raw_chunked_, 4},
{"_readr_read_tokens_", (DL_FUNC) &_readr_read_tokens_, 7},
{"_readr_read_tokens_chunked_", (DL_FUNC) &_readr_read_tokens_chunked_, 8},
{"_readr_read_tokens_chunked_", (DL_FUNC) &_readr_read_tokens_chunked_, 9},
{"_readr_stream_delim_", (DL_FUNC) &_readr_stream_delim_, 8},
{"_readr_tokenize_", (DL_FUNC) &_readr_tokenize_, 3},
{"_readr_type_convert_col", (DL_FUNC) &_readr_type_convert_col, 6},
Expand Down
8 changes: 7 additions & 1 deletion src/read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ typedef std::vector<CollectorPtr>::iterator CollectorItr;
const cpp11::list& colSpecs,
const cpp11::strings& colNames,
const cpp11::list& locale_,
const cpp11::sexp& spec,
bool progress) {

LocaleInfo l(locale_);
Expand All @@ -173,6 +174,11 @@ typedef std::vector<CollectorPtr>::iterator CollectorItr;
if (out.nrow() == 0) {
return;
}

// We use the C API directly, as we are modifying the read-only data_frame
// here.
Rf_setAttrib(out, Rf_install("spec"), spec);

R6method(callback, "receive")(out, pos);
pos += out.nrow();
}
Expand Down Expand Up @@ -260,7 +266,7 @@ typedef std::vector<CollectorPtr>::iterator CollectorItr;
}

std::vector<std::string> out;
for (auto & collector : collectors) {
for (auto& collector : collectors) {
cpp11::strings col(collector->vector());
out.push_back(collectorGuess(SEXP(col), cpp11::list(locale_)));
}
Expand Down
8 changes: 8 additions & 0 deletions tests/testthat/test-read-chunked.R
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,11 @@ test_that("AccumulateCallback works as intended", {
"`callback` must have three or more arguments"
)
})

test_that("Chunks include their spec (#1143)", {
res <- read_csv_chunked(readr_example("mtcars.csv"),
callback = ListCallback$new(function(x, pos) spec(x)),
chunk_size = 20)

expect_equal(res[[1]]$cols, spec_csv(readr_example("mtcars.csv"))$cols)
})

0 comments on commit cfe4c86

Please sign in to comment.