Skip to content

Commit

Permalink
Merge pull request #28 from sogou/dev
Browse files Browse the repository at this point in the history
merge from sogou/workflow dev
  • Loading branch information
holmes1412 authored Jan 25, 2021
2 parents 5381c12 + 36807b5 commit fa03e01
Show file tree
Hide file tree
Showing 27 changed files with 466 additions and 196 deletions.
21 changes: 21 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
language: cpp
dist: trusty
os: linux
compiler:
- gcc

jobs:
include:
- env: COMPILER=g++-8 BUILD=Release STANDARD=11
compiler: gcc
addons:
apt:
update: true
sources:
- ubuntu-toolchain-r-test
packages:
- g++-8
- cmake

script:
- make
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set(CMAKE_SKIP_RPATH TRUE)

project(
workflow
VERSION 0.9.3
VERSION 0.9.4
LANGUAGES C CXX
)

Expand Down
20 changes: 8 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[![License](https://img.shields.io/badge/License-Apache%202.0-green.svg)](https://github.com/sogou/workflow/blob/master/LICENSE)
[![Language](https://img.shields.io/badge/language-c++-red.svg)](https://en.cppreference.com/)
[![Platform](https://img.shields.io/badge/platform-linux%20%7C%20macos-lightgrey.svg)](#%E9%A1%B9%E7%9B%AE%E7%9A%84%E4%B8%80%E4%BA%9B%E8%AE%BE%E8%AE%A1%E7%89%B9%E7%82%B9)
[![Build Status](https://travis-ci.org/sogou/workflow.svg?branch=master)](https://travis-ci.org/sogou/workflow)

搜狗公司C++服务器引擎,支撑搜狗几乎所有后端C++在线服务,包括所有搜索服务,云输入法,在线广告等,每日处理超百亿请求。这是一个设计轻盈优雅的企业级程序引擎,可以满足大多数C++后端开发需求。
#### 你可以用来:
Expand Down Expand Up @@ -31,10 +32,10 @@ int main()
* 实现自定义协议client/server,构建自己的RPC系统。
* [srpc](https://github.com/sogou/srpc)就是以它为基础,作为独立项目开源。支持``srpc````brpc````thrift``等协议。
* 构建异步任务流,支持常用的串并联,也支持更加复杂的DAG结构。
* 作为并行编程工具使用。除了网络任务,我们也包含计算任务的调度。所有类型的任务都可以放入同一个流中。
* 作为并行计算工具使用。除了网络任务,我们也包含计算任务的调度。所有类型的任务都可以放入同一个流中。
*``Linux``系统下作为文件异步IO工具使用,性能超过任何标准调用。磁盘IO也是一种任务。
* 实现任何计算与通讯关系非常复杂的高性能高并发的后端服务。
* 构建服务网格(service mesh)系统
* 构建微服务系统
* 项目内置服务治理与负载均衡等功能。

#### 编译和运行环境
Expand Down Expand Up @@ -120,15 +121,10 @@ int main()
* 任何任务都会在callback之后被自动内存回收。如果创建的任务不想运行,则需要通过dismiss方法释放。
* 任务中的数据,例如网络请求的resp,也会随着任务被回收。此时用户可通过``std::move()``把需要的数据移走。
* SeriesWork和ParallelWork是两种框架对象,同样在callback之后被回收。
* 如果某个series是parallel的一个分支,则将在其所在parallel的callback之后再回收。
* 项目中不使用``std::shared_ptr``来管理内存。

#### 更多设计文档
持续更新中……


#### Authors

* **Xie Han** - *[[email protected]](mailto:[email protected])*
* **Wu Jiaxu** - *[[email protected]](mailto:[email protected])*
* **Wang Zhulei** - *[[email protected]](mailto:[email protected])* - Kafka Protocol Implementation
* **Li Yingxin** - *[[email protected]](mailto:[email protected])*
# 使用中有疑问?
可以先查看[FAQ](https://github.com/sogou/workflow/issues/170)[issues](https://github.com/sogou/workflow/issues)列表,看看是否能找到答案。
非常欢迎将您使用中遇到的问题发送到[issues](https://github.com/sogou/workflow/issues),我们将第一时间进行解答。同时更多的issue对新用户也会带来帮助。
也可以通过QQ群:``618773193`` 联系我们。
11 changes: 4 additions & 7 deletions README_en.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ int main()
* To implement **client/server on user-defined protocol** and build your own **RPC system**.
* [srpc](https://github.com/sogou/srpc) is based on it and it is an independent open source project, which supports srpc, brpc and thrift protocols.
* To build **asynchronous workflow**; support common **series** and **parallel** structures, and also support any **DAG** structures.
* As a **parallel programming tool**. In addition to **networking tasks**, Sogou C++ Workflow also includes **the scheduling of computing tasks**. All types of tasks can be put into **the same** flow.
* As a **parallel computing tool**. In addition to **networking tasks**, Sogou C++ Workflow also includes **the scheduling of computing tasks**. All types of tasks can be put into **the same** flow.
* As a **asynchronous file IO tool** in `Linux` system, with high performance exceeding any system call. Disk file IO is also a task.
* To realize any **high-performance** and **high-concurrency** back-end service with a very complex relationship between computing and networking.
* To build a **service mesh** system.
* To build a **micro service** system.
* This project has built-in **service governance** and **load balancing** features.

#### Compiling and running environment
Expand Down Expand Up @@ -85,6 +85,7 @@ int main()
* [About connection context](docs/en/about-connection-context.md)
* Built-in protocols
* [Asynchronous MySQL client:mysql\_cli](docs/en/tutorial-12-mysql_cli.md)
* [Asynchronous Kafka client: kafka\_cli](docs/en/tutorial-13-kafka_cli.md)

#### System design features

Expand Down Expand Up @@ -128,14 +129,10 @@ Memory reclamation mechanism
* Every task will be automatically reclaimed after the callback. If a task is created but a user does not want to run it, the user needs to release it through the dismiss method.
* Any data in the task, such as the response of the network request, will also be recycled with the task. At this time, the user can use `std::move()` to move the required data.
* SeriesWork and ParallelWork are two kinds of framework objects, which are also recycled after their callback.
* When a series is a branch of a parallel, it will be recycled after the callback of the parallel that it belongs to.
* This project doesn’t use `std::shared_ptr` to manage memory.

#### More design documents

To be continued...

## Authors

* **Xie Han** - *[[email protected]](mailto:[email protected])*
* **Wu Jiaxu** - *[[email protected]](mailto:[email protected])*
* **Li Yingxin** - *[[email protected]](mailto:[email protected])*
53 changes: 2 additions & 51 deletions docs/about-exit.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,57 +12,8 @@
* 我们所有的示例都符合这个假设,在callback里唤醒main函数。这是安全的,不用担心main返回的时候,callback还没结束的情况。
* ParallelWork是一种task,也需要运行到callback。
* 这一条规则某下情况下可以违反,并且程序行为有严格定义。但不了解核心原理的使用者请遵守这条规则,否则程序无法正常退出。
* 所有server必须stop完成,否则行为无定义。因为stop操作用户都会调,所以一般的server程序不会有什么退出方面的问题。

只要符合以上三个条件,程序都是可以正常退出,没有任何内存泄露。虽然定义非常严密,但是这里有一个注意事项,就是关于server stop完成的条件。
* server的stop()调用,会等所有的server任务callback结束(默认这个callback为空),而且不会有新的server任务被处理。
* 但是,如果用户在process里,启动一个新的任务,不在server task所在的series里,这件事框架并不能阻止,并且server stop无法等这个任务完成。
* 同样,如果用户在server task的callback里,向task所在的series里加入一个新任务(比如打log),那么这个新任务也不受server控制。
* 以上两种情况,如果server.stop()之后main函数立刻退出,那么就有可能违反上面的第二条规则。因为还有任务没有运行到callback。

针对上面这个情况,用户需要保证启动的任务已经到callback。方法可以用计数器记录有多少个运行中的任务,在main返回前等待这个数归0。
例如以下示例,server任务的callback里,在当前series加入一个打log的文件写任务(假设写文件非常慢,需要启动一次异步IO):
~~~cpp
std::mutex mutex;
std::condition_variable cond;
int log_task_cnt = 0;

void log_callback(WFFileIOTask *log_task)
{
mutex.lock();
if (--log_task_cnt == 0)
cond.notify_one();
mutex.unlock();
}

void reply_callback(WFHttpTask *server_task)
{
WFFileIOTask *log_task = WFTaskFactory::create_pwrite_task(..., log_callback);

mutex.lock();
log_task_cnt++;
mutex.unlock();
*series_of(server_task) << log_task;
}

int main(void)
{
WFHttpServer server;

server.start();
pause();
...

server.stop();

std::unique_lock<std::mutex> lock(mutex);
while (log_task_cnt != 0)
cond.wait(lock);
lock.unlock();
return 0;
}
~~~
以上这个方法虽然可行,但也确实增加了程序的复杂度和出错误几率,应该尽量避免。例如可直接在reply callback里写log。
* 所有server必须stop完成,否则行为无定义。因为stop操作用户都会调,所以一般server程序不会有什么退出方面的问题。
* server的stop会等待所有server task所在series结束。但如果用户在process直接start一个新任务,则需要自己考虑任务结束的问题。

# 关于OpenSSL 1.1版本在退出时的内存泄露

Expand Down
54 changes: 1 addition & 53 deletions docs/en/about-exit.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,59 +14,7 @@ Generally, as long as you writes the program normally and follows the methods in
* ParallelWork is a kind of tasks, which also needs to run to its callback.
* This rule can be violated under certain circumstances where the procedural behavior is strictly defined. However, if you don't understand the core principles, you should abide by this principle, otherwise the program can't exit normally.
* All server must stop, otherwise the behavior is undefined. Because all users know how to call the stop operation, generally a server program will not have any exit problems.

As long as the above three conditions are met, the program can exit normally without any memory leakage. Despite the strict definition, please note the conditions for the completion of a server stop.

* The call of **stop()** on a server will wait for the callbacks of all server tasks to finish (the callback is empty by default) and no new server tasks are processed.
* However, the framework can't stop you from starting a new task in the process, and not added to the series of the server task. The server **stop()** can't wait for the completion of this new task.
* Similarly, if the user adds a new task (such as logging) to the series of the server task in its callback, the new task is not controlled by the server.
* In both cases, if the main function exits immediately after **server.stop()**, it may violate the second rule above. Because there may still be tasks that have not run to their callback.

In the above situation, you need to ensure that the started task has run to its callback. You can use a counter to record the number of running tasks, and wait for the count value to reach 0 before the main function returns.
In the following example, in the callback of a server task, a log file writing task is added to the current series (assuming that file writing is very slow and asynchronous IO needs to be started once).

~~~cpp
std::mutex mutex;
std::condition_variable cond;
int log_task_cnt = 0;

void log_callback(WFFileIOTask *log_task)
{
mutex.lock();
if (--log_task_cnt == 0)
cond.notify_one();
mutex.unlock();
}

void reply_callback(WFHttpTask *server_task)
{
WFFileIOTask *log_task = WFTaskFactory::create_pwrite_task(..., log_callback);

mutex.lock();
log_task_cnt++;
mutex.unlock();
*series_of(server_task) << log_task;
}

int main(void)
{
WFHttpServer server;

server.start();
pause();
...

server.stop();

std::unique_lock<std::mutex> lock(mutex);
while (log_task_cnt != 0)
cond.wait(lock);
lock.unlock();
return 0;
}
~~~
Although the above method is feasible, it does increase the complexity and the error probability of the program, which should be avoided as much as possible. For example, you can write log directly in reply callback.
* Server's stop() method will block until all server tasks' series end. But if you start a task directly in process function, you have to take care of the end this task.

# About memory leakage of OpenSSL 1.1 in exiting

Expand Down
Loading

0 comments on commit fa03e01

Please sign in to comment.