kafka: refactor logic

This commit is contained in:
wzl12356
2022-04-14 15:22:47 +08:00
parent ccfd06d6a3
commit 35f2c2a4b3
11 changed files with 397 additions and 296 deletions

View File

@@ -40,7 +40,6 @@ using KafkaComplexTask = WFComplexClientTask<KafkaRequest, KafkaResponse,
enum MetaStatus
{
META_EMPTY = -1,
META_UNINIT,
META_DOING,
META_INITED,
@@ -279,6 +278,8 @@ protected:
virtual bool add_produce_record(const std::string& topic, int partition,
KafkaRecord record);
virtual bool add_offset_toppar(const KafkaToppar& toppar);
virtual void dispatch();
virtual void parse_query();
@@ -313,12 +314,16 @@ private:
void kafka_process_toppar_offset(KafkaToppar *task_toppar);
int arrange_toppar(int api_type);
int arrange_produce();
int arrange_fetch();
int arrange_commit();
int arrange_offset();
inline KafkaBroker *get_broker(int node_id)
{
return this->client_broker_map.find_item(node_id);
@@ -378,7 +383,7 @@ int ComplexKafkaTask::get_node_id(const KafkaToppar *toppar)
void ComplexKafkaTask::kafka_offsetcommit_callback(__WFKafkaTask *task)
{
ComplexKafkaTask *t = (ComplexKafkaTask *)task->user_data;
if (task->get_state() == 0)
if (task->get_state() == WFT_STATE_SUCCESS)
t->result.set_resp(std::move(*task->get_resp()), 0);
t->finish = true;
@@ -581,6 +586,8 @@ void ComplexKafkaTask::kafka_meta_callback(__WFKafkaTask *task)
{
ComplexKafkaTask *t = (ComplexKafkaTask *)task->user_data;
t->lock_status.get_mutex()->lock();
t->state = task->get_state();
t->error = task->get_error();
if (task->get_state() == WFT_STATE_SUCCESS)
{
kafka_merge_meta_list(&t->client_meta_list,
@@ -601,8 +608,6 @@ void ComplexKafkaTask::kafka_meta_callback(__WFKafkaTask *task)
while ((meta = t->meta_list.get_next()) != NULL)
(*t->client->member->meta_map)[meta->get_topic()] = META_UNINIT;
t->state = WFT_STATE_TASK_ERROR;
t->error = WFT_ERR_KAFKA_META_FAILED;
t->finish = true;
}
@@ -616,8 +621,21 @@ void ComplexKafkaTask::kafka_cgroup_callback(__WFKafkaTask *task)
{
ComplexKafkaTask *t = (ComplexKafkaTask *)task->user_data;
t->lock_status.get_mutex()->lock();
t->state = task->get_state();
t->error = task->get_error();
if (task->get_state() == 0)
{
kafka_merge_meta_list(&t->client_meta_list,
task->get_resp()->get_meta_list());
t->meta_list.rewind();
KafkaMeta *meta;
while ((meta = t->meta_list.get_next()) != NULL)
(*t->client->member->meta_map)[meta->get_topic()] = META_INITED;
kafka_merge_broker_list(&t->client_broker_map,
task->get_resp()->get_broker_list());
*t->lock_status.get_status() |= KAFKA_CGROUP_DONE;
*t->lock_status.get_status() &= (~(KAFKA_CGROUP_INIT|KAFKA_CGROUP_DOING));
@@ -652,17 +670,11 @@ void ComplexKafkaTask::kafka_cgroup_callback(__WFKafkaTask *task)
*t->lock_status.get_status() |= KAFKA_HEARTBEAT_DOING;
*t->lock_status.get_status() &= ~KAFKA_HEARTBEAT_INIT;
}
t->state = WFT_STATE_SUCCESS;
t->error = 0;
}
else
{
*t->lock_status.get_status() |= KAFKA_CGROUP_INIT;
*t->lock_status.get_status() &= (~(KAFKA_CGROUP_DONE|KAFKA_CGROUP_DOING));
t->state = WFT_STATE_TASK_ERROR;
t->error = WFT_ERR_KAFKA_CGROUP_FAILED;
t->finish = true;
}
@@ -719,16 +731,13 @@ void ComplexKafkaTask::kafka_process_toppar_offset(KafkaToppar *task_toppar)
if (strcmp(toppar->get_topic(), task_toppar->get_topic()) == 0 &&
toppar->get_partition() == task_toppar->get_partition())
{
if (task_toppar->get_error() == KAFKA_NONE &&
!task_toppar->reach_high_watermark())
if (task_toppar->get_error() == KAFKA_NONE)
toppar->set_offset(task_toppar->get_offset() + 1);
else
toppar->set_offset(task_toppar->get_offset());
toppar->set_low_watermark(task_toppar->get_low_watermark());
toppar->set_low_watermark(task_toppar->get_high_watermark());
break;
toppar->set_high_watermark(task_toppar->get_high_watermark());
}
}
}
@@ -766,7 +775,17 @@ void ComplexKafkaTask::generate_info()
this->userinfo += this->config.get_sasl_password();
this->userinfo += ":";
this->userinfo += this->config.get_sasl_mech();
this->userinfo += ":";
this->userinfo += std::to_string((intptr_t)this->client);
this->userinfo += "@";
this->url = "kafka://" + this->userinfo +
this->url.substr(this->url.find("kafka://") + 8);
}
else
{
char buf[64];
snprintf(buf, sizeof(buf), "user:pass:sasl:%p@", this->client);
this->userinfo = buf;
this->url = "kafka://" + this->userinfo +
this->url.substr(this->url.find("kafka://") + 8);
}
@@ -793,6 +812,8 @@ void ComplexKafkaTask::parse_query()
this->api_type = Kafka_Metadata;
else if (strcasecmp(v.c_str(), "leavegroup") == 0)
this->api_type = Kafka_LeaveGroup;
else if (strcasecmp(v.c_str(), "listoffsets") == 0)
this->api_type = Kafka_ListOffsets;
}
}
else if (strcasecmp(kv.first.c_str(), "topic") == 0)
@@ -807,9 +828,13 @@ enum MetaStatus ComplexKafkaTask::get_meta_status()
{
this->meta_list.rewind();
KafkaMeta *meta;
enum MetaStatus ret = META_EMPTY;
enum MetaStatus ret = META_INITED;
std::set<std::string> unique;
while ((meta = this->meta_list.get_next()) != NULL)
{
if (!unique.insert(meta->get_topic()).second)
continue;
switch((*this->client->member->meta_map)[meta->get_topic()])
{
case META_DOING:
@@ -818,16 +843,12 @@ enum MetaStatus ComplexKafkaTask::get_meta_status()
case META_INITED:
this->meta_list.del_cur();
delete meta;
ret = META_INITED;
break;
case META_UNINIT:
ret = META_UNINIT;
(*this->client->member->meta_map)[meta->get_topic()] = META_DOING;
break;
default:
break;
}
}
@@ -861,6 +882,36 @@ void ComplexKafkaTask::dispatch()
this->lock_status.get_mutex()->lock();
if (*this->lock_status.get_status() & KAFKA_CGROUP_DOING)
{
char name[64];
snprintf(name, 64, "%p.cgroup", this->client);
counter = WFTaskFactory::create_counter_task(name, 1, nullptr);
series_of(this)->push_front(this);
series_of(this)->push_front(counter);
this->lock_status.get_mutex()->unlock();
this->subtask_done();
return;
}
else if ((this->api_type == Kafka_Fetch || this->api_type == Kafka_OffsetCommit) &&
(*this->lock_status.get_status() & KAFKA_CGROUP_INIT))
{
task = __WFKafkaTaskFactory::create_kafka_task(this->url,
this->retry_max,
kafka_cgroup_callback);
task->user_data = this;
task->get_req()->set_config(this->config);
task->get_req()->set_api_type(Kafka_FindCoordinator);
task->get_req()->set_cgroup(this->cgroup);
task->get_req()->set_meta_list(this->meta_list);
series_of(this)->push_front(this);
series_of(this)->push_front(task);
*this->lock_status.get_status() |= KAFKA_CGROUP_DOING;
this->lock_status.get_mutex()->unlock();
this->subtask_done();
return;
}
char name[64];
switch(this->get_meta_status())
{
@@ -889,74 +940,13 @@ void ComplexKafkaTask::dispatch()
case META_INITED:
break;
case META_EMPTY:
if (this->api_type != Kafka_OffsetCommit &&
this->api_type != Kafka_LeaveGroup)
{
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_KAFKA_META_FAILED;
this->finish = true;
this->lock_status.get_mutex()->unlock();
this->subtask_done();
return;
}
}
if (*this->lock_status.get_status() & KAFKA_CGROUP_DOING)
if (arrange_toppar(this->api_type) < 0)
{
char name[64];
snprintf(name, 64, "%p.cgroup", this->client);
counter = WFTaskFactory::create_counter_task(name, 1, nullptr);
series_of(this)->push_front(this);
series_of(this)->push_front(counter);
this->lock_status.get_mutex()->unlock();
this->subtask_done();
return;
}
else if ((this->api_type == Kafka_Fetch || this->api_type == Kafka_OffsetCommit) &&
(*this->lock_status.get_status() & KAFKA_CGROUP_INIT))
{
KafkaBroker *broker = this->client_broker_map.get_first_entry();
if (!broker)
{
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_KAFKA_CGROUP_FAILED;
this->finish = true;
this->lock_status.get_mutex()->unlock();
this->subtask_done();
return;
}
if (broker->is_to_addr())
{
const struct sockaddr *addr;
socklen_t socklen;
broker->get_broker_addr(&addr, &socklen);
task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen,
this->get_userinfo(),
this->retry_max,
kafka_cgroup_callback);
}
else
{
task = __WFKafkaTaskFactory::create_kafka_task(broker->get_host(),
broker->get_port(),
this->get_userinfo(),
this->retry_max,
kafka_cgroup_callback);
}
task->user_data = this;
task->get_req()->set_config(this->config);
task->get_req()->set_api_type(Kafka_FindCoordinator);
task->get_req()->set_broker(*broker);
task->get_req()->set_cgroup(this->cgroup);
task->get_req()->set_meta_list(this->client_meta_list);
series_of(this)->push_front(this);
series_of(this)->push_front(task);
*this->lock_status.get_status() |= KAFKA_CGROUP_DOING;
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_KAFKA_ARRANGE_FAILED;
this->finish = true;
this->lock_status.get_mutex()->unlock();
this->subtask_done();
return;
@@ -966,7 +956,7 @@ void ComplexKafkaTask::dispatch()
switch(this->api_type)
{
case Kafka_Produce:
if (arrange_produce() < 0 || this->toppar_list_map.size() == 0)
if (this->toppar_list_map.size() == 0)
{
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_KAFKA_PRODUCE_FAILED;
@@ -1016,8 +1006,7 @@ void ComplexKafkaTask::dispatch()
break;
case Kafka_Fetch:
if (arrange_fetch() < 0 ||
this->toppar_list_map.size() == 0)
if (this->toppar_list_map.size() == 0)
{
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_KAFKA_FETCH_FAILED;
@@ -1073,7 +1062,7 @@ void ComplexKafkaTask::dispatch()
break;
case Kafka_OffsetCommit:
if (!this->cgroup.get_group() || arrange_commit() < 0)
if (!this->cgroup.get_group())
{
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_KAFKA_COMMIT_FAILED;
@@ -1138,6 +1127,56 @@ void ComplexKafkaTask::dispatch()
}
break;
case Kafka_ListOffsets:
if (this->toppar_list_map.size() == 0)
{
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_KAFKA_LIST_OFFSETS_FAILED;
this->finish = true;
break;
}
parallel = Workflow::create_parallel_work(kafka_parallel_callback);
this->result.create(this->toppar_list_map.size());
parallel->set_context(this);
for (auto &v : this->toppar_list_map)
{
auto cb = std::bind(&ComplexKafkaTask::kafka_move_task_callback, this,
std::placeholders::_1);
KafkaBroker *broker = get_broker(v.first);
if (broker->is_to_addr())
{
const struct sockaddr *addr;
socklen_t socklen;
broker->get_broker_addr(&addr, &socklen);
task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen,
this->get_userinfo(),
this->retry_max,
nullptr);
}
else
{
task = __WFKafkaTaskFactory::create_kafka_task(broker->get_host(),
broker->get_port(),
this->get_userinfo(),
this->retry_max,
nullptr);
}
task->get_req()->set_config(this->config);
task->get_req()->set_toppar_list(v.second);
task->get_req()->set_broker(*broker);
task->get_req()->set_api_type(Kafka_ListOffsets);
task->user_data = (void *)parallel->size();
KafkaComplexTask *ctask = static_cast<KafkaComplexTask *>(task);
*ctask->get_mutable_ctx() = cb;
series = Workflow::create_series_work(task, nullptr);
parallel->add_series(series);
}
series_of(this)->push_front(this);
series_of(this)->push_front(parallel);
break;
default:
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_KAFKA_API_UNKNOWN;
@@ -1352,6 +1391,77 @@ static bool check_replace_toppar(KafkaTopparList *toppar_list, KafkaToppar *topp
return false;
}
int ComplexKafkaTask::arrange_toppar(int api_type)
{
switch(api_type)
{
case Kafka_Produce:
return this->arrange_produce();
case Kafka_Fetch:
return this->arrange_fetch();
case Kafka_ListOffsets:
return this->arrange_offset();
case Kafka_OffsetCommit:
return this->arrange_commit();
default:
return 0;
}
}
bool ComplexKafkaTask::add_offset_toppar(const protocol::KafkaToppar& toppar)
{
if (!add_topic(toppar.get_topic()))
return false;
KafkaToppar *exist;
bool found = false;
while ((exist = this->toppar_list.get_next()) != NULL)
{
if (strcmp(exist->get_topic(), toppar.get_topic()) == 0 &&
exist->get_partition() == toppar.get_partition())
{
found = true;
break;
}
}
if (!found)
{
KafkaToppar toppar_t;
toppar_t.set_topic_partition(toppar.get_topic(), toppar.get_partition());
this->toppar_list.add_item(std::move(toppar_t));
}
return true;
}
int ComplexKafkaTask::arrange_offset()
{
this->toppar_list.rewind();
KafkaToppar *toppar;
while ((toppar = this->toppar_list.get_next()) != NULL)
{
int node_id = get_node_id(toppar);
if (node_id < 0)
return -1;
if (this->toppar_list_map.find(node_id) == this->toppar_list_map.end())
this->toppar_list_map[node_id] = (KafkaTopparList());
KafkaToppar new_toppar;
if (!new_toppar.set_topic_partition(toppar->get_topic(), toppar->get_partition()))
return -1;
this->toppar_list_map[node_id].add_item(std::move(new_toppar));
}
return 0;
}
int ComplexKafkaTask::arrange_commit()
{
this->toppar_list.rewind();
@@ -1381,23 +1491,18 @@ int ComplexKafkaTask::arrange_fetch()
{
int node_id = get_node_id(toppar);
if (node_id < 0)
{
this->lock_status.get_mutex()->unlock();
return -1;
}
if (this->toppar_list_map.find(node_id) == this->toppar_list_map.end())
this->toppar_list_map[node_id] = (KafkaTopparList());
KafkaToppar new_toppar;
if (!new_toppar.set_topic_partition(toppar->get_topic(), toppar->get_partition()))
{
this->lock_status.get_mutex()->unlock();
return -1;
}
new_toppar.set_offset(toppar->get_offset());
new_toppar.set_low_watermark(toppar->get_low_watermark());
new_toppar.set_high_watermark(toppar->get_high_watermark());
this->toppar_list_map[node_id].add_item(std::move(new_toppar));
}
}
@@ -1412,20 +1517,14 @@ int ComplexKafkaTask::arrange_fetch()
{
int node_id = get_node_id(toppar);
if (node_id < 0)
{
this->lock_status.get_mutex()->unlock();
return -1;
}
if (this->toppar_list_map.find(node_id) == this->toppar_list_map.end())
this->toppar_list_map[node_id] = KafkaTopparList();
KafkaToppar new_toppar;
if (!new_toppar.set_topic_partition(toppar->get_topic(), toppar->get_partition()))
{
this->lock_status.get_mutex()->unlock();
return -1;
}
new_toppar.set_offset(toppar->get_offset());
new_toppar.set_low_watermark(toppar->get_low_watermark());

View File

@@ -47,11 +47,14 @@ public:
virtual bool add_produce_record(const std::string& topic, int partition,
protocol::KafkaRecord record) = 0;
virtual bool add_offset_toppar(const protocol::KafkaToppar& toppar) = 0;
void add_commit_record(const protocol::KafkaRecord& record)
{
protocol::KafkaToppar toppar;
toppar.set_topic_partition(record.get_topic(), record.get_partition());
toppar.set_offset(record.get_offset());
toppar.set_error(KAFKA_NONE);
this->toppar_list.add_item(std::move(toppar));
}
@@ -60,6 +63,7 @@ public:
protocol::KafkaToppar toppar_t;
toppar_t.set_topic_partition(toppar.get_topic(), toppar.get_partition());
toppar_t.set_offset(toppar.get_offset());
toppar_t.set_error(KAFKA_NONE);
this->toppar_list.add_item(std::move(toppar_t));
}

View File

@@ -38,10 +38,8 @@ public:
__ComplexKafkaTask(int retry_max, __kafka_callback_t&& callback) :
WFComplexClientTask(retry_max, std::move(callback))
{
update_metadata_ = false;
is_user_request_ = true;
is_redirect_ = false;
need_retry_ = false;
}
protected:
@@ -107,16 +105,16 @@ private:
bool process_produce();
bool process_fetch();
bool process_metadata();
bool process_list_offsets();
bool process_find_coordinator();
bool process_join_group();
bool process_sync_group();
bool process_sasl_authenticate();
bool process_sasl_handshake();
bool update_metadata_;
bool is_user_request_;
bool is_redirect_;
bool need_retry_;
std::string user_info_;
};
CommMessageOut *__ComplexKafkaTask::message_out()
@@ -194,7 +192,8 @@ CommMessageOut *__ComplexKafkaTask::message_out()
(KafkaConnectionInfo *)this->get_connection()->get_context();
this->get_req()->set_api(&conn_info->api);
if (this->get_req()->get_api_type() == Kafka_Fetch)
if (this->get_req()->get_api_type() == Kafka_Fetch ||
this->get_req()->get_api_type() == Kafka_ListOffsets)
{
KafkaRequest *req = this->get_req();
req->get_toppar_list()->rewind();
@@ -294,7 +293,7 @@ bool __ComplexKafkaTask::init_success()
}
}
std::string username, password, sasl;
std::string username, password, sasl, client;
if (uri_.userinfo)
{
const char *pos = strchr(uri_.userinfo, ':');
@@ -307,28 +306,36 @@ bool __ComplexKafkaTask::init_success()
{
password = std::string(pos + 1, pos1 - pos - 1);
StringUtil::url_decode(password);
sasl = std::string(pos1 + 1);
const char *pos2 = strchr(pos1 + 1, ':');
if (pos2)
{
sasl = std::string(pos1 + 1, pos2 - pos1 - 1);
client = std::string(pos1 + 1);
}
}
}
if (username.empty() || password.empty() || sasl.empty())
if (username.empty() || password.empty() || sasl.empty() || client.empty())
{
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_URI_SCHEME_INVALID;
return false;
}
user_info_ = uri_.userinfo;
size_t info_len = username.size() + password.size() + sasl.size() +
client.size() + 50;
char *info = new char[info_len];
snprintf(info, info_len, "%s|user:%s|pass:%s|sasl:%s|client:%s|", "kafka",
username.c_str(), password.c_str(), sasl.c_str(), client.c_str());
this->WFComplexClientTask::set_info(info);
delete []info;
}
size_t info_len = username.size() + password.size() + sasl.size() + 50;
char *info = new char[info_len];
snprintf(info, info_len, "%s|user:%s|pass:%s|sasl:%s|", "kafka",
username.c_str(), password.c_str(), sasl.c_str());
this->WFComplexClientTask::set_info(info);
this->WFComplexClientTask::set_transport_type(type);
delete []info;
return true;
}
@@ -379,11 +386,13 @@ bool __ComplexKafkaTask::check_redirect()
socklen_t addrlen_coord;
coordinator->get_broker_addr(&addr_coord, &addrlen_coord);
set_redirect(TT_TCP, addr_coord, addrlen_coord, "");
set_redirect(TT_TCP, addr_coord, addrlen_coord,
this->WFComplexClientTask::info_);
}
else
{
std::string url = "kafka://";
url += user_info_ + "@";
url += coordinator->get_host();
url += ":" + std::to_string(coordinator->get_port());
@@ -396,7 +405,7 @@ bool __ComplexKafkaTask::check_redirect()
}
else
{
this->init(TT_TCP, paddr, addrlen, "");
this->init(TT_TCP, paddr, addrlen, this->WFComplexClientTask::info_);
return false;
}
}
@@ -430,31 +439,27 @@ bool __ComplexKafkaTask::process_join_group()
msg->get_cgroup()->get_coordinator()->set_to_addr(1);
}
if (msg->get_cgroup()->get_error() == KAFKA_MISSING_TOPIC)
{
this->get_req()->set_api_type(Kafka_Metadata);
this->get_req()->set_alien();
update_metadata_ = true;
}
else if (msg->get_cgroup()->get_error() == KAFKA_MEMBER_ID_REQUIRED)
switch(msg->get_cgroup()->get_error())
{
case KAFKA_MEMBER_ID_REQUIRED:
this->get_req()->set_api_type(Kafka_JoinGroup);
}
else if (msg->get_cgroup()->get_error() == KAFKA_UNKNOWN_MEMBER_ID)
{
break;
case KAFKA_UNKNOWN_MEMBER_ID:
msg->get_cgroup()->set_member_id("");
this->get_req()->set_api_type(Kafka_JoinGroup);
}
else if (msg->get_cgroup()->get_error())
{
break;
case KAFKA_NONE:
this->get_req()->set_api_type(Kafka_Metadata);
break;
default:
this->error = msg->get_cgroup()->get_error();
this->state = WFT_STATE_TASK_ERROR;
return false;
}
else
{
this->get_req()->set_api_type(Kafka_SyncGroup);
}
return true;
}
@@ -476,36 +481,43 @@ bool __ComplexKafkaTask::process_sync_group()
bool __ComplexKafkaTask::process_metadata()
{
KafkaResponse *msg = this->get_resp();
if (update_metadata_)
msg->get_meta_list()->rewind();
KafkaMeta *meta;
while ((meta = msg->get_meta_list()->get_next()) != NULL)
{
KafkaCgroup *cgroup = msg->get_cgroup();
if (cgroup->run_assignor(msg->get_meta_list(),
msg->get_alien_meta_list(),
cgroup->get_protocol_name()) < 0)
switch (meta->get_error())
{
this->error = errno;
case KAFKA_LEADER_NOT_AVAILABLE:
this->get_req()->set_api_type(Kafka_Metadata);
return true;
case KAFKA_NONE:
break;
default:
this->error = meta->get_error();
this->state = WFT_STATE_TASK_ERROR;
return false;
}
else
{
this->get_req()->set_api_type(Kafka_SyncGroup);
}
return true;
}
else
if (msg->get_cgroup()->get_group())
{
msg->get_meta_list()->rewind();
KafkaMeta *meta;
while ((meta = msg->get_meta_list()->get_next()) != NULL)
if (msg->get_cgroup()->is_leader())
{
if (meta->get_error() == KAFKA_LEADER_NOT_AVAILABLE)
KafkaCgroup *cgroup = msg->get_cgroup();
if (cgroup->run_assignor(msg->get_meta_list(),
cgroup->get_protocol_name()) < 0)
{
this->get_req()->set_api_type(Kafka_Metadata);
return true;
this->error = WFT_ERR_KAFKA_CGROUP_ASSIGN_FAILED;
this->state = WFT_STATE_TASK_ERROR;
return false;
}
}
return false;
this->get_req()->set_api_type(Kafka_SyncGroup);
return true;
}
return false;
}
bool __ComplexKafkaTask::process_fetch()
@@ -520,13 +532,48 @@ bool __ComplexKafkaTask::process_fetch()
{
toppar->set_offset(KAFKA_OFFSET_OVERFLOW);
toppar->set_low_watermark(KAFKA_OFFSET_UNINIT);
need_retry_ = true;
toppar->set_high_watermark(KAFKA_OFFSET_UNINIT);
ret = true;
}
switch (toppar->get_error())
{
case KAFKA_UNKNOWN_TOPIC_OR_PARTITION:
case KAFKA_LEADER_NOT_AVAILABLE:
case KAFKA_NOT_LEADER_FOR_PARTITION:
case KAFKA_BROKER_NOT_AVAILABLE:
case KAFKA_REPLICA_NOT_AVAILABLE:
case KAFKA_KAFKA_STORAGE_ERROR:
case KAFKA_FENCED_LEADER_EPOCH:
this->get_req()->set_api_type(Kafka_Metadata);
return true;
case KAFKA_NONE:
case KAFKA_OFFSET_OUT_OF_RANGE:
break;
default:
this->error = toppar->get_error();
this->state = WFT_STATE_TASK_ERROR;
return false;
}
}
return ret;
}
bool __ComplexKafkaTask::process_list_offsets()
{
KafkaToppar *toppar;
this->get_resp()->get_toppar_list()->rewind();
while ((toppar = this->get_resp()->get_toppar_list()->get_next()) != NULL)
{
if (toppar->get_error())
{
this->error = toppar->get_error();
this->state = WFT_STATE_TASK_ERROR;
}
}
return false;
}
bool __ComplexKafkaTask::process_produce()
{
KafkaToppar *toppar;
@@ -550,6 +597,12 @@ bool __ComplexKafkaTask::process_produce()
case KAFKA_FENCED_LEADER_EPOCH:
this->get_req()->set_api_type(Kafka_Metadata);
return true;
case KAFKA_NONE:
break;
default:
this->error = toppar->get_error();
this->state = WFT_STATE_TASK_ERROR;
return false;
}
}
return false;
@@ -575,6 +628,7 @@ bool __ComplexKafkaTask::process_sasl_authenticate()
}
return false;
}
bool __ComplexKafkaTask::has_next()
{
struct sockaddr_storage addr;
@@ -607,73 +661,75 @@ bool __ComplexKafkaTask::has_next()
return this->process_sasl_handshake();
case Kafka_SaslAuthenticate:
return this->process_sasl_authenticate();
case Kafka_ListOffsets:
return this->process_list_offsets();
case Kafka_OffsetCommit:
case Kafka_OffsetFetch:
case Kafka_ListOffsets:
case Kafka_Heartbeat:
case Kafka_LeaveGroup:
case Kafka_DescribeGroups:
case Kafka_Heartbeat:
this->error = this->get_resp()->get_cgroup()->get_error();
if (this->error)
this->state = WFT_STATE_TASK_ERROR;
break;
case Kafka_ApiVersions:
return false;
this->error = errno;
if (this->error)
this->state = WFT_STATE_TASK_ERROR;
break;
default:
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_KAFKA_API_UNKNOWN;
return false;
break;
}
return false;
}
bool __ComplexKafkaTask::finish_once()
{
bool finish = true;
if (this->state == WFT_STATE_SUCCESS)
finish = !has_next();
if (!is_user_request_)
{
delete this->get_message_out();
this->get_resp()->clear_buf();
}
if (is_redirect_ && this->state == WFT_STATE_UNDEFINED)
{
this->get_req()->clear_buf();
is_redirect_ = false;
return true;
}
if (this->state == WFT_STATE_SUCCESS)
{
if (has_next())
{
if (!is_user_request_)
{
delete this->get_message_out();
this->get_resp()->clear_buf();
return false;
}
this->get_req()->clear_buf();
if (is_redirect_)
{
is_redirect_ = false;
return true;
}
if (need_retry_)
{
is_user_request_ = false;
need_retry_ = false;
}
this->clear_resp();
return false;
}
if (!is_user_request_)
{
is_user_request_ = true;
delete this->get_message_out();
this->get_resp()->clear_buf();
return false;
}
if (*get_mutable_ctx())
(*get_mutable_ctx())(this);
if (!finish)
{
this->get_req()->clear_buf();
this->get_resp()->clear_buf();
return false;
}
}
else
{
this->disable_retry();
this->get_resp()->set_api_type(this->get_req()->get_api_type());
this->get_resp()->set_api_version(this->get_req()->get_api_version());
this->get_resp()->duplicate(*this->get_req());
if (*get_mutable_ctx())
(*get_mutable_ctx())(this);
}
if (*get_mutable_ctx())
(*get_mutable_ctx())(this);
return true;
}

