Merge pull request #1381 from kedixa/kafka

kafka task support retry
This commit is contained in:
xiehan
2023-09-27 18:50:54 +08:00
committed by GitHub
3 changed files with 24 additions and 51 deletions

View File

@@ -46,8 +46,7 @@
using namespace protocol;
using ComplexKafkaTask = WFComplexClientTask<KafkaRequest, KafkaResponse,
struct __ComplexKafkaTaskCtx>;
using ComplexKafkaTask = WFComplexClientTask<KafkaRequest, KafkaResponse, int>;
class KafkaMember
{
@@ -256,7 +255,7 @@ void KafkaClientTask::kafka_offsetcommit_callback(__WFKafkaTask *task)
t->finish = true;
t->state = task->get_state();
t->error = task->get_error();
t->kafka_error = static_cast<ComplexKafkaTask *>(task)->get_mutable_ctx()->kafka_error;
t->kafka_error = *static_cast<ComplexKafkaTask *>(task)->get_mutable_ctx();
}
void KafkaClientTask::kafka_leavegroup_callback(__WFKafkaTask *task)
@@ -265,7 +264,7 @@ void KafkaClientTask::kafka_leavegroup_callback(__WFKafkaTask *task)
t->finish = true;
t->state = task->get_state();
t->error = task->get_error();
t->kafka_error = static_cast<ComplexKafkaTask *>(task)->get_mutable_ctx()->kafka_error;
t->kafka_error = *static_cast<ComplexKafkaTask *>(task)->get_mutable_ctx();
}
void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task)
@@ -459,7 +458,7 @@ void KafkaClientTask::kafka_meta_callback(__WFKafkaTask *task)
t->member->mutex.lock();
t->state = task->get_state();
t->error = task->get_error();
t->kafka_error = static_cast<ComplexKafkaTask *>(task)->get_mutable_ctx()->kafka_error;
t->kafka_error = *static_cast<ComplexKafkaTask *>(task)->get_mutable_ctx();
if (t->state == WFT_STATE_SUCCESS)
{
kafka_merge_meta_list(&t->member->meta_list,
@@ -505,7 +504,7 @@ void KafkaClientTask::kafka_cgroup_callback(__WFKafkaTask *task)
t->member->mutex.lock();
t->state = task->get_state();
t->error = task->get_error();
t->kafka_error = static_cast<ComplexKafkaTask *>(task)->get_mutable_ctx()->kafka_error;
t->kafka_error = *static_cast<ComplexKafkaTask *>(task)->get_mutable_ctx();
if (t->state == WFT_STATE_SUCCESS)
{
@@ -640,17 +639,9 @@ void KafkaClientTask::kafka_move_task_callback(__WFKafkaTask *task)
int16_t state = task->get_state();
int16_t error = task->get_error();
/* This function is called before WFClientTask::done. Need to transfer
the state and error. */
if (state == WFT_STATE_SYS_ERROR && error < 0)
{
state = WFT_STATE_SSL_ERROR;
error = -error;
}
/* 'state' is always positive. */
state_error->first = (state << 16) | error;
state_error->second = static_cast<ComplexKafkaTask *>(task)->get_mutable_ctx()->kafka_error;
state_error->second = *static_cast<ComplexKafkaTask *>(task)->get_mutable_ctx();
series_of(task)->set_context(state_error);
KafkaTopparList *toppar_list = task->get_resp()->get_toppar_list();
@@ -933,14 +924,12 @@ int KafkaClientTask::dispatch_locked()
broker->get_port(),
this->get_userinfo(),
this->retry_max,
nullptr);
std::move(cb));
task->get_req()->set_config(this->config);
task->get_req()->set_toppar_list(v.second);
task->get_req()->set_broker(*broker);
task->get_req()->set_api_type(Kafka_Produce);
task->user_data = (void *)parallel->size();
ComplexKafkaTask *ctask = static_cast<ComplexKafkaTask *>(task);
ctask->get_mutable_ctx()->cb = cb;
series = Workflow::create_series_work(task, nullptr);
parallel->add_series(series);
}
@@ -970,15 +959,13 @@ int KafkaClientTask::dispatch_locked()
broker->get_port(),
this->get_userinfo(),
this->retry_max,
nullptr);
std::move(cb));
task->get_req()->set_config(this->config);
task->get_req()->set_toppar_list(v.second);
task->get_req()->set_broker(*broker);
task->get_req()->set_api_type(Kafka_Fetch);
task->user_data = (void *)parallel->size();
ComplexKafkaTask *ctask = static_cast<ComplexKafkaTask *>(task);
ctask->get_mutable_ctx()->cb = cb;
series = Workflow::create_series_work(task, nullptr);
parallel->add_series(series);
}
@@ -1067,14 +1054,12 @@ int KafkaClientTask::dispatch_locked()
broker->get_port(),
this->get_userinfo(),
this->retry_max,
nullptr);
std::move(cb));
task->get_req()->set_config(this->config);
task->get_req()->set_toppar_list(v.second);
task->get_req()->set_broker(*broker);
task->get_req()->set_api_type(Kafka_ListOffsets);
task->user_data = (void *)parallel->size();
ComplexKafkaTask *ctask = static_cast<ComplexKafkaTask *>(task);
ctask->get_mutable_ctx()->cb = cb;
series = Workflow::create_series_work(task, nullptr);
parallel->add_series(series);
}

