Merge pull request #1379 from Barenboim/master

Support Kafka over TLS
This commit is contained in:
xiehan
2023-09-25 20:40:53 +08:00
committed by GitHub
11 changed files with 239 additions and 414 deletions

View File

@@ -29,8 +29,9 @@ The program exists automatically after all the tasks are completed, and all the
In the command, the broker_url may contain several urls seperated by comma(,).
- For instance, kafka://host:port,kafka://host1:port...
- The default value is 9092;
- For instance, kafka://host:port,kafka://host1:port... or: **kafkas**://host:port,**kafkas**://host1:port for kafka over SSL;
- The default port is 9092 for TCP and 9093 for SSL;
- Do not mix 'kafkas://' with "kafka://", otherwise the init function will fail with errno EINVAL;
- If you want to use upstream policy at this layer, please refer to [upstream documents](/docs/en/about-upstream.md).
The following are several Kafka broker_url samples:
@@ -41,6 +42,12 @@ kafka://kafka.host:9090/
kafka://10.160.23.23:9000,10.123.23.23,kafka://kafka.sogou
kafkas://broker1.kafka.sogou,kafkas://broker2.kafka.sogou
Illegal broker_url sample (The first one is SSL, and the second one is not):
kafkas://broker1.kafka.sogou,broker2.kafka.sogou
# Principles and Features
Kafka client has no third-party dependencies internally except for the libraries used in the compression. With the high performance of Workflow, When properly configured and in fair environments, tens of thousands of Kafka requests can be processed in one second.

View File

@@ -28,8 +28,10 @@ Bazel执行bazel build kafka 编译支持kafka协议的类库执行bazel b
其中broker_url可以有多个url组成多个url之间以,分割
- 形式如kafka://host:port,kafka://host1:port...
- port默认为9092;
- 形式如kafka://host:port,kafka://host1:port... 或:**kafkas**://host:port,**kafkas**://host1:port代表使用SSL通信。
- port默认值在普通TCP连接下是9092SSL下为9093。
- "kafka://"前缀可以缺省。这时候使用默认使用TCL通信。
- 多个url必须都采用TCP或都采用SSL。否则init函数返回-1错误码为EINVAL。
- 如果用户在这一层有upstream选取需求可以参考[upstream文档](../docs/about-upstream.md)。
Kafka broker_url示例
@@ -40,6 +42,12 @@ kafka://kafka.host:9090/
kafka://10.160.23.23:9000,10.123.23.23,kafka://kafka.sogou
kafkas://broker1.kafka.sogou,kafkas://broker2.kafka.sogou
错误的url示例第一个broker为SSL第二个broker非SSL
kafkas://broker1.kafka.sogou,broker2.kafka.sogou
# 实现原理和特性
kafka client内部实现上除了压缩功能外没有依赖第三方库同时利用了workflow的高性能在合理的配置和环境下每秒钟可以处理几万次Kafka请求。

View File