View File

@@ -60,9 +60,12 @@ enum
WFT_ERR_KAFKA_COMMIT_FAILED = 5005,
WFT_ERR_KAFKA_META_FAILED = 5006,
WFT_ERR_KAFKA_LEAVEGROUP_FAILED = 5007,
WFT_ERR_KAFKA_API_UNKNOWN = 5008, ///< api type not supported
WFT_ERR_KAFKA_VERSION_DISALLOWED = 5009, ///< broker version not supported
WFT_ERR_KAFKA_SASL_DISALLOWED = 5010, ///< sasl not supported
WFT_ERR_KAFKA_API_UNKNOWN = 5008, ///< api type not supported
WFT_ERR_KAFKA_VERSION_DISALLOWED = 5009, ///< broker version not supported
WFT_ERR_KAFKA_SASL_DISALLOWED = 5010, ///< sasl not supported
WFT_ERR_KAFKA_ARRANGE_FAILED = 5011, ///< arrange toppar failed
WFT_ERR_KAFKA_LIST_OFFSETS_FAILED = 5012,
WFT_ERR_KAFKA_CGROUP_ASSIGN_FAILED = 5013,
//CONSUL
WFT_ERR_CONSUL_API_UNKNOWN = 6001, ///< api type not supported

View File

@@ -245,12 +245,10 @@ void KafkaCgroup::add_subscriber(KafkaMetaList *meta_list,
}
int KafkaCgroup::run_assignor(KafkaMetaList *meta_list,
KafkaMetaList *alien_meta_list,
const char *protocol_name)
{
std::vector<KafkaMetaSubscriber> subscribers;
this->add_subscriber(meta_list, &subscribers);
this->add_subscriber(alien_meta_list, &subscribers);
struct list_head *pos;
kafka_group_protocol_t *protocol;

View File

@@ -809,8 +809,6 @@ public:
long long get_low_watermark() const { return this->ptr->low_watermark; }
void set_low_watermark(long long offset) { this->ptr->low_watermark = offset; }
bool reach_high_watermark() const { return this->ptr->offset == this->ptr->high_watermark; }
public:
KafkaToppar()
{
@@ -1411,8 +1409,7 @@ public:
return this->coordinator;
}
int run_assignor(KafkaMetaList *meta_list, KafkaMetaList *alien_meta_list,
const char *protocol_name);
int run_assignor(KafkaMetaList *meta_list, const char *protocol_name);
void add_subscriber(KafkaMetaList *meta_list,
std::vector<KafkaMetaSubscriber> *subscribers);

View File

@@ -1618,7 +1618,6 @@ KafkaMessage::KafkaMessage()
this->stream = new EncodeStream;
this->api_type = Kafka_Unknown;
this->cur_size = 0;
this->alien = false;
}
KafkaMessage::~KafkaMessage()
@@ -2086,6 +2085,7 @@ KafkaRequest::KafkaRequest()
this->api_mver_map[Kafka_ApiVersions] = 0;
this->api_mver_map[Kafka_SaslHandshake] = 1;
this->api_mver_map[Kafka_SaslAuthenticate] = 0;
this->api_mver_map[Kafka_DescribeGroups] = 0;
}
int KafkaRequest::encode_produce(struct iovec vectors[], int max)
@@ -2436,17 +2436,11 @@ int KafkaRequest::encode_metadata(struct iovec vectors[], int max)
else
append_i32(this->msgbuf, 0);
KafkaMetaList *p_meta_list;
if (this->alien)
p_meta_list = &this->alien_meta_list;
else
p_meta_list = &this->meta_list;
p_meta_list->rewind();
this->meta_list.rewind();
KafkaMeta *meta;
int topic_cnt = 0;
while ((meta = p_meta_list->get_next()) != NULL)
while ((meta = this->meta_list.get_next()) != NULL)
{
append_string(this->msgbuf, meta->get_topic());
++topic_cnt;
@@ -2587,10 +2581,9 @@ int KafkaRequest::encode_syncgroup(struct iovec vectors[], int max)
if (this->api_version >= 3)
append_nullable_string(this->msgbuf, "", 0);
append_i32(this->msgbuf, this->cgroup.get_member_elements());
if (this->cgroup.is_leader())
{
append_i32(this->msgbuf, this->cgroup.get_member_elements());
for (int i = 0; i < this->cgroup.get_member_elements(); ++i)
{
kafka_member_t *member = this->cgroup.get_members()[i];
@@ -2599,7 +2592,10 @@ int KafkaRequest::encode_syncgroup(struct iovec vectors[], int max)
}
}
else
{
append_i32(this->msgbuf, 0);
append_bytes(this->msgbuf, "");
}
this->cur_size = this->msgbuf.size();
@@ -3064,14 +3060,8 @@ int KafkaResponse::parse_metadata(void **buf, size_t *size)
if (this->api_version >= 1)
CHECK_RET(parse_i32(buf, size, &controller_id));
KafkaMetaList *p_meta_list;
if (this->alien)
p_meta_list = &this->alien_meta_list;
else
p_meta_list = &this->meta_list;
CHECK_RET(kafka_meta_parse_topic(buf, size, this->api_version,
p_meta_list, &this->broker_list));
&this->meta_list, &this->broker_list));
return 0;
}
@@ -3242,9 +3232,6 @@ int KafkaResponse::parse_fetch(void **buf, size_t *size)
CHECK_RET(parse_i16(buf, size, &ptr->error));
CHECK_RET(parse_i64(buf, size, &high_watermark));
if (ptr->error == KAFKA_NONE)
ptr->high_watermark = high_watermark;
if (this->api_version >= 4)
{
CHECK_RET(parse_i64(buf, size, (int64_t *)&ptr->last_stable_offset));
@@ -3380,29 +3367,37 @@ int KafkaResponse::parse_findcoordinator(void **buf, size_t *size)
return 0;
}
static bool kafka_meta_find_topic(const std::string& topic_name,
KafkaMetaList *meta_list)
static bool kafka_meta_find_or_add_topic(const std::string& topic_name,
KafkaMetaList *meta_list)
{
meta_list->rewind();
bool ret = false;
bool find = false;
KafkaMeta *meta;
while ((meta = meta_list->get_next()) != NULL)
{
if (topic_name == meta->get_topic())
{
ret = true;
find = true;
break;
}
}
return ret;
if (!find)
{
KafkaMeta tmp;
if (!tmp.set_topic(topic_name))
return false;
meta_list->add_item(tmp);
}
return true;
}
static int kafka_cgroup_parse_member(void **buf, size_t *size,
KafkaCgroup *cgroup,
KafkaMetaList *meta_list,
KafkaMetaList *alien_meta_list,
int api_version)
{
int member_cnt = 0;
@@ -3418,7 +3413,6 @@ static int kafka_cgroup_parse_member(void **buf, size_t *size,
return -1;
kafka_member_t **member = cgroup->get_members();
std::set<std::string> alien_topic;
int i;
for (i = 0; i < member_cnt; ++i)
@@ -3463,8 +3457,8 @@ static int kafka_cgroup_parse_member(void **buf, size_t *size,
list_add_tail(toppar->get_list(), &member[i]->toppar_list);
if (!kafka_meta_find_topic(topic_name, meta_list))
alien_topic.insert(topic_name);
if (!kafka_meta_find_or_add_topic(topic_name, meta_list))
return -1;
}
if (j != topic_cnt)
@@ -3474,21 +3468,6 @@ static int kafka_cgroup_parse_member(void **buf, size_t *size,
if (i != member_cnt)
return -1;
for (const auto& v : alien_topic)
{
KafkaMeta *meta = new KafkaMeta;
if (!meta->set_topic(v))
{
delete meta;
return -1;
}
alien_meta_list->add_item(std::move(*meta));
}
if (!alien_topic.empty())
cgroup->set_error(KAFKA_MISSING_TOPIC);
return 0;
}
@@ -3508,16 +3487,8 @@ int KafkaResponse::parse_joingroup(void **buf, size_t *size)
CHECK_RET(parse_string(buf, size, &cgroup->member_id));
CHECK_RET(kafka_cgroup_parse_member(buf, size, &this->cgroup,
&this->meta_list,
&this->alien_meta_list,
this->api_version));
if (cgroup->error != KAFKA_MISSING_TOPIC && this->cgroup.is_leader())
{
CHECK_RET(this->cgroup.run_assignor(&this->meta_list,
&this->alien_meta_list,
cgroup->protocol_name));
}
return 0;
}
@@ -3583,9 +3554,12 @@ int KafkaResponse::parse_syncgroup(void **buf, size_t *size)
this->cgroup.set_error(error);
CHECK_RET(parse_bytes(buf, size, member_assignment));
CHECK_RET(kafka_parse_member_assignment(member_assignment.c_str(),
member_assignment.size(),
&this->cgroup));
if (!member_assignment.empty())
{
CHECK_RET(kafka_parse_member_assignment(member_assignment.c_str(),
member_assignment.size(),
&this->cgroup));
}
return 0;
}
@@ -3674,17 +3648,7 @@ int KafkaResponse::parse_offsetcommit(void **buf, size_t *size)
for (int i = 0 ; i < partition_cnt; ++i)
{
CHECK_RET(parse_i32(buf, size, &partition));
KafkaToppar *toppar = find_toppar_by_name(topic_name, partition,
this->cgroup.get_assigned_toppar_list());
if (!toppar)
return -1;
kafka_topic_partition_t *ptr = toppar->get_raw_ptr();
CHECK_RET(parse_i16(buf, size, &ptr->error));
if (ptr->error)
this->cgroup.set_error(ptr->error);
CHECK_RET(parse_i16(buf, size, &this->cgroup.get_raw_ptr()->error));
}
}
@@ -3719,7 +3683,6 @@ int KafkaResponse::parse_apiversions(void **buf, size_t *size)
CHECK_RET(parse_i16(buf, size, &error));
CHECK_RET(parse_i32(buf, size, &api_cnt));
if (api_cnt < 0)
{
errno = EBADMSG;

View File

@@ -93,16 +93,6 @@ public:
return &this->meta_list;
}
KafkaMetaList *get_alien_meta_list()
{
return &this->alien_meta_list;
}
void set_alien()
{
this->alien = true;
}
void set_toppar_list(const KafkaTopparList& toppar_list)
{
this->toppar_list = toppar_list;
@@ -137,12 +127,10 @@ public:
this->cgroup = msg.cgroup;
this->broker = msg.broker;
this->meta_list = msg.meta_list;
this->alien_meta_list = msg.alien_meta_list;
this->broker_list = msg.broker_list;
this->toppar_list = msg.toppar_list;
this->sasl = msg.sasl;
this->api = msg.api;
this->alien = msg.alien;
}
void clear_buf()
@@ -201,7 +189,6 @@ protected:
KafkaCgroup cgroup;
KafkaBroker broker;
KafkaMetaList meta_list;
KafkaMetaList alien_meta_list;
KafkaBrokerList broker_list;
KafkaTopparList toppar_list;
KafkaBuffer serialized;
@@ -218,8 +205,6 @@ protected:
kafka_sasl_t *sasl;
kafka_api_t *api;
bool alien;
};
class KafkaRequest : public KafkaMessage

