Optimize Kafka client to make tasks fail fast when servers down.

This commit is contained in:
Xie Han
2023-06-27 22:11:26 +08:00
parent 8a29f9e902
commit 86ff9fbc1b

View File

@@ -14,14 +14,20 @@
limitations under the License.
Authors: Wang Zhulei (wangzhulei@sogou-inc.com)
Xie Han (xiehan@sogou-inc.com)
Liu Kai (liukaidx@sogou-inc.com)
*/
#ifndef _WIN32
#include <unistd.h>
#endif
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <utility>
#include <vector>
#include <set>
#include <map>
#include <atomic>
#include <mutex>
#include "WFTaskError.h"
#include "WFKafkaClient.h"
#define KAFKA_HEARTBEAT_INTERVAL (3 * 1000 * 1000)
@@ -42,13 +48,6 @@ using namespace protocol;
using ComplexKafkaTask = WFComplexClientTask<KafkaRequest, KafkaResponse,
struct __ComplexKafkaTaskCtx>;
enum MetaStatus
{
META_UNINIT,
META_DOING,
META_INITED,
};
class KafkaMember
{
public:
@@ -56,9 +55,10 @@ public:
{
cgroup_status = KAFKA_CGROUP_NONE;
heartbeat_status = KAFKA_HEARTBEAT_UNINIT;
heartbeat_series = NULL;
meta_doing = false;
cgroup_outdated = false;
client_deinit = false;
heartbeat_series = NULL;
}
void incref()
@@ -77,13 +77,16 @@ public:
KafkaMetaList meta_list;
KafkaBrokerMap broker_map;
KafkaConfig config;
std::map<std::string, enum MetaStatus> meta_map;
std::map<std::string, bool> meta_status;
std::mutex mutex;
int cgroup_status;
int heartbeat_status;
void *heartbeat_series;
char cgroup_status;
char heartbeat_status;
bool meta_doing;
bool cgroup_outdated;
bool client_deinit;
void *heartbeat_series;
size_t cgroup_wait_cnt;
size_t meta_wait_cnt;
std::atomic<int> ref;
};
@@ -111,6 +114,7 @@ public:
this->member->mutex.unlock();
this->info_generated = false;
this->msg = NULL;
}
virtual ~KafkaClientTask()
@@ -165,6 +169,12 @@ private:
void kafka_process_toppar_offset(KafkaToppar *task_toppar);
bool compare_topics(KafkaClientTask *task);
bool check_cgroup();
bool check_meta();
int arrange_toppar(int api_type);
int arrange_produce();
@@ -175,6 +185,8 @@ private:
int arrange_offset();
int dispatch_locked();
inline KafkaBroker *get_broker(int node_id)
{
return this->member->broker_map.find_item(node_id);
@@ -182,8 +194,8 @@ private:
int get_node_id(const KafkaToppar *toppar);
enum MetaStatus get_meta_status(KafkaMetaList **uninit_meta_list);
void set_meta_status(enum MetaStatus status);
bool get_meta_status(KafkaMetaList **uninit_meta_list);
void set_meta_status(bool status);
std::string get_userinfo() { return this->userinfo; }
@@ -196,6 +208,8 @@ private:
std::set<std::string> topic_set;
std::string userinfo;
bool info_generated;
bool wait_cgroup;
void *msg;
friend class WFKafkaClient;
};
@@ -253,6 +267,7 @@ void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task)
{
KafkaMember *member = (KafkaMember *)task->user_data;
SeriesWork *series = series_of(task);
size_t max;
member->mutex.lock();
if (member->client_deinit)
@@ -291,11 +306,12 @@ void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task)
member->heartbeat_series = series;
}
member->mutex.unlock();
max = member->cgroup_wait_cnt;
char name[64];
snprintf(name, 64, "%p.cgroup", member);
WFTaskFactory::count_by_name(name, (unsigned int)-1);
member->mutex.unlock();
WFTaskFactory::signal_by_name(name, NULL, max);
}
else
kafka_rebalance_proc(member, series);
@@ -441,6 +457,9 @@ void KafkaClientTask::kafka_merge_broker_list(std::vector<std::string> *hosts,
void KafkaClientTask::kafka_meta_callback(__WFKafkaTask *task)
{
KafkaClientTask *t = (KafkaClientTask *)task->user_data;
void *msg = NULL;
size_t max;
t->member->mutex.lock();
t->state = task->get_state();
t->error = task->get_error();
@@ -453,7 +472,7 @@ void KafkaClientTask::kafka_meta_callback(__WFKafkaTask *task)
t->meta_list.rewind();
KafkaMeta *meta;
while ((meta = t->meta_list.get_next()) != NULL)
(t->member->meta_map)[meta->get_topic()] = META_INITED;
(t->member->meta_status)[meta->get_topic()] = true;
kafka_merge_broker_list(&t->member->broker_hosts,
&t->member->broker_map,
@@ -464,21 +483,27 @@ void KafkaClientTask::kafka_meta_callback(__WFKafkaTask *task)
t->meta_list.rewind();
KafkaMeta *meta;
while ((meta = t->meta_list.get_next()) != NULL)
(t->member->meta_map)[meta->get_topic()] = META_UNINIT;
(t->member->meta_status)[meta->get_topic()] = false;
t->finish = true;
msg = t;
}
t->member->meta_doing = false;
max = t->member->meta_wait_cnt;
char name[64];
snprintf(name, 64, "%p.meta", t->member);
t->member->mutex.unlock();
WFTaskFactory::count_by_name(name, (unsigned int)-1);
WFTaskFactory::signal_by_name(name, msg, max);
}
void KafkaClientTask::kafka_cgroup_callback(__WFKafkaTask *task)
{
KafkaClientTask *t = (KafkaClientTask *)task->user_data;
SeriesWork *heartbeat_series = NULL;
void *msg = NULL;
size_t max;
t->member->mutex.lock();
t->state = task->get_state();
@@ -495,7 +520,7 @@ void KafkaClientTask::kafka_cgroup_callback(__WFKafkaTask *task)
t->meta_list.rewind();
KafkaMeta *meta;
while ((meta = t->meta_list.get_next()) != NULL)
(t->member->meta_map)[meta->get_topic()] = META_INITED;
(t->member->meta_status)[meta->get_topic()] = true;
kafka_merge_broker_list(&t->member->broker_hosts,
&t->member->broker_map,
@@ -534,13 +559,15 @@ void KafkaClientTask::kafka_cgroup_callback(__WFKafkaTask *task)
t->member->heartbeat_status = KAFKA_HEARTBEAT_UNINIT;
t->member->heartbeat_series = NULL;
t->finish = true;
msg = t;
}
max = t->member->cgroup_wait_cnt;
char name[64];
snprintf(name, 64, "%p.cgroup", t->member);
t->member->mutex.unlock();
WFTaskFactory::count_by_name(name, (unsigned int)-1);
WFTaskFactory::signal_by_name(name, msg, max);
if (heartbeat_series)
heartbeat_series->start();
@@ -567,7 +594,7 @@ void KafkaClientTask::kafka_parallel_callback(const ParallelWork *pwork)
{
flag = true;
t->member->mutex.lock();
t->set_meta_status(META_UNINIT);
t->set_meta_status(false);
t->member->mutex.unlock();
}
state = state_error->first >> 16;
@@ -702,68 +729,66 @@ void KafkaClientTask::parse_query()
}
}
enum MetaStatus KafkaClientTask::get_meta_status(KafkaMetaList **uninit_meta_list)
bool KafkaClientTask::get_meta_status(KafkaMetaList **uninit_meta_list)
{
this->meta_list.rewind();
KafkaMeta *meta;
enum MetaStatus ret = META_INITED;
std::set<std::string> unique;
bool status = true;
while ((meta = this->meta_list.get_next()) != NULL)
{
if (!unique.insert(meta->get_topic()).second)
continue;
switch(this->member->meta_map[meta->get_topic()])
if (!this->member->meta_status[meta->get_topic()])
{
case META_UNINIT:
this->member->meta_map[meta->get_topic()] = META_DOING;
if (ret != META_UNINIT)
if (status)
{
*uninit_meta_list = new KafkaMetaList;
status = false;
}
(*uninit_meta_list)->add_item(*meta);
ret = META_UNINIT;
break;
case META_DOING:
if (ret == META_INITED)
ret = META_DOING;
break;
case META_INITED:
break;
}
}
return ret;
return status;
}
void KafkaClientTask::set_meta_status(enum MetaStatus status)
void KafkaClientTask::set_meta_status(bool status)
{
this->member->meta_list.rewind();
KafkaMeta *meta;
while ((meta = this->member->meta_list.get_next()) != NULL)
this->member->meta_map[meta->get_topic()] = status;
this->member->meta_status[meta->get_topic()] = false;
}
void KafkaClientTask::dispatch()
bool KafkaClientTask::compare_topics(KafkaClientTask *task)
{
__WFKafkaTask *task;
WFCounterTask *counter;
ParallelWork *parallel;
protocol::KafkaMetaList *meta_list1 = &this->meta_list;
protocol::KafkaMetaList *meta_list2 = &task->meta_list;
KafkaMeta *meta1, *meta2;
if (this->finish)
meta_list1->rewind();
meta_list2->rewind();
while (1)
{
this->subtask_done();
return;
meta1 = meta_list1->get_next();
meta2 = meta_list2->get_next();
if (!meta1 && !meta2)
return true;
if (!meta1 || !meta2)
return false;
if (strcmp(meta1->get_topic(), meta2->get_topic()))
return false;
}
}
if (!this->query.empty())
this->parse_query();
this->generate_info();
this->member->mutex.lock();
bool KafkaClientTask::check_cgroup()
{
if (this->member->cgroup_outdated &&
this->member->cgroup_status != KAFKA_CGROUP_DOING)
{
@@ -775,18 +800,21 @@ void KafkaClientTask::dispatch()
if (this->member->cgroup_status == KAFKA_CGROUP_DOING)
{
WFConditional *cond;
char name[64];
snprintf(name, 64, "%p.cgroup", this->member);
counter = WFTaskFactory::create_counter_task(name, 1, nullptr);
series_of(this)->push_front(this);
series_of(this)->push_front(counter);
this->member->mutex.unlock();
this->subtask_done();
return;
this->wait_cgroup = true;
cond = WFTaskFactory::create_conditional(name, this, &this->msg);
series_of(this)->push_front(cond);
this->member->cgroup_wait_cnt++;
return false;
}
else if ((this->api_type == Kafka_Fetch || this->api_type == Kafka_OffsetCommit) &&
(this->member->cgroup_status == KAFKA_CGROUP_UNINIT))
if ((this->api_type == Kafka_Fetch || this->api_type == Kafka_OffsetCommit) &&
(this->member->cgroup_status == KAFKA_CGROUP_UNINIT))
{
__WFKafkaTask *task;
task = __WFKafkaTaskFactory::create_kafka_task(this->url,
this->retry_max,
kafka_cgroup_callback);
@@ -798,17 +826,34 @@ void KafkaClientTask::dispatch()
series_of(this)->push_front(this);
series_of(this)->push_front(task);
this->member->cgroup_status = KAFKA_CGROUP_DOING;
this->member->mutex.unlock();
this->subtask_done();
return;
this->member->cgroup_wait_cnt = 0;
return false;
}
KafkaMetaList *uninit_meta_list;
char name[64];
return true;
}
switch(this->get_meta_status(&uninit_meta_list))
bool KafkaClientTask::check_meta()
{
KafkaMetaList *uninit_meta_list;
if (this->get_meta_status(&uninit_meta_list))
return true;
if (this->member->meta_doing)
{
case META_UNINIT:
WFConditional *cond;
char name[64];
snprintf(name, 64, "%p.meta", this->member);
this->wait_cgroup = false;
cond = WFTaskFactory::create_conditional(name, this, &this->msg);
series_of(this)->push_front(cond);
this->member->meta_wait_cnt++;
}
else
{
__WFKafkaTask *task;
task = __WFKafkaTaskFactory::create_kafka_task(this->url,
this->retry_max,
kafka_meta_callback);
@@ -816,45 +861,42 @@ void KafkaClientTask::dispatch()
task->get_req()->set_config(this->config);
task->get_req()->set_api_type(Kafka_Metadata);
task->get_req()->set_meta_list(*uninit_meta_list);
delete uninit_meta_list;
series_of(this)->push_front(this);
series_of(this)->push_front(task);
this->member->mutex.unlock();
this->subtask_done();
return;
case META_DOING:
snprintf(name, 64, "%p.meta", this->member);
counter = WFTaskFactory::create_counter_task(name, 1, nullptr);
series_of(this)->push_front(this);
series_of(this)->push_front(counter);
this->member->mutex.unlock();
this->subtask_done();
return;
case META_INITED:
break;
this->member->meta_wait_cnt = 0;
this->member->meta_doing = true;
}
delete uninit_meta_list;
return false;
}
int KafkaClientTask::dispatch_locked()
{
__WFKafkaTask *task;
ParallelWork *parallel;
SeriesWork *series;
if (this->check_cgroup() == false)
return this->member->cgroup_wait_cnt > 0;
if (this->check_meta() == false)
return this->member->meta_wait_cnt > 0;
if (arrange_toppar(this->api_type) < 0)
{
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_KAFKA_ARRANGE_FAILED;
this->finish = true;
this->member->mutex.unlock();
this->subtask_done();
return;
return 0;
}
if (this->member->cgroup_outdated)
{
series_of(this)->push_front(this);
this->member->mutex.unlock();
this->subtask_done();
return;
return 0;
}
SeriesWork *series;
switch(this->api_type)
{
case Kafka_Produce:
@@ -1088,8 +1130,47 @@ void KafkaClientTask::dispatch()
break;
}
return 0;
}
void KafkaClientTask::dispatch()
{
if (this->finish)
{
this->subtask_done();
return;
}
if (this->msg)
{
KafkaClientTask *task = static_cast<KafkaClientTask *>(this->msg);
if (this->wait_cgroup || this->compare_topics(task) == true)
{
this->state = task->get_state();
this->error = task->get_error();
this->kafka_error = get_kafka_error();
this->finish = true;
this->subtask_done();
return;
}
this->msg = NULL;
}
if (!this->query.empty())
this->parse_query();
this->generate_info();
int flag;
this->member->mutex.lock();
flag = this->dispatch_locked();
if (flag)
this->subtask_done();
this->member->mutex.unlock();
this->subtask_done();
if (!flag)
this->subtask_done();
}
bool KafkaClientTask::add_topic(const std::string& topic)
@@ -1111,7 +1192,7 @@ bool KafkaClientTask::add_topic(const std::string& topic)
if (!flag)
{
this->member->meta_map[topic] = META_UNINIT;
this->member->meta_status[topic] = false;
KafkaMeta tmp;
if (!tmp.set_topic(topic))