Do not transfer broker's address to sockaddr.

This commit is contained in:
Xie Han
2023-09-25 20:11:37 +08:00
committed by xiehan
parent 609d00ebec
commit db4ef8357c
7 changed files with 79 additions and 339 deletions

View File

@@ -291,13 +291,10 @@ void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task)
{
__WFKafkaTask *kafka_task;
KafkaBroker *coordinator = member->cgroup.get_coordinator();
const struct sockaddr *addr;
socklen_t socklen;
coordinator->get_broker_addr(&addr, &socklen);
kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
addr, socklen, "", 0,
coordinator->get_host(),
coordinator->get_port(),
"", 0,
kafka_heartbeat_callback);
kafka_task->user_data = member;
kafka_task->get_req()->set_api_type(Kafka_Heartbeat);
@@ -323,14 +320,11 @@ void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task)
void KafkaClientTask::kafka_rebalance_proc(KafkaMember *member, SeriesWork *series)
{
KafkaBroker *coordinator = member->cgroup.get_coordinator();
const struct sockaddr *addr;
socklen_t socklen;
coordinator->get_broker_addr(&addr, &socklen);
__WFKafkaTask *task;
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
addr, socklen, "", 0,
coordinator->get_host(),
coordinator->get_port(),
"", 0,
kafka_rebalance_callback);
task->user_data = member;
task->get_req()->set_config(member->config);
@@ -397,13 +391,10 @@ void KafkaClientTask::kafka_timer_callback(WFTimerTask *task)
__WFKafkaTask *kafka_task;
KafkaBroker *coordinator = member->cgroup.get_coordinator();
const struct sockaddr *addr;
socklen_t socklen;
coordinator->get_broker_addr(&addr, &socklen);
kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
addr, socklen, "", 0,
coordinator->get_host(),
coordinator->get_port(),
"", 0,
kafka_heartbeat_callback);
kafka_task->user_data = member;
@@ -539,13 +530,10 @@ void KafkaClientTask::kafka_cgroup_callback(__WFKafkaTask *task)
{
__WFKafkaTask *kafka_task;
KafkaBroker *coordinator = t->member->cgroup.get_coordinator();
const struct sockaddr *addr;
socklen_t socklen;
coordinator->get_broker_addr(&addr, &socklen);
kafka_task = __WFKafkaTaskFactory::create_kafka_task(t->member->transport_type,
addr, socklen, "", 0,
coordinator->get_host(),
coordinator->get_port(),
"", 0,
kafka_heartbeat_callback);
kafka_task->user_data = t->member;
t->member->incref();
@@ -896,6 +884,7 @@ bool KafkaClientTask::check_meta()
int KafkaClientTask::dispatch_locked()
{
KafkaMember *member = this->member;
KafkaBroker *coordinator;
__WFKafkaTask *task;
ParallelWork *parallel;
SeriesWork *series;
@@ -939,28 +928,12 @@ int KafkaClientTask::dispatch_locked()
auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this,
std::placeholders::_1);
KafkaBroker *broker = get_broker(v.first);
if (broker->is_to_addr())
{
const struct sockaddr *addr;
socklen_t socklen;
broker->get_broker_addr(&addr, &socklen);
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
addr, socklen,
this->get_userinfo(),
this->retry_max,
nullptr);
}
else
{
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
broker->get_host(),
broker->get_port(),
this->get_userinfo(),
this->retry_max,
nullptr);
}
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
broker->get_host(),
broker->get_port(),
this->get_userinfo(),
this->retry_max,
nullptr);
task->get_req()->set_config(this->config);
task->get_req()->set_toppar_list(v.second);
task->get_req()->set_broker(*broker);
@@ -992,27 +965,12 @@ int KafkaClientTask::dispatch_locked()
auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this,
std::placeholders::_1);
KafkaBroker *broker = get_broker(v.first);
if (broker->is_to_addr())
{
const struct sockaddr *addr;
socklen_t socklen;
broker->get_broker_addr(&addr, &socklen);
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
addr, socklen,
this->get_userinfo(),
this->retry_max,
nullptr);
}
else
{
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
broker->get_host(),
broker->get_port(),
this->get_userinfo(),
this->retry_max,
nullptr);
}
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
broker->get_host(),
broker->get_port(),
this->get_userinfo(),
this->retry_max,
nullptr);
task->get_req()->set_config(this->config);
task->get_req()->set_toppar_list(v.second);
@@ -1041,30 +999,24 @@ int KafkaClientTask::dispatch_locked()
this->finish = true;
break;
}
else
{
this->result.create(1);
KafkaBroker *coordinator = member->cgroup.get_coordinator();
const struct sockaddr *addr;
socklen_t socklen;
coordinator->get_broker_addr(&addr, &socklen);
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
addr, socklen,
this->get_userinfo(),
this->retry_max,
kafka_offsetcommit_callback);
task->user_data = this;
task->get_req()->set_config(this->config);
task->get_req()->set_cgroup(member->cgroup);
task->get_req()->set_broker(*coordinator);
task->get_req()->set_toppar_list(this->toppar_list);
task->get_req()->set_api_type(this->api_type);
series_of(this)->push_front(this);
series_of(this)->push_front(task);
break;
}
this->result.create(1);
coordinator = member->cgroup.get_coordinator();
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
coordinator->get_host(),
coordinator->get_port(),
this->get_userinfo(),
this->retry_max,
kafka_offsetcommit_callback);
task->user_data = this;
task->get_req()->set_config(this->config);
task->get_req()->set_cgroup(member->cgroup);
task->get_req()->set_broker(*coordinator);
task->get_req()->set_toppar_list(this->toppar_list);
task->get_req()->set_api_type(this->api_type);
series_of(this)->push_front(this);
series_of(this)->push_front(task);
break;
case Kafka_LeaveGroup:
if (!member->cgroup.get_group())
@@ -1074,29 +1026,23 @@ int KafkaClientTask::dispatch_locked()
this->finish = true;
break;
}
else
{
KafkaBroker *coordinator = member->cgroup.get_coordinator();
const struct sockaddr *addr;
socklen_t socklen;
coordinator->get_broker_addr(&addr, &socklen);
coordinator = member->cgroup.get_coordinator();
if (!coordinator->get_host())
break;
if (coordinator->is_to_addr())
{
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
addr, socklen,
this->get_userinfo(), 0,
kafka_leavegroup_callback);
task->user_data = this;
task->get_req()->set_config(this->config);
task->get_req()->set_api_type(Kafka_LeaveGroup);
task->get_req()->set_broker(*coordinator);
task->get_req()->set_cgroup(member->cgroup);
series_of(this)->push_front(this);
series_of(this)->push_front(task);
}
}
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
coordinator->get_host(),
coordinator->get_port(),
this->get_userinfo(), 0,
kafka_leavegroup_callback);
task->user_data = this;
task->get_req()->set_config(this->config);
task->get_req()->set_api_type(Kafka_LeaveGroup);
task->get_req()->set_broker(*coordinator);
task->get_req()->set_cgroup(member->cgroup);
series_of(this)->push_front(this);
series_of(this)->push_front(task);
break;
case Kafka_ListOffsets:
@@ -1116,28 +1062,12 @@ int KafkaClientTask::dispatch_locked()
auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this,
std::placeholders::_1);
KafkaBroker *broker = get_broker(v.first);
if (broker->is_to_addr())
{
const struct sockaddr *addr;
socklen_t socklen;
broker->get_broker_addr(&addr, &socklen);
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
addr, socklen,
this->get_userinfo(),
this->retry_max,
nullptr);
}
else
{
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
broker->get_host(),
broker->get_port(),
this->get_userinfo(),
this->retry_max,
nullptr);
}
task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type,
broker->get_host(),
broker->get_port(),
this->get_userinfo(),
this->retry_max,
nullptr);
task->get_req()->set_config(this->config);
task->get_req()->set_toppar_list(v.second);
task->get_req()->set_broker(*broker);
@@ -1709,14 +1639,12 @@ int WFKafkaClient::init(const std::string& broker)
int WFKafkaClient::init(const std::string& broker, const std::string& group)
{
if (this->init(broker) == 0)
{
this->member->cgroup.set_group(group);
this->member->cgroup_status = KAFKA_CGROUP_UNINIT;
return 0;
}
else
if (this->init(broker) < 0)
return -1;
this->member->cgroup.set_group(group);
this->member->cgroup_status = KAFKA_CGROUP_UNINIT;
return 0;
}
int WFKafkaClient::deinit()

