Skip to content

Commit

Permalink
refactor more
Browse files Browse the repository at this point in the history
  • Loading branch information
yl-lisen committed Mar 13, 2024
1 parent 0fe146c commit ee6f1a9
Show file tree
Hide file tree
Showing 14 changed files with 185 additions and 138 deletions.
2 changes: 1 addition & 1 deletion src/Common/serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ namespace DB
/// are concerns
#define SERDE
#define NO_SERDE
}
}
22 changes: 21 additions & 1 deletion src/Processors/Streaming/ISource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,30 @@ void ISource::checkpoint(CheckpointContextPtr ckpt_ctx_)
ckpt_request.setCheckpointRequestCtx(std::move(ckpt_ctx_));
}

void ISource::recover(CheckpointContextPtr ckpt_ctx_)
{
doRecover(std::move(ckpt_ctx_));
last_checkpointed_sn = lastSN();

/// Reset consume offset started from the next of last checkpointed sn (if not manually reset before recovery)
if (!reseted_start_sn.has_value())
doResetStartSN(last_checkpointed_sn + 1);
}

void ISource::resetStartSN(Int64 sn)
{
reseted_start_sn = sn;
doResetStartSN(sn);
}

std::optional<Chunk> ISource::tryGenerate()
{
if (auto current_ckpt_ctx = ckpt_request.poll(); current_ckpt_ctx)
return doCheckpoint(std::move(current_ckpt_ctx));
{
auto chunk = doCheckpoint(std::move(current_ckpt_ctx));
last_checkpointed_sn = lastSN();
return std::move(chunk);
}

auto chunk = generate();
if (!chunk)
Expand Down
28 changes: 24 additions & 4 deletions src/Processors/Streaming/ISource.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <Checkpoint/CheckpointRequest.h>
#include <Processors/ISource.h>
#include <Common/serde.h>

namespace DB
{
Expand All @@ -20,26 +21,45 @@ class ISource : public DB::ISource
is_streaming = true;
}

virtual String description() const { return ""; }

/// \brief Get the last progressed sequence number of the source, it shouldn't be called during pipeline execution (thread-unsafe)
virtual Int64 lastSN() const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for lastSN of {}", getName()); }
virtual Int64 lastSN() const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for lastSN() of {}", getName()); }

/// \brief Reset the start sequence number of the source, it must be called before the pipeline execution (thread-unsafe)
void resetStartSN(Int64 sn);

/// \brief Reset the sequence number of the source, it should be called before the pipeline execution (thread-unsafe)
virtual void resetSN(Int64 /*sn*/) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for resetSN of {}", getName()); }
Int64 lastCheckpointedSN() const noexcept { return last_checkpointed_sn; }

void checkpoint(CheckpointContextPtr ckpt_ctx_) override final;
void recover(CheckpointContextPtr ckpt_ctx_) override final;

private:
std::optional<Chunk> tryGenerate() override final;

/// \brief Checkpointing the source state
/// \brief Checkpointing the source state (include lastSN())
virtual Chunk doCheckpoint(CheckpointContextPtr)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for checkpoting of {}", getName());
}

/// \brief Recovering the source state (include lastSN())
virtual void doRecover(CheckpointContextPtr)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for recovering of {}", getName());
}

