Skip to content

Commit

Permalink
Refactor jobs engine into multiple files
Browse files Browse the repository at this point in the history
  • Loading branch information
herrcristi committed Jan 3, 2025
1 parent 9d0b439 commit d2663bf
Show file tree
Hide file tree
Showing 5 changed files with 780 additions and 526 deletions.
118 changes: 64 additions & 54 deletions examples/examples_jobs_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,72 +27,80 @@ namespace examples::jobs_engine {
};

using Request = std::pair<int, std::string>;
using JobsEng = small::jobs_engine<JobsType, Request, int, JobsGroupType>;

JobsEng jobs(
{.threads_count = 0 /*dont start any thread yet*/}, // overall config with default priorities
{.threads_count = 1, .bulk_count = 1}, // default jobs group config
{.group = JobsGroupType::kJobsGroup1}, // default jobs type config
[](auto &j /*this*/, const auto &items) {
for (auto &item : items) {
std::cout << "thread " << std::this_thread::get_id()
<< " DEFAULT processing "
<< "{"
<< " type=" << (int)item->type
<< " req.int=" << item->request.first << ","
<< " req.str=\"" << item->request.second << "\""
<< "}"
<< " time " << small::toISOString(small::timeNow())
<< "\n";
}
small::sleep(30);
});

jobs.add_jobs_group(JobsGroupType::kJobsGroup1, {.threads_count = 1});

// add specific function for job1
jobs.add_jobs_type(JobsType::kJobsType1, {.group = JobsGroupType::kJobsGroup1}, [](auto &j /*this*/, const auto &items, auto b /*extra param b*/) {
// process item using the jobs lock (not recommended)
{
std::unique_lock mlock( j );
for (auto &item : items) {
std::cout << "thread " << std::this_thread::get_id()
<< " JOB1 processing "
<< "{"
<< " type=" << (int)item->type
<< " req.int=" << item->request.first << ","
<< " req.str=\"" << item->request.second << "\""
<< "}"
<< " time " << small::toISOString(small::timeNow())
<< "\n";
}
}
small::sleep(30); }, 5 /*param b*/);
using JobsEng = small::jobs_engine<JobsType, Request, int /*response*/, JobsGroupType>;

// use default config and default function for job2
jobs.add_jobs_type(JobsType::kJobsType2);
auto jobs_processing_function = [](const std::vector<JobsEng::JobsItem *> &items) {
// this functions is defined without the engine params (it is here just for the example)
std::cout << "this function is defined without the engine params\n";
};

JobsEng::JobsConfig config{
.m_engine = {.m_threads_count = 0 /*dont start any thread yet*/,
.m_config_prio = {.priorities = {{small::EnumPriorities::kHighest, 2},
{small::EnumPriorities::kHigh, 2},
{small::EnumPriorities::kNormal, 2},
{small::EnumPriorities::kLow, 1}}}}, // overall config with default priorities
.m_default_processing_function = jobs_processing_function, // default processing function, better use jobs.add_default_processing_function to set it
.m_groups = {{JobsGroupType::kJobsGroup1, {.m_threads_count = 1}}}, // config by jobs group
.m_types = {
{JobsType::kJobsType1, {.m_group = JobsGroupType::kJobsGroup1}},
{JobsType::kJobsType2, {.m_group = JobsGroupType::kJobsGroup1}},
}};

// create jobs engine
JobsEng jobs(config);

jobs.add_default_processing_function([](auto &j /*this jobs engine*/, const auto &jobs_items) {
for (auto &item : jobs_items) {
std::cout << "thread " << std::this_thread::get_id()
<< " DEFAULT processing "
<< "{"
<< " type=" << (int)item->type
<< " req.int=" << item->request.first << ","
<< " req.str=\"" << item->request.second << "\""
<< "}"
<< " time " << small::toISOString(small::timeNow())
<< "\n";
}
small::sleep(30);
});

// add specific function for job1 (calling the function from jobs intead of config allows to pass the engine and extra param)
jobs.add_job_processing_function(JobsType::kJobsType1, [](auto &j /*this jobs engine*/, const auto &jobs_items, auto b /*extra param b*/) {
for (auto &item : jobs_items) {
std::cout << "thread " << std::this_thread::get_id()
<< " JOB1 processing "
<< "{"
<< " type=" << (int)item->type
<< " req.int=" << item->request.first << ","
<< " req.str=\"" << item->request.second << "\""
<< "}"
<< " time " << small::toISOString(small::timeNow())
<< "\n";
}
small::sleep(30); }, 5 /*param b*/);

JobsEng::JobsID jobs_id{};
std::vector<JobsEng::JobsID> jobs_ids;

// push
jobs.push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id);
jobs.push_back(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id);
jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, {1, "normal"}, &jobs_id);
jobs.queue().push_back(small::EnumPriorities::kHigh, JobsType::kJobsType2, {2, "high"}, &jobs_id);

