diff --git a/src/client/WFKafkaClient.cc b/src/client/WFKafkaClient.cc index 9a84e004..44146ce4 100644 --- a/src/client/WFKafkaClient.cc +++ b/src/client/WFKafkaClient.cc @@ -291,13 +291,10 @@ void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task) { __WFKafkaTask *kafka_task; KafkaBroker *coordinator = member->cgroup.get_coordinator(); - - const struct sockaddr *addr; - socklen_t socklen; - coordinator->get_broker_addr(&addr, &socklen); - kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - addr, socklen, "", 0, + coordinator->get_host(), + coordinator->get_port(), + "", 0, kafka_heartbeat_callback); kafka_task->user_data = member; kafka_task->get_req()->set_api_type(Kafka_Heartbeat); @@ -323,14 +320,11 @@ void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task) void KafkaClientTask::kafka_rebalance_proc(KafkaMember *member, SeriesWork *series) { KafkaBroker *coordinator = member->cgroup.get_coordinator(); - - const struct sockaddr *addr; - socklen_t socklen; - coordinator->get_broker_addr(&addr, &socklen); - __WFKafkaTask *task; task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - addr, socklen, "", 0, + coordinator->get_host(), + coordinator->get_port(), + "", 0, kafka_rebalance_callback); task->user_data = member; task->get_req()->set_config(member->config); @@ -397,13 +391,10 @@ void KafkaClientTask::kafka_timer_callback(WFTimerTask *task) __WFKafkaTask *kafka_task; KafkaBroker *coordinator = member->cgroup.get_coordinator(); - - const struct sockaddr *addr; - socklen_t socklen; - coordinator->get_broker_addr(&addr, &socklen); - kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - addr, socklen, "", 0, + coordinator->get_host(), + coordinator->get_port(), + "", 0, kafka_heartbeat_callback); kafka_task->user_data = member; @@ -539,13 +530,10 @@ void KafkaClientTask::kafka_cgroup_callback(__WFKafkaTask *task) { __WFKafkaTask *kafka_task; KafkaBroker *coordinator = t->member->cgroup.get_coordinator(); - - const struct sockaddr *addr; - socklen_t socklen; - coordinator->get_broker_addr(&addr, &socklen); - kafka_task = __WFKafkaTaskFactory::create_kafka_task(t->member->transport_type, - addr, socklen, "", 0, + coordinator->get_host(), + coordinator->get_port(), + "", 0, kafka_heartbeat_callback); kafka_task->user_data = t->member; t->member->incref(); @@ -896,6 +884,7 @@ bool KafkaClientTask::check_meta() int KafkaClientTask::dispatch_locked() { KafkaMember *member = this->member; + KafkaBroker *coordinator; __WFKafkaTask *task; ParallelWork *parallel; SeriesWork *series; @@ -939,28 +928,12 @@ int KafkaClientTask::dispatch_locked() auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this, std::placeholders::_1); KafkaBroker *broker = get_broker(v.first); - if (broker->is_to_addr()) - { - const struct sockaddr *addr; - socklen_t socklen; - broker->get_broker_addr(&addr, &socklen); - - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - addr, socklen, - this->get_userinfo(), - this->retry_max, - nullptr); - } - else - { - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - broker->get_host(), - broker->get_port(), - this->get_userinfo(), - this->retry_max, - nullptr); - } - + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + broker->get_host(), + broker->get_port(), + this->get_userinfo(), + this->retry_max, + nullptr); task->get_req()->set_config(this->config); task->get_req()->set_toppar_list(v.second); task->get_req()->set_broker(*broker); @@ -992,27 +965,12 @@ int KafkaClientTask::dispatch_locked() auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this, std::placeholders::_1); KafkaBroker *broker = get_broker(v.first); - if (broker->is_to_addr()) - { - const struct sockaddr *addr; - socklen_t socklen; - broker->get_broker_addr(&addr, &socklen); - - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - addr, socklen, - this->get_userinfo(), - this->retry_max, - nullptr); - } - else - { - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - broker->get_host(), - broker->get_port(), - this->get_userinfo(), - this->retry_max, - nullptr); - } + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + broker->get_host(), + broker->get_port(), + this->get_userinfo(), + this->retry_max, + nullptr); task->get_req()->set_config(this->config); task->get_req()->set_toppar_list(v.second); @@ -1041,30 +999,24 @@ int KafkaClientTask::dispatch_locked() this->finish = true; break; } - else - { - this->result.create(1); - KafkaBroker *coordinator = member->cgroup.get_coordinator(); - const struct sockaddr *addr; - socklen_t socklen; - coordinator->get_broker_addr(&addr, &socklen); - - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - addr, socklen, - this->get_userinfo(), - this->retry_max, - kafka_offsetcommit_callback); - task->user_data = this; - task->get_req()->set_config(this->config); - task->get_req()->set_cgroup(member->cgroup); - task->get_req()->set_broker(*coordinator); - task->get_req()->set_toppar_list(this->toppar_list); - task->get_req()->set_api_type(this->api_type); - series_of(this)->push_front(this); - series_of(this)->push_front(task); - break; - } + this->result.create(1); + coordinator = member->cgroup.get_coordinator(); + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + coordinator->get_host(), + coordinator->get_port(), + this->get_userinfo(), + this->retry_max, + kafka_offsetcommit_callback); + task->user_data = this; + task->get_req()->set_config(this->config); + task->get_req()->set_cgroup(member->cgroup); + task->get_req()->set_broker(*coordinator); + task->get_req()->set_toppar_list(this->toppar_list); + task->get_req()->set_api_type(this->api_type); + series_of(this)->push_front(this); + series_of(this)->push_front(task); + break; case Kafka_LeaveGroup: if (!member->cgroup.get_group()) @@ -1074,29 +1026,23 @@ int KafkaClientTask::dispatch_locked() this->finish = true; break; } - else - { - KafkaBroker *coordinator = member->cgroup.get_coordinator(); - const struct sockaddr *addr; - socklen_t socklen; - coordinator->get_broker_addr(&addr, &socklen); + coordinator = member->cgroup.get_coordinator(); + if (!coordinator->get_host()) + break; - if (coordinator->is_to_addr()) - { - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - addr, socklen, - this->get_userinfo(), 0, - kafka_leavegroup_callback); - task->user_data = this; - task->get_req()->set_config(this->config); - task->get_req()->set_api_type(Kafka_LeaveGroup); - task->get_req()->set_broker(*coordinator); - task->get_req()->set_cgroup(member->cgroup); - series_of(this)->push_front(this); - series_of(this)->push_front(task); - } - } + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + coordinator->get_host(), + coordinator->get_port(), + this->get_userinfo(), 0, + kafka_leavegroup_callback); + task->user_data = this; + task->get_req()->set_config(this->config); + task->get_req()->set_api_type(Kafka_LeaveGroup); + task->get_req()->set_broker(*coordinator); + task->get_req()->set_cgroup(member->cgroup); + series_of(this)->push_front(this); + series_of(this)->push_front(task); break; case Kafka_ListOffsets: @@ -1116,28 +1062,12 @@ int KafkaClientTask::dispatch_locked() auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this, std::placeholders::_1); KafkaBroker *broker = get_broker(v.first); - if (broker->is_to_addr()) - { - const struct sockaddr *addr; - socklen_t socklen; - broker->get_broker_addr(&addr, &socklen); - - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - addr, socklen, - this->get_userinfo(), - this->retry_max, - nullptr); - } - else - { - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - broker->get_host(), - broker->get_port(), - this->get_userinfo(), - this->retry_max, - nullptr); - } - + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + broker->get_host(), + broker->get_port(), + this->get_userinfo(), + this->retry_max, + nullptr); task->get_req()->set_config(this->config); task->get_req()->set_toppar_list(v.second); task->get_req()->set_broker(*broker); @@ -1709,14 +1639,12 @@ int WFKafkaClient::init(const std::string& broker) int WFKafkaClient::init(const std::string& broker, const std::string& group) { - if (this->init(broker) == 0) - { - this->member->cgroup.set_group(group); - this->member->cgroup_status = KAFKA_CGROUP_UNINIT; - return 0; - } - else + if (this->init(broker) < 0) return -1; + + this->member->cgroup.set_group(group); + this->member->cgroup_status = KAFKA_CGROUP_UNINIT; + return 0; } int WFKafkaClient::deinit() diff --git a/src/factory/KafkaTaskImpl.cc b/src/factory/KafkaTaskImpl.cc index dcddbc05..7da90453 100644 --- a/src/factory/KafkaTaskImpl.cc +++ b/src/factory/KafkaTaskImpl.cc @@ -118,7 +118,6 @@ private: virtual int keep_alive_timeout(); virtual int first_timeout(); bool has_next(); - bool check_redirect(); bool process_produce(); bool process_fetch(); bool process_metadata(); @@ -407,49 +406,6 @@ int __ComplexKafkaTask::first_timeout() return ret + KAFKA_ROUNDTRIP_TIMEOUT; } -bool __ComplexKafkaTask::check_redirect() -{ - struct sockaddr_storage addr; - socklen_t addrlen = sizeof addr; - const struct sockaddr *paddr = (const struct sockaddr *)&addr; - KafkaBroker *coordinator = this->get_req()->get_cgroup()->get_coordinator(); - - //always success - this->get_peer_addr((struct sockaddr *)&addr, &addrlen); - if (!coordinator->is_equal(paddr, addrlen)) - { - if (coordinator->is_to_addr()) - { - const struct sockaddr *addr_coord; - socklen_t addrlen_coord; - - coordinator->get_broker_addr(&addr_coord, &addrlen_coord); - set_redirect(this->get_transport_type(), addr_coord, addrlen_coord, - this->WFComplexClientTask::info_); - } - else - { - std::string url(uri_.scheme); - url += "://"; - url += user_info_ + "@"; - url += coordinator->get_host(); - url += ":" + std::to_string(coordinator->get_port()); - - ParsedURI uri; - URIParser::parse(url, uri); - set_redirect(std::move(uri)); - } - - return true; - } - else - { - this->init(this->get_transport_type(), paddr, addrlen, - this->WFComplexClientTask::info_); - return false; - } -} - bool __ComplexKafkaTask::process_find_coordinator() { KafkaCgroup *cgroup = this->get_resp()->get_cgroup(); @@ -463,8 +419,18 @@ bool __ComplexKafkaTask::process_find_coordinator() else { this->get_req()->set_cgroup(*cgroup); - is_redirect_ = check_redirect(); + KafkaBroker *coordinator = cgroup->get_coordinator(); + std::string url(uri_.scheme); + url += "://"; + url += user_info_ + "@"; + url += coordinator->get_host(); + url += ":" + std::to_string(coordinator->get_port()); + + ParsedURI uri; + URIParser::parse(url, uri); + set_redirect(std::move(uri)); this->get_req()->set_api_type(Kafka_JoinGroup); + is_redirect_ = true; return true; } } @@ -472,16 +438,6 @@ bool __ComplexKafkaTask::process_find_coordinator() bool __ComplexKafkaTask::process_join_group() { KafkaResponse *msg = this->get_resp(); - if (!msg->get_cgroup()->get_coordinator()->is_to_addr()) - { - struct sockaddr_storage addr; - socklen_t addrlen = sizeof addr; - const struct sockaddr *paddr = (const struct sockaddr *)&addr; - this->get_peer_addr((struct sockaddr *)&addr, &addrlen); - msg->get_cgroup()->get_coordinator()->set_broker_addr(paddr, addrlen); - msg->get_cgroup()->get_coordinator()->set_to_addr(1); - } - switch(msg->get_cgroup()->get_error()) { case KAFKA_MEMBER_ID_REQUIRED: @@ -681,18 +637,6 @@ bool __ComplexKafkaTask::process_sasl_authenticate() bool __ComplexKafkaTask::has_next() { - struct sockaddr_storage addr; - socklen_t addrlen = sizeof addr; - //always success - this->get_peer_addr((struct sockaddr *)&addr, &addrlen); - - const struct sockaddr *paddr = (const struct sockaddr *)&addr; - if (!this->get_resp()->get_broker()->is_to_addr()) - { - this->get_resp()->get_broker()->set_broker_addr(paddr, addrlen); - this->get_resp()->get_broker()->set_to_addr(1); - } - switch (this->get_resp()->get_api_type()) { case Kafka_Produce: @@ -809,20 +753,6 @@ __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const ParsedURI& uri, return task; } -__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(TransportType type, - const struct sockaddr *addr, - socklen_t addrlen, - const std::string& info, - int retry_max, - __kafka_callback_t callback) -{ - auto *task = new __ComplexKafkaTask(retry_max, std::move(callback)); - - task->init(type, addr, addrlen, info); - task->set_keep_alive(KAFKA_KEEPALIVE_DEFAULT); - return task; -} - __WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(TransportType type, const char *host, unsigned short port, diff --git a/src/factory/KafkaTaskImpl.inl b/src/factory/KafkaTaskImpl.inl index fd40f4a3..5b7f9c9b 100644 --- a/src/factory/KafkaTaskImpl.inl +++ b/src/factory/KafkaTaskImpl.inl @@ -40,13 +40,6 @@ public: int retry_max, __kafka_callback_t callback); - static __WFKafkaTask *create_kafka_task(TransportType type, - const struct sockaddr *addr, - socklen_t addrlen, - const std::string& info, - int retry_max, - __kafka_callback_t callback); - static __WFKafkaTask *create_kafka_task(TransportType type, const char *host, unsigned short port, diff --git a/src/protocol/KafkaDataTypes.h b/src/protocol/KafkaDataTypes.h index 1912574d..2c0d9329 100644 --- a/src/protocol/KafkaDataTypes.h +++ b/src/protocol/KafkaDataTypes.h @@ -1115,54 +1115,6 @@ public: struct rb_node *get_rb() { return &this->rb; } - bool is_equal(const struct sockaddr *addr, socklen_t socklen) const - { - if (this->ptr->addrlen == socklen && socklen) - return memcmp(addr, &this->ptr->addr, this->ptr->addrlen) == 0; - - return false; - } - - bool is_equal(const char *host, int port) const - { - if (port == this->ptr->port) - return strcmp(host, this->ptr->host) == 0; - return false; - } - - bool is_equal(int node_id) const - { - return this->ptr->node_id == node_id; - } - - bool is_equal(const KafkaBroker& broker) const - { - return is_equal(broker.ptr->host, broker.ptr->port); - } - - void get_broker_addr(const struct sockaddr **addr, socklen_t *socklen) const - { - if (this->ptr->addrlen) - { - *addr = (const struct sockaddr *)&this->ptr->addr; - *socklen = this->ptr->addrlen; - } - else - { - *addr = NULL; - *socklen = 0; - } - } - - void set_broker_addr(const struct sockaddr *addr, socklen_t socklen) - { - memcpy(&this->ptr->addr, addr, socklen); - this->ptr->addrlen = socklen; - } - - bool is_to_addr() const { return this->ptr->to_addr == 1; } - void set_to_addr(int to_addr) { this->ptr->to_addr = to_addr; } - int get_node_id() const { return this->ptr->node_id; } int get_id () const { return this->ptr->node_id; } @@ -1392,14 +1344,6 @@ public: return strcmp(this->ptr->leader_id, this->ptr->member_id) == 0; } - bool is_equal_coordinator(const struct sockaddr *addr, socklen_t addrlen) const - { - if (addrlen == this->ptr->coordinator.addrlen) - return memcmp(addr, &this->ptr->coordinator.addr, addrlen) == 0; - else - return false; - } - struct list_head *get_group_protocol() { return &this->ptr->group_protocol_list; diff --git a/src/protocol/KafkaMessage.cc b/src/protocol/KafkaMessage.cc index 660f3845..af06e7ec 100644 --- a/src/protocol/KafkaMessage.cc +++ b/src/protocol/KafkaMessage.cc @@ -1562,44 +1562,6 @@ int KafkaMessage::parse_records(void **buf, size_t *size, bool check_crcs, return 0; } -static bool __to_addr(const char *host, int port, struct sockaddr *sockaddr, - socklen_t *addrlen) -{ - size_t len = strlen(host); - struct sockaddr_in *addr; - struct sockaddr_in6 *addr6; - bool ret = true; - - if (!host) - ret = false; - else if (isdigit(host[0]) && isdigit(host[len - 1])) - { - addr = (struct sockaddr_in *)sockaddr; - if (inet_pton(AF_INET, host, &addr->sin_addr) == 1) - { - addr->sin_family = AF_INET; - *addrlen = sizeof(struct sockaddr_in); - addr->sin_port = htons(port); - } - else - ret = false; - } - else - { - addr6 = (struct sockaddr_in6 *)sockaddr; - if (inet_pton(AF_INET6, host, &addr6->sin6_addr) == 1) - { - addr6->sin6_family = AF_INET6; - *addrlen = sizeof(struct sockaddr_in6); - addr6->sin6_port = htons(port); - } - else - ret = false; - } - - return ret; -} - KafkaMessage::KafkaMessage() { static struct Crc32cInitializer @@ -2867,11 +2829,6 @@ static int kafka_meta_parse_broker(void **buf, size_t *size, if (api_version >= 1) CHECK_RET(parse_string(buf, size, &ptr->rack)); - if (__to_addr(ptr->host, ptr->port, (struct sockaddr *)&ptr->addr, &ptr->addrlen)) - { - ptr->to_addr = 1; - } - broker_list->rewind(); KafkaBroker *last; @@ -3364,12 +3321,6 @@ int KafkaResponse::parse_findcoordinator(void **buf, size_t *size) CHECK_RET(parse_string(buf, size, &cgroup->coordinator.host)); CHECK_RET(parse_i32(buf, size, &cgroup->coordinator.port)); - if (__to_addr(cgroup->coordinator.host, cgroup->coordinator.port, - (struct sockaddr *)&cgroup->coordinator.addr, &cgroup->coordinator.addrlen)) - { - cgroup->coordinator.to_addr = 1; - } - return 0; } diff --git a/src/protocol/kafka_parser.c b/src/protocol/kafka_parser.c index 4f852437..3822baeb 100644 --- a/src/protocol/kafka_parser.c +++ b/src/protocol/kafka_parser.c @@ -397,9 +397,6 @@ void kafka_broker_init(kafka_broker_t *broker) broker->port = 0; broker->host = NULL; broker->rack = NULL; - broker->to_addr = 0; - memset(&broker->addr, 0, sizeof(broker->addr)); - broker->addrlen = 0; broker->error = 0; broker->status = KAFKA_BROKER_UNINIT; } diff --git a/src/protocol/kafka_parser.h b/src/protocol/kafka_parser.h index f5eb9788..df613d4c 100644 --- a/src/protocol/kafka_parser.h +++ b/src/protocol/kafka_parser.h @@ -305,9 +305,6 @@ typedef struct __kafka_broker int port; char *host; char *rack; - int to_addr; - struct sockaddr_storage addr; - socklen_t addrlen; short error; int status; } kafka_broker_t;