View File

@@ -46,8 +46,7 @@ static KafkaCgroup __create_cgroup(const KafkaCgroup *c)
/**********Client**********/
class __ComplexKafkaTask : public WFComplexClientTask<KafkaRequest, KafkaResponse,
struct __ComplexKafkaTaskCtx>
class __ComplexKafkaTask : public WFComplexClientTask<KafkaRequest, KafkaResponse, int>
{
public:
__ComplexKafkaTask(int retry_max, __kafka_callback_t&& callback) :
@@ -55,7 +54,7 @@ public:
{
is_user_request_ = true;
is_redirect_ = false;
ctx_.kafka_error = 0;
ctx_ = 0;
}
protected:
@@ -409,8 +408,8 @@ int __ComplexKafkaTask::first_timeout()
bool __ComplexKafkaTask::process_find_coordinator()
{
KafkaCgroup *cgroup = this->get_resp()->get_cgroup();
ctx_.kafka_error = cgroup->get_error();
if (ctx_.kafka_error)
ctx_ = cgroup->get_error();
if (ctx_)
{
this->error = WFT_ERR_KAFKA_CGROUP_FAILED;
this->state = WFT_STATE_TASK_ERROR;
@@ -454,7 +453,7 @@ bool __ComplexKafkaTask::process_join_group()
break;
default:
ctx_.kafka_error = msg->get_cgroup()->get_error();
ctx_ = msg->get_cgroup()->get_error();
this->error = WFT_ERR_KAFKA_CGROUP_FAILED;
this->state = WFT_STATE_TASK_ERROR;
return false;
@@ -465,8 +464,8 @@ bool __ComplexKafkaTask::process_join_group()
bool __ComplexKafkaTask::process_sync_group()
{
ctx_.kafka_error = this->get_resp()->get_cgroup()->get_error();
if (ctx_.kafka_error)
ctx_ = this->get_resp()->get_cgroup()->get_error();
if (ctx_)
{
this->error = WFT_ERR_KAFKA_CGROUP_FAILED;
this->state = WFT_STATE_TASK_ERROR;
@@ -494,7 +493,7 @@ bool __ComplexKafkaTask::process_metadata()
case 0:
break;
default:
ctx_.kafka_error = meta->get_error();
ctx_ = meta->get_error();
this->error = WFT_ERR_KAFKA_META_FAILED;
this->state = WFT_STATE_TASK_ERROR;
return false;
@@ -554,7 +553,7 @@ bool __ComplexKafkaTask::process_fetch()
case KAFKA_OFFSET_OUT_OF_RANGE:
break;
default:
ctx_.kafka_error = toppar->get_error();
ctx_ = toppar->get_error();
this->error = WFT_ERR_KAFKA_FETCH_FAILED;
this->state = WFT_STATE_TASK_ERROR;
return false;
@@ -614,8 +613,8 @@ bool __ComplexKafkaTask::process_produce()
bool __ComplexKafkaTask::process_sasl_handshake()
{
ctx_.kafka_error = this->get_resp()->get_broker()->get_error();
if (ctx_.kafka_error)
ctx_ = this->get_resp()->get_broker()->get_error();
if (ctx_)
{
this->error = WFT_ERR_KAFKA_SASL_DISALLOWED;
this->state = WFT_STATE_TASK_ERROR;
@@ -626,8 +625,8 @@ bool __ComplexKafkaTask::process_sasl_handshake()
bool __ComplexKafkaTask::process_sasl_authenticate()
{
ctx_.kafka_error = this->get_resp()->get_broker()->get_error();
if (ctx_.kafka_error)
ctx_ = this->get_resp()->get_broker()->get_error();
if (ctx_)
{
this->error = WFT_ERR_KAFKA_SASL_DISALLOWED;
this->state = WFT_STATE_TASK_ERROR;
@@ -662,8 +661,8 @@ bool __ComplexKafkaTask::has_next()
case Kafka_LeaveGroup:
case Kafka_DescribeGroups:
case Kafka_Heartbeat:
ctx_.kafka_error = this->get_resp()->get_cgroup()->get_error();
if (ctx_.kafka_error)
ctx_ = this->get_resp()->get_cgroup()->get_error();
if (ctx_)
{
this->error = WFT_ERR_KAFKA_CGROUP_FAILED;
this->state = WFT_STATE_TASK_ERROR;
@@ -716,14 +715,10 @@ bool __ComplexKafkaTask::finish_once()
}
else
{
this->disable_retry();
this->get_resp()->set_api_type(this->get_req()->get_api_type());
this->get_resp()->set_api_version(this->get_req()->get_api_version());
}
if (ctx_.cb)
ctx_.cb(this);
return true;
}

View File

@@ -47,10 +47,3 @@ public:
int retry_max,
__kafka_callback_t callback);
};
struct __ComplexKafkaTaskCtx
{
int kafka_error;
__kafka_callback_t cb;
};