mirror of
https://github.com/sogou/workflow.git
synced 2026-02-08 01:33:17 +08:00
Merge branch 'master' of https://github.com/sogou/workflow into nossl
This commit is contained in:
6
BUILD
6
BUILD
@@ -375,3 +375,9 @@ cc_binary(
|
||||
srcs = ['tutorial/tutorial-14-consul_cli.cc'],
|
||||
deps = [':consul'],
|
||||
)
|
||||
|
||||
cc_binary(
|
||||
name = 'redis_subscriber',
|
||||
srcs = ['tutorial/tutorial-18-redis_subscriber.cc'],
|
||||
deps = [':redis'],
|
||||
)
|
||||
|
||||
@@ -127,6 +127,7 @@ sudo dnf install workflow
|
||||
* [异步MySQL客户端:mysql_cli](docs/tutorial-12-mysql_cli.md)
|
||||
* [异步kafka客户端:kafka_cli](docs/tutorial-13-kafka_cli.md)
|
||||
* [异步DNS客户端:dns_cli](docs/tutorial-17-dns_cli.md)
|
||||
* [Redis订阅客户端:redis_subscriber](docs/tutorial-18-redis_subscriber.md)
|
||||
|
||||
#### 编程范式
|
||||
|
||||
|
||||
@@ -18,7 +18,11 @@ mysql://username:password@host:port/dbname?character\_set=charset&character\_set
|
||||
|
||||
- set scheme to be **mysqls://** for accessing MySQL with SSL connnection (MySQL server 5.7 or above is required).
|
||||
|
||||
- fill in the username and the password for the MySQL database;
|
||||
- fill in the username and the password for the MySQL database; Special characters in password need to be escaped.
|
||||
~~~cpp
|
||||
// Password: @@@@####
|
||||
std::string url = "mysql://root:" + StringUtil::url_encode_component("@@@@####") + "@127.0.0.1";
|
||||
~~~
|
||||
|
||||
- the default port number is 3306;
|
||||
|
||||
|
||||
@@ -17,8 +17,11 @@ mysql://username:password@host:port/dbname?character_set=charset&character_set_r
|
||||
|
||||
- 如果以SSL连接访问MySQL,则scheme设为**mysqls://**。MySQL server 5.7及以上支持;
|
||||
|
||||
- username和password按需填写;
|
||||
|
||||
- username和password按需填写,如果密码里包含特殊字符,需要转义后再拼接URL;
|
||||
~~~cpp
|
||||
// 密码为:@@@@####
|
||||
std::string url = "mysql://root:" + StringUtil::url_encode_component("@@@@####") + "@127.0.0.1";
|
||||
~~~
|
||||
- port默认为3306;
|
||||
|
||||
- dbname为要用的数据库名,一般如果SQL语句只操作一个db的话建议填写;
|
||||
|
||||
116
docs/tutorial-18-redis_subscriber.md
Normal file
116
docs/tutorial-18-redis_subscriber.md
Normal file
@@ -0,0 +1,116 @@
|
||||
# Redis订阅模式
|
||||
|
||||
## 示例代码
|
||||
[tutorial-18-redis_subscriber.cc](/tutorial/tutorial-18-redis_subscriber.cc)
|
||||
|
||||
## 创建订阅客户端和任务
|
||||
在Workflow中,一个客户端网络任务通常是向服务端发出一个请求并接收一个回复,而Redis订阅任务不同,它会先发出一个订阅请求,然后源源不断地接收服务端推送过来的消息,在这个过程中,客户端还可以新增或取消channels、patterns。
|
||||
|
||||
用于实现Redis订阅功能的任务是`WFRedisSubscribeTask`,与普通的Redis任务不同,它不从任务工厂产生,而是需要使用`WFRedisSubscriber`来创建。例如
|
||||
|
||||
```cpp
|
||||
WFRedisSubscriber suber;
|
||||
|
||||
if (suber.init(url) != 0)
|
||||
{
|
||||
std::cerr << "Subscriber init failed " << strerror(errno) << std::endl;
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// ...
|
||||
|
||||
WFRedisSubscribeTask *task;
|
||||
task = suber.create_subscribe_task(channels, extract, callback);
|
||||
|
||||
task->set_watch_timeout(1000000); // 1000秒
|
||||
task->start();
|
||||
|
||||
// 这里可以使用task的相关接口改变订阅内容
|
||||
// ...
|
||||
|
||||
task->release();
|
||||
suber.deinit();
|
||||
```
|
||||
|
||||
初始化`WFRedisSubscriber`需要使用`Redis URL`,这与普通Redis任务相同,不再赘述。创建订阅任务时,需要提供三个参数
|
||||
|
||||
- channels/patterns: 一个或多个被订阅的channel(subscribe)或pattern(psubscribe)
|
||||
- extract: 收到服务端推送消息时的处理函数
|
||||
- callback: 任务结束后的回调函数
|
||||
|
||||
这个例子中为`watch_timeout`设置了一个很长的时间,若这个时间较短,且服务端长时间未推送消息,则连接会因为超时而断开,订阅任务也会直接失败,请根据实际情况合理设置。
|
||||
|
||||
当任务处理完成后,需要通过`task->release()`来释放这个任务,这也是与其他任务的一个不同之处。
|
||||
|
||||
## 处理订阅消息
|
||||
服务端推送的消息由创建任务时指定的`extract`函数处理。后续描述中,subscribe对应channel,psubscribe对应pattern。
|
||||
|
||||
1. 服务端推送的消息格式是具有三个元素的数组,第一个元素是字符串"message"或"pmessage",第二个元素是该消息的channel或pattern的名称,第三个元素是消息的内容。
|
||||
2. subscribe或psubscribe请求的回复是具有三个元素的数组,第一个元素是字符串"subscribe"或"psubscribe",第二个元素是channel或pattern的名称,第三个元素是当前通过subscribe或psubscribe命令已经订阅了多少个channel或pattern,是一个整数。如果一个请求订阅了多个channel或pattern,会有多个回复。
|
||||
3. unsubscribe或punsubscribe请求的回复是具有三个元素的数组,格式与订阅命令相似。当取消订阅但不指定channel或pattern时,表示取消所有该类型的订阅,对于所有已经订阅的channel或pattern,返回一个回复消息。若当前类型未订阅任何channel或pattern,则返回一个消息,其中名称部分为nil。
|
||||
|
||||
更多详情可参阅redis文档。
|
||||
|
||||
处理消息的一个示例如下,简单地将内容打印到标准输出
|
||||
|
||||
```cpp
|
||||
void extract(WFRedisSubscribeTask *task)
|
||||
{
|
||||
auto *resp = task->get_resp();
|
||||
protocol::RedisValue value;
|
||||
|
||||
resp->get_result(value);
|
||||
|
||||
if (value.is_array())
|
||||
{
|
||||
for (size_t i = 0; i < value.arr_size(); i++)
|
||||
{
|
||||
if (value[i].is_string())
|
||||
std::cout << value[i].string_value();
|
||||
else if (value[i].is_int())
|
||||
std::cout << value[i].int_value();
|
||||
else if (value[i].is_nil())
|
||||
std::cout << "nil";
|
||||
else
|
||||
std::cout << "Unexpected value in array!";
|
||||
|
||||
std::cout << "\n";
|
||||
}
|
||||
}
|
||||
else
|
||||
std::cout << "Unexpected value!\n";
|
||||
}
|
||||
```
|
||||
|
||||
## 改变订阅内容
|
||||
在任务过程中,可以通过下述接口新增或取消订阅,注意在带有channels或patterns参数的接口中,请勿传入空数组。
|
||||
|
||||
```cpp
|
||||
// ...
|
||||
|
||||
task->start();
|
||||
|
||||
// 新增订阅一组channels
|
||||
task->subscribe(channels);
|
||||
|
||||
// 取消订阅一组channels
|
||||
task->unsubscribe(channels);
|
||||
|
||||
// 取消订阅所有channels
|
||||
task->unsubscribe();
|
||||
|
||||
// 新增订阅一组patterns
|
||||
task->psubscribe(patterns);
|
||||
|
||||
// 取消订阅一组patterns
|
||||
task->punsubscribe(patterns);
|
||||
|
||||
// 取消订阅所有patterns
|
||||
task->punsubscribe();
|
||||
|
||||
task->release();
|
||||
```
|
||||
|
||||
当所有channels和patterns都被取消订阅后,任务会直接结束,此后不能再新增订阅,请注意该细节。也可以直接通过`task->quit()`来主动结束任务。
|
||||
|
||||
此外,订阅模式下可以通过`task->ping()`或`task->ping(message)`向Redis服务器发起`ping`请求。当任务设置了较小的`watch_timeout`,但服务端可能长时间没有消息推送时,通过定时发出`ping`请求可以令服务端推送`pong`响应,此时任务便不会因为超时而失败。
|
||||
@@ -67,6 +67,7 @@ if (NOT REDIS STREQUAL "n")
|
||||
set(TUTORIAL_LIST
|
||||
tutorial-02-redis_cli
|
||||
tutorial-03-wget_to_redis
|
||||
tutorial-18-redis_subscriber
|
||||
)
|
||||
foreach(src ${TUTORIAL_LIST})
|
||||
string(REPLACE "-" ";" arr ${src})
|
||||
|
||||
150
tutorial/tutorial-18-redis_subscriber.cc
Normal file
150
tutorial/tutorial-18-redis_subscriber.cc
Normal file
@@ -0,0 +1,150 @@
|
||||
#include <cerrno>
|
||||
#include <cctype>
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
|
||||
#include "workflow/WFRedisSubscriber.h"
|
||||
#include "workflow/WFFacilities.h"
|
||||
#include "workflow/StringUtil.h"
|
||||
|
||||
void extract(WFRedisSubscribeTask *task)
|
||||
{
|
||||
auto *resp = task->get_resp();
|
||||
protocol::RedisValue value;
|
||||
|
||||
resp->get_result(value);
|
||||
|
||||
if (value.is_array())
|
||||
{
|
||||
for (size_t i = 0; i < value.arr_size(); i++)
|
||||
{
|
||||
if (value[i].is_string())
|
||||
std::cout << value[i].string_value();
|
||||
else if (value[i].is_int())
|
||||
std::cout << value[i].int_value();
|
||||
else if (value[i].is_nil())
|
||||
std::cout << "nil";
|
||||
else
|
||||
std::cout << "Unexpected value in array!";
|
||||
|
||||
std::cout << "\n";
|
||||
}
|
||||
}
|
||||
else
|
||||
std::cout << "Unexpected value!\n";
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
if (argc < 3)
|
||||
{
|
||||
std::cerr << argv[0] << " <URL> <Channel> [<Channel>]..." << std::endl;
|
||||
exit(1);
|
||||
}
|
||||
|
||||
std::string url = argv[1];
|
||||
if (strncasecmp(argv[1], "redis://", 8) != 0 &&
|
||||
strncasecmp(argv[1], "rediss://", 9) != 0)
|
||||
{
|
||||
url = "redis://" + url;
|
||||
}
|
||||
|
||||
WFRedisSubscriber suber;
|
||||
|
||||
if (suber.init(url) != 0)
|
||||
{
|
||||
std::cerr << "Subscriber init failed " << strerror(errno) << std::endl;
|
||||
exit(1);
|
||||
}
|
||||
|
||||
std::vector<std::string> channels;
|
||||
for (int i = 2; i < argc; i++)
|
||||
channels.push_back(argv[i]);
|
||||
|
||||
WFFacilities::WaitGroup wg(1);
|
||||
bool finished = false;
|
||||
|
||||
auto callback = [&](WFRedisSubscribeTask *task)
|
||||
{
|
||||
std::cout << "state = " << task->get_state()
|
||||
<< ", error = " << task->get_error() << std::endl;
|
||||
|
||||
finished = true;
|
||||
wg.done();
|
||||
};
|
||||
|
||||
WFRedisSubscribeTask *task;
|
||||
task = suber.create_subscribe_task(channels, extract, callback);
|
||||
|
||||
task->set_watch_timeout(1000000);
|
||||
task->start();
|
||||
|
||||
std::string line;
|
||||
|
||||
while (!finished)
|
||||
{
|
||||
std::string cmd;
|
||||
std::vector<std::string> params;
|
||||
|
||||
if (std::getline(std::cin, line))
|
||||
{
|
||||
if (line.empty())
|
||||
continue;
|
||||
|
||||
params = StringUtil::split_filter_empty(line, ' ');
|
||||
}
|
||||
|
||||
if (finished)
|
||||
break;
|
||||
|
||||
if (params.empty())
|
||||
{
|
||||
task->unsubscribe();
|
||||
task->punsubscribe();
|
||||
break;
|
||||
}
|
||||
|
||||
cmd = params[0];
|
||||
params.erase(params.begin());
|
||||
|
||||
for (char &c : cmd)
|
||||
c = std::toupper(c);
|
||||
|
||||
int ret;
|
||||
if (cmd == "SUBSCRIBE")
|
||||
ret = task->subscribe(params);
|
||||
else if (cmd == "UNSUBSCRIBE")
|
||||
ret = task->unsubscribe(params);
|
||||
else if (cmd == "PSUBSCRIBE")
|
||||
ret = task->psubscribe(params);
|
||||
else if (cmd == "PUNSUBSCRIBE")
|
||||
ret = task->punsubscribe(params);
|
||||
else if (cmd == "PING")
|
||||
{
|
||||
if (params.empty())
|
||||
ret = task->ping();
|
||||
else
|
||||
ret = task->ping(params[0]);
|
||||
}
|
||||
else if (cmd == "QUIT")
|
||||
ret = task->quit();
|
||||
else
|
||||
{
|
||||
std::cerr << "Invalid command " << cmd << std::endl;
|
||||
ret = 0;
|
||||
}
|
||||
|
||||
if (ret < 0)
|
||||
{
|
||||
std::cerr << "Send command failed " << strerror(errno) << std::endl;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
task->release();
|
||||
wg.wait();
|
||||
suber.deinit();
|
||||
|
||||
return 0;
|
||||
}
|
||||
@@ -11,7 +11,7 @@ function all_examples()
|
||||
local item = {}
|
||||
local s = path.filename(x)
|
||||
if ((s == "upstream_unittest.cc" and not has_config("upstream")) or
|
||||
((s == "tutorial-02-redis_cli.cc" or s == "tutorial-03-wget_to_redis.cc") and not has_config("redis")) or
|
||||
((s == "tutorial-02-redis_cli.cc" or s == "tutorial-03-wget_to_redis.cc" or s == "tutorial-18-redis_subscriber.cc") and not has_config("redis")) or
|
||||
(s == "tutorial-12-mysql_cli.cc" and not has_config("mysql")) or
|
||||
(s == "tutorial-14-consul_cli.cc" and not has_config("consul")) or
|
||||
(s == "tutorial-13-kafka_cli.cc")) then
|
||||
|
||||
Reference in New Issue
Block a user