Skip to content

Commit

Permalink
change stop_calculate to has_emit and modify Initial contact point
Browse files Browse the repository at this point in the history
  • Loading branch information
YoshiakiNishimura committed Feb 13, 2025
1 parent 494ec66 commit c25d785
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
16 changes: 9 additions & 7 deletions src/jogasaki/plan/compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,9 @@ std::shared_ptr<mirror_container> preprocess_mirror(
unsafe_downcast<takatori::statement::execute>(*statement).execution_plan(),
[&container](takatori::plan::step const& s) {
if (s.kind() == takatori::plan::step_kind::process) {
container->set_partitions(impl::calculate_partition(s));
if (impl::has_emit_operator(s)) {
container->set_partitions(impl::calculate_partition(s));
}
} else {
VLOG_LP(log_error) << "The bottom of graph_type must be process.";
}
Expand Down Expand Up @@ -1145,14 +1147,14 @@ status create_executable_statement(compiler_context& ctx, parameter_set const* p
return status::ok;
}

bool stop_calculate_partition(takatori::plan::step const& s) noexcept {
bool is_stop = true;
bool has_emit_operator(takatori::plan::step const& s) noexcept {
bool has_emit = false;
auto& process = unsafe_downcast<takatori::plan::process const>(s);
takatori::relation::sort_from_upstream(
process.operators(), [&is_stop](takatori::relation::expression const& op) {
if (op.kind() == takatori::relation::expression_kind::emit) { is_stop = false; }
process.operators(), [&has_emit](takatori::relation::expression const& op) {
if (op.kind() == takatori::relation::expression_kind::emit) { has_emit = true; }
});
return is_stop;
return has_emit;
}

size_t terminal_calculate_partition(takatori::plan::step const& s) noexcept {
Expand Down Expand Up @@ -1209,7 +1211,7 @@ size_t calculate_partition(takatori::plan::step const& s) noexcept {
if (unsafe_downcast<takatori::plan::process>(s).upstreams().empty()) {
return terminal_calculate_partition(s);
}
if (stop_calculate_partition(s)){
if (!has_emit_operator(s)){
return global::config_pool()->default_partitions();
}
return intermediate_calculate_partition(s);
Expand Down
2 changes: 1 addition & 1 deletion src/jogasaki/plan/compiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ std::shared_ptr<executor::process::impl::variable_table> create_host_variables(
* @param s the plan of step
* @return true if stop calculate partition
*/
[[nodiscard]] bool stop_calculate_partition(takatori::plan::step const& s) noexcept;
[[nodiscard]] bool has_emit_operator(takatori::plan::step const& s) noexcept;

} // namespace impl

Expand Down

0 comments on commit c25d785

Please sign in to comment.