Optimize kafka client code structure.

This commit is contained in:
Xie Han
2024-02-21 19:29:43 +08:00
parent 315b237f2a
commit 37a3aec078

View File

@@ -314,7 +314,10 @@ void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task)
WFTaskFactory::signal_by_name(name, NULL, max);
}
else
{
kafka_rebalance_proc(member, series);
member->mutex.unlock();
}
}
void KafkaClientTask::kafka_rebalance_proc(KafkaMember *member, SeriesWork *series)
@@ -337,8 +340,6 @@ void KafkaClientTask::kafka_rebalance_proc(KafkaMember *member, SeriesWork *seri
member->cgroup_outdated = false;
series->push_back(task);
member->mutex.unlock();
}
void KafkaClientTask::kafka_heartbeat_callback(__WFKafkaTask *task)
@@ -356,12 +357,7 @@ void KafkaClientTask::kafka_heartbeat_callback(__WFKafkaTask *task)
return;
}
if (resp->get_cgroup()->get_error() != 0)
{
kafka_rebalance_proc(member, series);
return;
}
else
if (resp->get_cgroup()->get_error() == 0)
{
member->heartbeat_status = KAFKA_HEARTBEAT_DONE;
WFTimerTask *timer_task;
@@ -370,6 +366,8 @@ void KafkaClientTask::kafka_heartbeat_callback(__WFKafkaTask *task)
timer_task->user_data = member;
series->push_back(timer_task);
}
else
kafka_rebalance_proc(member, series);
member->mutex.unlock();
}