/// \brief Reset current consume sequence number, it must be called before the pipeline execution (thread-unsafe)
virtual void doResetStartSN(Int64 /*sn*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for doResetStartSN() of {}", getName());
}

private:
/// For checkpoint
CheckpointRequest ckpt_request;
NO_SERDE std::optional<Int64> reseted_start_sn;
NO_SERDE Int64 last_checkpointed_sn = -1;
};
}
}
21 changes: 2 additions & 19 deletions src/QueryPipeline/QueryPipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,32 +586,15 @@ std::unique_ptr<ReadProgressCallback> QueryPipeline::getReadProgressCallback() c
}

/// proton: starts.
std::vector<Int64> QueryPipeline::getLastSNsOfStreamingSources() const
{
std::vector<Int64> sns;
for (const auto & processor : processors)
{
if (processor->isSource() && processor->isStreaming())
sns.emplace_back(std::static_pointer_cast<const Streaming::ISource>(processor)->lastSN());
}
return sns;
}

void QueryPipeline::resetSNsOfStreamingSources(const std::vector<Int64> & sns)
std::vector<std::shared_ptr<Streaming::ISource>> QueryPipeline::getStreamingSources() const
{
std::vector<std::shared_ptr<Streaming::ISource>> streaming_sources;
streaming_sources.reserve(sns.size());
for (const auto & processor : processors)
{
if (processor->isSource() && processor->isStreaming())
streaming_sources.emplace_back(std::static_pointer_cast<Streaming::ISource>(processor));
}

if (sns.size() != streaming_sources.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Number of sequence numbers doesn't match number of streaming sources");

for (auto [streaming_source, sn] : std::views::zip(streaming_sources, sns))
streaming_source->resetSN(sn);
return streaming_sources;
}
/// proton: ends.
}
9 changes: 6 additions & 3 deletions src/QueryPipeline/QueryPipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ class ReadProgressCallback;
struct ColumnWithTypeAndName;
using ColumnsWithTypeAndName = std::vector<ColumnWithTypeAndName>;

namespace Streaming
{
class ISource;
}

class QueryPipeline
{
public:
Expand Down Expand Up @@ -134,9 +139,7 @@ class QueryPipeline
/// proton : starts
void setExecuteMode(ExecuteMode exec_mode_) { exec_mode = exec_mode_; }

std::vector<Int64> getLastSNsOfStreamingSources() const;

void resetSNsOfStreamingSources(const std::vector<Int64> & sources_sns);
std::vector<std::shared_ptr<Streaming::ISource>> getStreamingSources() const;
/// proton : ends

private:
Expand Down
13 changes: 2 additions & 11 deletions src/Storages/ExternalStream/Kafka/KafkaSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,27 +345,18 @@ Chunk KafkaSource::doCheckpoint(CheckpointContextPtr ckpt_ctx_)
return result;
}

void KafkaSource::recover(CheckpointContextPtr ckpt_ctx_)
void KafkaSource::doRecover(CheckpointContextPtr ckpt_ctx_)
{
ckpt_ctx_->coordinator->recover(
getLogicID(), ckpt_ctx_, [&](VersionType version, ReadBuffer & rb) { ckpt_data.deserialize(version, rb); });

LOG_INFO(log, "Recovered last_sn={}", ckpt_data.last_sn);

/// Reset consume offset started from the next of last sn (if not manually reset before recovery)
resetSN(ckpt_data.last_sn + 1);
}

void KafkaSource::resetSN(Int64 sn)
void KafkaSource::doResetStartSN(Int64 sn)
{
if (sn_reseted.test_and_set())
return;

if (sn >= 0)
{
ckpt_data.last_sn = sn - 1;
consume_ctx.offset = sn;
}
}

void KafkaSource::State::serialize(WriteBuffer & wb) const
Expand Down
9 changes: 4 additions & 5 deletions src/Storages/ExternalStream/Kafka/KafkaSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,11 @@ class KafkaSource final : public Streaming::ISource

String getName() const override { return "KafkaSource"; }

String description() const override { return fmt::format("topic={}, partition={}", consume_ctx.topic, consume_ctx.partition); }

Chunk generate() override;

Int64 lastSN() const override { return ckpt_data.last_sn; }
void resetSN(Int64 sn) override;

void recover(CheckpointContextPtr ckpt_ctx_) override;

private:
void calculateColumnPositions();
Expand All @@ -58,6 +57,8 @@ class KafkaSource final : public Streaming::ISource
inline void readAndProcess();

Chunk doCheckpoint(CheckpointContextPtr ckpt_ctx_) override;
void doRecover(CheckpointContextPtr ckpt_ctx_) override;
void doResetStartSN(Int64 sn) override;

private:
StorageSnapshotPtr storage_snapshot;
Expand Down Expand Up @@ -108,8 +109,6 @@ class KafkaSource final : public Streaming::ISource
explicit State(const klog::KafkaWALContext & consume_ctx_) : topic(consume_ctx_.topic), partition(consume_ctx_.partition) { }
} ckpt_data;

std::atomic_flag sn_reseted;

ExternalStreamCounterPtr external_stream_counter;
};

Expand Down
Loading

0 comments on commit ee6f1a9

Please sign in to comment.