Skip to content

Commit

Permalink
0.3.10: Merge pull request #13 from CODIANZ/development
Browse files Browse the repository at this point in the history
adopted `stream_controller` for `inflow_restriction`.
  • Loading branch information
terukazu-inoue authored Jul 4, 2023
2 parents 71caa0d + 79700b6 commit 286ae1c
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 80 deletions.
25 changes: 17 additions & 8 deletions include/another-rxcpp/utils/inflow_restriction.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define __another_rxcpp_h_inflow_restriction__

#include "../observable.h"
#include "../internal/tools/stream_controller.h"
#include "sem.h"

namespace another_rxcpp {
Expand All @@ -24,20 +25,28 @@ class inflow_restriction {
{
auto sem = sem_;
return observable<>::create<T>([sem, o](subscriber<T> s){
auto sctl = internal::stream_controller<T>(s);
sctl.set_on_finalize([sem](){
sem->unlock();
});
sem->lock();
o.subscribe({
[s](const T& v){
s.on_next(v);
if(!sctl.is_subscribed()) {
sem->unlock();
return;
}
o.subscribe(sctl.template new_observer<T>(
[sctl](auto, const T& v){
sctl.sink_next(v);
},
[s, sem](std::exception_ptr e){
s.on_error(e);
[sctl, sem](auto, std::exception_ptr e){
sctl.sink_error(e);
sem->unlock();
},
[s, sem](){
s.on_completed();
[sctl, sem](auto serial){
sctl.sink_completed(serial);
sem->unlock();
}
});
));
});
}
};
Expand Down
171 changes: 118 additions & 53 deletions test/case_7.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,81 +8,146 @@
using namespace another_rxcpp;
using namespace another_rxcpp::operators;

void test_case_7() {
log() << "test_case_7 -- begin" << std::endl;
struct watier : public std::enable_shared_from_this<watier> {
subjects::subject<int> breaker_subject;

{
log() << "#1 normal observer -- begin" << std::endl;
auto sbsc = observable<>::create<int>([](subscriber<int> s){
subjects::subject<int> sbj;
observables::range(0, 10)
.observe_on(schedulers::observe_on_new_thread())
.take_until(sbj.as_observable())
virtual ~watier() {
log() << "~watier" << std::endl;
}

auto wait_subscribe(std::string memo){
auto THIS = shared_from_this();
return observable<>::create<int>([THIS, memo](subscriber<int> s){
observables::interval(std::chrono::milliseconds(700), schedulers::new_thread_scheduler())
.take_until(THIS->breaker_subject.as_observable())
.subscribe(
[s, sbj](auto x) {
log() << "#1 inner next: " << x << std::endl;
if(x == 5){
sbj.as_subscriber().on_next(1);
sbj.as_subscriber().on_completed();
}
[s, memo](auto x) {
log() << memo << " next: " << x << std::endl;
},
[s](auto err) {
log() << "#1 inner error" << std::endl;
[s, memo](auto err) {
log() << memo << " error" << std::endl;
s.on_error(err);
},
[s]{
log() << "#1 inner completed" << std::endl;
[s, memo]{
log() << memo << " completed" << std::endl;
s.on_next(999);
s.on_completed();
}
);
})
.subscribe(
[](auto x) { log() << "#1 outer next: " << x << std::endl; },
[](auto) { log() << "#1 outer error" << std::endl; },
[] { log() << "#1 outer completed" << std::endl; }
);
while(sbsc.is_subscribed()) {}
log() << "#1 normal observer -- end" << std::endl;
});
}

{
log() << "#2 stream_controller -- begin" << std::endl;

auto sbsc = observable<>::create<int>([](subscriber<int> s){
auto wait_stream_controller(std::string memo){
auto THIS = shared_from_this();
return observable<>::create<int>([THIS, memo](subscriber<int> s){
internal::stream_controller<int> sctl(s);
subjects::subject<int> sbj;
observables::range(0, 10)
.observe_on(schedulers::observe_on_new_thread())
.take_until(sbj.as_observable())
observables::interval(std::chrono::milliseconds(700), schedulers::new_thread_scheduler())
.take_until(THIS->breaker_subject.as_observable())
.subscribe(sctl.new_observer<int>(
[sctl, sbj](auto, auto x) {
log() << "#2 inner next: " << x << std::endl;
if(x == 5){
sbj.as_subscriber().on_next(1);
sbj.as_subscriber().on_completed();
}
[sctl, memo](auto, auto x) {
log() << memo << " next: " << x << std::endl;
},
[sctl](auto, auto err) {
log() << "#2 inner error" << std::endl;
[sctl, memo](auto, auto err) {
log() << memo << " error" << std::endl;
sctl.sink_error(err);
},
[sctl](auto serial){
log() << "#2 inner completed" << std::endl;
[sctl, memo](auto serial){
log() << memo << " completed" << std::endl;
sctl.sink_next(999);
sctl.sink_completed(serial);
}
));
})
.subscribe(
[](auto x) { log() << "#2 outer next: " << x << std::endl; },
[](auto) { log() << "#2 outer error" << std::endl; },
[] { log() << "#2 outer completed" << std::endl; }
);
while(sbsc.is_subscribed()) {}
});
}
};

auto emit_error(int ms){
return observables::just(1)
.observe_on(schedulers::new_thread_scheduler())
.flat_map([ms](auto){
return observable<>::create<int>([ms](subscriber<int> s){
s.on_next(1);
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
log() << "emit_error" << std::endl;
s.on_error(std::make_exception_ptr(std::runtime_error("error")));
});
});
}

void test_case_7() {
log() << "test_case_7 -- begin" << std::endl;

{
log() << "#1 -- begin" << std::endl;
{
auto w = std::make_shared<watier>();
auto sbsc = emit_error(1000)
.flat_map([w](auto){
return w->wait_subscribe("#1 watier1")
.amb(schedulers::new_thread_scheduler(), w->wait_subscribe("#1 watier2"));
})
.on_error_resume_next([w](auto){
log() << "#1 on_error_resume_next" << std::endl;
std::thread([w]{
wait(100);
log() << "#1 break!" << std::endl;
w->breaker_subject.as_subscriber().on_next(1);
}).detach();
return observables::just(123456);
})
.subscribe(
[](auto x) {
log() << "#1 next: " << x << std::endl;
},
[](auto err) {
log() << "#1 error" << std::endl;
},
[]{
log() << "#1 completed" << std::endl;
}
);
while(sbsc.is_subscribed()) {}
}
log() << "#1-- end" << std::endl;
}

wait(1000);

log() << "#2 stream_controller -- end" << std::endl;
{
log() << "#2 -- begin" << std::endl;
{
auto w = std::make_shared<watier>();
auto sbsc = emit_error(1000)
.flat_map([w](auto){
return w->wait_stream_controller("#2 watier1")
.amb(schedulers::new_thread_scheduler(), w->wait_stream_controller("#2 watier2"));
})
.on_error_resume_next([w](auto){
log() << "#2 on_error_resume_next" << std::endl;
std::thread([w]{
wait(100);
log() << "#2 break!" << std::endl;
w->breaker_subject.as_subscriber().on_next(1);
}).detach();
return observables::just(123456);
})
.subscribe(
[](auto x) {
log() << "#2 next: " << x << std::endl;
},
[](auto err) {
log() << "#2 error" << std::endl;
},
[]{
log() << "#2 completed" << std::endl;
}
);
while(sbsc.is_subscribed()) {}
}
log() << "#2-- end" << std::endl;
}

wait(1000);

log() << "test_case_7 -- end" << std::endl << std::endl;
}
79 changes: 60 additions & 19 deletions test/inflow_restriction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,25 @@ void test_inflow_restriction() {

enum class result { success, failure };
struct long_api {
std::mutex mtx_;
int count_ = 0;
std::shared_ptr<std::mutex> mtx_ = std::make_shared<std::mutex>();
std::shared_ptr<int> count_ = std::make_shared<int>(0);
observable<result> call() {
auto mtx = mtx_;
auto count = count_;
return observables::just(unit{}, new_thread_scheduler())
| tap([=](unit){
std::lock_guard<std::mutex> lock(mtx_);
const int x = count_++;
| tap([mtx, count](unit){
std::lock_guard<std::mutex> lock(*mtx);
(*count)++;
const int x = (*count)++;
std::cout << std::this_thread::get_id() << " : enter #" << x << std::endl;
})
| delay(std::chrono::seconds(1), new_thread_scheduler())
| map([=](unit){
| map([](unit){
return result::success;
})
| tap([=](result){
std::lock_guard<std::mutex> lock(mtx_);
const int x = count_--;
| tap([mtx, count](result){
std::lock_guard<std::mutex> lock(*mtx);
const int x = (*count)--;
std::cout << std::this_thread::get_id() << " : leave #" << x << std::endl;
});
}
Expand All @@ -50,23 +53,23 @@ void test_inflow_restriction() {

auto sbsc = (
observables::iterate(list)
| flat_map([=](int n){
| flat_map([api](int n){
return api->call()
| map([=](result){
| map([n](result){
return n;
});
})
)
.subscribe({
[=](const int& x){
[mtx](const int& x){
std::lock_guard<std::mutex> lock(*mtx);
log() << "next " << x << std::endl;
},
[=](std::exception_ptr){
[mtx](std::exception_ptr){
std::lock_guard<std::mutex> lock(*mtx);
log() << "error " << std::endl;
},
[=](){
[mtx](){
std::lock_guard<std::mutex> lock(*mtx);
log() << "completed " << std::endl;
}
Expand All @@ -84,23 +87,61 @@ void test_inflow_restriction() {

auto sbsc = (
observables::iterate(list)
| flat_map([=](int n){
| flat_map([ifr, api](int n){
return ifr->enter(api->call())
| map([n](result){
return n;
});
})
)
.subscribe({
[mtx](const int& x){
std::lock_guard<std::mutex> lock(*mtx);
log() << "next " << x << std::endl;
},
[mtx](std::exception_ptr){
std::lock_guard<std::mutex> lock(*mtx);
log() << "error " << std::endl;
},
[mtx](){
std::lock_guard<std::mutex> lock(*mtx);
log() << "completed " << std::endl;
}
});

while(sbsc.is_subscribed()) {}
}

{
log() << "use inflow_restriction + unsubscribe" << std::endl;
auto mtx = std::make_shared<std::mutex>();
auto list = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15};
auto api = std::make_shared<long_api>();
auto ifr = std::make_shared<inflow_restriction<4>>();

auto sbsc = (
observables::iterate(list)
| flat_map([ifr, api](int n){
return ifr->enter(api->call())
| map([=](result){
| tap([n](auto){
log() << "tap: " << n << std::endl;
})
| map([n](result){
return n;
});
})
)
.take(5)
.subscribe({
[=](const int& x){
[mtx](const int& x){
std::lock_guard<std::mutex> lock(*mtx);
log() << "next " << x << std::endl;
},
[=](std::exception_ptr){
[mtx](std::exception_ptr){
std::lock_guard<std::mutex> lock(*mtx);
log() << "error " << std::endl;
},
[=](){
[mtx](){
std::lock_guard<std::mutex> lock(*mtx);
log() << "completed " << std::endl;
}
Expand Down

0 comments on commit 286ae1c

Please sign in to comment.