View File

@@ -78,9 +78,6 @@ void KafkaResult::fetch_toppars(std::vector<KafkaToppar *>& toppars)
KafkaToppar *toppar = NULL;
for (size_t i = 0; i < this->resp_num; ++i)
{
if (this->resp_vec[i].get_api_type() != Kafka_OffsetCommit)
continue;
this->resp_vec[i].get_toppar_list()->rewind();
while ((toppar = this->resp_vec[i].get_toppar_list()->get_next()) != NULL)

View File

@@ -328,7 +328,7 @@ void kafka_config_init(kafka_config_t *conf)
conf->fetch_timeout = 100;
conf->fetch_min_bytes = 1;
conf->fetch_max_bytes = 50 * 1024 * 1024;
conf->fetch_msg_max_bytes = 1024 * 1024;
conf->fetch_msg_max_bytes = 10 * 1024 * 1024;
conf->offset_timestamp = KAFKA_TIMESTAMP_LATEST;
conf->commit_timestamp = 0;
conf->session_timeout = 10*1000;
@@ -364,7 +364,7 @@ void kafka_config_deinit(kafka_config_t *conf)
void kafka_partition_init(kafka_partition_t *partition)
{
partition->error = KAFKA_NONE;
partition->error = KAFKA_UNKNOWN_SERVER_ERROR;
partition->partition_index = -1;
kafka_broker_init(&partition->leader);
partition->replica_nodes = NULL;
@@ -401,7 +401,7 @@ void kafka_broker_init(kafka_broker_t *broker)
broker->to_addr = 0;
memset(&broker->addr, 0, sizeof(broker->addr));
broker->addrlen = 0;
broker->error = 0;
broker->error = KAFKA_UNKNOWN_SERVER_ERROR;
broker->status = KAFKA_BROKER_UNINIT;
}
@@ -413,7 +413,7 @@ void kafka_broker_deinit(kafka_broker_t *broker)
void kafka_meta_init(kafka_meta_t *meta)
{
meta->error = KAFKA_NONE;
meta->error = KAFKA_UNKNOWN_SERVER_ERROR;
meta->topic_name = NULL;
meta->error_message = NULL;
meta->is_internal = 0;
@@ -438,7 +438,7 @@ void kafka_meta_deinit(kafka_meta_t *meta)
void kafka_topic_partition_init(kafka_topic_partition_t *toppar)
{
toppar->error = KAFKA_NONE;
toppar->error = KAFKA_UNKNOWN_SERVER_ERROR;
toppar->topic_name = NULL;
toppar->partition = -1;
toppar->preferred_read_replica = -1;
@@ -533,7 +533,7 @@ void kafka_member_deinit(kafka_member_t *member)
void kafka_cgroup_init(kafka_cgroup_t *cgroup)
{
INIT_LIST_HEAD(&cgroup->assigned_toppar_list);
cgroup->error = KAFKA_NONE;
cgroup->error = KAFKA_UNKNOWN_SERVER_ERROR;
cgroup->error_msg = NULL;
kafka_broker_init(&cgroup->coordinator);
cgroup->leader_id = NULL;

View File

@@ -110,7 +110,6 @@ enum
KAFKA_PREFERRED_LEADER_NOT_AVAILABLE = 80,
KAFKA_GROUP_MAX_SIZE_REACHED = 81,
KAFKA_FENCED_INSTANCE_ID = 82,
KAFKA_MISSING_TOPIC = 256 + 1,
};
enum