diff --git a/src/client/WFKafkaClient.cc b/src/client/WFKafkaClient.cc index 217f0cd2..8339aed9 100644 --- a/src/client/WFKafkaClient.cc +++ b/src/client/WFKafkaClient.cc @@ -77,6 +77,7 @@ public: enum TransportType transport_type; std::string scheme; std::vector broker_hosts; + SSL_CTX *ssl_ctx; KafkaCgroup cgroup; KafkaMetaList meta_list; KafkaBrokerMap broker_map; @@ -192,7 +193,7 @@ private: int dispatch_locked(); - inline KafkaBroker *get_broker(int node_id) + KafkaBroker *get_broker(int node_id) { return this->member->broker_map.find_item(node_id); } @@ -294,7 +295,7 @@ void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task) kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, coordinator->get_host(), coordinator->get_port(), - "", 0, + member->ssl_ctx, "", 0, kafka_heartbeat_callback); kafka_task->user_data = member; kafka_task->get_req()->set_api_type(Kafka_Heartbeat); @@ -327,7 +328,7 @@ void KafkaClientTask::kafka_rebalance_proc(KafkaMember *member, SeriesWork *seri task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, coordinator->get_host(), coordinator->get_port(), - "", 0, + member->ssl_ctx, "", 0, kafka_rebalance_callback); task->user_data = member; task->get_req()->set_config(member->config); @@ -392,7 +393,7 @@ void KafkaClientTask::kafka_timer_callback(WFTimerTask *task) kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, coordinator->get_host(), coordinator->get_port(), - "", 0, + member->ssl_ctx, "", 0, kafka_heartbeat_callback); kafka_task->user_data = member; @@ -496,69 +497,70 @@ void KafkaClientTask::kafka_meta_callback(__WFKafkaTask *task) void KafkaClientTask::kafka_cgroup_callback(__WFKafkaTask *task) { KafkaClientTask *t = (KafkaClientTask *)task->user_data; + KafkaMember *member = t->member; SeriesWork *heartbeat_series = NULL; void *msg = NULL; size_t max; - t->member->mutex.lock(); + member->mutex.lock(); t->state = task->get_state(); t->error = task->get_error(); t->kafka_error = *static_cast(task)->get_mutable_ctx(); if (t->state == WFT_STATE_SUCCESS) { - t->member->cgroup = std::move(*(task->get_resp()->get_cgroup())); + member->cgroup = std::move(*(task->get_resp()->get_cgroup())); - kafka_merge_meta_list(&t->member->meta_list, + kafka_merge_meta_list(&member->meta_list, task->get_resp()->get_meta_list()); t->meta_list.rewind(); KafkaMeta *meta; while ((meta = t->meta_list.get_next()) != NULL) - (t->member->meta_status)[meta->get_topic()] = true; + (member->meta_status)[meta->get_topic()] = true; - kafka_merge_broker_list(t->member->scheme, - &t->member->broker_hosts, - &t->member->broker_map, + kafka_merge_broker_list(member->scheme, + &member->broker_hosts, + &member->broker_map, task->get_resp()->get_broker_list()); - t->member->cgroup_status = KAFKA_CGROUP_DONE; + member->cgroup_status = KAFKA_CGROUP_DONE; - if (t->member->heartbeat_status == KAFKA_HEARTBEAT_UNINIT) + if (member->heartbeat_status == KAFKA_HEARTBEAT_UNINIT) { __WFKafkaTask *kafka_task; - KafkaBroker *coordinator = t->member->cgroup.get_coordinator(); - kafka_task = __WFKafkaTaskFactory::create_kafka_task(t->member->transport_type, + KafkaBroker *coordinator = member->cgroup.get_coordinator(); + kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, coordinator->get_host(), coordinator->get_port(), - "", 0, + member->ssl_ctx, "", 0, kafka_heartbeat_callback); - kafka_task->user_data = t->member; - t->member->incref(); + kafka_task->user_data = member; + member->incref(); - kafka_task->get_req()->set_config(t->member->config); + kafka_task->get_req()->set_config(member->config); kafka_task->get_req()->set_api_type(Kafka_Heartbeat); - kafka_task->get_req()->set_cgroup(t->member->cgroup); + kafka_task->get_req()->set_cgroup(member->cgroup); kafka_task->get_req()->set_broker(*coordinator); heartbeat_series = Workflow::create_series_work(kafka_task, nullptr); - t->member->heartbeat_status = KAFKA_HEARTBEAT_DOING; - t->member->heartbeat_series = heartbeat_series; + member->heartbeat_status = KAFKA_HEARTBEAT_DOING; + member->heartbeat_series = heartbeat_series; } } else { - t->member->cgroup_status = KAFKA_CGROUP_UNINIT; - t->member->heartbeat_status = KAFKA_HEARTBEAT_UNINIT; - t->member->heartbeat_series = NULL; + member->cgroup_status = KAFKA_CGROUP_UNINIT; + member->heartbeat_status = KAFKA_HEARTBEAT_UNINIT; + member->heartbeat_series = NULL; t->finish = true; msg = t; } - max = t->member->cgroup_wait_cnt; + max = member->cgroup_wait_cnt; char name[64]; - snprintf(name, 64, "%p.cgroup", t->member); - t->member->mutex.unlock(); + snprintf(name, 64, "%p.cgroup", member); + member->mutex.unlock(); WFTaskFactory::signal_by_name(name, msg, max); @@ -789,16 +791,17 @@ bool KafkaClientTask::compare_topics(KafkaClientTask *task) bool KafkaClientTask::check_cgroup() { - if (this->member->cgroup_outdated && - this->member->cgroup_status != KAFKA_CGROUP_DOING) + KafkaMember *member = this->member; + + if (member->cgroup_outdated && member->cgroup_status != KAFKA_CGROUP_DOING) { - this->member->cgroup_outdated = false; - this->member->cgroup_status = KAFKA_CGROUP_UNINIT; - this->member->heartbeat_series = NULL; - this->member->heartbeat_status = KAFKA_HEARTBEAT_UNINIT; + member->cgroup_outdated = false; + member->cgroup_status = KAFKA_CGROUP_UNINIT; + member->heartbeat_series = NULL; + member->heartbeat_status = KAFKA_HEARTBEAT_UNINIT; } - if (this->member->cgroup_status == KAFKA_CGROUP_DOING) + if (member->cgroup_status == KAFKA_CGROUP_DOING) { WFConditional *cond; char name[64]; @@ -806,27 +809,27 @@ bool KafkaClientTask::check_cgroup() this->wait_cgroup = true; cond = WFTaskFactory::create_conditional(name, this, &this->msg); series_of(this)->push_front(cond); - this->member->cgroup_wait_cnt++; + member->cgroup_wait_cnt++; return false; } if ((this->api_type == Kafka_Fetch || this->api_type == Kafka_OffsetCommit) && - (this->member->cgroup_status == KAFKA_CGROUP_UNINIT)) + (member->cgroup_status == KAFKA_CGROUP_UNINIT)) { __WFKafkaTask *task; - task = __WFKafkaTaskFactory::create_kafka_task(this->url, + task = __WFKafkaTaskFactory::create_kafka_task(this->url, member->ssl_ctx, this->retry_max, kafka_cgroup_callback); task->user_data = this; task->get_req()->set_config(this->config); task->get_req()->set_api_type(Kafka_FindCoordinator); - task->get_req()->set_cgroup(this->member->cgroup); - task->get_req()->set_meta_list(this->member->meta_list); + task->get_req()->set_cgroup(member->cgroup); + task->get_req()->set_meta_list(member->meta_list); series_of(this)->push_front(this); series_of(this)->push_front(task); - this->member->cgroup_status = KAFKA_CGROUP_DOING; - this->member->cgroup_wait_cnt = 0; + member->cgroup_status = KAFKA_CGROUP_DOING; + member->cgroup_wait_cnt = 0; return false; } @@ -835,12 +838,13 @@ bool KafkaClientTask::check_cgroup() bool KafkaClientTask::check_meta() { + KafkaMember *member = this->member; KafkaMetaList *uninit_meta_list; if (this->get_meta_status(&uninit_meta_list)) return true; - if (this->member->meta_doing) + if (member->meta_doing) { WFConditional *cond; char name[64]; @@ -848,13 +852,13 @@ bool KafkaClientTask::check_meta() this->wait_cgroup = false; cond = WFTaskFactory::create_conditional(name, this, &this->msg); series_of(this)->push_front(cond); - this->member->meta_wait_cnt++; + member->meta_wait_cnt++; } else { __WFKafkaTask *task; - task = __WFKafkaTaskFactory::create_kafka_task(this->url, + task = __WFKafkaTaskFactory::create_kafka_task(this->url, member->ssl_ctx, this->retry_max, kafka_meta_callback); task->user_data = this; @@ -863,8 +867,8 @@ bool KafkaClientTask::check_meta() task->get_req()->set_meta_list(*uninit_meta_list); series_of(this)->push_front(this); series_of(this)->push_front(task); - this->member->meta_wait_cnt = 0; - this->member->meta_doing = true; + member->meta_wait_cnt = 0; + member->meta_doing = true; } delete uninit_meta_list; @@ -921,6 +925,7 @@ int KafkaClientTask::dispatch_locked() task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, broker->get_host(), broker->get_port(), + member->ssl_ctx, this->get_userinfo(), this->retry_max, std::move(cb)); @@ -956,6 +961,7 @@ int KafkaClientTask::dispatch_locked() task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, broker->get_host(), broker->get_port(), + member->ssl_ctx, this->get_userinfo(), this->retry_max, std::move(cb)); @@ -991,6 +997,7 @@ int KafkaClientTask::dispatch_locked() task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, coordinator->get_host(), coordinator->get_port(), + member->ssl_ctx, this->get_userinfo(), this->retry_max, kafka_offsetcommit_callback); @@ -1020,6 +1027,7 @@ int KafkaClientTask::dispatch_locked() task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, coordinator->get_host(), coordinator->get_port(), + member->ssl_ctx, this->get_userinfo(), 0, kafka_leavegroup_callback); task->user_data = this; @@ -1051,6 +1059,7 @@ int KafkaClientTask::dispatch_locked() task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, broker->get_host(), broker->get_port(), + member->ssl_ctx, this->get_userinfo(), this->retry_max, std::move(cb)); @@ -1580,7 +1589,7 @@ SubTask *WFKafkaTask::done() return series->pop(); } -int WFKafkaClient::init(const std::string& broker) +int WFKafkaClient::init(const std::string& broker, SSL_CTX *ssl_ctx) { std::vector broker_hosts; std::string::size_type ppos = 0; @@ -1620,6 +1629,7 @@ int WFKafkaClient::init(const std::string& broker) this->member = new KafkaMember; this->member->broker_hosts = std::move(broker_hosts); + this->member->ssl_ctx = ssl_ctx; if (use_ssl) { this->member->transport_type = TT_TCP_SSL; @@ -1629,9 +1639,10 @@ int WFKafkaClient::init(const std::string& broker) return 0; } -int WFKafkaClient::init(const std::string& broker, const std::string& group) +int WFKafkaClient::init(const std::string& broker, const std::string& group, + SSL_CTX *ssl_ctx) { - if (this->init(broker) < 0) + if (this->init(broker, ssl_ctx) < 0) return -1; this->member->cgroup.set_group(group); @@ -1652,8 +1663,7 @@ WFKafkaTask *WFKafkaClient::create_kafka_task(const std::string& query, int retry_max, kafka_callback_t cb) { - WFKafkaTask *task = new KafkaClientTask(query, retry_max, std::move(cb), - this); + WFKafkaTask *task = new KafkaClientTask(query, retry_max, std::move(cb), this); return task; } diff --git a/src/client/WFKafkaClient.h b/src/client/WFKafkaClient.h index b7777f14..61ec25e3 100644 --- a/src/client/WFKafkaClient.h +++ b/src/client/WFKafkaClient.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "WFTask.h" #include "KafkaMessage.h" #include "KafkaResult.h" @@ -145,9 +146,22 @@ public: // example: kafka://kafka.sogou // example: kafka.sogou:9090 // example: kafka://10.160.23.23:9000,10.123.23.23,kafka://kafka.sogou - int init(const std::string& broker_url); + // example: kafkas://kafka.sogou -> kafka over TLS + int init(const std::string& broker_url) + { + return this->init(broker_url, NULL); + } - int init(const std::string& broker_url, const std::string& group); + int init(const std::string& broker_url, const std::string& group) + { + return this->init(broker_url, group, NULL); + } + + // With a specific SSL_CTX. Effective only on brokers over TLS. + int init(const std::string& broker_url, SSL_CTX *ssl_ctx); + + int init(const std::string& broker_url, const std::string& group, + SSL_CTX *ssl_ctx); int deinit(); diff --git a/src/factory/KafkaTaskImpl.cc b/src/factory/KafkaTaskImpl.cc index 09616e5c..587d189a 100644 --- a/src/factory/KafkaTaskImpl.cc +++ b/src/factory/KafkaTaskImpl.cc @@ -714,12 +714,14 @@ bool __ComplexKafkaTask::finish_once() /**********Factory**********/ // kafka://user:password:sasl@host:port/api=type&topic=name __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const std::string& url, + SSL_CTX *ssl_ctx, int retry_max, __kafka_callback_t callback) { auto *task = new __ComplexKafkaTask(retry_max, std::move(callback)); - ParsedURI uri; + task->set_ssl_ctx(ssl_ctx); + ParsedURI uri; URIParser::parse(url, uri); task->init(std::move(uri)); task->set_keep_alive(KAFKA_KEEPALIVE_DEFAULT); @@ -727,10 +729,12 @@ __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const std::string& url, } __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const ParsedURI& uri, + SSL_CTX *ssl_ctx, int retry_max, __kafka_callback_t callback) { auto *task = new __ComplexKafkaTask(retry_max, std::move(callback)); + task->set_ssl_ctx(ssl_ctx); task->init(uri); task->set_keep_alive(KAFKA_KEEPALIVE_DEFAULT); @@ -740,11 +744,13 @@ __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const ParsedURI& uri, __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(enum TransportType type, const char *host, unsigned short port, + SSL_CTX *ssl_ctx, const std::string& info, int retry_max, __kafka_callback_t callback) { auto *task = new __ComplexKafkaTask(retry_max, std::move(callback)); + task->set_ssl_ctx(ssl_ctx); std::string url = (type == TT_TCP_SSL ? "kafkas://" : "kafka://"); diff --git a/src/factory/KafkaTaskImpl.inl b/src/factory/KafkaTaskImpl.inl index 9b600444..83ec4c78 100644 --- a/src/factory/KafkaTaskImpl.inl +++ b/src/factory/KafkaTaskImpl.inl @@ -16,6 +16,7 @@ Authors: Wang Zhulei (wangzhulei@sogou-inc.com) */ +#include #include "WFTaskFactory.h" #include "KafkaMessage.h" @@ -32,16 +33,19 @@ public: * user task. */ static __WFKafkaTask *create_kafka_task(const ParsedURI& uri, + SSL_CTX *ssl_ctx, int retry_max, __kafka_callback_t callback); static __WFKafkaTask *create_kafka_task(const std::string& url, + SSL_CTX *ssl_ctx, int retry_max, __kafka_callback_t callback); static __WFKafkaTask *create_kafka_task(enum TransportType type, const char *host, unsigned short port, + SSL_CTX *ssl_ctx, const std::string& info, int retry_max, __kafka_callback_t callback);