@@ -52,14 +52,15 @@ using ComplexKafkaTask = WFComplexClientTask<KafkaRequest, KafkaResponse,
class KafkaMember
{
public:
KafkaMember() : ref(1)
KafkaMember() : scheme("kafka://"), ref(1)
{
cgroup_status = KAFKA_CGROUP_NONE;
heartbeat_status = KAFKA_HEARTBEAT_UNINIT;
meta_doing = false;
cgroup_outdated = false;
client_deinit = false;
heartbeat_series = NULL;
this->transport_type = TT_TCP;
this->cgroup_status = KAFKA_CGROUP_NONE;
this->heartbeat_status = KAFKA_HEARTBEAT_UNINIT;
this->meta_doing = false;
this->cgroup_outdated = false;
this->client_deinit = false;
this->heartbeat_series = NULL;
}
void incref()
@@ -73,6 +74,8 @@ public:
delete this;
}
TransportType transport_type;
std::string scheme;
std::vector<std::string> broker_hosts;
KafkaCgroup cgroup;
KafkaMetaList meta_list;
@@ -146,7 +149,8 @@ private:
static void kafka_merge_meta_list(KafkaMetaList *dst,
KafkaMetaList *src);
static void kafka_merge_broker_list(std::vector<std::string> *hosts,
static void kafka_merge_broker_list(const std::string& scheme,
std::vector<std::string> *hosts,
KafkaBrokerMap *dst,
KafkaBrokerList *src);
@@ -287,15 +291,10 @@ void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task)
{
__WFKafkaTask *kafka_task;
KafkaBroker *coordinator = member->cgroup.get_coordinator();
const struct sockaddr *addr;
socklen_t socklen;
coordinator->get_broker_addr(&addr, &socklen);
kafka_task = __WFKafkaTaskFactory::create_kafka_task(addr,
socklen,
"",
0,
kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
coordinator->get_host(),
coordinator->get_port(),
"", 0,
kafka_heartbeat_callback);
kafka_task->user_data = member;
kafka_task->get_req()->set_api_type(Kafka_Heartbeat);
@@ -321,13 +320,11 @@ void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task)
void KafkaClientTask::kafka_rebalance_proc(KafkaMember *member, SeriesWork *series)
{
KafkaBroker *coordinator = member->cgroup.get_coordinator();
const struct sockaddr *addr;
socklen_t socklen;
coordinator->get_broker_addr(&addr, &socklen);
__WFKafkaTask *task;
task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen, "", 0,
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
coordinator->get_host(),
coordinator->get_port(),
"", 0,
kafka_rebalance_callback);
task->user_data = member;
task->get_req()->set_config(member->config);
@@ -394,12 +391,10 @@ void KafkaClientTask::kafka_timer_callback(WFTimerTask *task)
__WFKafkaTask *kafka_task;
KafkaBroker *coordinator = member->cgroup.get_coordinator();
const struct sockaddr *addr;
socklen_t socklen;
coordinator->get_broker_addr(&addr, &socklen);
kafka_task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen, "", 0,
kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
coordinator->get_host(),
coordinator->get_port(),
"", 0,
kafka_heartbeat_callback);
kafka_task->user_data = member;
@@ -436,7 +431,8 @@ void KafkaClientTask::kafka_merge_meta_list(KafkaMetaList *dst,
}
}
void KafkaClientTask::kafka_merge_broker_list(std::vector<std::string> *hosts,
void KafkaClientTask::kafka_merge_broker_list(const std::string& scheme,
std::vector<std::string> *hosts,
KafkaBrokerMap *dst,
KafkaBrokerList *src)
{
@@ -445,9 +441,8 @@ void KafkaClientTask::kafka_merge_broker_list(std::vector<std::string> *hosts,
KafkaBroker *src_broker;
while ((src_broker = src->get_next()) != NULL)
{
std::string host = "kafka://";
host = host + src_broker->get_host() + ":" +
std::to_string(src_broker->get_port());
std::string host = scheme + src_broker->get_host() + ":" +
std::to_string(src_broker->get_port());
hosts->emplace_back(std::move(host));
if (!dst->find_item(src_broker->get_node_id()))
@@ -475,7 +470,8 @@ void KafkaClientTask::kafka_meta_callback(__WFKafkaTask *task)
while ((meta = t->meta_list.get_next()) != NULL)
(t->member->meta_status)[meta->get_topic()] = true;
kafka_merge_broker_list(&t->member->broker_hosts,
kafka_merge_broker_list(t->member->scheme,
&t->member->broker_hosts,
&t->member->broker_map,
task->get_resp()->get_broker_list());
}
@@ -523,7 +519,8 @@ void KafkaClientTask::kafka_cgroup_callback(__WFKafkaTask *task)
while ((meta = t->meta_list.get_next()) != NULL)
(t->member->meta_status)[meta->get_topic()] = true;
kafka_merge_broker_list(&t->member->broker_hosts,
kafka_merge_broker_list(t->member->scheme,
&t->member->broker_hosts,
&t->member->broker_map,
task->get_resp()->get_broker_list());
@@ -533,12 +530,9 @@ void KafkaClientTask::kafka_cgroup_callback(__WFKafkaTask *task)
{
__WFKafkaTask *kafka_task;
KafkaBroker *coordinator = t->member->cgroup.get_coordinator();
const struct sockaddr *addr;
socklen_t socklen;
coordinator->get_broker_addr(&addr, &socklen);
kafka_task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen,
kafka_task = __WFKafkaTaskFactory::create_kafka_task(t->member->transport_type,
coordinator->get_host(),
coordinator->get_port(),
"", 0,
kafka_heartbeat_callback);
kafka_task->user_data = t->member;
@@ -581,14 +575,14 @@ void KafkaClientTask::kafka_parallel_callback(const ParallelWork *pwork)
t->state = WFT_STATE_TASK_ERROR;
t->error = 0;
std::pair<int, int> *state_error;
std::pair<int32_t, int32_t> *state_error;
bool flag = false;
int state = WFT_STATE_SUCCESS;
int error = 0;
int16_t state = WFT_STATE_SUCCESS;
int16_t error = 0;
int kafka_error = 0;
for (size_t i = 0; i < pwork->size(); i++)
{
state_error = (std::pair<int, int> *)pwork->series_at(i)->get_context();
state_error = (std::pair<int32_t, int32_t> *)pwork->series_at(i)->get_context();
if ((state_error->first >> 16) != WFT_STATE_SUCCESS)
{
if (!flag)
@@ -642,9 +636,20 @@ void KafkaClientTask::kafka_process_toppar_offset(KafkaToppar *task_toppar)
void KafkaClientTask::kafka_move_task_callback(__WFKafkaTask *task)
{
std::pair<int, int> *state_error = new std::pair<int, int>;
auto *state_error = new std::pair<int32_t, int32_t>;
int16_t state = task->get_state();
int16_t error = task->get_error();
state_error->first = (task->get_state() << 16) + task->get_error();
/* This function is called before WFClientTask::done. Need to transfer
the state and error. */
if (state == WFT_STATE_SYS_ERROR && error < 0)
{
state = WFT_STATE_SSL_ERROR;
error = -error;
}
/* 'state' is always positive. */
state_error->first = (state << 16) | error;
state_error->second = static_cast<ComplexKafkaTask *>(task)->get_mutable_ctx()->kafka_error;
series_of(task)->set_context(state_error);
@@ -674,27 +679,29 @@ void KafkaClientTask::generate_info()
if (this->config.get_sasl_mech())
{
this->userinfo = this->config.get_sasl_username();
const char *username = this->config.get_sasl_username();
const char *password = this->config.get_sasl_password();
this->userinfo.clear();
if (username)
this->userinfo += StringUtil::url_encode_component(username);
this->userinfo += ":";
this->userinfo +=
StringUtil::url_encode_component(this->config.get_sasl_password());
if (password)
this->userinfo += StringUtil::url_encode_component(password);
this->userinfo += ":";
this->userinfo += this->config.get_sasl_mech();
this->userinfo += ":";
this->userinfo += std::to_string((intptr_t)this->member);
this->userinfo += "@";
this->url = "kafka://" + this->userinfo +
this->url.substr(this->url.find("kafka://") + 8);
}
else
{
char buf[64];
snprintf(buf, sizeof(buf), "user:pass:sasl:%p@", this->member);
snprintf(buf, 64, "user:pass:sasl:%p", this->member);
this->userinfo = buf;
this->url = "kafka://" + this->userinfo +
this->url.substr(this->url.find("kafka://") + 8);
}
const char *hostport = this->url.c_str() + this->member->scheme.size();
this->url = this->member->scheme + this->userinfo + "@" + hostport;
this->info_generated = true;
}
@@ -876,15 +883,17 @@ bool KafkaClientTask::check_meta()
int KafkaClientTask::dispatch_locked()
{
KafkaMember *member = this->member;
KafkaBroker *coordinator;
__WFKafkaTask *task;
ParallelWork *parallel;
SeriesWork *series;
if (this->check_cgroup() == false)
return this->member->cgroup_wait_cnt > 0;
return member->cgroup_wait_cnt > 0;
if (this->check_meta() == false)
return this->member->meta_wait_cnt > 0;
return member->meta_wait_cnt > 0;
if (arrange_toppar(this->api_type) < 0)
{
@@ -919,26 +928,12 @@ int KafkaClientTask::dispatch_locked()
auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this,
std::placeholders::_1);
KafkaBroker *broker = get_broker(v.first);
if (broker->is_to_addr())
{
const struct sockaddr *addr;
socklen_t socklen;
broker->get_broker_addr(&addr, &socklen);
task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen,
this->get_userinfo(),
this->retry_max,
nullptr);
}
else
{
task = __WFKafkaTaskFactory::create_kafka_task(broker->get_host(),
broker->get_port(),
this->get_userinfo(),
this->retry_max,
nullptr);
}
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
broker->get_host(),
broker->get_port(),
this->get_userinfo(),
this->retry_max,
nullptr);
task->get_req()->set_config(this->config);
task->get_req()->set_toppar_list(v.second);
task->get_req()->set_broker(*broker);
@@ -970,25 +965,12 @@ int KafkaClientTask::dispatch_locked()
auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this,
std::placeholders::_1);
KafkaBroker *broker = get_broker(v.first);
if (broker->is_to_addr())
{
const struct sockaddr *addr;
socklen_t socklen;
broker->get_broker_addr(&addr, &socklen);
task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen,
this->get_userinfo(),
this->retry_max,
nullptr);
}
else
{
task = __WFKafkaTaskFactory::create_kafka_task(broker->get_host(),
broker->get_port(),
this->get_userinfo(),
this->retry_max,
nullptr);
}
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
broker->get_host(),
broker->get_port(),
this->get_userinfo(),
this->retry_max,
nullptr);
task->get_req()->set_config(this->config);
task->get_req()->set_toppar_list(v.second);
@@ -1010,69 +992,57 @@ int KafkaClientTask::dispatch_locked()
break;
case Kafka_OffsetCommit:
if (!this->member->cgroup.get_group())
if (!member->cgroup.get_group())
{
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_KAFKA_COMMIT_FAILED;
this->finish = true;
break;
}
else
{
this->result.create(1);
KafkaBroker *coordinator = this->member->cgroup.get_coordinator();
const struct sockaddr *addr;
socklen_t socklen;
coordinator->get_broker_addr(&addr, &socklen);
task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen,
this->get_userinfo(),
this->retry_max,
kafka_offsetcommit_callback);
task->user_data = this;
task->get_req()->set_config(this->config);
task->get_req()->set_cgroup(this->member->cgroup);
task->get_req()->set_broker(*coordinator);
task->get_req()->set_toppar_list(this->toppar_list);
task->get_req()->set_api_type(this->api_type);
series_of(this)->push_front(this);
series_of(this)->push_front(task);
break;
}
this->result.create(1);
coordinator = member->cgroup.get_coordinator();
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
coordinator->get_host(),
coordinator->get_port(),
this->get_userinfo(),
this->retry_max,
kafka_offsetcommit_callback);
task->user_data = this;
task->get_req()->set_config(this->config);
task->get_req()->set_cgroup(member->cgroup);
task->get_req()->set_broker(*coordinator);
task->get_req()->set_toppar_list(this->toppar_list);
task->get_req()->set_api_type(this->api_type);
series_of(this)->push_front(this);
series_of(this)->push_front(task);
break;
case Kafka_LeaveGroup:
if (!this->member->cgroup.get_group())
if (!member->cgroup.get_group())
{
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_KAFKA_LEAVEGROUP_FAILED;
this->finish = true;
break;
}
else
{
KafkaBroker *coordinator = this->member->cgroup.get_coordinator();
const struct sockaddr *addr;
socklen_t socklen;
coordinator->get_broker_addr(&addr, &socklen);
coordinator = member->cgroup.get_coordinator();
if (!coordinator->get_host())
break;
if (coordinator->is_to_addr())
{
task = __WFKafkaTaskFactory::create_kafka_task(addr,
socklen,
this->get_userinfo(),
0,
kafka_leavegroup_callback);
task->user_data = this;
task->get_req()->set_config(this->config);
task->get_req()->set_api_type(Kafka_LeaveGroup);
task->get_req()->set_broker(*coordinator);
task->get_req()->set_cgroup(this->member->cgroup);
series_of(this)->push_front(this);
series_of(this)->push_front(task);
}
}
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
coordinator->get_host(),
coordinator->get_port(),
this->get_userinfo(), 0,
kafka_leavegroup_callback);
task->user_data = this;
task->get_req()->set_config(this->config);
task->get_req()->set_api_type(Kafka_LeaveGroup);
task->get_req()->set_broker(*coordinator);
task->get_req()->set_cgroup(member->cgroup);
series_of(this)->push_front(this);
series_of(this)->push_front(task);
break;
case Kafka_ListOffsets:
@@ -1092,26 +1062,12 @@ int KafkaClientTask::dispatch_locked()
auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this,
std::placeholders::_1);
KafkaBroker *broker = get_broker(v.first);
if (broker->is_to_addr())
{
const struct sockaddr *addr;
socklen_t socklen;
broker->get_broker_addr(&addr, &socklen);
task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen,
this->get_userinfo(),
this->retry_max,
nullptr);
}
else
{
task = __WFKafkaTaskFactory::create_kafka_task(broker->get_host(),
broker->get_port(),
this->get_userinfo(),
this->retry_max,
nullptr);
}
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
broker->get_host(),
broker->get_port(),
this->get_userinfo(),
this->retry_max,
nullptr);
task->get_req()->set_config(this->config);
task->get_req()->set_toppar_list(v.second);
task->get_req()->set_broker(*broker);
@@ -1635,43 +1591,57 @@ SubTask *WFKafkaTask::done()
int WFKafkaClient::init(const std::string& broker)
{
std::vector<std::string> broker_hosts;
std::string::size_type pos = broker.find(',');
std::string::size_type ppos = 0;
std::string::size_type pos;
bool use_ssl;
if (pos == std::string::npos)
use_ssl = (strncasecmp(broker.c_str(), "kafkas://", 9) == 0);
while (1)
{
std::string host = broker;
if (strncasecmp(host.c_str(), "kafka://", 8) != 0)
host = "kafka://" + host;
broker_hosts.emplace_back(host);
}
else
{
do
{
std::string host = broker.substr(ppos, pos - ppos);
if (strncasecmp(host.c_str(), "kafka://", 8) != 0)
host = "kafka://" + host;
broker_hosts.emplace_back(host);
ppos = pos + 1;
pos = broker.find(',', ppos);
} while (pos != std::string::npos);
pos = broker.find(',', ppos);
std::string host = broker.substr(ppos, pos - ppos);
if (strncasecmp(host.c_str(), "kafka://", 8) != 0)
if (use_ssl)
{
if (strncasecmp(host.c_str(), "kafkas://", 9) != 0)
{
errno = EINVAL;
return -1;
}
}
else if (strncasecmp(host.c_str(), "kafka://", 8) != 0)
{
if (strncasecmp(host.c_str(), "kafkas://", 9) == 0)
{
errno = EINVAL;
return -1;
}
host = "kafka://" + host;
}
broker_hosts.emplace_back(host);
if (pos == std::string::npos)
break;
ppos = pos + 1;
}
this->member = new KafkaMember;
this->member->broker_hosts = std::move(broker_hosts);
if (use_ssl)
{
this->member->transport_type = TT_TCP_SSL;
this->member->scheme = "kafkas://";
}
return 0;
}
int WFKafkaClient::init(const std::string& broker, const std::string& group)
{
this->init(broker);
if (this->init(broker) < 0)
return -1;
this->member->cgroup.set_group(group);
this->member->cgroup_status = KAFKA_CGROUP_UNINIT;
return 0;

View File

@@ -14,6 +14,7 @@
limitations under the License.
Authors: Wang Zhulei (wangzhulei@sogou-inc.com)
Xie Han (xiehan@sogou-inc.com)
*/
#include <assert.h>
@@ -117,7 +118,6 @@ private:
virtual int keep_alive_timeout();
virtual int first_timeout();
bool has_next();
bool check_redirect();
bool process_produce();
bool process_fetch();
bool process_metadata();
@@ -310,19 +310,17 @@ CommMessageIn *__ComplexKafkaTask::message_in()
bool __ComplexKafkaTask::init_success()
{
TransportType type = TT_TCP;
if (uri_.scheme)
TransportType type;
if (uri_.scheme && strcasecmp(uri_.scheme, "kafka") == 0)
type = TT_TCP;
else if (uri_.scheme && strcasecmp(uri_.scheme, "kafkas") == 0)
type = TT_TCP_SSL;
else
{
if (strcasecmp(uri_.scheme, "kafka") == 0)
type = TT_TCP;
//else if (uri_.scheme && strcasecmp(uri_.scheme, "kafkas") == 0)
// type = TT_TCP_SSL;
else
{
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_URI_SCHEME_INVALID;
return false;
}
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_URI_SCHEME_INVALID;
return false;
}
std::string username, password, sasl, client;
@@ -367,7 +365,6 @@ bool __ComplexKafkaTask::init_success()
}
this->WFComplexClientTask::set_transport_type(type);
return true;
}
@@ -409,47 +406,6 @@ int __ComplexKafkaTask::first_timeout()
return ret + KAFKA_ROUNDTRIP_TIMEOUT;
}
bool __ComplexKafkaTask::check_redirect()
{
struct sockaddr_storage addr;
socklen_t addrlen = sizeof addr;
const struct sockaddr *paddr = (const struct sockaddr *)&addr;
KafkaBroker *coordinator = this->get_req()->get_cgroup()->get_coordinator();
//always success
this->get_peer_addr((struct sockaddr *)&addr, &addrlen);
if (!coordinator->is_equal(paddr, addrlen))
{
if (coordinator->is_to_addr())
{
const struct sockaddr *addr_coord;
socklen_t addrlen_coord;
coordinator->get_broker_addr(&addr_coord, &addrlen_coord);
set_redirect(TT_TCP, addr_coord, addrlen_coord,
this->WFComplexClientTask::info_);
}
else
{
std::string url = "kafka://";
url += user_info_ + "@";
url += coordinator->get_host();
url += ":" + std::to_string(coordinator->get_port());
ParsedURI uri;
URIParser::parse(url, uri);
set_redirect(std::move(uri));
}
return true;
}
else
{
this->init(TT_TCP, paddr, addrlen, this->WFComplexClientTask::info_);
return false;
}
}
bool __ComplexKafkaTask::process_find_coordinator()
{
KafkaCgroup *cgroup = this->get_resp()->get_cgroup();
@@ -463,8 +419,18 @@ bool __ComplexKafkaTask::process_find_coordinator()
else
{
this->get_req()->set_cgroup(*cgroup);
is_redirect_ = check_redirect();
KafkaBroker *coordinator = cgroup->get_coordinator();
std::string url(uri_.scheme);
url += "://";
url += user_info_ + "@";
url += coordinator->get_host();
url += ":" + std::to_string(coordinator->get_port());
ParsedURI uri;
URIParser::parse(url, uri);
set_redirect(std::move(uri));
this->get_req()->set_api_type(Kafka_JoinGroup);
is_redirect_ = true;
return true;
}
}
@@ -472,16 +438,6 @@ bool __ComplexKafkaTask::process_find_coordinator()
bool __ComplexKafkaTask::process_join_group()
{
KafkaResponse *msg = this->get_resp();
if (!msg->get_cgroup()->get_coordinator()->is_to_addr())
{
struct sockaddr_storage addr;
socklen_t addrlen = sizeof addr;
const struct sockaddr *paddr = (const struct sockaddr *)&addr;
this->get_peer_addr((struct sockaddr *)&addr, &addrlen);
msg->get_cgroup()->get_coordinator()->set_broker_addr(paddr, addrlen);
msg->get_cgroup()->get_coordinator()->set_to_addr(1);
}
switch(msg->get_cgroup()->get_error())
{
case KAFKA_MEMBER_ID_REQUIRED:
@@ -681,18 +637,6 @@ bool __ComplexKafkaTask::process_sasl_authenticate()
bool __ComplexKafkaTask::has_next()
{
struct sockaddr_storage addr;
socklen_t addrlen = sizeof addr;
//always success
this->get_peer_addr((struct sockaddr *)&addr, &addrlen);
const struct sockaddr *paddr = (const struct sockaddr *)&addr;
if (!this->get_resp()->get_broker()->is_to_addr())
{
this->get_resp()->get_broker()->set_broker_addr(paddr, addrlen);
this->get_resp()->get_broker()->set_to_addr(1);
}
switch (this->get_resp()->get_api_type())
{
case Kafka_Produce:
@@ -809,20 +753,8 @@ __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const ParsedURI& uri,
return task;
}
__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const struct sockaddr *addr,
socklen_t addrlen,
const std::string& info,
int retry_max,
__kafka_callback_t callback)
{
auto *task = new __ComplexKafkaTask(retry_max, std::move(callback));
task->init(TT_TCP, addr, addrlen, info);
task->set_keep_alive(KAFKA_KEEPALIVE_DEFAULT);
return task;
}
__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const char *host,
__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(TransportType type,
const char *host,
unsigned short port,
const std::string& info,
int retry_max,
@@ -830,10 +762,10 @@ __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const char *host,
{
auto *task = new __ComplexKafkaTask(retry_max, std::move(callback));
std::string url = "kafka://";
std::string url = (type == TT_TCP_SSL ? "kafkas://" : "kafka://");
if (!info.empty())
url += info;
url += info + "@";
url += host;
url += ":" + std::to_string(port);

View File

@@ -40,13 +40,8 @@ public:
int retry_max,
__kafka_callback_t callback);
static __WFKafkaTask *create_kafka_task(const struct sockaddr *addr,
socklen_t addrlen,
const std::string& info,
int retry_max,
__kafka_callback_t callback);
static __WFKafkaTask *create_kafka_task(const char *host,
static __WFKafkaTask *create_kafka_task(TransportType type,
const char *host,
unsigned short port,
const std::string& info,
int retry_max,

View File

@@ -154,6 +154,11 @@ __WFGlobal::__WFGlobal()
static_scheme_port_["Kafka"] = "9092";
static_scheme_port_["KAFKA"] = "9092";
static_scheme_port_["kafkas"] = "9093";
static_scheme_port_["Kafkas"] = "9093";
static_scheme_port_["KAFKAs"] = "9093";
static_scheme_port_["KAFKAS"] = "9093";
sync_count_ = 0;
sync_max_ = 0;
}

View File

@@ -1016,14 +1016,12 @@ public:
return this->ptr->port;
}
std::string get_uri() const
std::string get_host_port() const
{
std::string uri = "kafka://";
uri += this->ptr->host;
uri += ":";
uri += std::to_string(this->ptr->port);
return uri;
std::string host_port(this->ptr->host);
host_port += ":";
host_port += std::to_string(this->ptr->port);
return host_port;
}
int get_error()
@@ -1103,12 +1101,12 @@ public:
bool operator< (const KafkaBroker& broker) const
{
return this->get_uri() < broker.get_uri();
return this->get_host_port() < broker.get_host_port();
}
bool operator> (const KafkaBroker& broker) const
{
return this->get_uri() > broker.get_uri();
return this->get_host_port() > broker.get_host_port();
}
kafka_broker_t *get_raw_ptr() const { return this->ptr; }
@@ -1117,54 +1115,6 @@ public:
struct rb_node *get_rb() { return &this->rb; }
bool is_equal(const struct sockaddr *addr, socklen_t socklen) const
{
if (this->ptr->addrlen == socklen && socklen)
return memcmp(addr, &this->ptr->addr, this->ptr->addrlen) == 0;
return false;
}
bool is_equal(const char *host, int port) const
{
if (port == this->ptr->port)
return strcmp(host, this->ptr->host) == 0;
return false;
}
bool is_equal(int node_id) const
{
return this->ptr->node_id == node_id;
}
bool is_equal(const KafkaBroker& broker) const
{
return is_equal(broker.ptr->host, broker.ptr->port);
}
void get_broker_addr(const struct sockaddr **addr, socklen_t *socklen) const
{
if (this->ptr->addrlen)
{
*addr = (const struct sockaddr *)&this->ptr->addr;
*socklen = this->ptr->addrlen;
}
else
{
*addr = NULL;
*socklen = 0;
}
}
void set_broker_addr(const struct sockaddr *addr, socklen_t socklen)
{
memcpy(&this->ptr->addr, addr, socklen);
this->ptr->addrlen = socklen;
}
bool is_to_addr() const { return this->ptr->to_addr == 1; }
void set_to_addr(int to_addr) { this->ptr->to_addr = to_addr; }
int get_node_id() const { return this->ptr->node_id; }
int get_id () const { return this->ptr->node_id; }
@@ -1394,14 +1344,6 @@ public:
return strcmp(this->ptr->leader_id, this->ptr->member_id) == 0;
}
bool is_equal_coordinator(const struct sockaddr *addr, socklen_t addrlen) const
{
if (addrlen == this->ptr->coordinator.addrlen)
return memcmp(addr, &this->ptr->coordinator.addr, addrlen) == 0;
else
return false;
}
struct list_head *get_group_protocol()
{
return &this->ptr->group_protocol_list;

View File

@@ -1562,44 +1562,6 @@ int KafkaMessage::parse_records(void **buf, size_t *size, bool check_crcs,
return 0;
}
static bool __to_addr(const char *host, int port, struct sockaddr *sockaddr,
socklen_t *addrlen)
{
size_t len = strlen(host);
struct sockaddr_in *addr;
struct sockaddr_in6 *addr6;
bool ret = true;
if (!host)
ret = false;
else if (isdigit(host[0]) && isdigit(host[len - 1]))
{
addr = (struct sockaddr_in *)sockaddr;
if (inet_pton(AF_INET, host, &addr->sin_addr) == 1)
{
addr->sin_family = AF_INET;
*addrlen = sizeof(struct sockaddr_in);
addr->sin_port = htons(port);
}
else
ret = false;
}
else
{
addr6 = (struct sockaddr_in6 *)sockaddr;
if (inet_pton(AF_INET6, host, &addr6->sin6_addr) == 1)
{
addr6->sin6_family = AF_INET6;
*addrlen = sizeof(struct sockaddr_in6);
addr6->sin6_port = htons(port);
}
else
ret = false;
}
return ret;
}
KafkaMessage::KafkaMessage()
{
static struct Crc32cInitializer
@@ -2867,11 +2829,6 @@ static int kafka_meta_parse_broker(void **buf, size_t *size,
if (api_version >= 1)
CHECK_RET(parse_string(buf, size, &ptr->rack));
if (__to_addr(ptr->host, ptr->port, (struct sockaddr *)&ptr->addr, &ptr->addrlen))
{
ptr->to_addr = 1;
}
broker_list->rewind();
KafkaBroker *last;
@@ -3364,12 +3321,6 @@ int KafkaResponse::parse_findcoordinator(void **buf, size_t *size)
CHECK_RET(parse_string(buf, size, &cgroup->coordinator.host));
CHECK_RET(parse_i32(buf, size, &cgroup->coordinator.port));
if (__to_addr(cgroup->coordinator.host, cgroup->coordinator.port,
(struct sockaddr *)&cgroup->coordinator.addr, &cgroup->coordinator.addrlen))
{
cgroup->coordinator.to_addr = 1;
}
return 0;
}

View File

@@ -397,9 +397,6 @@ void kafka_broker_init(kafka_broker_t *broker)
broker->port = 0;
broker->host = NULL;
broker->rack = NULL;
broker->to_addr = 0;
memset(&broker->addr, 0, sizeof(broker->addr));
broker->addrlen = 0;
broker->error = 0;
broker->status = KAFKA_BROKER_UNINIT;
}
@@ -1209,6 +1206,8 @@ void kafka_sasl_init(kafka_sasl_t *sasl)
sasl->scram.first_msg.iov_len = 0;
sasl->scram.server_signature_b64.iov_base = NULL;
sasl->scram.server_signature_b64.iov_len = 0;
sasl->buf = NULL;
sasl->bsize = 0;
sasl->status = 0;
}
@@ -1216,6 +1215,7 @@ void kafka_sasl_deinit(kafka_sasl_t *sasl)
{
free(sasl->scram.cnonce.iov_base);
free(sasl->scram.server_signature_b64.iov_base);
free(sasl->buf);
}
int kafka_sasl_set_username(const char *username, kafka_config_t *conf)

View File

@@ -305,9 +305,6 @@ typedef struct __kafka_broker
int port;
char *host;
char *rack;
int to_addr;
struct sockaddr_storage addr;
socklen_t addrlen;
short error;
int status;
} kafka_broker_t;

View File

@@ -190,8 +190,11 @@ int main(int argc, char *argv[])
signal(SIGINT, sig_handler);
url = argv[1];
if (strncmp(argv[1], "kafka://", 8) != 0)
if (strncmp(argv[1], "kafka://", 8) != 0 &&
strncmp(argv[1], "kafkas://", 9) != 0)
{
url = "kafka://" + url;
}
char buf[512 * 1024];
WFKafkaTask *task;
@@ -206,7 +209,12 @@ int main(int argc, char *argv[])
if (compress_type > Kafka_Zstd)
exit(1);
client.init(url);
if (client.init(url) < 0)
{
perror("client.init");
exit(1);
}
task = client.create_kafka_task("api=produce", 3, kafka_callback);
KafkaConfig config;
KafkaRecord record;
@@ -232,7 +240,12 @@ int main(int argc, char *argv[])
{
if (argc > 3 && argv[3][0] == 'd')
{
client.init(url);
if (client.init(url) < 0)
{
perror("client.init");
exit(1);
}
task = client.create_kafka_task("api=fetch", 3, kafka_callback);
KafkaToppar toppar;
@@ -248,7 +261,12 @@ int main(int argc, char *argv[])
}
else
{
client.init(url, "workflow_group");
if (client.init(url, "workflow_group") < 0)
{
perror("client.init");
exit(1);
}
task = client.create_kafka_task("topic=workflow_test1&topic=workflow_test2&api=fetch",
3, kafka_callback);
}