kafka bugfix: cannot fetch without group

This commit is contained in:
kedixa
2023-04-20 16:09:41 +08:00
parent e103f8521e
commit 856bc734c6
2 changed files with 7 additions and 2 deletions

View File

@@ -29,6 +29,7 @@
#define KAFKA_CGROUP_UNINIT 0
#define KAFKA_CGROUP_DOING 1
#define KAFKA_CGROUP_DONE 2
#define KAFKA_CGROUP_NONE 3
#define KAFKA_HEARTBEAT_UNINIT 0
#define KAFKA_HEARTBEAT_DOING 1
@@ -53,7 +54,7 @@ class KafkaMember
public:
KafkaMember() : ref(1)
{
cgroup_status = KAFKA_CGROUP_UNINIT;
cgroup_status = KAFKA_CGROUP_NONE;
heartbeat_status = KAFKA_HEARTBEAT_UNINIT;
heartbeat_series = NULL;
cgroup_outdated = false;
@@ -1588,6 +1589,7 @@ int WFKafkaClient::init(const std::string& broker, const std::string& group)
{
this->init(broker);
this->member->cgroup.set_group(group);
this->member->cgroup_status = KAFKA_CGROUP_UNINIT;
return 0;
}

View File

@@ -287,6 +287,7 @@ CommMessageIn *__ComplexKafkaTask::message_in()
{
KafkaRequest *req = static_cast<KafkaRequest *>(this->get_message_out());
KafkaResponse *resp = this->get_resp();
KafkaCgroup *cgroup;
resp->set_api_type(req->get_api_type());
resp->set_api_version(req->get_api_version());
@@ -296,7 +297,9 @@ CommMessageIn *__ComplexKafkaTask::message_in()
{
case Kafka_FindCoordinator:
case Kafka_Heartbeat:
resp->set_cgroup(__create_cgroup(req->get_cgroup()));
cgroup = req->get_cgroup();
if (cgroup->get_group())
resp->set_cgroup(__create_cgroup(cgroup));
break;
default:
break;