View File

@@ -118,7 +118,6 @@ private:
virtual int keep_alive_timeout();
virtual int first_timeout();
bool has_next();
bool check_redirect();
bool process_produce();
bool process_fetch();
bool process_metadata();
@@ -407,49 +406,6 @@ int __ComplexKafkaTask::first_timeout()
return ret + KAFKA_ROUNDTRIP_TIMEOUT;
}
bool __ComplexKafkaTask::check_redirect()
{
struct sockaddr_storage addr;
socklen_t addrlen = sizeof addr;
const struct sockaddr *paddr = (const struct sockaddr *)&addr;
KafkaBroker *coordinator = this->get_req()->get_cgroup()->get_coordinator();
//always success
this->get_peer_addr((struct sockaddr *)&addr, &addrlen);
if (!coordinator->is_equal(paddr, addrlen))
{
if (coordinator->is_to_addr())
{
const struct sockaddr *addr_coord;
socklen_t addrlen_coord;
coordinator->get_broker_addr(&addr_coord, &addrlen_coord);
set_redirect(this->get_transport_type(), addr_coord, addrlen_coord,
this->WFComplexClientTask::info_);
}
else
{
std::string url(uri_.scheme);
url += "://";
url += user_info_ + "@";
url += coordinator->get_host();
url += ":" + std::to_string(coordinator->get_port());
ParsedURI uri;
URIParser::parse(url, uri);
set_redirect(std::move(uri));
}
return true;
}
else
{
this->init(this->get_transport_type(), paddr, addrlen,
this->WFComplexClientTask::info_);
return false;
}
}
bool __ComplexKafkaTask::process_find_coordinator()
{
KafkaCgroup *cgroup = this->get_resp()->get_cgroup();
@@ -463,8 +419,18 @@ bool __ComplexKafkaTask::process_find_coordinator()
else
{
this->get_req()->set_cgroup(*cgroup);
is_redirect_ = check_redirect();
KafkaBroker *coordinator = cgroup->get_coordinator();
std::string url(uri_.scheme);
url += "://";
url += user_info_ + "@";
url += coordinator->get_host();
url += ":" + std::to_string(coordinator->get_port());
ParsedURI uri;
URIParser::parse(url, uri);
set_redirect(std::move(uri));
this->get_req()->set_api_type(Kafka_JoinGroup);
is_redirect_ = true;
return true;
}
}
@@ -472,16 +438,6 @@ bool __ComplexKafkaTask::process_find_coordinator()
bool __ComplexKafkaTask::process_join_group()
{
KafkaResponse *msg = this->get_resp();
if (!msg->get_cgroup()->get_coordinator()->is_to_addr())
{
struct sockaddr_storage addr;
socklen_t addrlen = sizeof addr;
const struct sockaddr *paddr = (const struct sockaddr *)&addr;
this->get_peer_addr((struct sockaddr *)&addr, &addrlen);
msg->get_cgroup()->get_coordinator()->set_broker_addr(paddr, addrlen);
msg->get_cgroup()->get_coordinator()->set_to_addr(1);
}
switch(msg->get_cgroup()->get_error())
{
case KAFKA_MEMBER_ID_REQUIRED:
@@ -681,18 +637,6 @@ bool __ComplexKafkaTask::process_sasl_authenticate()
bool __ComplexKafkaTask::has_next()
{
struct sockaddr_storage addr;
socklen_t addrlen = sizeof addr;
//always success
this->get_peer_addr((struct sockaddr *)&addr, &addrlen);
const struct sockaddr *paddr = (const struct sockaddr *)&addr;
if (!this->get_resp()->get_broker()->is_to_addr())
{
this->get_resp()->get_broker()->set_broker_addr(paddr, addrlen);
this->get_resp()->get_broker()->set_to_addr(1);
}
switch (this->get_resp()->get_api_type())
{
case Kafka_Produce:
@@ -809,20 +753,6 @@ __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const ParsedURI& uri,
return task;
}
__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(TransportType type,
const struct sockaddr *addr,
socklen_t addrlen,
const std::string& info,
int retry_max,
__kafka_callback_t callback)
{
auto *task = new __ComplexKafkaTask(retry_max, std::move(callback));
task->init(type, addr, addrlen, info);
task->set_keep_alive(KAFKA_KEEPALIVE_DEFAULT);
return task;
}
__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(TransportType type,
const char *host,
unsigned short port,

View File

@@ -40,13 +40,6 @@ public:
int retry_max,
__kafka_callback_t callback);
static __WFKafkaTask *create_kafka_task(TransportType type,
const struct sockaddr *addr,
socklen_t addrlen,
const std::string& info,
int retry_max,
__kafka_callback_t callback);
static __WFKafkaTask *create_kafka_task(TransportType type,
const char *host,
unsigned short port,

View File

@@ -1115,54 +1115,6 @@ public:
struct rb_node *get_rb() { return &this->rb; }
bool is_equal(const struct sockaddr *addr, socklen_t socklen) const
{
if (this->ptr->addrlen == socklen && socklen)
return memcmp(addr, &this->ptr->addr, this->ptr->addrlen) == 0;
return false;
}
bool is_equal(const char *host, int port) const
{
if (port == this->ptr->port)
return strcmp(host, this->ptr->host) == 0;
return false;
}
bool is_equal(int node_id) const
{
return this->ptr->node_id == node_id;
}
bool is_equal(const KafkaBroker& broker) const
{
return is_equal(broker.ptr->host, broker.ptr->port);
}
void get_broker_addr(const struct sockaddr **addr, socklen_t *socklen) const
{
if (this->ptr->addrlen)
{
*addr = (const struct sockaddr *)&this->ptr->addr;
*socklen = this->ptr->addrlen;
}
else
{
*addr = NULL;
*socklen = 0;
}
}
void set_broker_addr(const struct sockaddr *addr, socklen_t socklen)
{
memcpy(&this->ptr->addr, addr, socklen);
this->ptr->addrlen = socklen;
}
bool is_to_addr() const { return this->ptr->to_addr == 1; }
void set_to_addr(int to_addr) { this->ptr->to_addr = to_addr; }
int get_node_id() const { return this->ptr->node_id; }
int get_id () const { return this->ptr->node_id; }
@@ -1392,14 +1344,6 @@ public:
return strcmp(this->ptr->leader_id, this->ptr->member_id) == 0;
}
bool is_equal_coordinator(const struct sockaddr *addr, socklen_t addrlen) const
{
if (addrlen == this->ptr->coordinator.addrlen)
return memcmp(addr, &this->ptr->coordinator.addr, addrlen) == 0;
else
return false;
}
struct list_head *get_group_protocol()
{
return &this->ptr->group_protocol_list;

View File

@@ -1562,44 +1562,6 @@ int KafkaMessage::parse_records(void **buf, size_t *size, bool check_crcs,
return 0;
}
static bool __to_addr(const char *host, int port, struct sockaddr *sockaddr,
socklen_t *addrlen)
{
size_t len = strlen(host);
struct sockaddr_in *addr;
struct sockaddr_in6 *addr6;
bool ret = true;
if (!host)
ret = false;
else if (isdigit(host[0]) && isdigit(host[len - 1]))
{
addr = (struct sockaddr_in *)sockaddr;
if (inet_pton(AF_INET, host, &addr->sin_addr) == 1)
{
addr->sin_family = AF_INET;
*addrlen = sizeof(struct sockaddr_in);
addr->sin_port = htons(port);
}
else
ret = false;
}
else
{
addr6 = (struct sockaddr_in6 *)sockaddr;
if (inet_pton(AF_INET6, host, &addr6->sin6_addr) == 1)
{
addr6->sin6_family = AF_INET6;
*addrlen = sizeof(struct sockaddr_in6);
addr6->sin6_port = htons(port);
}
else
ret = false;
}
return ret;
}
KafkaMessage::KafkaMessage()
{
static struct Crc32cInitializer
@@ -2867,11 +2829,6 @@ static int kafka_meta_parse_broker(void **buf, size_t *size,
if (api_version >= 1)
CHECK_RET(parse_string(buf, size, &ptr->rack));
if (__to_addr(ptr->host, ptr->port, (struct sockaddr *)&ptr->addr, &ptr->addrlen))
{
ptr->to_addr = 1;
}
broker_list->rewind();
KafkaBroker *last;
@@ -3364,12 +3321,6 @@ int KafkaResponse::parse_findcoordinator(void **buf, size_t *size)
CHECK_RET(parse_string(buf, size, &cgroup->coordinator.host));
CHECK_RET(parse_i32(buf, size, &cgroup->coordinator.port));
if (__to_addr(cgroup->coordinator.host, cgroup->coordinator.port,
(struct sockaddr *)&cgroup->coordinator.addr, &cgroup->coordinator.addrlen))
{
cgroup->coordinator.to_addr = 1;
}
return 0;
}

View File

@@ -397,9 +397,6 @@ void kafka_broker_init(kafka_broker_t *broker)
broker->port = 0;
broker->host = NULL;
broker->rack = NULL;
broker->to_addr = 0;
memset(&broker->addr, 0, sizeof(broker->addr));
broker->addrlen = 0;
broker->error = 0;
broker->status = KAFKA_BROKER_UNINIT;
}

View File

@@ -305,9 +305,6 @@ typedef struct __kafka_broker
int port;
char *host;
char *rack;
int to_addr;
struct sockaddr_storage addr;
socklen_t addrlen;
short error;
int status;
} kafka_broker_t;