Update kafka tutorial.

This commit is contained in:
Xie Han
2023-09-25 19:43:28 +08:00
parent 49d3743932
commit 609d00ebec

View File

@@ -190,8 +190,11 @@ int main(int argc, char *argv[])
signal(SIGINT, sig_handler);
url = argv[1];
if (strncmp(argv[1], "kafka://", 8) != 0)
if (strncmp(argv[1], "kafka://", 8) != 0 &&
strncmp(argv[1], "kafkas://", 9) != 0)
{
url = "kafka://" + url;
}
char buf[512 * 1024];
WFKafkaTask *task;
@@ -206,7 +209,12 @@ int main(int argc, char *argv[])
if (compress_type > Kafka_Zstd)
exit(1);
client.init(url);
if (client.init(url) < 0)
{
perror("client.init");
exit(1);
}
task = client.create_kafka_task("api=produce", 3, kafka_callback);
KafkaConfig config;
KafkaRecord record;
@@ -232,7 +240,12 @@ int main(int argc, char *argv[])
{
if (argc > 3 && argv[3][0] == 'd')
{
client.init(url);
if (client.init(url) < 0)
{
perror("client.init");
exit(1);
}
task = client.create_kafka_task("api=fetch", 3, kafka_callback);
KafkaToppar toppar;
@@ -248,7 +261,12 @@ int main(int argc, char *argv[])
}
else
{
client.init(url, "workflow_group");
if (client.init(url, "workflow_group") < 0)
{
perror("client.init");
exit(1);
}
task = client.create_kafka_task("topic=workflow_test1&topic=workflow_test2&api=fetch",
3, kafka_callback);
}