diff --git a/docs/tutorial-13-kafka_cli.md b/docs/tutorial-13-kafka_cli.md index b4845d30..ef14eda0 100644 --- a/docs/tutorial-13-kafka_cli.md +++ b/docs/tutorial-13-kafka_cli.md @@ -5,19 +5,20 @@ # 关于编译选项 -在workflow中,你可以使用第三方库比如librdkafka,也可使用自带的kafka client,因此它对kafka协议的支持是独立的。 +在workflow中,关于kafka协议的支持,你可以使用第三方库比如[librdkafka](https://github.com/edenhill/librdkafka.git),也可使用自带的kafka client,因此对kafka协议的支持是独立的。 通过命令make KAFKA=y 编译独立的类库支持kafka协议,系统需要预先安装[zlib](https://github.com/madler/zlib.git),[snappy](https://github.com/google/snappy.git),[lz4(>=1.7.5)](https://github.com/lz4/lz4.git),[zstd](https://github.com/facebook/zstd.git)等第三方库。 + # 关于kafka_cli -这是一个kafka client,根据不同的输入参数,完成kafka的消息生产(produce)、消息消费(fetch)、元数据获取(meta)等。 +这是一个kafka client,可以完成kafka的消息生产(produce)和消息消费(fetch)。 -编译时需要在tutorial目录中执行编译命令make KAFKA=y。 +编译时需要在tutorial目录中执行编译命令make KAFKA=y或者在项目根目录执行make KAFKA=y tutorial。 -该程序从命令行读取一个kafka broker服务器地址和本次任务的类型(produce/fetch/meta): +该程序从命令行读取一个kafka broker服务器地址和本次任务的类型(produce/fetch): -./kafka_cli \ [p/c/m] +./kafka_cli \ [p/c] 程序会在执行完任务后自动退出,一切资源完全回收。 @@ -35,9 +36,35 @@ kafka://kafka.host:9090/ kafka://10.160.23.23:9000,10.123.23.23,kafka://kafka.sogou +# 实现原理和特性 + +kafka client内部实现上除了压缩功能外没有依赖第三方库,同时利用了workflow的高性能,在合理的配置和环境下,每秒钟可以处理几万次Kafka请求。 + +在内部实现上,kafka client会把一次请求按照内部使用到的broker分拆成并行parallel任务,每个broker地址对应parallel任务中的一个子任务, + +这样可以最大限度的提升效率,同时利用workflow内部对连接的复用机制使得整体的连接数控制在一个合理的范围。 + +如果一个broker地址下有多个topic partition,为了提高吞吐,应该创建多个client,然后按照topic partition分别创建任务独立启动。 + + # 创建并启动Kafka任务 -由于Kafka需要保存broker、meta和group之类的全局信息,因此建议用户使用WFKafkaClient这个二级工厂来创建kafka任务 +首先需要创建一个WFKafkaClient对象,然后调用init函数初始化WFKafkaClient对象, +~~~cpp +int init(const std::string& broker_url); + +int init(const std::string& broker_url, const std::string& group); +~~~ +其中broker_url是kafka broker集群的地址,格式可以参考上面的broker_url, + +group是消费者组的group_name,用在基于消费者组的fetch任务中,如果是produce任务或者没有使用消费者组的fetch任务,则不需要使用此接口; + +用消费者组的时候,可以设置heartbeat的间隔时间,时间单位是毫秒,用于维持心跳: +~~~cpp +void set_heartbeat_interval(size_t interval_ms); +~~~ + +后面再通过WFKafkaClient对象创建kafka任务 ~~~cpp using kafka_callback_t = std::function; @@ -45,10 +72,41 @@ WFKafkaTask *create_kafka_task(const std::string& query, int retry_max, kafka_ca WFKafkaTask *create_kafka_task(int retry_max, kafka_callback_t cb); ~~~ +其中query中包含此次任务的类型以及topic等属性,retry_max表示最大重试次数,cb为用户自定义的callback函数,当task执行完毕后会被调用, -用户有两种方式设置任务的详细信息: +接着还可以修改task的默认配置以满足实际需要,详细接口可以在[KafkaDataTypes.h](../src/protocol/KafkaDataTypes.h)中查看 +~~~cpp +KafkaConfig config; +config.set_client_id("workflow"); +task->set_config(std::move(config)); +~~~ +支持的配置选项描述如下: +配置名 | 类型 | 默认值 | 含义 +------ | ---- | -------| ------- +produce_timeout | int | 100ms | produce的超时时间 +produce_msg_max_bytes | int | 1000000 bytes | 单个消息的最大长度限制 +produce_msgset_cnt | int | int | 10000 | 一次通信消息集合的最大条数 +produce_msgset_max_bytes | int | 1000000 bytes | 一次通信消息集合的最大长度限制 +fetch_timeout | int | 100ms | fetch的超时时间 +fetch_min_bytes | int | 1 byte | 一次fetch通信最小消息的长度 +fetch_max_bytes | int | 50M bytes | 一次fetch通信最大消息的长度 +fetch_msg_max_bytes | int | 1M bytes | 一次fetch通信单个消息的最大长度 +offset_timestamp | long long int | -2 | 消费者组模式下,没有找到历史offset时,初始化的offset,-2表示最久,-1表示最新 +session_timeout | int | 10s | 加入消费者组初始化时的超时时间 +rebalance_timeout | int | 10s | 加入消费者组同步信息阶段的超时时间 +produce_acks | int | -1 | produce任务在返回之前应确保消息成功复制的broker节点数,-1表示所有的复制broker节点 +allow_auto_topic_creation | bool | true | produce时topic不存在时,是否自动创建topic +broker_version | char * | NULL | 指定broker的版本号,<0.10时需要手动指定 +compress_type | int | NoCompress | produce消息的压缩类型 +client_id | char * | NULL | 表示client的id +check_crcs | bool | false | fetch任务中是否校验消息的crc32 -1、在query中直接指定任务类型、topic等信息 + +最后就可以调用start接口启动kafka任务。 + +# produce任务 + +1、在创建并初始化WFKafkaClient之后,可以在query中直接指定topic等信息创建WFKafkaTask任务 使用示例如下: ~~~cpp @@ -65,17 +123,15 @@ int main(int argc, char *argv[]) } ~~~ -2、在创建完WFKafkaTask之后,根据任务的类型先调用set_api_type设置,然后调用add接口准备输入, +2、在创建完WFKafkaTask之后,先通过调用set_key, set_value, add_header_pair等方法构建KafkaRecord, -关于二级工厂的更多接口,可以在[WFKafkaClient.h](../src/client/WFKafkaClient.h)中查看 +关于KafkaRecord的更多接口,可以在[KafkaDataTypes.h](../src/protocol/KafkaDataTypes.h)中查看 -比如针对produce任务,先创建KafkaRecord,然后调用set_key, set_value, add_header_pair等方法构建KafkaRecord, +然后应该通过调用add_produce_record添加KafkaRecord,关于更多接口的详细定义,可以在[WFKafkaClient.h](../src/client/WFKafkaClient.h)中查看 -接着调用add_produce_record添加record,关于KafkaRecord的更多接口,可以在[KafkaDataTypes.h](../src/protocol/KafkaDataTypes.h)中查看 +需要注意的是,add_produce_record的第二个参数partition,当>=0是表示指定的partition,-1表示随机指定partition或者调用自定义的kafka_partitioner_t -针对fetch和meta任务,需要调用add_topic指定topic - -其他包括callback、series、user_data等与workflow其他task用法类似。 +kafka_partitioner_t可以通过set_partitioner接口设置自定义规则。 使用示例如下: ~~~cpp @@ -85,6 +141,7 @@ int main(int argc, char *argv[]) WFKafkaClient *client_fetch = new WFKafkaClient(); client_fetch->init(url); task = client_fetch->create_kafka_task("api=produce&topic=xxx&topic=yyy", 3, kafka_callback); + task->set_partitioner(partitioner); KafkaRecord record; record.set_key("key1", strlen("key1")); @@ -98,11 +155,55 @@ int main(int argc, char *argv[]) } ~~~ +3、produce还可以使用kafka支持的4种压缩协议,通过设置配置项来实现 + +使用示例如下: +~~~cpp +int main(int argc, char *argv[]) +{ + ... + WFKafkaClient *client_fetch = new WFKafkaClient(); + client_fetch->init(url); + task = client_fetch->create_kafka_task("api=produce&topic=xxx&topic=yyy", 3, kafka_callback); + + KafkaConfig config; + config.set_compress_type(Kafka_Zstd); + task->set_config(std::move(config)); + + KafkaRecord record; + record.set_key("key1", strlen("key1")); + record.set_value(buf, sizeof(buf)); + record.add_header_pair("hk1", 3, "hv1", 3); + task->add_produce_record("workflow_test1", -1, std::move(record)); + + ... + task->start(); + ... +} +~~~ + + # fetch任务 fetch任务支持消费者组模式和手动模式 -1、消费者组模式 +1、手动模式 + +无需指定消费者组,同时需要用户指定topic、partition和offset + +使用示例如下: +~~~cpp + client = new WFKafkaClient(); + client->init(url); + task = client->create_kafka_task("api=fetch", 3, kafka_callback); + + KafkaToppar toppar; + toppar.set_topic_partition("workflow_test1", 0); + toppar.set_offset(0); + task->add_toppar(toppar); +~~~ + +2、消费者组模式 在初始化client的时候需要指定消费者组的名称 @@ -121,31 +222,38 @@ int main(int argc, char *argv[]) } ~~~ -2、手动模型 +3、offset的提交 -无需指定消费者组,同时需要用户指定topic、partition和offset - -使用示例如下: +在消费者组模式下,用户消费消息后,可以在callback函数中,通过创建commit任务来自动提交消费的记录,使用示例如下: ~~~cpp - client = new WFKafkaClient(); - client->init(url); - task = client->create_kafka_task("api=fetch", 3, kafka_callback); +void kafka_callback(WFKafkaTask *task) +{ + ... + commit_task = client.create_kafka_task("api=commit", 3, kafka_callback); - KafkaToppar toppar; - toppar.set_topic_partition("workflow_test1", 0); - toppar.set_offset(0); - task->add_toppar(toppar); + ... + commit_task->start(); + ... +} ~~~ + # 关于client的关闭 在消费者组模式下,client在关闭之前需要调用create_leavegroup_task创建leavegroup_task, -它会发送leavegroup协议包,否则会导致消费者组没有正确退出 +它会发送leavegroup协议包,如果没有启动leavegroup_task,会导致消费者组没有正确退出,触发这个组的rebalance。 + # 处理kafka结果 -处理结果的函数和其他的示例一样,既可以使用普通函数也可以使用std::function来处理结果 +消息的结果集的数据结构是KafkaResult,可以通过调用WFKafkaTask的get_result()接口获得, + +然后调用KafkaResult的fetch_record接口可以将本次task相关的record取出来,它是一个KafkaRecord的二维vector, + +第一维是topic partition,第二维是某个topic partition下对应的KafkaRecord, + +在[KafkaResult.h](../src/protocol/KafkaResult.h)中可以看到KafkaResult的定义 ~~~cpp void kafka_callback(WFKafkaTask *task) { @@ -194,9 +302,3 @@ void kafka_callback(WFKafkaTask *task) ... } ~~~ - -在这个callback中,task就是二级工厂产生的task,任务的结果集类型是protocol::KafkaResult。 - -结果集对象可以通过task->get_result()直接得到,获得结果。 - -在[KafkaResult.h](../src/protocol/KafkaResult.h)中可以看到KafkaResult的定义。 diff --git a/tutorial/tutorial-13-kafka_cli.cc b/tutorial/tutorial-13-kafka_cli.cc index 053d45bc..83a5bf99 100644 --- a/tutorial/tutorial-13-kafka_cli.cc +++ b/tutorial/tutorial-13-kafka_cli.cc @@ -48,6 +48,7 @@ void kafka_callback(WFKafkaTask *task) fprintf(stderr, "error msg: %s\n", WFGlobal::get_error_string(state, error)); fprintf(stderr, "Failed. Press Ctrl-C to exit.\n"); + client.deinit(); wait_group.done(); return; } @@ -141,20 +142,6 @@ void kafka_callback(WFKafkaTask *task) break; - case Kafka_Metadata: - { - protocol::KafkaMetaList *meta_list = client.get_meta_list(); - KafkaMeta *meta; - - while ((meta = meta_list->get_next()) != NULL) - { - printf("meta\ttopic: %s, partition_num: %d\n", - meta->get_topic(), meta->get_partition_elements()); - } - - break; - } - case Kafka_OffsetCommit: task->get_result()->fetch_toppars(toppars); @@ -225,6 +212,7 @@ int main(int argc, char *argv[]) KafkaRecord record; config.set_compress_type(compress_type); + config.set_client_id("workflow"); task->set_config(std::move(config)); for (size_t i = 0; i < sizeof (buf); ++i) @@ -240,14 +228,6 @@ int main(int argc, char *argv[]) record.add_header_pair("hk2", 3, "hv2", 3); task->add_produce_record("workflow_test2", -1, std::move(record)); } - else if (argv[2][0] == 'm') - { - client.init(url); - task = client.create_kafka_task(url, 3, kafka_callback); - task->add_topic("workflow_test1"); - task->add_topic("workflow_test2"); - task->set_api_type(Kafka_Metadata); - } else if (argv[2][0] == 'c') { if (argc > 3 && argv[3][0] == 'd') @@ -271,10 +251,11 @@ int main(int argc, char *argv[]) client.init(url, "workflow_group"); task = client.create_kafka_task("topic=workflow_test1&topic=workflow_test2&api=fetch", 3, kafka_callback); - KafkaConfig config; - config.set_client_id("workflow"); - task->set_config(std::move(config)); } + + KafkaConfig config; + config.set_client_id("workflow"); + task->set_config(std::move(config)); } else {