Skip to content

Commit

Permalink
fixup! fixup! ...
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Nov 19, 2022
1 parent 81f1f92 commit 357a86b
Showing 1 changed file with 59 additions and 18 deletions.
77 changes: 59 additions & 18 deletions src/dataqueue/queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,8 @@ class DataQueueEntry : public EntryBase {

// Essentially an entry that exists to give the Javascript side
// control of what happens when data is read. Always non-idempotent.
class StreamEntry final : public BaseObject, public EntryBase {
class StreamEntry final : public BaseObject,
public EntryBase {
public:
static void New(const FunctionCallbackInfo<Value>& args) {
CHECK(args.IsConstructCall());
Expand Down Expand Up @@ -911,8 +912,8 @@ class StreamEntry final : public BaseObject, public EntryBase {
// is provided by an underlying StreamBase implementation. Data is never
// buffered and the size is never known in advance.
class StreamBaseEntry final : public BaseObject,
public EntryBase{
public:
public EntryBase {
public:
static void New(const FunctionCallbackInfo<Value>& args) {
CHECK(args.IsConstructCall());
Environment* env = Environment::GetCurrent(args);
Expand Down Expand Up @@ -1081,17 +1082,19 @@ class StreamBaseEntry final : public BaseObject,

// ============================================================================

class FdEntry : public EntryBase {
class FdEntry final : public EntryBase {
public:
FdEntry(int fd) : fd_(fd), start_(0) {
FdEntry(int fd, size_t start, size_t end = 0)
: fd_(fd), start_(0), end_(end) {
CHECK(fd);
uv_fs_t req;
uv_fs_fstat(nullptr, &req, fd, nullptr);

stat_ = req.statbuf;
end_ = stat_.st_size;
err_ = UpdateStat();
if (err_ == 0 && (end_ == 0 || end_ > stat_.st_size))
end_ = stat_.st_size;
}

FdEntry(int fd, uv_stat_t stat, size_t start, size_t end)
: fd_(fd), start_(start), end_(end), stat_(stat) {}

std::unique_ptr<DataQueue::Reader> getReader() override {
// TODO(@flakey5): streambase reader w/ validation
return nullptr;
Expand All @@ -1100,30 +1103,68 @@ class FdEntry : public EntryBase {
std::unique_ptr<Entry> slice(
size_t start,
Maybe<size_t> end = Nothing<size_t>()) override {
size_t newSize = end.IsJust() ? end.ToChecked() : end_;
size_t new_start = start_ + start;
size_t new_end = end_;
if (end.IsJust()) {
new_end = std::min(end.FromJust() + start, new_end);
}

CHECK(start >= start_);
CHECK(newSize <= end_);
CHECK(new_start >= start_);
CHECK(new_end <= end_);

return std::make_unique<FdEntry>(fd_, stat_, start, newSize);
return std::make_unique<FdEntry>(fd_, stat_, new_start, new_end);
}

Maybe<size_t> size() const override {
return Just(end_ - start_);
}

bool isIdempotent() const override final {
bool isIdempotent() const override {
return true;
}

SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(FdEntry)
SET_SELF_SIZE(FdEntry)

class Reader : public DataQueue::Reader {
public:
Reader(Environment* env, int fd) : env_(env), fd_(fd) {}

int Pull(
Next next,
int options,
DataQueue::Vec* data,
size_t count,
size_t max_count_hint = bob::kMaxCountHint) override {
uv_fs_fstat(env_->event_loop(), &req_, fd_, &OnStat);
}

private:
Environment* env_;
int fd_;
uv_fs_t req_;

static void OnStat(uv_fs_t* req) {
uv_stat_t current_stat = req->statbuf;
// Compare the current stat to make sure it has not changed...
// Then read.
// call uv_fs_read...
}
};

private:
FdEntry(int fd, uv_stat_t stat, size_t start, size_t end)
: fd_(fd), stat_(stat), start_(start), end_(end) {}
int UpdateStat() {
uv_fs_t req;
uv_fs_fstat(nullptr, &req, fd_, nullptr);
stat_ = req.statbuf;
}

int fd_;
uv_stat_t stat_;
int err_ = 0;
size_t start_;
size_t end_;
uv_stat_t stat_;
};

// ============================================================================
Expand Down

0 comments on commit 357a86b

Please sign in to comment.