jobs.push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, std::make_pair(3, "normal"), &jobs_id);
jobs.push_back(small::EnumPriorities::kHigh, {.type = JobsType::kJobsType1, .request = {4, "high"}}, &jobs_id);
jobs.push_back(small::EnumPriorities::kLow, JobsType::kJobsType1, {5, "low"}, &jobs_id);
jobs.queue().push_back(small::EnumPriorities::kNormal, JobsType::kJobsType1, std::make_pair(3, "normal"), &jobs_id);
jobs.queue().push_back(small::EnumPriorities::kHigh, {.type = JobsType::kJobsType1, .request = {4, "high"}}, &jobs_id);
jobs.queue().push_back(small::EnumPriorities::kLow, JobsType::kJobsType1, {5, "low"}, &jobs_id);

Request req = {6, "normal"};
jobs.push_back(small::EnumPriorities::kNormal, {.type = JobsType::kJobsType1, .request = req}, nullptr);
jobs.queue().push_back(small::EnumPriorities::kNormal, {.type = JobsType::kJobsType1, .request = req}, nullptr);

std::vector<JobsEng::JobsItem> jobs_items = {{.type = JobsType::kJobsType1, .request = {7, "highest"}}};
jobs.push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids);
jobs.push_back(small::EnumPriorities::kHighest, {{.type = JobsType::kJobsType1, .request = {8, "highest"}}}, &jobs_ids);
jobs.queue().push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids);
jobs.queue().push_back(small::EnumPriorities::kHighest, {{.type = JobsType::kJobsType1, .request = {8, "highest"}}}, &jobs_ids);

jobs.push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id);
jobs.push_back_delay_until(small::timeNow() + std::chrono::milliseconds(350), small::EnumPriorities::kNormal, JobsType::kJobsType1, {101, "delay normal"}, &jobs_id);
jobs.push_back_delay_for(std::chrono::milliseconds(400), small::EnumPriorities::kNormal, JobsType::kJobsType1, {102, "delay normal"}, &jobs_id);
jobs.queue().push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "delay normal"}, &jobs_id);
jobs.queue().push_back_delay_until(small::timeNow() + std::chrono::milliseconds(350), small::EnumPriorities::kNormal, JobsType::kJobsType1, {101, "delay normal"}, &jobs_id);
jobs.queue().push_back_delay_for(std::chrono::milliseconds(400), small::EnumPriorities::kNormal, JobsType::kJobsType1, {102, "delay normal"}, &jobs_id);

jobs.start_threads(3); // manual start threads

Expand All @@ -102,6 +110,8 @@ namespace examples::jobs_engine {
std::cout << "wait for with timeout, ret = " << static_cast<int>(ret) << " as timeout\n";
jobs.wait(); // wait here for jobs to finish due to exit flag

std::cout << "size = " << jobs.size() << "\n";

std::cout << "Jobs Engine example 1 finish\n\n";

return 0;
Expand Down
73 changes: 73 additions & 0 deletions include/jobs_config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#pragma once

#include <unordered_map>

#include "jobs_item.h"
#include "prio_queue.h"

namespace small {
//
// small class for jobs config
// - setup how many threads to use overall, and the priorities
// - for each job group how many threads to use
// - for each job type how to process and to which group it belongs
//
template <typename JobsTypeT, typename JobsRequestT, typename JobsResponseT, typename JobsGroupT = JobsTypeT, typename JobsPrioT = EnumPriorities>
struct jobs_config
{
using JobsItem = typename small::jobs_item<JobsTypeT, JobsRequestT, JobsResponseT>;
using ProcessingFunction = std::function<void(const std::vector<JobsItem *> &)>;

// config the entire jobs engine
struct ConfigJobsEngine
{
int m_threads_count{8}; // how many total threads for processing
small::config_prio_queue<JobsPrioT> m_config_prio{};
};

// config an individual job type
struct ConfigJobsType
{
JobsGroupT m_group{}; // job type group (multiple job types can be configured to same group)
ProcessingFunction m_processing_function{}; // processing Function
};

// config the job group (where job types can be grouped)
struct ConfigJobsGroup
{
int m_threads_count{1}; // how many threads for processing (out of the global threads)
int m_bulk_count{1}; // how many objects are processed at once
};

ConfigJobsEngine m_engine{}; // config for entire engine (threads, priorities, etc)
ProcessingFunction m_default_processing_function{}; // default processing function
std::unordered_map<JobsGroupT, ConfigJobsGroup> m_groups; // config by jobs group
std::unordered_map<JobsTypeT, ConfigJobsType> m_types; // config by jobs type

// processing function
inline void add_default_processing_function(ProcessingFunction processing_function)
{
ProcessingFunction previous_processing_function = m_default_processing_function;
m_default_processing_function = processing_function;
apply_default_processing_function(previous_processing_function);
}

inline void add_job_processing_function(const JobsTypeT &jobs_type, ProcessingFunction processing_function)
{
auto it_f = m_types.find(jobs_type);
if (it_f == m_types.end()) {
return;
}
it_f->second.m_processing_function = processing_function;
}

inline void apply_default_processing_function(ProcessingFunction previous_processing_function = nullptr)
{
for (auto &[type, jobs_type_config] : m_types) {
if (jobs_type_config.m_processing_function == previous_processing_function) {
jobs_type_config.m_processing_function = m_default_processing_function;
}
}
}
};
} // namespace small
Loading

0 comments on commit d2663bf

Please sign in to comment.