Merge pull request #959 from kedixa/dev

set kafka correlation id
This commit is contained in:
xiehan
2022-06-24 17:52:15 +08:00
committed by GitHub
4 changed files with 10 additions and 2 deletions

View File

@@ -105,7 +105,7 @@ if (KAFKA STREQUAL "y")
workflow z lz4 zstd ${SNAPPY_LIB}
OpenSSL::SSL OpenSSL::Crypto pthread)
endif ()
set_target_properties(${KAFKA_SHARED_LIB_NAME} PROPERTIES OUTPUT_NAME "wfkafka" VERSION ${CMAKE_PROJECT_VERSION} SOVERSION ${PROJECT_VERSION_MAJOR})
set_target_properties(${KAFKA_SHARED_LIB_NAME} PROPERTIES OUTPUT_NAME "wfkafka" VERSION ${PROJECT_VERSION} SOVERSION ${PROJECT_VERSION_MAJOR})
endif ()
install(

View File

@@ -183,6 +183,7 @@ CommMessageOut *__ComplexKafkaTask::message_out()
req->set_api_type(Kafka_SaslHandshake);
else
req->set_api_type(Kafka_SaslAuthenticate);
req->set_correlation_id(1);
is_user_request_ = false;
return req;
}
@@ -256,11 +257,13 @@ CommMessageOut *__ComplexKafkaTask::message_out()
new_req->set_toppar_list(toppar_list);
new_req->set_config(*req->get_config());
new_req->set_api_type(Kafka_ListOffsets);
new_req->set_correlation_id(seqid);
is_user_request_ = false;
return new_req;
}
}
this->get_req()->set_correlation_id(seqid);
return this->WFComplexClientTask::message_out();
}

View File

@@ -1614,6 +1614,7 @@ KafkaMessage::KafkaMessage()
kafka_parser_init(this->parser);
this->stream = new EncodeStream;
this->api_type = Kafka_Unknown;
this->correlation_id = 0;
this->cur_size = 0;
}
@@ -1757,7 +1758,7 @@ int KafkaMessage::encode_head()
append_i32(this->headbuf, 0);
append_i16(this->headbuf, this->api_type);
append_i16(this->headbuf, this->api_version);
append_i32(this->headbuf, 0);
append_i32(this->headbuf, this->correlation_id);
append_string(this->headbuf, this->config.get_client_id());
return 0;

View File

@@ -60,6 +60,9 @@ public:
void set_api_version(int ver) { this->api_version = ver; }
int get_api_version() const { return this->api_version; }
void set_correlation_id(int id) { this-> correlation_id = id; }
int get_correlation_id() const { return this->correlation_id; }
void set_config(const KafkaConfig& conf)
{
this->config = conf;
@@ -197,6 +200,7 @@ protected:
int api_type;
int api_version;
int correlation_id;
int message_version;
std::map<int, int> api_mver_map;