update sasl for scram

This commit is contained in:
wzlsuccess
2021-06-22 11:12:48 +08:00
parent 3ea97df047
commit 9bb1174554
8 changed files with 184 additions and 428 deletions

View File

@@ -124,8 +124,7 @@ public:
this->meta_list = new KafkaMetaList;
this->broker_list = new KafkaBrokerList;
this->lock_status = new KafkaLockStatus;
this->broker_map_by_id = new KafkaBrokerMap;
this->broker_map_by_uri = new KafkaBrokerMap;
this->broker_map = new KafkaBrokerMap;
this->meta_map = new std::map<std::string, MetaStatus>;
}
@@ -138,8 +137,7 @@ public:
delete this->cgroup;
delete this->meta_list;
delete this->broker_list;
delete this->broker_map_by_id;
delete this->broker_map_by_uri;
delete this->broker_map;
delete this->lock_status;
delete this->meta_map;
}
@@ -149,8 +147,7 @@ public:
KafkaCgroup *cgroup;
KafkaMetaList *meta_list;
KafkaBrokerList *broker_list;
KafkaBrokerMap *broker_map_by_id;
KafkaBrokerMap *broker_map_by_uri;
KafkaBrokerMap *broker_map;
KafkaLockStatus *lock_status;
std::map<std::string, MetaStatus> *meta_map;
@@ -256,8 +253,7 @@ public:
this->cgroup = *client->member->cgroup;
this->client_meta_list = *client->member->meta_list;
this->client_broker_list = *client->member->broker_list;
this->client_broker_map_by_id = *client->member->broker_map_by_id;
this->client_broker_map_by_uri = *client->member->broker_map_by_uri;
this->client_broker_map = *client->member->broker_map;
this->query = query;
if (!client->member->broker_hosts->empty())
@@ -288,16 +284,13 @@ protected:
virtual void generate_info();
private:
static void kafka_meta_callback(__WFKafkaTask *task);
static void kafka_merge_meta_list(KafkaMetaList* dst,
KafkaMetaList* src);
static void kafka_meta_callback(__WFKafkaTask *task);
static void kafka_process_broker_api(ComplexKafkaTask *t, __WFKafkaTask *task);
void kafka_broker_api_callback(__WFKafkaTask *task);
static void kafka_broker_callback(const ParallelWork *pwork);
static void kafka_merge_broker_list(KafkaBrokerMap *dst,
KafkaBrokerList *src);
static void kafka_cgroup_callback(__WFKafkaTask *task);
@@ -327,13 +320,7 @@ private:
inline KafkaBroker *get_broker(int node_id)
{
return this->client_broker_map_by_id.find_item(node_id);
}
inline KafkaBroker *find_broker(const KafkaBrokerMap& map,
const KafkaBroker& broker)
{
return map.find_item(broker);
return this->client_broker_map.find_item(node_id);
}
int get_node_id(const KafkaToppar *toppar);
@@ -347,8 +334,7 @@ private:
KafkaLockStatus lock_status;
KafkaMetaList client_meta_list;
KafkaBrokerList client_broker_list;
KafkaBrokerMap client_broker_map_by_id;
KafkaBrokerMap client_broker_map_by_uri;
KafkaBrokerMap client_broker_map;
KafkaCgroup cgroup;
std::map<int, KafkaTopparList> toppar_list_map;
std::string url;
@@ -419,7 +405,6 @@ void ComplexKafkaTask::kafka_rebalance_callback(__WFKafkaTask *task)
{
*t->get_lock_status()->get_status() |= KAFKA_CGROUP_DONE;
*t->get_lock_status()->get_status() &= (~(KAFKA_CGROUP_INIT|KAFKA_CGROUP_DOING));
t->get_cgroup()->set_coordinator(task->get_resp()->get_broker());
if (*t->get_lock_status()->get_status() & KAFKA_HEARTBEAT_INIT)
{
@@ -436,7 +421,7 @@ void ComplexKafkaTask::kafka_rebalance_callback(__WFKafkaTask *task)
t->get_userinfo(),
kafka_heartbeat_callback);
kafka_task->user_data = t;
kafka_task->get_req()->set_api(Kafka_Heartbeat);
kafka_task->get_req()->set_api_type(Kafka_Heartbeat);
kafka_task->get_req()->set_cgroup(*t->get_cgroup());
kafka_task->get_req()->set_broker(*coordinator);
kafka_task->start();
@@ -468,7 +453,7 @@ void ComplexKafkaTask::kafka_rebalance_proc(KafkaHeartbeat *t)
kafka_rebalance_callback);
task->user_data = t;
task->get_req()->set_config(*t->get_config());
task->get_req()->set_api(Kafka_FindCoordinator);
task->get_req()->set_api_type(Kafka_FindCoordinator);
task->get_req()->set_cgroup(*t->get_cgroup());
task->get_req()->set_meta_list(*t->get_meta_list());
@@ -542,7 +527,7 @@ void ComplexKafkaTask::kafka_timer_callback(WFTimerTask *task)
kafka_task->user_data = t;
kafka_task->get_req()->set_config(*t->get_config());
kafka_task->get_req()->set_api(Kafka_Heartbeat);
kafka_task->get_req()->set_api_type(Kafka_Heartbeat);
kafka_task->get_req()->set_cgroup(*t->get_cgroup());
kafka_task->get_req()->set_broker(*coordinator);
kafka_task->start();
@@ -574,131 +559,15 @@ void ComplexKafkaTask::kafka_merge_meta_list(KafkaMetaList *dst,
}
}
void ComplexKafkaTask::kafka_process_broker_api(ComplexKafkaTask *t, __WFKafkaTask *task)
void ComplexKafkaTask::kafka_merge_broker_list(KafkaBrokerMap *dst,
KafkaBrokerList *src)
{
if (t->config.get_broker_version())
src->rewind();
KafkaBroker *src_broker;
while ((src_broker = src->get_next()) != NULL)
{
KafkaBroker *broker, *exist;
task->get_resp()->get_broker_list()->rewind();
while ((broker = task->get_resp()->get_broker_list()->get_next()) != NULL)
{
exist = t->get_broker(broker->get_node_id());
if (exist && exist->get_raw_ptr()->status != KAFKA_BROKER_UNINIT)
continue;
kafka_api_version_t *api;
size_t api_cnt;
const char *brk_ver = t->config.get_broker_version();
int ret = kafka_api_version_is_queryable(brk_ver, &api, &api_cnt);
if (ret == 0)
{
if (!broker->allocate_api_version(api_cnt))
{
t->state = WFT_STATE_TASK_ERROR;
t->error = errno;
t->lock_status.get_mutex()->unlock();
return;
}
memcpy(broker->get_api(), api,
sizeof(kafka_api_version_t) * api_cnt);
broker->get_raw_ptr()->status = KAFKA_BROKER_INITED;
t->client_broker_map_by_id.add_item(*broker, broker->get_node_id());
}
else
{
t->state = WFT_STATE_TASK_ERROR;
t->error = WFT_ERR_KAFKA_VERSION_DISALLOWED;
t->lock_status.get_mutex()->unlock();
return;
}
}
t->state = WFT_STATE_SUCCESS;
t->error = 0;
t->meta_list.rewind();
KafkaMeta *meta;
while ((meta = t->meta_list.get_next()) != NULL)
(*t->client->member->meta_map)[meta->get_topic()] = META_INITED;
char name[64];
snprintf(name, 64, "%p.meta", t->client);
t->lock_status.get_mutex()->unlock();
WFTaskFactory::count_by_name(name, (unsigned int)-1);
}
else
{
SeriesWork *series;
ParallelWork *parallel = Workflow::create_parallel_work(kafka_broker_callback);
parallel->set_context(t);
KafkaBroker *broker, *exist;
task->get_resp()->get_broker_list()->rewind();
while ((broker = task->get_resp()->get_broker_list()->get_next()) != NULL)
{
exist = t->find_broker(t->client_broker_map_by_uri, *broker);
if (exist && exist->get_raw_ptr()->status == KAFKA_BROKER_INITED)
{
if (exist->get_node_id() != broker->get_node_id())
{
broker->copy_from(*exist);
broker->get_raw_ptr()->status = KAFKA_BROKER_INITED;
t->client_broker_map_by_id.add_item(*broker,
broker->get_node_id());
}
continue;
}
else if (exist && exist->get_raw_ptr()->status == KAFKA_BROKER_DOING)
{
char name[64];
snprintf(name, 64, "%p.meta", t->client);
auto counter = WFTaskFactory::create_counter_task(name, 1, nullptr);
series = Workflow::create_series_work(counter, nullptr);
parallel->add_series(series);
continue;
}
broker->get_raw_ptr()->status = KAFKA_BROKER_DOING;
t->client_broker_map_by_uri.add_item(*broker);
t->client_broker_map_by_id.add_item(*broker, broker->get_node_id());
auto cb = std::bind(&ComplexKafkaTask::kafka_broker_api_callback, t,
std::placeholders::_1);
__WFKafkaTask *ntask;
if (broker->is_to_addr())
{
const struct sockaddr *addr;
socklen_t socklen;
broker->get_broker_addr(&addr, &socklen);
ntask = __WFKafkaTaskFactory::create_kafka_task(addr, socklen,
t->retry_max,
t->get_userinfo(),
nullptr);
}
else
{
ntask = __WFKafkaTaskFactory::create_kafka_task(broker->get_host(),
broker->get_port(),
t->retry_max,
t->get_userinfo(),
nullptr);
}
ntask->get_req()->set_config(t->config);
ntask->get_req()->set_broker(*broker);
ntask->get_req()->set_api(Kafka_ApiVersions);
ntask->user_data = broker->get_raw_ptr();
KafkaComplexTask *ctask = static_cast<KafkaComplexTask *>(ntask);
*ctask->get_mutable_ctx() = cb;
series = Workflow::create_series_work(ntask, nullptr);
parallel->add_series(series);
}
series_of(task)->push_front(parallel);
t->lock_status.get_mutex()->unlock();
if (!dst->find_item(src_broker->get_node_id()))
dst->add_item(*src_broker);
}
}
@@ -710,7 +579,19 @@ void ComplexKafkaTask::kafka_meta_callback(__WFKafkaTask *task)
{
kafka_merge_meta_list(&t->client_meta_list,
task->get_resp()->get_meta_list());
kafka_process_broker_api(t, task);
t->meta_list.rewind();
KafkaMeta *meta;
while ((meta = t->meta_list.get_next()) != NULL)
(*t->client->member->meta_map)[meta->get_topic()] = META_INITED;
kafka_merge_broker_list(&t->client_broker_map,
task->get_resp()->get_broker_list());
char name[64];
snprintf(name, 64, "%p.meta", t->client);
t->lock_status.get_mutex()->unlock();
WFTaskFactory::count_by_name(name, (unsigned int)-1);
}
else
{
@@ -725,70 +606,6 @@ void ComplexKafkaTask::kafka_meta_callback(__WFKafkaTask *task)
}
}
struct __broker_status
{
kafka_broker_t *broker;
int state;
int error;
};
void ComplexKafkaTask::kafka_broker_api_callback(__WFKafkaTask *task)
{
struct __broker_status *status = new struct __broker_status;
status->broker = (kafka_broker_t *)task->user_data;
status->state = task->get_state();
status->error = task->get_error();
series_of(task)->set_context(status);
}
void ComplexKafkaTask::kafka_broker_callback(const ParallelWork *pwork)
{
ComplexKafkaTask *t = (ComplexKafkaTask *)pwork->get_context();
t->state = WFT_STATE_SUCCESS;
t->error = 0;
t->lock_status.get_mutex()->lock();
struct __broker_status *status;
for (size_t i = 0; i < pwork->size(); i++)
{
status = (struct __broker_status *)pwork->series_at(i)->get_context();
if (!status)
continue;
if (status->state != WFT_STATE_SUCCESS)
{
t->state = status->state;
t->error = status->error;
}
else
status->broker->status = KAFKA_BROKER_INITED;
delete status;
}
if (t->state == WFT_STATE_SUCCESS)
{
t->state = WFT_STATE_SUCCESS;
t->error = 0;
}
else
{
t->state = WFT_STATE_TASK_ERROR;
t->error = WFT_ERR_KAFKA_META_FAILED;
t->finish = true;
}
t->meta_list.rewind();
KafkaMeta *meta;
while ((meta = t->meta_list.get_next()) != NULL)
(*t->client->member->meta_map)[meta->get_topic()] = META_INITED;
char name[64];
snprintf(name, 64, "%p.meta", t->client);
t->lock_status.get_mutex()->unlock();
WFTaskFactory::count_by_name(name, (unsigned int)-1);
}
void ComplexKafkaTask::kafka_cgroup_callback(__WFKafkaTask *task)
{
ComplexKafkaTask *t = (ComplexKafkaTask *)task->user_data;
@@ -797,7 +614,6 @@ void ComplexKafkaTask::kafka_cgroup_callback(__WFKafkaTask *task)
{
*t->lock_status.get_status() |= KAFKA_CGROUP_DONE;
*t->lock_status.get_status() &= (~(KAFKA_CGROUP_INIT|KAFKA_CGROUP_DOING));
t->cgroup.set_coordinator(task->get_resp()->get_broker());
if (*t->lock_status.get_status() & KAFKA_HEARTBEAT_INIT)
{
@@ -823,7 +639,7 @@ void ComplexKafkaTask::kafka_cgroup_callback(__WFKafkaTask *task)
kafka_heartbeat_callback);
kafka_task->user_data = hb;
kafka_task->get_req()->set_config(t->config);
kafka_task->get_req()->set_api(Kafka_Heartbeat);
kafka_task->get_req()->set_api_type(Kafka_Heartbeat);
kafka_task->get_req()->set_cgroup(t->cgroup);
kafka_task->get_req()->set_broker(*coordinator);
kafka_task->start();
@@ -913,7 +729,7 @@ void ComplexKafkaTask::kafka_move_task_callback(__WFKafkaTask *task)
KafkaTopparList *toppar_list = task->get_resp()->get_toppar_list();
if (task->get_resp()->get_api() == Kafka_Fetch)
if (task->get_resp()->get_api_type() == Kafka_Fetch)
{
toppar_list->rewind();
KafkaToppar *task_toppar;
@@ -928,22 +744,13 @@ void ComplexKafkaTask::kafka_move_task_callback(__WFKafkaTask *task)
void ComplexKafkaTask::generate_info()
{
if (this->config.get_sasl_mechanisms())
if (this->config.get_sasl_mech())
{
this->userinfo = this->config.get_sasl_username();
this->userinfo += ":";
this->userinfo += this->config.get_sasl_password();
this->userinfo += ":";
this->userinfo += this->config.get_sasl_mechanisms();
this->meta_list.rewind();
KafkaMeta *meta;
while ((meta = this->meta_list.get_next()) != NULL)
{
this->userinfo += "%";
this->userinfo += meta->get_topic();
}
this->userinfo += this->config.get_sasl_mech();
this->userinfo += "@";
this->url = "kafka://" + this->userinfo +
this->url.substr(this->url.find("kafka://") + 8);
@@ -1035,7 +842,7 @@ void ComplexKafkaTask::dispatch()
kafka_meta_callback);
task->user_data = this;
task->get_req()->set_config(this->config);
task->get_req()->set_api(Kafka_Metadata);
task->get_req()->set_api_type(Kafka_Metadata);
task->get_req()->set_meta_list(this->meta_list);
series_of(this)->push_front(this);
series_of(this)->push_front(task);
@@ -1070,7 +877,7 @@ void ComplexKafkaTask::dispatch()
else if ((this->api_type == Kafka_Fetch || this->api_type == Kafka_OffsetCommit) &&
(*this->lock_status.get_status() & KAFKA_CGROUP_INIT))
{
KafkaBroker *broker = this->client_broker_map_by_id.get_first_entry();
KafkaBroker *broker = this->client_broker_map.get_first_entry();
if (!broker)
{
this->state = WFT_STATE_TASK_ERROR;
@@ -1101,7 +908,7 @@ void ComplexKafkaTask::dispatch()
task->user_data = this;
task->get_req()->set_config(this->config);
task->get_req()->set_api(Kafka_FindCoordinator);
task->get_req()->set_api_type(Kafka_FindCoordinator);
task->get_req()->set_broker(*broker);
task->get_req()->set_cgroup(this->cgroup);
task->get_req()->set_meta_list(this->client_meta_list);
@@ -1155,7 +962,7 @@ void ComplexKafkaTask::dispatch()
task->get_req()->set_config(this->config);
task->get_req()->set_toppar_list(v.second);
task->get_req()->set_broker(*broker);
task->get_req()->set_api(Kafka_Produce);
task->get_req()->set_api_type(Kafka_Produce);
task->user_data = (void *)parallel->size();
KafkaComplexTask *ctask = static_cast<KafkaComplexTask *>(task);
*ctask->get_mutable_ctx() = cb;
@@ -1207,7 +1014,7 @@ void ComplexKafkaTask::dispatch()
task->get_req()->set_config(this->config);
task->get_req()->set_toppar_list(v.second);
task->get_req()->set_broker(*broker);
task->get_req()->set_api(Kafka_Fetch);
task->get_req()->set_api_type(Kafka_Fetch);
task->user_data = (void *)parallel->size();
KafkaComplexTask *ctask = static_cast<KafkaComplexTask *>(task);
*ctask->get_mutable_ctx() = cb;
@@ -1249,7 +1056,7 @@ void ComplexKafkaTask::dispatch()
task->get_req()->set_cgroup(this->cgroup);
task->get_req()->set_broker(*coordinator);
task->get_req()->set_toppar_list(this->toppar_list);
task->get_req()->set_api(this->api_type);
task->get_req()->set_api_type(this->api_type);
series_of(this)->push_front(this);
series_of(this)->push_front(task);
break;
@@ -1280,7 +1087,7 @@ void ComplexKafkaTask::dispatch()
kafka_leavegroup_callback);
task->user_data = this;
task->get_req()->set_config(this->config);
task->get_req()->set_api(Kafka_LeaveGroup);
task->get_req()->set_api_type(Kafka_LeaveGroup);
task->get_req()->set_broker(*coordinator);
task->get_req()->set_cgroup(this->cgroup);
series_of(this)->push_front(this);

View File

@@ -50,16 +50,19 @@ protected:
private:
struct KafkaConnectionInfo
{
kafka_api_t api;
kafka_sasl_t sasl;
std::string mechanisms;
KafkaConnectionInfo()
{
kafka_api_init(&this->api);
kafka_sasl_init(&this->sasl);
}
~KafkaConnectionInfo()
{
kafka_api_deinit(&this->api);
kafka_sasl_deinit(&this->sasl);
}
@@ -107,81 +110,82 @@ private:
CommMessageOut *__ComplexKafkaTask::message_out()
{
long long seqid = this->get_seq();
KafkaBroker *broker = this->get_req()->get_broker();
if (seqid == 0)
{
KafkaConnectionInfo *conn_info = new KafkaConnectionInfo;
auto&& deleter = [] (void *ctx)
{
KafkaConnectionInfo *conn_info = (KafkaConnectionInfo *)ctx;
delete conn_info;
};
this->get_connection()->set_context(conn_info, std::move(deleter));
this->get_req()->set_api(&conn_info->api);
if (!this->get_req()->get_config()->get_broker_version())
{
if (!broker->get_api())
{
KafkaRequest *req = new KafkaRequest;
req->duplicate(*this->get_req());
req->set_api(Kafka_ApiVersions);
if (this->get_req()->get_api() != Kafka_ApiVersions)
is_user_request_ = false;
return req;
}
else
seqid++;
KafkaRequest *req = new KafkaRequest;
req->duplicate(*this->get_req());
req->set_api_type(Kafka_ApiVersions);
is_user_request_ = false;
return req;
}
else
{
kafka_api_version_t *api;
size_t api_cnt;
const char *brk_ver = this->get_req()->get_config()->get_broker_version();
int ret = kafka_api_version_is_queryable(brk_ver, &api, &api_cnt);
const char *v = this->get_req()->get_config()->get_broker_version();
int ret = kafka_api_version_is_queryable(v, &api, &api_cnt);
kafka_api_version_t *p = NULL;
if (ret == 0)
{
broker->allocate_api_version(api_cnt);
memcpy(broker->get_api(), api,
sizeof(kafka_api_version_t) * api_cnt);
p = (kafka_api_version_t *)malloc(api_cnt * sizeof(*p));
if (p)
{
memcpy(p, api, sizeof(kafka_api_version_t) * api_cnt);
conn_info->api.api = p;
conn_info->api.elements = api_cnt;
conn_info->api.features = kafka_get_features(p, api_cnt);
}
}
else
{
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_KAFKA_VERSION_DISALLOWED;
if (!p)
return NULL;
}
seqid++;
}
}
if (seqid == 1 && !this->get_connection()->get_context())
if (seqid == 1)
{
const char *sasl_mechanisms = this->get_req()->get_config()->get_sasl_mechanisms();
if (sasl_mechanisms)
const char *sasl_mech = this->get_req()->get_config()->get_sasl_mech();
KafkaConnectionInfo *conn_info =
(KafkaConnectionInfo *)this->get_connection()->get_context();
if (sasl_mech && conn_info->sasl.status == 0)
{
KafkaConnectionInfo *conn_info = new KafkaConnectionInfo;
if (!conn_info->init(sasl_mechanisms))
if (!conn_info->init(sasl_mech))
return NULL;
auto&& deleter = [] (void *ctx)
{
KafkaConnectionInfo *conn_info = (KafkaConnectionInfo *)ctx;
delete conn_info;
};
this->get_connection()->set_context(conn_info, std::move(deleter));
this->get_req()->set_api(&conn_info->api);
this->get_req()->set_sasl(&conn_info->sasl);
KafkaRequest *req = new KafkaRequest;
req->duplicate(*this->get_req());
if (broker->get_features() & KAFKA_FEATURE_SASL_HANDSHAKE)
req->set_api(Kafka_SaslHandshake);
if (conn_info->api.features & KAFKA_FEATURE_SASL_HANDSHAKE)
req->set_api_type(Kafka_SaslHandshake);
else
req->set_api(Kafka_SaslAuthenticate);
req->set_api_type(Kafka_SaslAuthenticate);
is_user_request_ = false;
return req;
}
}
if (this->get_req()->get_api() == Kafka_Fetch)
KafkaConnectionInfo *conn_info =
(KafkaConnectionInfo *)this->get_connection()->get_context();
this->get_req()->set_api(&conn_info->api);
if (this->get_req()->get_api_type() == Kafka_Fetch)
{
KafkaRequest *req = this->get_req();
req->get_toppar_list()->rewind();
@@ -205,11 +209,11 @@ CommMessageOut *__ComplexKafkaTask::message_out()
if (flag)
{
KafkaRequest *new_req = new KafkaRequest;
new_req->set_api(&conn_info->api);
new_req->set_broker(*req->get_broker());
new_req->set_toppar_list(toppar_list);
new_req->set_config(*req->get_config());
new_req->set_api(Kafka_ListOffsets);
new_req->set_api_type(Kafka_ListOffsets);
is_user_request_ = false;
return new_req;
}
@@ -223,7 +227,7 @@ CommMessageIn *__ComplexKafkaTask::message_in()
KafkaRequest *req = static_cast<KafkaRequest *>(this->get_message_out());
KafkaResponse *resp = this->get_resp();
resp->set_api(req->get_api());
resp->set_api_type(req->get_api_type());
resp->set_api_version(req->get_api_version());
resp->duplicate(*req);
@@ -290,7 +294,7 @@ int __ComplexKafkaTask::first_timeout()
KafkaRequest *client_req = this->get_req();
int ret = 0;
switch(client_req->get_api())
switch(client_req->get_api_type())
{
case Kafka_Fetch:
ret = client_req->get_config()->get_fetch_timeout();
@@ -372,7 +376,7 @@ bool __ComplexKafkaTask::has_next()
msg->get_broker()->set_to_addr(1);
}
switch (msg->get_api())
switch (msg->get_api_type())
{
case Kafka_FindCoordinator:
if (msg->get_cgroup()->get_error())
@@ -384,7 +388,7 @@ bool __ComplexKafkaTask::has_next()
else
{
is_redirect_ = check_redirect();
this->get_req()->set_api(Kafka_JoinGroup);
this->get_req()->set_api_type(Kafka_JoinGroup);
}
break;
@@ -398,17 +402,17 @@ bool __ComplexKafkaTask::has_next()
if (msg->get_cgroup()->get_error() == KAFKA_MISSING_TOPIC)
{
this->get_req()->set_api(Kafka_Metadata);
this->get_req()->set_api_type(Kafka_Metadata);
update_metadata_ = true;
}
else if (msg->get_cgroup()->get_error() == KAFKA_MEMBER_ID_REQUIRED)
{
this->get_req()->set_api(Kafka_JoinGroup);
this->get_req()->set_api_type(Kafka_JoinGroup);
}
else if (msg->get_cgroup()->get_error() == KAFKA_UNKNOWN_MEMBER_ID)
{
msg->get_cgroup()->set_member_id("");
this->get_req()->set_api(Kafka_JoinGroup);
this->get_req()->set_api_type(Kafka_JoinGroup);
}
else if (msg->get_cgroup()->get_error())
{
@@ -417,7 +421,7 @@ bool __ComplexKafkaTask::has_next()
ret = false;
}
else
this->get_req()->set_api(Kafka_SyncGroup);
this->get_req()->set_api_type(Kafka_SyncGroup);
break;
@@ -429,7 +433,7 @@ bool __ComplexKafkaTask::has_next()
ret = false;
}
else
this->get_req()->set_api(Kafka_OffsetFetch);
this->get_req()->set_api_type(Kafka_OffsetFetch);
break;
@@ -444,7 +448,7 @@ bool __ComplexKafkaTask::has_next()
this->state = WFT_STATE_TASK_ERROR;
}
else
this->get_req()->set_api(Kafka_SyncGroup);
this->get_req()->set_api_type(Kafka_SyncGroup);
}
else
{
@@ -457,7 +461,7 @@ bool __ComplexKafkaTask::has_next()
if (meta->get_error() == KAFKA_LEADER_NOT_AVAILABLE)
{
ret = true;
this->get_req()->set_api(Kafka_Metadata);
this->get_req()->set_api_type(Kafka_Metadata);
break;
}
}
@@ -482,7 +486,7 @@ bool __ComplexKafkaTask::has_next()
{
if (!toppar->record_reach_end())
{
this->get_req()->set_api(Kafka_Produce);
this->get_req()->set_api_type(Kafka_Produce);
return true;
}
}
@@ -549,9 +553,9 @@ bool __ComplexKafkaTask::finish_once()
return false;
}
if (this->get_resp()->get_api() == Kafka_Fetch ||
this->get_resp()->get_api() == Kafka_Produce ||
this->get_resp()->get_api() == Kafka_ApiVersions)
if (this->get_resp()->get_api_type() == Kafka_Fetch ||
this->get_resp()->get_api_type() == Kafka_Produce ||
this->get_resp()->get_api_type() == Kafka_ApiVersions)
{
if (*get_mutable_ctx())
(*get_mutable_ctx())(this);
@@ -561,7 +565,7 @@ bool __ComplexKafkaTask::finish_once()
{
this->disable_retry();
this->get_resp()->set_api(this->get_req()->get_api());
this->get_resp()->set_api_type(this->get_req()->get_api_type());
this->get_resp()->set_api_version(this->get_req()->get_api_version());
this->get_resp()->duplicate(*this->get_req());

View File

@@ -495,11 +495,11 @@ public:
this->ptr->offset_store = offset_store;
}
const char *get_sasl_mechanisms() const
const char *get_sasl_mech() const
{
return this->ptr->mechanisms;
}
bool set_sasl_mechanisms(const char *mechanisms)
bool set_sasl_mech(const char *mechanisms)
{
char *p = strdup(mechanisms);
@@ -1036,39 +1036,12 @@ public:
return this->get_uri() > broker.get_uri();
}
bool copy_from(const KafkaBroker& broker)
{
const struct sockaddr *addr;
socklen_t socklen;
broker.get_broker_addr(&addr, &socklen);
memcpy(&this->ptr->addr, addr, socklen);
this->ptr->addrlen = socklen;
this->ptr->to_addr = 1;
size_t size = (broker.get_raw_ptr()->api_elements) *
sizeof(kafka_api_version_t);
void *p = malloc(size);
if (!p)
return false;
memcpy(p, broker.get_raw_ptr()->api, size);
free(this->ptr->api);
this->ptr->api = (kafka_api_version_t *)p;
this->ptr->api_elements = broker.get_raw_ptr()->api_elements;
this->ptr->features = broker.get_raw_ptr()->features;
return true;
}
kafka_broker_t *get_raw_ptr() const { return this->ptr; }
struct list_head *get_list() { return &this->list; }
struct rb_node *get_rb() { return &this->rb; }
void set_feature(unsigned features) { this->ptr->features = features; }
bool is_equal(const struct sockaddr *addr, socklen_t socklen) const
{
if (this->ptr->addrlen == socklen && socklen)
@@ -1121,33 +1094,6 @@ public:
int get_id () const { return this->ptr->node_id; }
bool allocate_api_version(size_t len)
{
void *p = malloc(len * sizeof(kafka_api_version_t));
if (!p)
return false;
free(this->ptr->api);
this->ptr->api = (kafka_api_version_t *)p;
this->ptr->api_elements = len;
return true;
}
kafka_api_version_t *get_api()
{
return this->ptr->api;
}
void set_features(unsigned features)
{
this->ptr->features = features;
}
unsigned get_features()
{
return this->ptr->features;
}
private:
struct list_head list;
struct rb_node rb;
@@ -1384,22 +1330,6 @@ public:
return this->coordinator;
}
bool set_coordinator(KafkaBroker *coord)
{
size_t size = (coord->get_raw_ptr()->api_elements) * sizeof(kafka_api_version_t);
void *p = malloc(size);
if (!p)
return false;
memcpy(p, coord->get_raw_ptr()->api, size);
free(this->ptr->coordinator.api);
this->ptr->coordinator.api = (kafka_api_version_t *)p;
this->ptr->coordinator.api_elements = coord->get_raw_ptr()->api_elements;
this->ptr->coordinator.features = coord->get_raw_ptr()->features;
return true;
}
int run_assignor(KafkaMetaList *meta_list, const char *protocol_name);
static int kafka_range_assignor(kafka_member_t **members,

View File

@@ -1693,7 +1693,7 @@ int KafkaMessage::encode_message(int api_type, struct iovec vectors[], int max)
return it->second(vectors, max);
}
static int kafka_get_api_version(KafkaBroker& broker, const KafkaConfig& conf,
static int kafka_get_api_version(const kafka_api_t *api, const KafkaConfig& conf,
int api_type, int mvers, int message_version)
{
int min_vers = 0;
@@ -1709,8 +1709,7 @@ static int kafka_get_api_version(KafkaBroker& broker, const KafkaConfig& conf,
min_vers = 7;
}
return kafka_broker_get_api_version(broker.get_raw_ptr(),
api_type, min_vers, mvers);
return kafka_broker_get_api_version(api, api_type, min_vers, mvers);
}
int KafkaMessage::encode_head()
@@ -1719,28 +1718,28 @@ int KafkaMessage::encode_head()
this->api_version = 0;
else
{
if (this->broker.get_features() & KAFKA_FEATURE_MSGVER2)
if (this->api->features & KAFKA_FEATURE_MSGVER2)
this->message_version = 2;
else if (this->broker.get_features() & KAFKA_FEATURE_MSGVER1)
else if (this->api->features & KAFKA_FEATURE_MSGVER1)
this->message_version = 1;
else
this->message_version = 0;
if (this->config.get_compress_type() == Kafka_Lz4 &&
!(this->broker.get_features() & KAFKA_FEATURE_LZ4))
!(this->api->features & KAFKA_FEATURE_LZ4))
{
this->config.set_compress_type(Kafka_NoCompress);
}
if (this->config.get_compress_type() == Kafka_Zstd &&
!(this->broker.get_features() & KAFKA_FEATURE_ZSTD))
!(this->api->features & KAFKA_FEATURE_ZSTD))
{
this->config.set_compress_type(Kafka_NoCompress);
}
int mver = this->api_mver_map[this->api_type];
this->api_version = kafka_get_api_version(this->broker, this->config,
this->api_version = kafka_get_api_version(this->api, this->config,
this->api_type, mver,
this->message_version);
}
@@ -2732,7 +2731,7 @@ int KafkaRequest::encode_apiversions(struct iovec vectors[], int max)
int KafkaRequest::encode_saslhandshake(struct iovec vectors[], int max)
{
append_string(this->msgbuf, this->config.get_sasl_mechanisms());
append_string(this->msgbuf, this->config.get_sasl_mech());
this->cur_size = this->msgbuf.size();
@@ -2870,31 +2869,20 @@ static bool kafka_broker_get_leader(int leader_id, KafkaBrokerList *broker_list,
char *host = strdup(broker->host);
if (host)
{
size_t api_elem_size = sizeof(kafka_api_version_t) * broker->api_elements;
kafka_api_version_t *api = (kafka_api_version_t *)malloc(api_elem_size);
if (api)
char *rack;
if (broker->rack)
rack = strdup(broker->rack);
if (!broker->rack || rack)
{
char *rack;
if (broker->rack)
rack = strdup(broker->rack);
leader->rack = rack;
if (!broker->rack || rack)
{
if (broker->rack)
leader->rack = rack;
leader->to_addr = broker->to_addr;
memcpy(&leader->addr, &broker->addr, sizeof(struct sockaddr_storage));
leader->addrlen = broker->addrlen;
leader->features = broker->features;
memcpy(api, broker->api, api_elem_size);
leader->api_elements = broker->api_elements;
leader->host = host;
leader->api = api;
return true;
}
free(api);
leader->to_addr = broker->to_addr;
memcpy(&leader->addr, &broker->addr, sizeof(struct sockaddr_storage));
leader->addrlen = broker->addrlen;
leader->host = host;
return true;
}
free(host);
@@ -3683,7 +3671,6 @@ static bool kafka_api_version_cmp(const kafka_api_version_t& api_ver1,
int KafkaResponse::parse_apiversions(void **buf, size_t *size)
{
kafka_broker_t *ptr = this->broker.get_raw_ptr();
short error;
int api_cnt;
int throttle_time;
@@ -3697,21 +3684,25 @@ int KafkaResponse::parse_apiversions(void **buf, size_t *size)
return -1;
}
if (!this->broker.allocate_api_version(api_cnt))
void *p = malloc(api_cnt * sizeof(kafka_api_version_t));
if (!p)
return -1;
this->api->api = (kafka_api_version_t *)p;
this->api->elements = api_cnt;
for (int i = 0; i < api_cnt; ++i)
{
CHECK_RET(parse_i16(buf, size, &ptr->api[i].api_key));
CHECK_RET(parse_i16(buf, size, &ptr->api[i].min_ver));
CHECK_RET(parse_i16(buf, size, &ptr->api[i].max_ver));
CHECK_RET(parse_i16(buf, size, &this->api->api[i].api_key));
CHECK_RET(parse_i16(buf, size, &this->api->api[i].min_ver));
CHECK_RET(parse_i16(buf, size, &this->api->api[i].max_ver));
}
if (this->api_version >= 1)
CHECK_RET(parse_i32(buf, size, &throttle_time));
std::sort(ptr->api, ptr->api + api_cnt, kafka_api_version_cmp);
this->broker.set_feature(kafka_get_features(ptr->api, ptr->api_elements));
std::sort(this->api->api, this->api->api + api_cnt, kafka_api_version_cmp);
this->api->features = kafka_get_features(this->api->api, api_cnt);
return 0;
}
@@ -3734,7 +3725,7 @@ int KafkaResponse::parse_saslhandshake(void **buf, size_t *size)
{
CHECK_RET(parse_string(buf, size, mechanism));
if (strcasecmp(mechanism.c_str(), this->config.get_sasl_mechanisms()) == 0)
if (strcasecmp(mechanism.c_str(), this->config.get_sasl_mech()) == 0)
break;
}
@@ -3825,11 +3816,13 @@ int KafkaResponse::append(const void *buf, size_t *size)
}
else if (this->api_type == Kafka_SaslAuthenticate)
{
if (strncasecmp(this->config.get_sasl_mechanisms(), "SCRAM", 5) == 0)
if (strncasecmp(this->config.get_sasl_mech(), "SCRAM", 5) == 0)
{
this->clear_buf();
if (this->sasl->scram.state != -1)
ret = this->handle_sasl_continue();
else
this->sasl->status = 1;
}
}
}

View File

@@ -55,8 +55,8 @@ public:
public:
int encode_message(int api_type, struct iovec vectors[], int max);
void set_api(int api_type) { this->api_type = api_type; }
int get_api() const { return this->api_type; }
void set_api_type(int api_type) { this->api_type = api_type; }
int get_api_type() const { return this->api_type; }
void set_api_version(int ver) { this->api_version = ver; }
int get_api_version() const { return this->api_version; }
@@ -117,6 +117,11 @@ public:
this->sasl = sasl;
}
void set_api(kafka_api_t *api)
{
this->api = api;
}
void duplicate(KafkaMessage& msg)
{
this->config = msg.config;
@@ -126,6 +131,7 @@ public:
this->broker_list = msg.broker_list;
this->toppar_list = msg.toppar_list;
this->sasl = msg.sasl;
this->api = msg.api;
}
void clear_buf()
@@ -201,6 +207,7 @@ protected:
size_t cur_size;
kafka_sasl_t *sasl;
kafka_api_t *api;
};
class KafkaRequest : public KafkaMessage

View File

@@ -78,7 +78,7 @@ void KafkaResult::fetch_toppars(std::vector<KafkaToppar *>& toppars)
KafkaToppar *toppar = NULL;
for (size_t i = 0; i < this->resp_num; ++i)
{
if (this->resp_vec[i].get_api() != Kafka_OffsetCommit)
if (this->resp_vec[i].get_api_type() != Kafka_OffsetCommit)
continue;
this->resp_vec[i].get_toppar_list()->rewind();
@@ -97,8 +97,8 @@ void KafkaResult::fetch_records(std::vector<std::vector<KafkaRecord *>>& records
for (size_t i = 0; i < this->resp_num; ++i)
{
if (this->resp_vec[i].get_api() != Kafka_Produce &&
this->resp_vec[i].get_api() != Kafka_Fetch)
if (this->resp_vec[i].get_api_type() != Kafka_Produce &&
this->resp_vec[i].get_api_type() != Kafka_Fetch)
continue;
this->resp_vec[i].get_toppar_list()->rewind();

View File

@@ -280,15 +280,14 @@ unsigned kafka_get_features(kafka_api_version_t *api, size_t api_cnt)
return features;
}
int kafka_broker_get_api_version(const kafka_broker_t *broker,
int api_key,
int kafka_broker_get_api_version(const kafka_api_t *api, int api_key,
int min_ver, int max_ver)
{
kafka_api_version_t sk = { .api_key = api_key };
kafka_api_version_t *retp;
retp = bsearch(&sk, broker->api, broker->api_elements,
sizeof(*broker->api), kafka_api_version_key_cmp);
retp = bsearch(&sk, api->api, api->elements,
sizeof(*api->api), kafka_api_version_key_cmp);
if (!retp)
return -1;
@@ -379,6 +378,18 @@ void kafka_partition_deinit(kafka_partition_t *partition)
free(partition->isr_nodes);
}
void kafka_api_init(kafka_api_t *api)
{
api->features = 0;
api->api = NULL;
api->elements = 0;
}
void kafka_api_deinit(kafka_api_t *api)
{
free(api->api);
}
void kafka_broker_init(kafka_broker_t *broker)
{
broker->node_id = -1;
@@ -388,9 +399,6 @@ void kafka_broker_init(kafka_broker_t *broker)
broker->to_addr = 0;
memset(&broker->addr, 0, sizeof(broker->addr));
broker->addrlen = 0;
broker->features = 0;
broker->api = NULL;
broker->api_elements = 0;
broker->error = 0;
broker->status = KAFKA_BROKER_UNINIT;
}
@@ -399,7 +407,6 @@ void kafka_broker_deinit(kafka_broker_t *broker)
{
free(broker->host);
free(broker->rack);
free(broker->api);
}
void kafka_meta_init(kafka_meta_t *meta)
@@ -1195,6 +1202,7 @@ void kafka_sasl_init(kafka_sasl_t *sasl)
sasl->scram.first_msg.iov_len = 0;
sasl->scram.server_signature_b64.iov_base = NULL;
sasl->scram.server_signature_b64.iov_len = 0;
sasl->status = 0;
}
void kafka_sasl_deinit(kafka_sasl_t *sasl)

View File

@@ -209,6 +209,13 @@ typedef struct __kafka_api_version
short max_ver;
} kafka_api_version_t;
typedef struct __kafka_api_t
{
unsigned features;
kafka_api_version_t *api;
int elements;
} kafka_api_t;
typedef struct __kafka_parser
{
int complete;
@@ -241,6 +248,7 @@ typedef struct __kafka_sasl
kafka_scram_t scram;
char *buf;
size_t bsize;
int status;
} kafka_sasl_t;
typedef struct __kafka_config
@@ -285,9 +293,6 @@ typedef struct __kafka_broker
int to_addr;
struct sockaddr_storage addr;
socklen_t addrlen;
unsigned features;
kafka_api_version_t *api;
int api_elements;
short error;
int status;
} kafka_broker_t;
@@ -441,6 +446,9 @@ void kafka_record_deinit(kafka_record_t *record);
void kafka_record_header_init(kafka_record_header_t *header);
void kafka_record_header_deinit(kafka_record_header_t *header);
void kafka_api_init(kafka_api_t *api);
void kafka_api_deinit(kafka_api_t *api);
void kafka_sasl_init(kafka_sasl_t *sasl);
void kafka_sasl_deinit(kafka_sasl_t *sasl);
@@ -461,8 +469,7 @@ int kafka_meta_set_topic(const char *topic_name, kafka_meta_t *meta);
int kafka_cgroup_set_group(const char *group_name, kafka_cgroup_t *cgroup);
int kafka_broker_get_api_version(const kafka_broker_t *broker,
int api_key,
int kafka_broker_get_api_version(const kafka_api_t *broker, int api_key,
int min_ver, int max_ver);
unsigned kafka_get_features(kafka_api_version_t *api, size_t api_cnt);