support kafka kip 329

This commit is contained in:
wzl12356
2021-11-09 13:38:55 +08:00
parent 17f9516d72
commit 75eb145c9e
6 changed files with 51 additions and 1 deletions

3
.gitignore vendored
View File

@@ -33,3 +33,6 @@
# bazel env
bazel-*
# vscode configs
.vscode

View File

@@ -347,6 +347,9 @@ private:
int ComplexKafkaTask::get_node_id(const KafkaToppar *toppar)
{
if (toppar->get_preferred_read_replica() >= 0)
return toppar->get_preferred_read_replica();
bool flag = false;
this->client_meta_list.rewind();
KafkaMeta *meta;

View File

@@ -495,6 +495,21 @@ public:
this->ptr->offset_store = offset_store;
}
const char *get_rack_id() const
{
return this->ptr->rack_id;
}
bool set_rack_id(const char *rack_id)
{
char *p = strdup(rack_id);
if (!p)
return false;
free(this->ptr->rack_id);
this->ptr->rack_id = p;
return true;
}
const char *get_sasl_mech() const
{
return this->ptr->mechanisms;
@@ -756,6 +771,11 @@ public:
this->ptr) == 0;
}
int get_preferred_read_replica() const
{
return this->ptr->preferred_read_replica;
}
bool set_topic(const char *topic)
{
this->ptr->topic_name = strdup(topic);

View File

@@ -223,6 +223,17 @@ static inline size_t append_varint_i32(std::string& buf, int32_t num)
return append_varint_i64(buf, num);
}
static size_t append_compact_string(std::string& buf, const char *str)
{
if (!str || str[0] == '\0')
append_string(buf, "");
size_t len = strlen(str);
size_t r = append_varint_u64(buf, len + 1);
append_string_raw(buf, str, len);
return r + len;
}
static inline int parse_i8(void **buf, size_t *size, int8_t *val)
{
if (*size >= 1)
@@ -2403,7 +2414,12 @@ int KafkaRequest::encode_fetch(struct iovec vectors[], int max)
//rackid
if (this->api_version >= 11)
append_string(this->msgbuf, "");
{
if (this->config.get_rack_id())
append_compact_string(this->msgbuf, this->config.get_rack_id());
else
append_string(this->msgbuf, "");
}
this->cur_size = this->msgbuf.size();
@@ -3233,7 +3249,10 @@ int KafkaResponse::parse_fetch(void **buf, size_t *size)
}
if (this->api_version >= 11)
{
CHECK_RET(parse_i32(buf, size, &preferred_read_replica));
ptr->preferred_read_replica = preferred_read_replica;
}
parse_records(buf, size, this->config.get_check_crcs(),
toppar->get_record(),

View File

@@ -344,6 +344,7 @@ void kafka_config_init(kafka_config_t *conf)
conf->client_id = NULL;
conf->check_crcs = 0;
conf->offset_store = KAFKA_OFFSET_AUTO;
conf->rack_id = NULL;
conf->mechanisms = NULL;
conf->username = NULL;
conf->password = NULL;
@@ -355,6 +356,7 @@ void kafka_config_deinit(kafka_config_t *conf)
{
free(conf->broker_version);
free(conf->client_id);
free(conf->rack_id);
free(conf->mechanisms);
free(conf->username);
free(conf->password);
@@ -439,6 +441,7 @@ void kafka_topic_partition_init(kafka_topic_partition_t *toppar)
toppar->error = KAFKA_NONE;
toppar->topic_name = NULL;
toppar->partition = -1;
toppar->preferred_read_replica = -1;
toppar->offset = -1;
toppar->high_watermark = -1;
toppar->low_watermark = -2;

View File

@@ -286,6 +286,7 @@ typedef struct __kafka_config
char *client_id;
int check_crcs;
int offset_store;
char *rack_id;
char *mechanisms;
char *username;
@@ -333,6 +334,7 @@ typedef struct __kafka_topic_partition
short error;
char *topic_name;
int partition;
int preferred_read_replica;
long long offset;
long long high_watermark;
long long low_watermark;