Files
workflow/docs/tutorial-13-kafka_cli.md
wzlsuccess ab059d3663 update docs
2021-06-22 17:11:23 +08:00

327 lines
11 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 异步Kafka客户端kafka_cli
# 示例代码
[tutorial-13-kafka_cli.cc](/tutorial/tutorial-13-kafka_cli.cc)
# 关于编译选项
在workflow中对kafka协议的支持是独立的。因此可以通过命令make KAFKA=y 编译独立的类库支持kafka协议系统需要预先安装[zlib](https://github.com/madler/zlib.git),[snappy](https://github.com/google/snappy.git),[lz4(>=1.7.5)](https://github.com/lz4/lz4.git),[zstd](https://github.com/facebook/zstd.git)等第三方库用来支持kafka协议中的压缩算法。
# 关于kafka_cli
这是一个kafka client可以完成kafka的消息生产(produce)和消息消费(fetch)。
编译时需要在tutorial目录中执行编译命令make KAFKA=y或者在项目根目录执行make KAFKA=y tutorial。
该程序从命令行读取一个kafka broker服务器地址和本次任务的类型(produce/fetch)
./kafka_cli \<broker_url\> [p/c]
程序会在执行完任务后自动退出,一切资源完全回收。
其中broker_url可以有多个url组成多个url之间以,分割
- 形式如kafka://host:port,kafka://host1:port...
- port默认为9092;
- 如果用户在这一层有upstream选取需求可以参考[upstream文档](../docs/about-upstream.md)。
Kafka broker_url示例
kafka://127.0.0.1/
kafka://kafka.host:9090/
kafka://10.160.23.23:9000,10.123.23.23,kafka://kafka.sogou
# 实现原理和特性
kafka client内部实现上除了压缩功能外没有依赖第三方库同时利用了workflow的高性能在合理的配置和环境下每秒钟可以处理几万次Kafka请求。
在内部实现上kafka client会把一次请求按照内部使用到的broker分拆成并行parallel任务每个broker地址对应parallel任务中的一个子任务
这样可以最大限度的提升效率同时利用workflow内部对连接的复用机制使得整体的连接数控制在一个合理的范围。
如果一个broker地址下有多个topic partition为了提高吞吐应该创建多个client然后按照topic partition分别创建任务独立启动。
# 创建并启动Kafka任务
首先需要创建一个WFKafkaClient对象然后调用init函数初始化WFKafkaClient对象
~~~cpp
int init(const std::string& broker_url);
int init(const std::string& broker_url, const std::string& group);
~~~
其中broker_url是kafka broker集群的地址格式可以参考上面的broker_url
group是消费者组的group_name用在基于消费者组的fetch任务中如果是produce任务或者没有使用消费者组的fetch任务则不需要使用此接口
用消费者组的时候可以设置heartbeat的间隔时间时间单位是毫秒用于维持心跳
~~~cpp
void set_heartbeat_interval(size_t interval_ms);
~~~
后面再通过WFKafkaClient对象创建kafka任务
~~~cpp
using kafka_callback_t = std::function<void (WFKafkaTask *)>;
WFKafkaTask *create_kafka_task(const std::string& query, int retry_max, kafka_callback_t cb);
WFKafkaTask *create_kafka_task(int retry_max, kafka_callback_t cb);
~~~
其中query中包含此次任务的类型以及topic等属性retry_max表示最大重试次数cb为用户自定义的callback函数当task执行完毕后会被调用
接着还可以修改task的默认配置以满足实际需要详细接口可以在[KafkaDataTypes.h](../src/protocol/KafkaDataTypes.h)中查看
~~~cpp
KafkaConfig config;
config.set_client_id("workflow");
task->set_config(std::move(config));
~~~
支持的配置选项描述如下:
配置名 | 类型 | 默认值 | 含义
------ | ---- | -------| -------
produce_timeout | int | 100ms | produce的超时时间
produce_msg_max_bytes | int | 1000000 bytes | 单个消息的最大长度限制
produce_msgset_cnt | int | int | 10000 | 一次通信消息集合的最大条数
produce_msgset_max_bytes | int | 1000000 bytes | 一次通信消息集合的最大长度限制
fetch_timeout | int | 100ms | fetch的超时时间
fetch_min_bytes | int | 1 byte | 一次fetch通信最小消息的长度
fetch_max_bytes | int | 50M bytes | 一次fetch通信最大消息的长度
fetch_msg_max_bytes | int | 1M bytes | 一次fetch通信单个消息的最大长度
offset_timestamp | long long int | -2 | 消费者组模式下没有找到历史offset时初始化的offset-2表示最久-1表示最新
session_timeout | int | 10s | 加入消费者组初始化时的超时时间
rebalance_timeout | int | 10s | 加入消费者组同步信息阶段的超时时间
produce_acks | int | -1 | produce任务在返回之前应确保消息成功复制的broker节点数-1表示所有的复制broker节点
allow_auto_topic_creation | bool | true | produce时topic不存在时是否自动创建topic
broker_version | char * | NULL | 指定broker的版本号<0.10时需要手动指定
compress_type | int | NoCompress | produce消息的压缩类型
client_id | char * | NULL | 表示client的id
check_crcs | bool | false | fetch任务中是否校验消息的crc32
offset_store | int | 0 | 加入消费者组时是否使用上次提交offset1表示使用指定的offset0表示优先使用上次提交
sasl_mechanisms | char * | NULL | sasl认证类型目前支持plain和scram
sasl_username | char * | NULL | sasl认证所需的username
sasl_password | char * | NULL | sasl认证所需的password
最后就可以调用start接口启动kafka任务。
# produce任务
1、在创建并初始化WFKafkaClient之后可以在query中直接指定topic等信息创建WFKafkaTask任务
使用示例如下:
~~~cpp
int main(int argc, char *argv[])
{
...
client = new WFKafkaClient();
client->init(url);
task = client->create_kafka_task("api=fetch&topic=xxx&topic=yyy", 3, kafka_callback);
...
task->start();
...
}
~~~
2、在创建完WFKafkaTask之后先通过调用set_key, set_value, add_header_pair等方法构建KafkaRecord
关于KafkaRecord的更多接口可以在[KafkaDataTypes.h](../src/protocol/KafkaDataTypes.h)中查看
然后应该通过调用add_produce_record添加KafkaRecord关于更多接口的详细定义可以在[WFKafkaClient.h](../src/client/WFKafkaClient.h)中查看
需要注意的是add_produce_record的第二个参数partition当>=0是表示指定的partition-1表示随机指定partition或者调用自定义的kafka_partitioner_t
kafka_partitioner_t可以通过set_partitioner接口设置自定义规则。
使用示例如下:
~~~cpp
int main(int argc, char *argv[])
{
...
WFKafkaClient *client_fetch = new WFKafkaClient();
client_fetch->init(url);
task = client_fetch->create_kafka_task("api=produce&topic=xxx&topic=yyy", 3, kafka_callback);
task->set_partitioner(partitioner);
KafkaRecord record;
record.set_key("key1", strlen("key1"));
record.set_value(buf, sizeof(buf));
record.add_header_pair("hk1", 3, "hv1", 3);
task->add_produce_record("workflow_test1", -1, std::move(record));
...
task->start();
...
}
~~~
3、produce还可以使用kafka支持的4种压缩协议通过设置配置项来实现
使用示例如下:
~~~cpp
int main(int argc, char *argv[])
{
...
WFKafkaClient *client_fetch = new WFKafkaClient();
client_fetch->init(url);
task = client_fetch->create_kafka_task("api=produce&topic=xxx&topic=yyy", 3, kafka_callback);
KafkaConfig config;
config.set_compress_type(Kafka_Zstd);
task->set_config(std::move(config));
KafkaRecord record;
record.set_key("key1", strlen("key1"));
record.set_value(buf, sizeof(buf));
record.add_header_pair("hk1", 3, "hv1", 3);
task->add_produce_record("workflow_test1", -1, std::move(record));
...
task->start();
...
}
~~~
# fetch任务
fetch任务支持消费者组模式和手动模式
1、手动模式
无需指定消费者组同时需要用户指定topic、partition和offset
使用示例如下:
~~~cpp
client = new WFKafkaClient();
client->init(url);
task = client->create_kafka_task("api=fetch", 3, kafka_callback);
KafkaToppar toppar;
toppar.set_topic_partition("workflow_test1", 0);
toppar.set_offset(0);
task->add_toppar(toppar);
~~~
2、消费者组模式
在初始化client的时候需要指定消费者组的名称
使用示例如下:
~~~cpp
int main(int argc, char *argv[])
{
...
WFKafkaClient *client_fetch = new WFKafkaClient();
client_fetch->init(url, cgroup_name);
task = client_fetch->create_kafka_task("api=fetch&topic=xxx&topic=yyy", 3, kafka_callback);
...
task->start();
...
}
~~~
3、offset的提交
在消费者组模式下用户消费消息后可以在callback函数中通过创建commit任务来自动提交消费的记录使用示例如下
~~~cpp
void kafka_callback(WFKafkaTask *task)
{
...
commit_task = client.create_kafka_task("api=commit", 3, kafka_callback);
...
commit_task->start();
...
}
~~~
# 关于client的关闭
在消费者组模式下client在关闭之前需要调用create_leavegroup_task创建leavegroup_task
它会发送leavegroup协议包如果没有启动leavegroup_task会导致消费者组没有正确退出触发这个组的rebalance。
# 处理kafka结果
消息的结果集的数据结构是KafkaResult可以通过调用WFKafkaTask的get_result()接口获得,
然后调用KafkaResult的fetch_record接口可以将本次task相关的record取出来它是一个KafkaRecord的二维vector
第一维是topic partition第二维是某个topic partition下对应的KafkaRecord
在[KafkaResult.h](../src/protocol/KafkaResult.h)中可以看到KafkaResult的定义
~~~cpp
void kafka_callback(WFKafkaTask *task)
{
int state = task->get_state();
int error = task->get_error();
// handle error states
...
protocol::KafkaResult *result = task->get_result();
result->fetch_records(records);
for (auto &v : records)
{
for (auto &w: v)
{
const void *value;
size_t value_len;
w->get_value(&value, &value_len);
printf("produce\ttopic: %s, partition: %d, status: %d, offset: %lld, val_len: %zu\n",
w->get_topic(), w->get_partition(), w->get_status(), w->get_offset(), value_len);
}
}
...
protocol::KafkaResult new_result = std::move(*task->get_result());
if (new_result.fetch_records(records))
{
for (auto &v : records)
{
if (v.empty())
continue;
for (auto &w: v)
{
if (fp)
{
const void *value;
size_t value_len;
w->get_value(&value, &value_len);
fwrite(w->get_value(), w->get_value_len(), 1, fp);
}
}
}
}
...
}
~~~
# 认证
认证信息需要在配置中设置以sasl为例:
~~~cpp
int main(int argc, char *argv[])
{
...
client = new WFKafkaClient();
client->init(url);
task = client->create_kafka_task("api=fetch&topic=xxx&topic=yyy", 3, kafka_callback);
config.set_sasl_username("fetch");
config.set_sasl_password("fetch-secret");
config.set_sasl_mech("SCRAM-SHA-256");
task->set_config(std::move(config));
...
task->start();
...
}
~~~