Skip to content

Commit

Permalink
Refactor jobs engine
Browse files Browse the repository at this point in the history
  • Loading branch information
herrcristi committed Dec 28, 2024
1 parent 74d65a2 commit ff6c886
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 19 deletions.
5 changes: 3 additions & 2 deletions examples/examples_jobs_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ namespace examples::jobs_engine {
// use default config and default function for job2
jobs.add_jobs_type(JobsType::kJobsType2);

jobs.start_threads(3); // manual start threads

JobsEng::JobsID jobs_id{};
std::vector<JobsEng::JobsID> jobs_ids;

Expand All @@ -90,11 +88,14 @@ namespace examples::jobs_engine {

std::vector<JobsEng::JobsItem> jobs_items = {{.type = JobsType::kJobsType1, .request = {7, "g"}}};
jobs.push_back(small::EnumPriorities::kHighest, jobs_items, &jobs_ids);
jobs.push_back(small::EnumPriorities::kHighest, {{.type = JobsType::kJobsType1, .request = {8, "h"}}}, &jobs_ids);

jobs.push_back_delay_for(std::chrono::milliseconds(300), small::EnumPriorities::kNormal, JobsType::kJobsType1, {100, "x"}, &jobs_id);
jobs.push_back_delay_until(small::timeNow() + std::chrono::milliseconds(350), small::EnumPriorities::kNormal, JobsType::kJobsType1, {101, "y"}, &jobs_id);
jobs.push_back_delay_for(std::chrono::milliseconds(400), small::EnumPriorities::kNormal, JobsType::kJobsType1, {102, "z"}, &jobs_id);

jobs.start_threads(3); // manual start threads

small::sleep(50);
// jobs.signal_exit_force();
auto ret = jobs.wait_for(std::chrono::milliseconds(100)); // wait to finished
Expand Down
6 changes: 3 additions & 3 deletions include/jobs_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include <unordered_map>

#include "jobs_engine_scheduler.h"
#include "jobs_engine_thread_pool.h"

// // example
// using qc = std::pair<int, std::string>;
Expand Down Expand Up @@ -527,7 +527,7 @@ namespace small {
// inner thread function for executing items (should return if there are more items)
//

friend small::jobs_engine_scheduler<JobsGroupT, ThisJobsEngine>;
friend small::jobs_engine_thread_pool<JobsGroupT, ThisJobsEngine>;

inline EnumLock do_action(const JobsGroupT &jobs_group, bool *has_items)
{
Expand Down Expand Up @@ -692,6 +692,6 @@ namespace small {

JobQueueDelayedT m_delayed_items{*this}; // queue of delayed items

small::jobs_engine_scheduler<JobsGroupT, ThisJobsEngine> m_thread_pool{*this}; // scheduler for processing items (by group) using a pool of threads
small::jobs_engine_thread_pool<JobsGroupT, ThisJobsEngine> m_thread_pool{*this}; // scheduler for processing items (by group) using a pool of threads
};
} // namespace small
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ namespace small {
// helper class for jobs_engine to execute group of jobs (parent caller must implement 'do_action')
//
template <typename JobGroupT, typename ParentCallerT>
class jobs_engine_scheduler
class jobs_engine_thread_pool
{
public:
//
// jobs_engine_scheduler
// jobs_engine_thread_pool
//
explicit jobs_engine_scheduler(ParentCallerT &parent_caller)
explicit jobs_engine_thread_pool(ParentCallerT &parent_caller)
: m_parent_caller(parent_caller)
{
}

~jobs_engine_scheduler()
~jobs_engine_thread_pool()
{
wait();
}
Expand All @@ -36,7 +36,7 @@ namespace small {
// clang-format on

// clang-format off
// use it as locker (std::unique_lock<small:jobs_engine_scheduler<T>> m...)
// use it as locker (std::unique_lock<small:jobs_engine_thread_pool<T>> m...)
inline void lock () { m_workers.lock(); }
inline void unlock () { m_workers.unlock(); }
inline bool try_lock () { return m_workers.try_lock(); }
Expand All @@ -54,7 +54,7 @@ namespace small {
// config processing by job group type
// this should be done in the initial setup phase once
//
inline void add_job_group(const JobGroupT job_group, const int &threads_count)
inline void add_job_group(const JobGroupT &job_group, const int &threads_count)
{
m_scheduler[job_group].m_threads_count = threads_count;
}
Expand All @@ -63,7 +63,7 @@ namespace small {
// when items are added to be processed in parent class the start scheduler should be called
// to trigger action (if needed for the new job group)
//
inline void job_start(const JobGroupT job_group)
inline void job_start(const JobGroupT &job_group)
{
auto it = m_scheduler.find(job_group); // map is not changed, so can be access without locking
if (it == m_scheduler.end()) {
Expand Down Expand Up @@ -114,10 +114,10 @@ namespace small {

private:
// some prevention
jobs_engine_scheduler(const jobs_engine_scheduler &) = delete;
jobs_engine_scheduler(jobs_engine_scheduler &&) = delete;
jobs_engine_scheduler &operator=(const jobs_engine_scheduler &) = delete;
jobs_engine_scheduler &operator=(jobs_engine_scheduler &&__t) = delete;
jobs_engine_thread_pool(const jobs_engine_thread_pool &) = delete;
jobs_engine_thread_pool(jobs_engine_thread_pool &&) = delete;
jobs_engine_thread_pool &operator=(const jobs_engine_thread_pool &) = delete;
jobs_engine_thread_pool &operator=(jobs_engine_thread_pool &&__t) = delete;

private:
struct JobGroupStats
Expand All @@ -129,7 +129,7 @@ namespace small {
//
// to trigger action (if needed for the new job group)
//
inline void job_action_start(const JobGroupT job_group, const bool has_items, JobGroupStats &stats)
inline void job_action_start(const JobGroupT &job_group, const bool has_items, JobGroupStats &stats)
{
if (!has_items) {
return;
Expand All @@ -148,7 +148,7 @@ namespace small {
//
// job action ended
//
inline void job_action_end(const JobGroupT job_group, const bool has_items)
inline void job_action_end(const JobGroupT &job_group, const bool has_items)
{
auto it = m_scheduler.find(job_group); // map is not changed, so can be access without locking
if (it == m_scheduler.end()) {
Expand Down Expand Up @@ -188,7 +188,7 @@ namespace small {
//
struct JobWorkerThreadFunction
{
void operator()(small::worker_thread<JobGroupT> &, const std::vector<JobGroupT> &items, small::jobs_engine_scheduler<JobGroupT, ParentCallerT> *pThis) const
void operator()(small::worker_thread<JobGroupT> &, const std::vector<JobGroupT> &items, small::jobs_engine_thread_pool<JobGroupT, ParentCallerT> *pThis) const
{
pThis->thread_function(items);
}
Expand Down

0 comments on commit ff6c886

Please sign in to comment.