modify kafka tutorial and doc

This commit is contained in:
wangzhulei
2020-12-06 20:54:25 +08:00
parent 2636513569
commit 00e0d353ab
2 changed files with 143 additions and 60 deletions

View File

@@ -48,6 +48,7 @@ void kafka_callback(WFKafkaTask *task)
fprintf(stderr, "error msg: %s\n",
WFGlobal::get_error_string(state, error));
fprintf(stderr, "Failed. Press Ctrl-C to exit.\n");
client.deinit();
wait_group.done();
return;
}
@@ -141,20 +142,6 @@ void kafka_callback(WFKafkaTask *task)
break;
case Kafka_Metadata:
{
protocol::KafkaMetaList *meta_list = client.get_meta_list();
KafkaMeta *meta;
while ((meta = meta_list->get_next()) != NULL)
{
printf("meta\ttopic: %s, partition_num: %d\n",
meta->get_topic(), meta->get_partition_elements());
}
break;
}
case Kafka_OffsetCommit:
task->get_result()->fetch_toppars(toppars);
@@ -225,6 +212,7 @@ int main(int argc, char *argv[])
KafkaRecord record;
config.set_compress_type(compress_type);
config.set_client_id("workflow");
task->set_config(std::move(config));
for (size_t i = 0; i < sizeof (buf); ++i)
@@ -240,14 +228,6 @@ int main(int argc, char *argv[])
record.add_header_pair("hk2", 3, "hv2", 3);
task->add_produce_record("workflow_test2", -1, std::move(record));
}
else if (argv[2][0] == 'm')
{
client.init(url);
task = client.create_kafka_task(url, 3, kafka_callback);
task->add_topic("workflow_test1");
task->add_topic("workflow_test2");
task->set_api_type(Kafka_Metadata);
}
else if (argv[2][0] == 'c')
{
if (argc > 3 && argv[3][0] == 'd')
@@ -271,10 +251,11 @@ int main(int argc, char *argv[])
client.init(url, "workflow_group");
task = client.create_kafka_task("topic=workflow_test1&topic=workflow_test2&api=fetch",
3, kafka_callback);
KafkaConfig config;
config.set_client_id("workflow");
task->set_config(std::move(config));
}
KafkaConfig config;
config.set_client_id("workflow");
task->set_config(std::move(config));
}
else
{