Enable setting specific SSL_CTX for kafka client. (#1539)

This commit is contained in:
xiehan
2024-04-26 21:05:19 +08:00
committed by GitHub
parent 0e8b158975
commit df12798e84
4 changed files with 89 additions and 55 deletions

View File

@@ -77,6 +77,7 @@ public:
enum TransportType transport_type;
std::string scheme;
std::vector<std::string> broker_hosts;
SSL_CTX *ssl_ctx;
KafkaCgroup cgroup;
KafkaMetaList meta_list;
KafkaBrokerMap broker_map;
@@ -192,7 +193,7 @@ private:
int dispatch_locked();
inline KafkaBroker *get_broker(int node_id)
KafkaBroker *get_broker(int node_id)
{
return this->member->broker_map.find_item(node_id);
}
@@ -294,7 +295,7 @@ void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task)
kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
coordinator->get_host(),
coordinator->get_port(),
"", 0,
member->ssl_ctx, "", 0,
kafka_heartbeat_callback);
kafka_task->user_data = member;
kafka_task->get_req()->set_api_type(Kafka_Heartbeat);
@@ -327,7 +328,7 @@ void KafkaClientTask::kafka_rebalance_proc(KafkaMember *member, SeriesWork *seri
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
coordinator->get_host(),
coordinator->get_port(),
"", 0,
member->ssl_ctx, "", 0,
kafka_rebalance_callback);
task->user_data = member;
task->get_req()->set_config(member->config);
@@ -392,7 +393,7 @@ void KafkaClientTask::kafka_timer_callback(WFTimerTask *task)
kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
coordinator->get_host(),
coordinator->get_port(),
"", 0,
member->ssl_ctx, "", 0,
kafka_heartbeat_callback);
kafka_task->user_data = member;
@@ -496,69 +497,70 @@ void KafkaClientTask::kafka_meta_callback(__WFKafkaTask *task)
void KafkaClientTask::kafka_cgroup_callback(__WFKafkaTask *task)
{
KafkaClientTask *t = (KafkaClientTask *)task->user_data;
KafkaMember *member = t->member;
SeriesWork *heartbeat_series = NULL;
void *msg = NULL;
size_t max;
t->member->mutex.lock();
member->mutex.lock();
t->state = task->get_state();
t->error = task->get_error();
t->kafka_error = *static_cast<ComplexKafkaTask *>(task)->get_mutable_ctx();
if (t->state == WFT_STATE_SUCCESS)
{
t->member->cgroup = std::move(*(task->get_resp()->get_cgroup()));
member->cgroup = std::move(*(task->get_resp()->get_cgroup()));
kafka_merge_meta_list(&t->member->meta_list,
kafka_merge_meta_list(&member->meta_list,
task->get_resp()->get_meta_list());
t->meta_list.rewind();
KafkaMeta *meta;
while ((meta = t->meta_list.get_next()) != NULL)
(t->member->meta_status)[meta->get_topic()] = true;
(member->meta_status)[meta->get_topic()] = true;
kafka_merge_broker_list(t->member->scheme,
&t->member->broker_hosts,
&t->member->broker_map,
kafka_merge_broker_list(member->scheme,
&member->broker_hosts,
&member->broker_map,
task->get_resp()->get_broker_list());
t->member->cgroup_status = KAFKA_CGROUP_DONE;
member->cgroup_status = KAFKA_CGROUP_DONE;
if (t->member->heartbeat_status == KAFKA_HEARTBEAT_UNINIT)
if (member->heartbeat_status == KAFKA_HEARTBEAT_UNINIT)
{
__WFKafkaTask *kafka_task;
KafkaBroker *coordinator = t->member->cgroup.get_coordinator();
kafka_task = __WFKafkaTaskFactory::create_kafka_task(t->member->transport_type,
KafkaBroker *coordinator = member->cgroup.get_coordinator();
kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
coordinator->get_host(),
coordinator->get_port(),
"", 0,
member->ssl_ctx, "", 0,
kafka_heartbeat_callback);
kafka_task->user_data = t->member;
t->member->incref();
kafka_task->user_data = member;
member->incref();
kafka_task->get_req()->set_config(t->member->config);
kafka_task->get_req()->set_config(member->config);
kafka_task->get_req()->set_api_type(Kafka_Heartbeat);
kafka_task->get_req()->set_cgroup(t->member->cgroup);
kafka_task->get_req()->set_cgroup(member->cgroup);
kafka_task->get_req()->set_broker(*coordinator);
heartbeat_series = Workflow::create_series_work(kafka_task, nullptr);
t->member->heartbeat_status = KAFKA_HEARTBEAT_DOING;
t->member->heartbeat_series = heartbeat_series;
member->heartbeat_status = KAFKA_HEARTBEAT_DOING;
member->heartbeat_series = heartbeat_series;
}
}
else
{
t->member->cgroup_status = KAFKA_CGROUP_UNINIT;
t->member->heartbeat_status = KAFKA_HEARTBEAT_UNINIT;
t->member->heartbeat_series = NULL;
member->cgroup_status = KAFKA_CGROUP_UNINIT;
member->heartbeat_status = KAFKA_HEARTBEAT_UNINIT;
member->heartbeat_series = NULL;
t->finish = true;
msg = t;
}
max = t->member->cgroup_wait_cnt;
max = member->cgroup_wait_cnt;
char name[64];
snprintf(name, 64, "%p.cgroup", t->member);
t->member->mutex.unlock();
snprintf(name, 64, "%p.cgroup", member);
member->mutex.unlock();
WFTaskFactory::signal_by_name(name, msg, max);
@@ -789,16 +791,17 @@ bool KafkaClientTask::compare_topics(KafkaClientTask *task)
bool KafkaClientTask::check_cgroup()
{
if (this->member->cgroup_outdated &&
this->member->cgroup_status != KAFKA_CGROUP_DOING)
KafkaMember *member = this->member;
if (member->cgroup_outdated && member->cgroup_status != KAFKA_CGROUP_DOING)
{
this->member->cgroup_outdated = false;
this->member->cgroup_status = KAFKA_CGROUP_UNINIT;
this->member->heartbeat_series = NULL;
this->member->heartbeat_status = KAFKA_HEARTBEAT_UNINIT;
member->cgroup_outdated = false;
member->cgroup_status = KAFKA_CGROUP_UNINIT;
member->heartbeat_series = NULL;
member->heartbeat_status = KAFKA_HEARTBEAT_UNINIT;
}
if (this->member->cgroup_status == KAFKA_CGROUP_DOING)
if (member->cgroup_status == KAFKA_CGROUP_DOING)
{
WFConditional *cond;
char name[64];
@@ -806,27 +809,27 @@ bool KafkaClientTask::check_cgroup()
this->wait_cgroup = true;
cond = WFTaskFactory::create_conditional(name, this, &this->msg);
series_of(this)->push_front(cond);
this->member->cgroup_wait_cnt++;
member->cgroup_wait_cnt++;
return false;
}
if ((this->api_type == Kafka_Fetch || this->api_type == Kafka_OffsetCommit) &&
(this->member->cgroup_status == KAFKA_CGROUP_UNINIT))
(member->cgroup_status == KAFKA_CGROUP_UNINIT))
{
__WFKafkaTask *task;
task = __WFKafkaTaskFactory::create_kafka_task(this->url,
task = __WFKafkaTaskFactory::create_kafka_task(this->url, member->ssl_ctx,
this->retry_max,
kafka_cgroup_callback);
task->user_data = this;
task->get_req()->set_config(this->config);
task->get_req()->set_api_type(Kafka_FindCoordinator);
task->get_req()->set_cgroup(this->member->cgroup);
task->get_req()->set_meta_list(this->member->meta_list);
task->get_req()->set_cgroup(member->cgroup);
task->get_req()->set_meta_list(member->meta_list);
series_of(this)->push_front(this);
series_of(this)->push_front(task);
this->member->cgroup_status = KAFKA_CGROUP_DOING;
this->member->cgroup_wait_cnt = 0;
member->cgroup_status = KAFKA_CGROUP_DOING;
member->cgroup_wait_cnt = 0;
return false;
}
@@ -835,12 +838,13 @@ bool KafkaClientTask::check_cgroup()
bool KafkaClientTask::check_meta()
{
KafkaMember *member = this->member;
KafkaMetaList *uninit_meta_list;
if (this->get_meta_status(&uninit_meta_list))
return true;
if (this->member->meta_doing)
if (member->meta_doing)
{
WFConditional *cond;
char name[64];
@@ -848,13 +852,13 @@ bool KafkaClientTask::check_meta()
this->wait_cgroup = false;
cond = WFTaskFactory::create_conditional(name, this, &this->msg);
series_of(this)->push_front(cond);
this->member->meta_wait_cnt++;
member->meta_wait_cnt++;
}
else
{
__WFKafkaTask *task;
task = __WFKafkaTaskFactory::create_kafka_task(this->url,
task = __WFKafkaTaskFactory::create_kafka_task(this->url, member->ssl_ctx,
this->retry_max,
kafka_meta_callback);
task->user_data = this;
@@ -863,8 +867,8 @@ bool KafkaClientTask::check_meta()
task->get_req()->set_meta_list(*uninit_meta_list);
series_of(this)->push_front(this);
series_of(this)->push_front(task);
this->member->meta_wait_cnt = 0;
this->member->meta_doing = true;
member->meta_wait_cnt = 0;
member->meta_doing = true;
}
delete uninit_meta_list;
@@ -921,6 +925,7 @@ int KafkaClientTask::dispatch_locked()
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
broker->get_host(),
broker->get_port(),
member->ssl_ctx,
this->get_userinfo(),
this->retry_max,
std::move(cb));
@@ -956,6 +961,7 @@ int KafkaClientTask::dispatch_locked()
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
broker->get_host(),
broker->get_port(),
member->ssl_ctx,
this->get_userinfo(),
this->retry_max,
std::move(cb));
@@ -991,6 +997,7 @@ int KafkaClientTask::dispatch_locked()
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
coordinator->get_host(),
coordinator->get_port(),
member->ssl_ctx,
this->get_userinfo(),
this->retry_max,
kafka_offsetcommit_callback);
@@ -1020,6 +1027,7 @@ int KafkaClientTask::dispatch_locked()
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
coordinator->get_host(),
coordinator->get_port(),
member->ssl_ctx,
this->get_userinfo(), 0,
kafka_leavegroup_callback);
task->user_data = this;
@@ -1051,6 +1059,7 @@ int KafkaClientTask::dispatch_locked()
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
broker->get_host(),
broker->get_port(),
member->ssl_ctx,
this->get_userinfo(),
this->retry_max,
std::move(cb));
@@ -1580,7 +1589,7 @@ SubTask *WFKafkaTask::done()
return series->pop();
}
int WFKafkaClient::init(const std::string& broker)
int WFKafkaClient::init(const std::string& broker, SSL_CTX *ssl_ctx)
{
std::vector<std::string> broker_hosts;
std::string::size_type ppos = 0;
@@ -1620,6 +1629,7 @@ int WFKafkaClient::init(const std::string& broker)
this->member = new KafkaMember;
this->member->broker_hosts = std::move(broker_hosts);
this->member->ssl_ctx = ssl_ctx;
if (use_ssl)
{
this->member->transport_type = TT_TCP_SSL;
@@ -1629,9 +1639,10 @@ int WFKafkaClient::init(const std::string& broker)
return 0;
}
int WFKafkaClient::init(const std::string& broker, const std::string& group)
int WFKafkaClient::init(const std::string& broker, const std::string& group,
SSL_CTX *ssl_ctx)
{
if (this->init(broker) < 0)
if (this->init(broker, ssl_ctx) < 0)
return -1;
this->member->cgroup.set_group(group);
@@ -1652,8 +1663,7 @@ WFKafkaTask *WFKafkaClient::create_kafka_task(const std::string& query,
int retry_max,
kafka_callback_t cb)
{
WFKafkaTask *task = new KafkaClientTask(query, retry_max, std::move(cb),
this);
WFKafkaTask *task = new KafkaClientTask(query, retry_max, std::move(cb), this);
return task;
}

View File

@@ -22,6 +22,7 @@
#include <string>
#include <vector>
#include <functional>
#include <openssl/ssl.h>
#include "WFTask.h"
#include "KafkaMessage.h"
#include "KafkaResult.h"
@@ -145,9 +146,22 @@ public:
// example: kafka://kafka.sogou
// example: kafka.sogou:9090
// example: kafka://10.160.23.23:9000,10.123.23.23,kafka://kafka.sogou
int init(const std::string& broker_url);
// example: kafkas://kafka.sogou -> kafka over TLS
int init(const std::string& broker_url)
{
return this->init(broker_url, NULL);
}
int init(const std::string& broker_url, const std::string& group);
int init(const std::string& broker_url, const std::string& group)
{
return this->init(broker_url, group, NULL);
}
// With a specific SSL_CTX. Effective only on brokers over TLS.
int init(const std::string& broker_url, SSL_CTX *ssl_ctx);
int init(const std::string& broker_url, const std::string& group,
SSL_CTX *ssl_ctx);
int deinit();

View File

@@ -714,12 +714,14 @@ bool __ComplexKafkaTask::finish_once()
/**********Factory**********/
// kafka://user:password:sasl@host:port/api=type&topic=name
__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const std::string& url,
SSL_CTX *ssl_ctx,
int retry_max,
__kafka_callback_t callback)
{
auto *task = new __ComplexKafkaTask(retry_max, std::move(callback));
ParsedURI uri;
task->set_ssl_ctx(ssl_ctx);
ParsedURI uri;
URIParser::parse(url, uri);
task->init(std::move(uri));
task->set_keep_alive(KAFKA_KEEPALIVE_DEFAULT);
@@ -727,10 +729,12 @@ __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const std::string& url,
}
__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const ParsedURI& uri,
SSL_CTX *ssl_ctx,
int retry_max,
__kafka_callback_t callback)
{
auto *task = new __ComplexKafkaTask(retry_max, std::move(callback));
task->set_ssl_ctx(ssl_ctx);
task->init(uri);
task->set_keep_alive(KAFKA_KEEPALIVE_DEFAULT);
@@ -740,11 +744,13 @@ __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const ParsedURI& uri,
__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(enum TransportType type,
const char *host,
unsigned short port,
SSL_CTX *ssl_ctx,
const std::string& info,
int retry_max,
__kafka_callback_t callback)
{
auto *task = new __ComplexKafkaTask(retry_max, std::move(callback));
task->set_ssl_ctx(ssl_ctx);
std::string url = (type == TT_TCP_SSL ? "kafkas://" : "kafka://");

View File

@@ -16,6 +16,7 @@
Authors: Wang Zhulei (wangzhulei@sogou-inc.com)
*/
#include <openssl/ssl.h>
#include "WFTaskFactory.h"
#include "KafkaMessage.h"
@@ -32,16 +33,19 @@ public:
* user task.
*/
static __WFKafkaTask *create_kafka_task(const ParsedURI& uri,
SSL_CTX *ssl_ctx,
int retry_max,
__kafka_callback_t callback);
static __WFKafkaTask *create_kafka_task(const std::string& url,
SSL_CTX *ssl_ctx,
int retry_max,
__kafka_callback_t callback);
static __WFKafkaTask *create_kafka_task(enum TransportType type,
const char *host,
unsigned short port,
SSL_CTX *ssl_ctx,
const std::string& info,
int retry_max,
__kafka_callback_t callback);