From 49d37439323d69cd49e201b8cd423979f16305c7 Mon Sep 17 00:00:00 2001 From: Xie Han <63350856@qq.com> Date: Sat, 23 Sep 2023 02:48:19 +0800 Subject: [PATCH 1/3] Add support for kafka client over SSL. --- docs/en/tutorial-13-kafka_cli.md | 11 +- docs/tutorial-13-kafka_cli.md | 12 +- src/client/WFKafkaClient.cc | 204 +++++++++++++++++++------------ src/factory/KafkaTaskImpl.cc | 44 +++---- src/factory/KafkaTaskImpl.inl | 6 +- src/manager/WFGlobal.cc | 5 + src/protocol/KafkaDataTypes.h | 16 ++- src/protocol/kafka_parser.c | 3 + 8 files changed, 184 insertions(+), 117 deletions(-) diff --git a/docs/en/tutorial-13-kafka_cli.md b/docs/en/tutorial-13-kafka_cli.md index 70a16b26..cc08cda7 100644 --- a/docs/en/tutorial-13-kafka_cli.md +++ b/docs/en/tutorial-13-kafka_cli.md @@ -29,8 +29,9 @@ The program exists automatically after all the tasks are completed, and all the In the command, the broker_url may contain several urls seperated by comma(,). -- For instance, kafka://host:port,kafka://host1:port... -- The default value is 9092; +- For instance, kafka://host:port,kafka://host1:port... or: **kafkas**://host:port,**kafkas**://host1:port for kafka over SSL; +- The default port is 9092 for TCP and 9093 for SSL; +- Do not mix 'kafkas://' with "kafka://", otherwise the init function will fail with errno EINVAL; - If you want to use upstream policy at this layer, please refer to [upstream documents](/docs/en/about-upstream.md). The following are several Kafka broker_url samples: @@ -41,6 +42,12 @@ kafka://kafka.host:9090/ kafka://10.160.23.23:9000,10.123.23.23,kafka://kafka.sogou +kafkas://broker1.kafka.sogou,kafkas://broker2.kafka.sogou + +Illegal broker_url sample (The first one is SSL, and the second one is not): + +kafkas://broker1.kafka.sogou,broker2.kafka.sogou + # Principles and Features Kafka client has no third-party dependencies internally except for the libraries used in the compression. With the high performance of Workflow, When properly configured and in fair environments, tens of thousands of Kafka requests can be processed in one second. diff --git a/docs/tutorial-13-kafka_cli.md b/docs/tutorial-13-kafka_cli.md index a3fcf532..714846e9 100644 --- a/docs/tutorial-13-kafka_cli.md +++ b/docs/tutorial-13-kafka_cli.md @@ -28,8 +28,10 @@ Bazel:执行bazel build kafka 编译支持kafka协议的类库;执行bazel b 其中broker_url可以有多个url组成,多个url之间以,分割 -- 形式如:kafka://host:port,kafka://host1:port... -- port默认为9092; +- 形式如:kafka://host:port,kafka://host1:port... 或:**kafkas**://host:port,**kafkas**://host1:port代表使用SSL通信。 +- port的默认值在普通TCP连接下是9092,SSL下为9093。 +- "kafka://"前缀可以缺省。这时候使用默认使用TCL通信。 +- 多个url,必须都采用TCP或都采用SSL。否则init函数返回-1,错误码为EINVAL。 - 如果用户在这一层有upstream选取需求,可以参考[upstream文档](../docs/about-upstream.md)。 Kafka broker_url示例: @@ -40,6 +42,12 @@ kafka://kafka.host:9090/ kafka://10.160.23.23:9000,10.123.23.23,kafka://kafka.sogou +kafkas://broker1.kafka.sogou,kafkas://broker2.kafka.sogou + +错误的url示例(第一个broker为SSL,第二个broker非SSL): + +kafkas://broker1.kafka.sogou,broker2.kafka.sogou + # 实现原理和特性 kafka client内部实现上除了压缩功能外没有依赖第三方库,同时利用了workflow的高性能,在合理的配置和环境下,每秒钟可以处理几万次Kafka请求。 diff --git a/src/client/WFKafkaClient.cc b/src/client/WFKafkaClient.cc index 88666e2e..9a84e004 100644 --- a/src/client/WFKafkaClient.cc +++ b/src/client/WFKafkaClient.cc @@ -52,14 +52,15 @@ using ComplexKafkaTask = WFComplexClientTasktransport_type = TT_TCP; + this->cgroup_status = KAFKA_CGROUP_NONE; + this->heartbeat_status = KAFKA_HEARTBEAT_UNINIT; + this->meta_doing = false; + this->cgroup_outdated = false; + this->client_deinit = false; + this->heartbeat_series = NULL; } void incref() @@ -73,6 +74,8 @@ public: delete this; } + TransportType transport_type; + std::string scheme; std::vector broker_hosts; KafkaCgroup cgroup; KafkaMetaList meta_list; @@ -146,7 +149,8 @@ private: static void kafka_merge_meta_list(KafkaMetaList *dst, KafkaMetaList *src); - static void kafka_merge_broker_list(std::vector *hosts, + static void kafka_merge_broker_list(const std::string& scheme, + std::vector *hosts, KafkaBrokerMap *dst, KafkaBrokerList *src); @@ -292,10 +296,8 @@ void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task) socklen_t socklen; coordinator->get_broker_addr(&addr, &socklen); - kafka_task = __WFKafkaTaskFactory::create_kafka_task(addr, - socklen, - "", - 0, + kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + addr, socklen, "", 0, kafka_heartbeat_callback); kafka_task->user_data = member; kafka_task->get_req()->set_api_type(Kafka_Heartbeat); @@ -327,7 +329,8 @@ void KafkaClientTask::kafka_rebalance_proc(KafkaMember *member, SeriesWork *seri coordinator->get_broker_addr(&addr, &socklen); __WFKafkaTask *task; - task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen, "", 0, + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + addr, socklen, "", 0, kafka_rebalance_callback); task->user_data = member; task->get_req()->set_config(member->config); @@ -399,7 +402,8 @@ void KafkaClientTask::kafka_timer_callback(WFTimerTask *task) socklen_t socklen; coordinator->get_broker_addr(&addr, &socklen); - kafka_task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen, "", 0, + kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + addr, socklen, "", 0, kafka_heartbeat_callback); kafka_task->user_data = member; @@ -436,7 +440,8 @@ void KafkaClientTask::kafka_merge_meta_list(KafkaMetaList *dst, } } -void KafkaClientTask::kafka_merge_broker_list(std::vector *hosts, +void KafkaClientTask::kafka_merge_broker_list(const std::string& scheme, + std::vector *hosts, KafkaBrokerMap *dst, KafkaBrokerList *src) { @@ -445,9 +450,8 @@ void KafkaClientTask::kafka_merge_broker_list(std::vector *hosts, KafkaBroker *src_broker; while ((src_broker = src->get_next()) != NULL) { - std::string host = "kafka://"; - host = host + src_broker->get_host() + ":" + - std::to_string(src_broker->get_port()); + std::string host = scheme + src_broker->get_host() + ":" + + std::to_string(src_broker->get_port()); hosts->emplace_back(std::move(host)); if (!dst->find_item(src_broker->get_node_id())) @@ -475,7 +479,8 @@ void KafkaClientTask::kafka_meta_callback(__WFKafkaTask *task) while ((meta = t->meta_list.get_next()) != NULL) (t->member->meta_status)[meta->get_topic()] = true; - kafka_merge_broker_list(&t->member->broker_hosts, + kafka_merge_broker_list(t->member->scheme, + &t->member->broker_hosts, &t->member->broker_map, task->get_resp()->get_broker_list()); } @@ -523,7 +528,8 @@ void KafkaClientTask::kafka_cgroup_callback(__WFKafkaTask *task) while ((meta = t->meta_list.get_next()) != NULL) (t->member->meta_status)[meta->get_topic()] = true; - kafka_merge_broker_list(&t->member->broker_hosts, + kafka_merge_broker_list(t->member->scheme, + &t->member->broker_hosts, &t->member->broker_map, task->get_resp()->get_broker_list()); @@ -538,8 +544,8 @@ void KafkaClientTask::kafka_cgroup_callback(__WFKafkaTask *task) socklen_t socklen; coordinator->get_broker_addr(&addr, &socklen); - kafka_task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen, - "", 0, + kafka_task = __WFKafkaTaskFactory::create_kafka_task(t->member->transport_type, + addr, socklen, "", 0, kafka_heartbeat_callback); kafka_task->user_data = t->member; t->member->incref(); @@ -581,14 +587,14 @@ void KafkaClientTask::kafka_parallel_callback(const ParallelWork *pwork) t->state = WFT_STATE_TASK_ERROR; t->error = 0; - std::pair *state_error; + std::pair *state_error; bool flag = false; - int state = WFT_STATE_SUCCESS; - int error = 0; + int16_t state = WFT_STATE_SUCCESS; + int16_t error = 0; int kafka_error = 0; for (size_t i = 0; i < pwork->size(); i++) { - state_error = (std::pair *)pwork->series_at(i)->get_context(); + state_error = (std::pair *)pwork->series_at(i)->get_context(); if ((state_error->first >> 16) != WFT_STATE_SUCCESS) { if (!flag) @@ -642,9 +648,20 @@ void KafkaClientTask::kafka_process_toppar_offset(KafkaToppar *task_toppar) void KafkaClientTask::kafka_move_task_callback(__WFKafkaTask *task) { - std::pair *state_error = new std::pair; + auto *state_error = new std::pair; + int16_t state = task->get_state(); + int16_t error = task->get_error(); - state_error->first = (task->get_state() << 16) + task->get_error(); + /* This function is called before WFClientTask::done. Need to transfer + the state and error. */ + if (state == WFT_STATE_SYS_ERROR && error < 0) + { + state = WFT_STATE_SSL_ERROR; + error = -error; + } + + /* 'state' is always positive. */ + state_error->first = (state << 16) | error; state_error->second = static_cast(task)->get_mutable_ctx()->kafka_error; series_of(task)->set_context(state_error); @@ -674,27 +691,29 @@ void KafkaClientTask::generate_info() if (this->config.get_sasl_mech()) { - this->userinfo = this->config.get_sasl_username(); + const char *username = this->config.get_sasl_username(); + const char *password = this->config.get_sasl_password(); + + this->userinfo.clear(); + if (username) + this->userinfo += StringUtil::url_encode_component(username); this->userinfo += ":"; - this->userinfo += - StringUtil::url_encode_component(this->config.get_sasl_password()); + if (password) + this->userinfo += StringUtil::url_encode_component(password); this->userinfo += ":"; this->userinfo += this->config.get_sasl_mech(); this->userinfo += ":"; this->userinfo += std::to_string((intptr_t)this->member); - this->userinfo += "@"; - this->url = "kafka://" + this->userinfo + - this->url.substr(this->url.find("kafka://") + 8); } else { char buf[64]; - snprintf(buf, sizeof(buf), "user:pass:sasl:%p@", this->member); + snprintf(buf, 64, "user:pass:sasl:%p", this->member); this->userinfo = buf; - this->url = "kafka://" + this->userinfo + - this->url.substr(this->url.find("kafka://") + 8); } + const char *hostport = this->url.c_str() + this->member->scheme.size(); + this->url = this->member->scheme + this->userinfo + "@" + hostport; this->info_generated = true; } @@ -876,15 +895,16 @@ bool KafkaClientTask::check_meta() int KafkaClientTask::dispatch_locked() { + KafkaMember *member = this->member; __WFKafkaTask *task; ParallelWork *parallel; SeriesWork *series; if (this->check_cgroup() == false) - return this->member->cgroup_wait_cnt > 0; + return member->cgroup_wait_cnt > 0; if (this->check_meta() == false) - return this->member->meta_wait_cnt > 0; + return member->meta_wait_cnt > 0; if (arrange_toppar(this->api_type) < 0) { @@ -925,14 +945,16 @@ int KafkaClientTask::dispatch_locked() socklen_t socklen; broker->get_broker_addr(&addr, &socklen); - task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen, + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + addr, socklen, this->get_userinfo(), this->retry_max, nullptr); } else { - task = __WFKafkaTaskFactory::create_kafka_task(broker->get_host(), + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + broker->get_host(), broker->get_port(), this->get_userinfo(), this->retry_max, @@ -976,14 +998,16 @@ int KafkaClientTask::dispatch_locked() socklen_t socklen; broker->get_broker_addr(&addr, &socklen); - task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen, + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + addr, socklen, this->get_userinfo(), this->retry_max, nullptr); } else { - task = __WFKafkaTaskFactory::create_kafka_task(broker->get_host(), + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + broker->get_host(), broker->get_port(), this->get_userinfo(), this->retry_max, @@ -1010,7 +1034,7 @@ int KafkaClientTask::dispatch_locked() break; case Kafka_OffsetCommit: - if (!this->member->cgroup.get_group()) + if (!member->cgroup.get_group()) { this->state = WFT_STATE_TASK_ERROR; this->error = WFT_ERR_KAFKA_COMMIT_FAILED; @@ -1020,19 +1044,20 @@ int KafkaClientTask::dispatch_locked() else { this->result.create(1); - KafkaBroker *coordinator = this->member->cgroup.get_coordinator(); + KafkaBroker *coordinator = member->cgroup.get_coordinator(); const struct sockaddr *addr; socklen_t socklen; coordinator->get_broker_addr(&addr, &socklen); - task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen, + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + addr, socklen, this->get_userinfo(), this->retry_max, kafka_offsetcommit_callback); task->user_data = this; task->get_req()->set_config(this->config); - task->get_req()->set_cgroup(this->member->cgroup); + task->get_req()->set_cgroup(member->cgroup); task->get_req()->set_broker(*coordinator); task->get_req()->set_toppar_list(this->toppar_list); task->get_req()->set_api_type(this->api_type); @@ -1042,7 +1067,7 @@ int KafkaClientTask::dispatch_locked() } case Kafka_LeaveGroup: - if (!this->member->cgroup.get_group()) + if (!member->cgroup.get_group()) { this->state = WFT_STATE_TASK_ERROR; this->error = WFT_ERR_KAFKA_LEAVEGROUP_FAILED; @@ -1051,7 +1076,7 @@ int KafkaClientTask::dispatch_locked() } else { - KafkaBroker *coordinator = this->member->cgroup.get_coordinator(); + KafkaBroker *coordinator = member->cgroup.get_coordinator(); const struct sockaddr *addr; socklen_t socklen; @@ -1059,16 +1084,15 @@ int KafkaClientTask::dispatch_locked() if (coordinator->is_to_addr()) { - task = __WFKafkaTaskFactory::create_kafka_task(addr, - socklen, - this->get_userinfo(), - 0, + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + addr, socklen, + this->get_userinfo(), 0, kafka_leavegroup_callback); task->user_data = this; task->get_req()->set_config(this->config); task->get_req()->set_api_type(Kafka_LeaveGroup); task->get_req()->set_broker(*coordinator); - task->get_req()->set_cgroup(this->member->cgroup); + task->get_req()->set_cgroup(member->cgroup); series_of(this)->push_front(this); series_of(this)->push_front(task); } @@ -1098,14 +1122,16 @@ int KafkaClientTask::dispatch_locked() socklen_t socklen; broker->get_broker_addr(&addr, &socklen); - task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen, + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + addr, socklen, this->get_userinfo(), this->retry_max, nullptr); } else { - task = __WFKafkaTaskFactory::create_kafka_task(broker->get_host(), + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + broker->get_host(), broker->get_port(), this->get_userinfo(), this->retry_max, @@ -1635,46 +1661,62 @@ SubTask *WFKafkaTask::done() int WFKafkaClient::init(const std::string& broker) { std::vector broker_hosts; - std::string::size_type pos = broker.find(','); std::string::size_type ppos = 0; + std::string::size_type pos; + bool use_ssl; - if (pos == std::string::npos) + use_ssl = (strncasecmp(broker.c_str(), "kafkas://", 9) == 0); + while (1) { - std::string host = broker; - if (strncasecmp(host.c_str(), "kafka://", 8) != 0) - host = "kafka://" + host; - broker_hosts.emplace_back(host); - } - else - { - do - { - std::string host = broker.substr(ppos, pos - ppos); - if (strncasecmp(host.c_str(), "kafka://", 8) != 0) - host = "kafka://" + host; - broker_hosts.emplace_back(host); - - ppos = pos + 1; - pos = broker.find(',', ppos); - } while (pos != std::string::npos); - + pos = broker.find(',', ppos); std::string host = broker.substr(ppos, pos - ppos); - if (strncasecmp(host.c_str(), "kafka://", 8) != 0) + if (use_ssl) + { + if (strncasecmp(host.c_str(), "kafkas://", 9) != 0) + { + errno = EINVAL; + return -1; + } + } + else if (strncasecmp(host.c_str(), "kafka://", 8) != 0) + { + if (strncasecmp(host.c_str(), "kafkas://", 9) == 0) + { + errno = EINVAL; + return -1; + } + host = "kafka://" + host; + } + broker_hosts.emplace_back(host); + if (pos == std::string::npos) + break; + + ppos = pos + 1; } this->member = new KafkaMember; this->member->broker_hosts = std::move(broker_hosts); + if (use_ssl) + { + this->member->transport_type = TT_TCP_SSL; + this->member->scheme = "kafkas://"; + } + return 0; } int WFKafkaClient::init(const std::string& broker, const std::string& group) { - this->init(broker); - this->member->cgroup.set_group(group); - this->member->cgroup_status = KAFKA_CGROUP_UNINIT; - return 0; + if (this->init(broker) == 0) + { + this->member->cgroup.set_group(group); + this->member->cgroup_status = KAFKA_CGROUP_UNINIT; + return 0; + } + else + return -1; } int WFKafkaClient::deinit() diff --git a/src/factory/KafkaTaskImpl.cc b/src/factory/KafkaTaskImpl.cc index 4bc69196..dcddbc05 100644 --- a/src/factory/KafkaTaskImpl.cc +++ b/src/factory/KafkaTaskImpl.cc @@ -14,6 +14,7 @@ limitations under the License. Authors: Wang Zhulei (wangzhulei@sogou-inc.com) + Xie Han (xiehan@sogou-inc.com) */ #include @@ -310,19 +311,17 @@ CommMessageIn *__ComplexKafkaTask::message_in() bool __ComplexKafkaTask::init_success() { - TransportType type = TT_TCP; - if (uri_.scheme) + TransportType type; + + if (uri_.scheme && strcasecmp(uri_.scheme, "kafka") == 0) + type = TT_TCP; + else if (uri_.scheme && strcasecmp(uri_.scheme, "kafkas") == 0) + type = TT_TCP_SSL; + else { - if (strcasecmp(uri_.scheme, "kafka") == 0) - type = TT_TCP; - //else if (uri_.scheme && strcasecmp(uri_.scheme, "kafkas") == 0) - // type = TT_TCP_SSL; - else - { - this->state = WFT_STATE_TASK_ERROR; - this->error = WFT_ERR_URI_SCHEME_INVALID; - return false; - } + this->state = WFT_STATE_TASK_ERROR; + this->error = WFT_ERR_URI_SCHEME_INVALID; + return false; } std::string username, password, sasl, client; @@ -367,7 +366,6 @@ bool __ComplexKafkaTask::init_success() } this->WFComplexClientTask::set_transport_type(type); - return true; } @@ -426,12 +424,13 @@ bool __ComplexKafkaTask::check_redirect() socklen_t addrlen_coord; coordinator->get_broker_addr(&addr_coord, &addrlen_coord); - set_redirect(TT_TCP, addr_coord, addrlen_coord, + set_redirect(this->get_transport_type(), addr_coord, addrlen_coord, this->WFComplexClientTask::info_); } else { - std::string url = "kafka://"; + std::string url(uri_.scheme); + url += "://"; url += user_info_ + "@"; url += coordinator->get_host(); url += ":" + std::to_string(coordinator->get_port()); @@ -445,7 +444,8 @@ bool __ComplexKafkaTask::check_redirect() } else { - this->init(TT_TCP, paddr, addrlen, this->WFComplexClientTask::info_); + this->init(this->get_transport_type(), paddr, addrlen, + this->WFComplexClientTask::info_); return false; } } @@ -809,7 +809,8 @@ __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const ParsedURI& uri, return task; } -__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const struct sockaddr *addr, +__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(TransportType type, + const struct sockaddr *addr, socklen_t addrlen, const std::string& info, int retry_max, @@ -817,12 +818,13 @@ __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const struct sockaddr *ad { auto *task = new __ComplexKafkaTask(retry_max, std::move(callback)); - task->init(TT_TCP, addr, addrlen, info); + task->init(type, addr, addrlen, info); task->set_keep_alive(KAFKA_KEEPALIVE_DEFAULT); return task; } -__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const char *host, +__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(TransportType type, + const char *host, unsigned short port, const std::string& info, int retry_max, @@ -830,10 +832,10 @@ __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const char *host, { auto *task = new __ComplexKafkaTask(retry_max, std::move(callback)); - std::string url = "kafka://"; + std::string url = (type == TT_TCP_SSL ? "kafkas://" : "kafka://"); if (!info.empty()) - url += info; + url += info + "@"; url += host; url += ":" + std::to_string(port); diff --git a/src/factory/KafkaTaskImpl.inl b/src/factory/KafkaTaskImpl.inl index cb18c9a5..fd40f4a3 100644 --- a/src/factory/KafkaTaskImpl.inl +++ b/src/factory/KafkaTaskImpl.inl @@ -40,13 +40,15 @@ public: int retry_max, __kafka_callback_t callback); - static __WFKafkaTask *create_kafka_task(const struct sockaddr *addr, + static __WFKafkaTask *create_kafka_task(TransportType type, + const struct sockaddr *addr, socklen_t addrlen, const std::string& info, int retry_max, __kafka_callback_t callback); - static __WFKafkaTask *create_kafka_task(const char *host, + static __WFKafkaTask *create_kafka_task(TransportType type, + const char *host, unsigned short port, const std::string& info, int retry_max, diff --git a/src/manager/WFGlobal.cc b/src/manager/WFGlobal.cc index 0d9b2d43..8d4e20ff 100644 --- a/src/manager/WFGlobal.cc +++ b/src/manager/WFGlobal.cc @@ -154,6 +154,11 @@ __WFGlobal::__WFGlobal() static_scheme_port_["Kafka"] = "9092"; static_scheme_port_["KAFKA"] = "9092"; + static_scheme_port_["kafkas"] = "9093"; + static_scheme_port_["Kafkas"] = "9093"; + static_scheme_port_["KAFKAs"] = "9093"; + static_scheme_port_["KAFKAS"] = "9093"; + sync_count_ = 0; sync_max_ = 0; } diff --git a/src/protocol/KafkaDataTypes.h b/src/protocol/KafkaDataTypes.h index 2d3d546d..1912574d 100644 --- a/src/protocol/KafkaDataTypes.h +++ b/src/protocol/KafkaDataTypes.h @@ -1016,14 +1016,12 @@ public: return this->ptr->port; } - std::string get_uri() const + std::string get_host_port() const { - std::string uri = "kafka://"; - - uri += this->ptr->host; - uri += ":"; - uri += std::to_string(this->ptr->port); - return uri; + std::string host_port(this->ptr->host); + host_port += ":"; + host_port += std::to_string(this->ptr->port); + return host_port; } int get_error() @@ -1103,12 +1101,12 @@ public: bool operator< (const KafkaBroker& broker) const { - return this->get_uri() < broker.get_uri(); + return this->get_host_port() < broker.get_host_port(); } bool operator> (const KafkaBroker& broker) const { - return this->get_uri() > broker.get_uri(); + return this->get_host_port() > broker.get_host_port(); } kafka_broker_t *get_raw_ptr() const { return this->ptr; } diff --git a/src/protocol/kafka_parser.c b/src/protocol/kafka_parser.c index 26e735b6..4f852437 100644 --- a/src/protocol/kafka_parser.c +++ b/src/protocol/kafka_parser.c @@ -1209,6 +1209,8 @@ void kafka_sasl_init(kafka_sasl_t *sasl) sasl->scram.first_msg.iov_len = 0; sasl->scram.server_signature_b64.iov_base = NULL; sasl->scram.server_signature_b64.iov_len = 0; + sasl->buf = NULL; + sasl->bsize = 0; sasl->status = 0; } @@ -1216,6 +1218,7 @@ void kafka_sasl_deinit(kafka_sasl_t *sasl) { free(sasl->scram.cnonce.iov_base); free(sasl->scram.server_signature_b64.iov_base); + free(sasl->buf); } int kafka_sasl_set_username(const char *username, kafka_config_t *conf) From 609d00ebec84e6c9c4e68317bc1d0c7c212f8e17 Mon Sep 17 00:00:00 2001 From: Xie Han <63350856@qq.com> Date: Mon, 25 Sep 2023 19:43:28 +0800 Subject: [PATCH 2/3] Update kafka tutorial. --- tutorial/tutorial-13-kafka_cli.cc | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/tutorial/tutorial-13-kafka_cli.cc b/tutorial/tutorial-13-kafka_cli.cc index befbe840..0ba446d7 100644 --- a/tutorial/tutorial-13-kafka_cli.cc +++ b/tutorial/tutorial-13-kafka_cli.cc @@ -190,8 +190,11 @@ int main(int argc, char *argv[]) signal(SIGINT, sig_handler); url = argv[1]; - if (strncmp(argv[1], "kafka://", 8) != 0) + if (strncmp(argv[1], "kafka://", 8) != 0 && + strncmp(argv[1], "kafkas://", 9) != 0) + { url = "kafka://" + url; + } char buf[512 * 1024]; WFKafkaTask *task; @@ -206,7 +209,12 @@ int main(int argc, char *argv[]) if (compress_type > Kafka_Zstd) exit(1); - client.init(url); + if (client.init(url) < 0) + { + perror("client.init"); + exit(1); + } + task = client.create_kafka_task("api=produce", 3, kafka_callback); KafkaConfig config; KafkaRecord record; @@ -232,7 +240,12 @@ int main(int argc, char *argv[]) { if (argc > 3 && argv[3][0] == 'd') { - client.init(url); + if (client.init(url) < 0) + { + perror("client.init"); + exit(1); + } + task = client.create_kafka_task("api=fetch", 3, kafka_callback); KafkaToppar toppar; @@ -248,7 +261,12 @@ int main(int argc, char *argv[]) } else { - client.init(url, "workflow_group"); + if (client.init(url, "workflow_group") < 0) + { + perror("client.init"); + exit(1); + } + task = client.create_kafka_task("topic=workflow_test1&topic=workflow_test2&api=fetch", 3, kafka_callback); } From db4ef8357cdb7303b5df77799fa8f7f092ae7b8a Mon Sep 17 00:00:00 2001 From: Xie Han <63350856@qq.com> Date: Mon, 25 Sep 2023 20:11:37 +0800 Subject: [PATCH 3/3] Do not transfer broker's address to sockaddr. --- src/client/WFKafkaClient.cc | 208 +++++++++++----------------------- src/factory/KafkaTaskImpl.cc | 92 ++------------- src/factory/KafkaTaskImpl.inl | 7 -- src/protocol/KafkaDataTypes.h | 56 --------- src/protocol/KafkaMessage.cc | 49 -------- src/protocol/kafka_parser.c | 3 - src/protocol/kafka_parser.h | 3 - 7 files changed, 79 insertions(+), 339 deletions(-) diff --git a/src/client/WFKafkaClient.cc b/src/client/WFKafkaClient.cc index 9a84e004..44146ce4 100644 --- a/src/client/WFKafkaClient.cc +++ b/src/client/WFKafkaClient.cc @@ -291,13 +291,10 @@ void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task) { __WFKafkaTask *kafka_task; KafkaBroker *coordinator = member->cgroup.get_coordinator(); - - const struct sockaddr *addr; - socklen_t socklen; - coordinator->get_broker_addr(&addr, &socklen); - kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - addr, socklen, "", 0, + coordinator->get_host(), + coordinator->get_port(), + "", 0, kafka_heartbeat_callback); kafka_task->user_data = member; kafka_task->get_req()->set_api_type(Kafka_Heartbeat); @@ -323,14 +320,11 @@ void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task) void KafkaClientTask::kafka_rebalance_proc(KafkaMember *member, SeriesWork *series) { KafkaBroker *coordinator = member->cgroup.get_coordinator(); - - const struct sockaddr *addr; - socklen_t socklen; - coordinator->get_broker_addr(&addr, &socklen); - __WFKafkaTask *task; task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - addr, socklen, "", 0, + coordinator->get_host(), + coordinator->get_port(), + "", 0, kafka_rebalance_callback); task->user_data = member; task->get_req()->set_config(member->config); @@ -397,13 +391,10 @@ void KafkaClientTask::kafka_timer_callback(WFTimerTask *task) __WFKafkaTask *kafka_task; KafkaBroker *coordinator = member->cgroup.get_coordinator(); - - const struct sockaddr *addr; - socklen_t socklen; - coordinator->get_broker_addr(&addr, &socklen); - kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - addr, socklen, "", 0, + coordinator->get_host(), + coordinator->get_port(), + "", 0, kafka_heartbeat_callback); kafka_task->user_data = member; @@ -539,13 +530,10 @@ void KafkaClientTask::kafka_cgroup_callback(__WFKafkaTask *task) { __WFKafkaTask *kafka_task; KafkaBroker *coordinator = t->member->cgroup.get_coordinator(); - - const struct sockaddr *addr; - socklen_t socklen; - coordinator->get_broker_addr(&addr, &socklen); - kafka_task = __WFKafkaTaskFactory::create_kafka_task(t->member->transport_type, - addr, socklen, "", 0, + coordinator->get_host(), + coordinator->get_port(), + "", 0, kafka_heartbeat_callback); kafka_task->user_data = t->member; t->member->incref(); @@ -896,6 +884,7 @@ bool KafkaClientTask::check_meta() int KafkaClientTask::dispatch_locked() { KafkaMember *member = this->member; + KafkaBroker *coordinator; __WFKafkaTask *task; ParallelWork *parallel; SeriesWork *series; @@ -939,28 +928,12 @@ int KafkaClientTask::dispatch_locked() auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this, std::placeholders::_1); KafkaBroker *broker = get_broker(v.first); - if (broker->is_to_addr()) - { - const struct sockaddr *addr; - socklen_t socklen; - broker->get_broker_addr(&addr, &socklen); - - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - addr, socklen, - this->get_userinfo(), - this->retry_max, - nullptr); - } - else - { - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - broker->get_host(), - broker->get_port(), - this->get_userinfo(), - this->retry_max, - nullptr); - } - + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + broker->get_host(), + broker->get_port(), + this->get_userinfo(), + this->retry_max, + nullptr); task->get_req()->set_config(this->config); task->get_req()->set_toppar_list(v.second); task->get_req()->set_broker(*broker); @@ -992,27 +965,12 @@ int KafkaClientTask::dispatch_locked() auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this, std::placeholders::_1); KafkaBroker *broker = get_broker(v.first); - if (broker->is_to_addr()) - { - const struct sockaddr *addr; - socklen_t socklen; - broker->get_broker_addr(&addr, &socklen); - - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - addr, socklen, - this->get_userinfo(), - this->retry_max, - nullptr); - } - else - { - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - broker->get_host(), - broker->get_port(), - this->get_userinfo(), - this->retry_max, - nullptr); - } + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + broker->get_host(), + broker->get_port(), + this->get_userinfo(), + this->retry_max, + nullptr); task->get_req()->set_config(this->config); task->get_req()->set_toppar_list(v.second); @@ -1041,30 +999,24 @@ int KafkaClientTask::dispatch_locked() this->finish = true; break; } - else - { - this->result.create(1); - KafkaBroker *coordinator = member->cgroup.get_coordinator(); - const struct sockaddr *addr; - socklen_t socklen; - coordinator->get_broker_addr(&addr, &socklen); - - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - addr, socklen, - this->get_userinfo(), - this->retry_max, - kafka_offsetcommit_callback); - task->user_data = this; - task->get_req()->set_config(this->config); - task->get_req()->set_cgroup(member->cgroup); - task->get_req()->set_broker(*coordinator); - task->get_req()->set_toppar_list(this->toppar_list); - task->get_req()->set_api_type(this->api_type); - series_of(this)->push_front(this); - series_of(this)->push_front(task); - break; - } + this->result.create(1); + coordinator = member->cgroup.get_coordinator(); + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + coordinator->get_host(), + coordinator->get_port(), + this->get_userinfo(), + this->retry_max, + kafka_offsetcommit_callback); + task->user_data = this; + task->get_req()->set_config(this->config); + task->get_req()->set_cgroup(member->cgroup); + task->get_req()->set_broker(*coordinator); + task->get_req()->set_toppar_list(this->toppar_list); + task->get_req()->set_api_type(this->api_type); + series_of(this)->push_front(this); + series_of(this)->push_front(task); + break; case Kafka_LeaveGroup: if (!member->cgroup.get_group()) @@ -1074,29 +1026,23 @@ int KafkaClientTask::dispatch_locked() this->finish = true; break; } - else - { - KafkaBroker *coordinator = member->cgroup.get_coordinator(); - const struct sockaddr *addr; - socklen_t socklen; - coordinator->get_broker_addr(&addr, &socklen); + coordinator = member->cgroup.get_coordinator(); + if (!coordinator->get_host()) + break; - if (coordinator->is_to_addr()) - { - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - addr, socklen, - this->get_userinfo(), 0, - kafka_leavegroup_callback); - task->user_data = this; - task->get_req()->set_config(this->config); - task->get_req()->set_api_type(Kafka_LeaveGroup); - task->get_req()->set_broker(*coordinator); - task->get_req()->set_cgroup(member->cgroup); - series_of(this)->push_front(this); - series_of(this)->push_front(task); - } - } + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + coordinator->get_host(), + coordinator->get_port(), + this->get_userinfo(), 0, + kafka_leavegroup_callback); + task->user_data = this; + task->get_req()->set_config(this->config); + task->get_req()->set_api_type(Kafka_LeaveGroup); + task->get_req()->set_broker(*coordinator); + task->get_req()->set_cgroup(member->cgroup); + series_of(this)->push_front(this); + series_of(this)->push_front(task); break; case Kafka_ListOffsets: @@ -1116,28 +1062,12 @@ int KafkaClientTask::dispatch_locked() auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this, std::placeholders::_1); KafkaBroker *broker = get_broker(v.first); - if (broker->is_to_addr()) - { - const struct sockaddr *addr; - socklen_t socklen; - broker->get_broker_addr(&addr, &socklen); - - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - addr, socklen, - this->get_userinfo(), - this->retry_max, - nullptr); - } - else - { - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - broker->get_host(), - broker->get_port(), - this->get_userinfo(), - this->retry_max, - nullptr); - } - + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + broker->get_host(), + broker->get_port(), + this->get_userinfo(), + this->retry_max, + nullptr); task->get_req()->set_config(this->config); task->get_req()->set_toppar_list(v.second); task->get_req()->set_broker(*broker); @@ -1709,14 +1639,12 @@ int WFKafkaClient::init(const std::string& broker) int WFKafkaClient::init(const std::string& broker, const std::string& group) { - if (this->init(broker) == 0) - { - this->member->cgroup.set_group(group); - this->member->cgroup_status = KAFKA_CGROUP_UNINIT; - return 0; - } - else + if (this->init(broker) < 0) return -1; + + this->member->cgroup.set_group(group); + this->member->cgroup_status = KAFKA_CGROUP_UNINIT; + return 0; } int WFKafkaClient::deinit() diff --git a/src/factory/KafkaTaskImpl.cc b/src/factory/KafkaTaskImpl.cc index dcddbc05..7da90453 100644 --- a/src/factory/KafkaTaskImpl.cc +++ b/src/factory/KafkaTaskImpl.cc @@ -118,7 +118,6 @@ private: virtual int keep_alive_timeout(); virtual int first_timeout(); bool has_next(); - bool check_redirect(); bool process_produce(); bool process_fetch(); bool process_metadata(); @@ -407,49 +406,6 @@ int __ComplexKafkaTask::first_timeout() return ret + KAFKA_ROUNDTRIP_TIMEOUT; } -bool __ComplexKafkaTask::check_redirect() -{ - struct sockaddr_storage addr; - socklen_t addrlen = sizeof addr; - const struct sockaddr *paddr = (const struct sockaddr *)&addr; - KafkaBroker *coordinator = this->get_req()->get_cgroup()->get_coordinator(); - - //always success - this->get_peer_addr((struct sockaddr *)&addr, &addrlen); - if (!coordinator->is_equal(paddr, addrlen)) - { - if (coordinator->is_to_addr()) - { - const struct sockaddr *addr_coord; - socklen_t addrlen_coord; - - coordinator->get_broker_addr(&addr_coord, &addrlen_coord); - set_redirect(this->get_transport_type(), addr_coord, addrlen_coord, - this->WFComplexClientTask::info_); - } - else - { - std::string url(uri_.scheme); - url += "://"; - url += user_info_ + "@"; - url += coordinator->get_host(); - url += ":" + std::to_string(coordinator->get_port()); - - ParsedURI uri; - URIParser::parse(url, uri); - set_redirect(std::move(uri)); - } - - return true; - } - else - { - this->init(this->get_transport_type(), paddr, addrlen, - this->WFComplexClientTask::info_); - return false; - } -} - bool __ComplexKafkaTask::process_find_coordinator() { KafkaCgroup *cgroup = this->get_resp()->get_cgroup(); @@ -463,8 +419,18 @@ bool __ComplexKafkaTask::process_find_coordinator() else { this->get_req()->set_cgroup(*cgroup); - is_redirect_ = check_redirect(); + KafkaBroker *coordinator = cgroup->get_coordinator(); + std::string url(uri_.scheme); + url += "://"; + url += user_info_ + "@"; + url += coordinator->get_host(); + url += ":" + std::to_string(coordinator->get_port()); + + ParsedURI uri; + URIParser::parse(url, uri); + set_redirect(std::move(uri)); this->get_req()->set_api_type(Kafka_JoinGroup); + is_redirect_ = true; return true; } } @@ -472,16 +438,6 @@ bool __ComplexKafkaTask::process_find_coordinator() bool __ComplexKafkaTask::process_join_group() { KafkaResponse *msg = this->get_resp(); - if (!msg->get_cgroup()->get_coordinator()->is_to_addr()) - { - struct sockaddr_storage addr; - socklen_t addrlen = sizeof addr; - const struct sockaddr *paddr = (const struct sockaddr *)&addr; - this->get_peer_addr((struct sockaddr *)&addr, &addrlen); - msg->get_cgroup()->get_coordinator()->set_broker_addr(paddr, addrlen); - msg->get_cgroup()->get_coordinator()->set_to_addr(1); - } - switch(msg->get_cgroup()->get_error()) { case KAFKA_MEMBER_ID_REQUIRED: @@ -681,18 +637,6 @@ bool __ComplexKafkaTask::process_sasl_authenticate() bool __ComplexKafkaTask::has_next() { - struct sockaddr_storage addr; - socklen_t addrlen = sizeof addr; - //always success - this->get_peer_addr((struct sockaddr *)&addr, &addrlen); - - const struct sockaddr *paddr = (const struct sockaddr *)&addr; - if (!this->get_resp()->get_broker()->is_to_addr()) - { - this->get_resp()->get_broker()->set_broker_addr(paddr, addrlen); - this->get_resp()->get_broker()->set_to_addr(1); - } - switch (this->get_resp()->get_api_type()) { case Kafka_Produce: @@ -809,20 +753,6 @@ __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const ParsedURI& uri, return task; } -__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(TransportType type, - const struct sockaddr *addr, - socklen_t addrlen, - const std::string& info, - int retry_max, - __kafka_callback_t callback) -{ - auto *task = new __ComplexKafkaTask(retry_max, std::move(callback)); - - task->init(type, addr, addrlen, info); - task->set_keep_alive(KAFKA_KEEPALIVE_DEFAULT); - return task; -} - __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(TransportType type, const char *host, unsigned short port, diff --git a/src/factory/KafkaTaskImpl.inl b/src/factory/KafkaTaskImpl.inl index fd40f4a3..5b7f9c9b 100644 --- a/src/factory/KafkaTaskImpl.inl +++ b/src/factory/KafkaTaskImpl.inl @@ -40,13 +40,6 @@ public: int retry_max, __kafka_callback_t callback); - static __WFKafkaTask *create_kafka_task(TransportType type, - const struct sockaddr *addr, - socklen_t addrlen, - const std::string& info, - int retry_max, - __kafka_callback_t callback); - static __WFKafkaTask *create_kafka_task(TransportType type, const char *host, unsigned short port, diff --git a/src/protocol/KafkaDataTypes.h b/src/protocol/KafkaDataTypes.h index 1912574d..2c0d9329 100644 --- a/src/protocol/KafkaDataTypes.h +++ b/src/protocol/KafkaDataTypes.h @@ -1115,54 +1115,6 @@ public: struct rb_node *get_rb() { return &this->rb; } - bool is_equal(const struct sockaddr *addr, socklen_t socklen) const - { - if (this->ptr->addrlen == socklen && socklen) - return memcmp(addr, &this->ptr->addr, this->ptr->addrlen) == 0; - - return false; - } - - bool is_equal(const char *host, int port) const - { - if (port == this->ptr->port) - return strcmp(host, this->ptr->host) == 0; - return false; - } - - bool is_equal(int node_id) const - { - return this->ptr->node_id == node_id; - } - - bool is_equal(const KafkaBroker& broker) const - { - return is_equal(broker.ptr->host, broker.ptr->port); - } - - void get_broker_addr(const struct sockaddr **addr, socklen_t *socklen) const - { - if (this->ptr->addrlen) - { - *addr = (const struct sockaddr *)&this->ptr->addr; - *socklen = this->ptr->addrlen; - } - else - { - *addr = NULL; - *socklen = 0; - } - } - - void set_broker_addr(const struct sockaddr *addr, socklen_t socklen) - { - memcpy(&this->ptr->addr, addr, socklen); - this->ptr->addrlen = socklen; - } - - bool is_to_addr() const { return this->ptr->to_addr == 1; } - void set_to_addr(int to_addr) { this->ptr->to_addr = to_addr; } - int get_node_id() const { return this->ptr->node_id; } int get_id () const { return this->ptr->node_id; } @@ -1392,14 +1344,6 @@ public: return strcmp(this->ptr->leader_id, this->ptr->member_id) == 0; } - bool is_equal_coordinator(const struct sockaddr *addr, socklen_t addrlen) const - { - if (addrlen == this->ptr->coordinator.addrlen) - return memcmp(addr, &this->ptr->coordinator.addr, addrlen) == 0; - else - return false; - } - struct list_head *get_group_protocol() { return &this->ptr->group_protocol_list; diff --git a/src/protocol/KafkaMessage.cc b/src/protocol/KafkaMessage.cc index 660f3845..af06e7ec 100644 --- a/src/protocol/KafkaMessage.cc +++ b/src/protocol/KafkaMessage.cc @@ -1562,44 +1562,6 @@ int KafkaMessage::parse_records(void **buf, size_t *size, bool check_crcs, return 0; } -static bool __to_addr(const char *host, int port, struct sockaddr *sockaddr, - socklen_t *addrlen) -{ - size_t len = strlen(host); - struct sockaddr_in *addr; - struct sockaddr_in6 *addr6; - bool ret = true; - - if (!host) - ret = false; - else if (isdigit(host[0]) && isdigit(host[len - 1])) - { - addr = (struct sockaddr_in *)sockaddr; - if (inet_pton(AF_INET, host, &addr->sin_addr) == 1) - { - addr->sin_family = AF_INET; - *addrlen = sizeof(struct sockaddr_in); - addr->sin_port = htons(port); - } - else - ret = false; - } - else - { - addr6 = (struct sockaddr_in6 *)sockaddr; - if (inet_pton(AF_INET6, host, &addr6->sin6_addr) == 1) - { - addr6->sin6_family = AF_INET6; - *addrlen = sizeof(struct sockaddr_in6); - addr6->sin6_port = htons(port); - } - else - ret = false; - } - - return ret; -} - KafkaMessage::KafkaMessage() { static struct Crc32cInitializer @@ -2867,11 +2829,6 @@ static int kafka_meta_parse_broker(void **buf, size_t *size, if (api_version >= 1) CHECK_RET(parse_string(buf, size, &ptr->rack)); - if (__to_addr(ptr->host, ptr->port, (struct sockaddr *)&ptr->addr, &ptr->addrlen)) - { - ptr->to_addr = 1; - } - broker_list->rewind(); KafkaBroker *last; @@ -3364,12 +3321,6 @@ int KafkaResponse::parse_findcoordinator(void **buf, size_t *size) CHECK_RET(parse_string(buf, size, &cgroup->coordinator.host)); CHECK_RET(parse_i32(buf, size, &cgroup->coordinator.port)); - if (__to_addr(cgroup->coordinator.host, cgroup->coordinator.port, - (struct sockaddr *)&cgroup->coordinator.addr, &cgroup->coordinator.addrlen)) - { - cgroup->coordinator.to_addr = 1; - } - return 0; } diff --git a/src/protocol/kafka_parser.c b/src/protocol/kafka_parser.c index 4f852437..3822baeb 100644 --- a/src/protocol/kafka_parser.c +++ b/src/protocol/kafka_parser.c @@ -397,9 +397,6 @@ void kafka_broker_init(kafka_broker_t *broker) broker->port = 0; broker->host = NULL; broker->rack = NULL; - broker->to_addr = 0; - memset(&broker->addr, 0, sizeof(broker->addr)); - broker->addrlen = 0; broker->error = 0; broker->status = KAFKA_BROKER_UNINIT; } diff --git a/src/protocol/kafka_parser.h b/src/protocol/kafka_parser.h index f5eb9788..df613d4c 100644 --- a/src/protocol/kafka_parser.h +++ b/src/protocol/kafka_parser.h @@ -305,9 +305,6 @@ typedef struct __kafka_broker int port; char *host; char *rack; - int to_addr; - struct sockaddr_storage addr; - socklen_t addrlen; short error; int status; } kafka_broker_t;