mirror of
https://github.com/sogou/workflow.git
synced 2026-02-08 01:33:17 +08:00
kafka set failed state when produce or fetch failed (#1687)
This commit is contained in:
@@ -521,30 +521,18 @@ bool __ComplexKafkaTask::process_fetch()
|
||||
this->get_resp()->get_toppar_list()->rewind();
|
||||
while ((toppar = this->get_resp()->get_toppar_list()->get_next()) != NULL)
|
||||
{
|
||||
if (toppar->get_error() == KAFKA_OFFSET_OUT_OF_RANGE)
|
||||
int toppar_error = toppar->get_error();
|
||||
|
||||
if (toppar_error == KAFKA_OFFSET_OUT_OF_RANGE)
|
||||
{
|
||||
toppar->set_offset(KAFKA_OFFSET_OVERFLOW);
|
||||
toppar->set_low_watermark(KAFKA_OFFSET_UNINIT);
|
||||
toppar->set_high_watermark(KAFKA_OFFSET_UNINIT);
|
||||
ret = true;
|
||||
}
|
||||
|
||||
switch (toppar->get_error())
|
||||
else if (toppar_error)
|
||||
{
|
||||
case KAFKA_UNKNOWN_TOPIC_OR_PARTITION:
|
||||
case KAFKA_LEADER_NOT_AVAILABLE:
|
||||
case KAFKA_NOT_LEADER_FOR_PARTITION:
|
||||
case KAFKA_BROKER_NOT_AVAILABLE:
|
||||
case KAFKA_REPLICA_NOT_AVAILABLE:
|
||||
case KAFKA_KAFKA_STORAGE_ERROR:
|
||||
case KAFKA_FENCED_LEADER_EPOCH:
|
||||
this->get_req()->set_api_type(Kafka_Metadata);
|
||||
return true;
|
||||
case 0:
|
||||
case KAFKA_OFFSET_OUT_OF_RANGE:
|
||||
break;
|
||||
default:
|
||||
ctx_ = toppar->get_error();
|
||||
ctx_ = toppar_error;
|
||||
this->error = WFT_ERR_KAFKA_FETCH_FAILED;
|
||||
this->state = WFT_STATE_TASK_ERROR;
|
||||
return false;
|
||||
@@ -580,21 +568,10 @@ bool __ComplexKafkaTask::process_produce()
|
||||
return true;
|
||||
}
|
||||
|
||||
switch (toppar->get_error())
|
||||
if (toppar->get_error())
|
||||
{
|
||||
case KAFKA_UNKNOWN_TOPIC_OR_PARTITION:
|
||||
case KAFKA_LEADER_NOT_AVAILABLE:
|
||||
case KAFKA_NOT_LEADER_FOR_PARTITION:
|
||||
case KAFKA_BROKER_NOT_AVAILABLE:
|
||||
case KAFKA_REPLICA_NOT_AVAILABLE:
|
||||
case KAFKA_KAFKA_STORAGE_ERROR:
|
||||
case KAFKA_FENCED_LEADER_EPOCH:
|
||||
this->get_req()->set_api_type(Kafka_Metadata);
|
||||
return true;
|
||||
case 0:
|
||||
break;
|
||||
default:
|
||||
this->error = toppar->get_error();
|
||||
ctx_ = toppar->get_error();
|
||||
this->error = WFT_ERR_KAFKA_PRODUCE_FAILED;
|
||||
this->state = WFT_STATE_TASK_ERROR;
|
||||
return false;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user