From 609d00ebec84e6c9c4e68317bc1d0c7c212f8e17 Mon Sep 17 00:00:00 2001 From: Xie Han <63350856@qq.com> Date: Mon, 25 Sep 2023 19:43:28 +0800 Subject: [PATCH] Update kafka tutorial. --- tutorial/tutorial-13-kafka_cli.cc | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/tutorial/tutorial-13-kafka_cli.cc b/tutorial/tutorial-13-kafka_cli.cc index befbe840..0ba446d7 100644 --- a/tutorial/tutorial-13-kafka_cli.cc +++ b/tutorial/tutorial-13-kafka_cli.cc @@ -190,8 +190,11 @@ int main(int argc, char *argv[]) signal(SIGINT, sig_handler); url = argv[1]; - if (strncmp(argv[1], "kafka://", 8) != 0) + if (strncmp(argv[1], "kafka://", 8) != 0 && + strncmp(argv[1], "kafkas://", 9) != 0) + { url = "kafka://" + url; + } char buf[512 * 1024]; WFKafkaTask *task; @@ -206,7 +209,12 @@ int main(int argc, char *argv[]) if (compress_type > Kafka_Zstd) exit(1); - client.init(url); + if (client.init(url) < 0) + { + perror("client.init"); + exit(1); + } + task = client.create_kafka_task("api=produce", 3, kafka_callback); KafkaConfig config; KafkaRecord record; @@ -232,7 +240,12 @@ int main(int argc, char *argv[]) { if (argc > 3 && argv[3][0] == 'd') { - client.init(url); + if (client.init(url) < 0) + { + perror("client.init"); + exit(1); + } + task = client.create_kafka_task("api=fetch", 3, kafka_callback); KafkaToppar toppar; @@ -248,7 +261,12 @@ int main(int argc, char *argv[]) } else { - client.init(url, "workflow_group"); + if (client.init(url, "workflow_group") < 0) + { + perror("client.init"); + exit(1); + } + task = client.create_kafka_task("topic=workflow_test1&topic=workflow_test2&api=fetch", 3, kafka_callback); }