Fix some kafka problems. (#1468)

This commit is contained in:
xiehan
2024-01-09 17:20:41 +08:00
committed by GitHub
parent 3a8c14ce6b
commit 244913d348
5 changed files with 51 additions and 55 deletions

View File

@@ -28,8 +28,9 @@
#include <atomic>
#include <mutex>
#include "WFTaskError.h"
#include "WFKafkaClient.h"
#include "StringUtil.h"
#include "KafkaTaskImpl.inl"
#include "WFKafkaClient.h"
#define KAFKA_HEARTBEAT_INTERVAL (3 * 1000 * 1000)

View File

@@ -23,10 +23,9 @@
#include <string>
#include <vector>
#include <functional>
#include "WFTask.h"
#include "KafkaMessage.h"
#include "KafkaResult.h"
#include "KafkaTaskImpl.inl"
class WFKafkaTask;
class WFKafkaClient;

View File

@@ -16,21 +16,50 @@
Authors: Wang Zhulei (wangzhulei@sogou-inc.com)
*/
#include <errno.h>
#include <assert.h>
#include <algorithm>
#include "KafkaDataTypes.h"
#define MIN(x, y) ((x) <= (y) ? (x) : (y))
namespace protocol
{
#define MIN(x, y) ((x) <= (y) ? (x) : (y))
static int compare_member(const void *p1, const void *p2)
std::string KafkaConfig::get_sasl_info() const
{
kafka_member_t *member1 = (kafka_member_t *)p1;
kafka_member_t *member2 = (kafka_member_t *)p2;
return strcmp(member1->member_id, member2->member_id);
std::string info;
if (strcasecmp(this->ptr->mechanisms, "plain") == 0)
{
info += this->ptr->mechanisms;
info += "|";
info += this->ptr->username;
info += "|";
info += this->ptr->password;
info += "|";
}
else if (strncasecmp(this->ptr->mechanisms, "SCRAM", 5) == 0)
{
info += this->ptr->mechanisms;
info += "|";
info += this->ptr->username;
info += "|";
info += this->ptr->password;
info += "|";
}
return info;
}
static bool compare_member(const kafka_member_t *m1, const kafka_member_t *m2)
{
return strcmp(m1->member_id, m2->member_id) < 0;
}
inline void KafkaMetaSubscriber::sort_by_member()
{
std::sort(this->member_vec.begin(), this->member_vec.end(), compare_member);
}
static bool operator<(const KafkaMetaSubscriber& s1, const KafkaMetaSubscriber& s2)
@@ -114,7 +143,7 @@ int KafkaCgroup::kafka_roundrobin_assignor(kafka_member_t **members,
int next = -1;
std::sort(subscribers->begin(), subscribers->end());
qsort(members, member_elements, sizeof (kafka_member_t *), compare_member);
std::sort(members, members + member_elements, compare_member);
for (const auto& subscriber : *subscribers)
{

View File

@@ -19,15 +19,14 @@
#ifndef _KAFKA_DATATYPES_H_
#define _KAFKA_DATATYPES_H_
#include <assert.h>
#include <algorithm>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <utility>
#include <string.h>
#include <vector>
#include <string>
#include <string.h>
#include <atomic>
#include <snappy.h>
#include <snappy-sinksource.h>
@@ -35,7 +34,6 @@
#include "rbtree.h"
#include "kafka_parser.h"
namespace protocol
{
@@ -553,30 +551,7 @@ public:
return kafka_sasl_set_password(password, this->ptr) == 0;
}
std::string get_sasl_info() const
{
std::string info;
if (strcasecmp(this->ptr->mechanisms, "plain") == 0)
{
info += this->ptr->mechanisms;
info += "|";
info += this->ptr->username;
info += "|";
info += this->ptr->password;
info += "|";
}
else if (strncasecmp(this->ptr->mechanisms, "SCRAM", 5) == 0)
{
info += this->ptr->mechanisms;
info += "|";
info += this->ptr->username;
info += "|";
info += this->ptr->password;
info += "|";
}
return info;
}
std::string get_sasl_info() const;
bool new_client(kafka_sasl_t *sasl)
{
@@ -1284,15 +1259,7 @@ public:
return &this->member_vec;
}
static bool cmp(const kafka_member_t *m1, const kafka_member_t *m2)
{
return strcmp(m1->member_id, m2->member_id) < 0;
}
void sort_by_member()
{
std::sort(this->member_vec.begin(), this->member_vec.end(), cmp);
}
void sort_by_member();
private:
KafkaMeta *meta;

View File

@@ -16,14 +16,14 @@
Authors: Wang Zhulei (wangzhulei@sogou-inc.com)
*/
#include <arpa/inet.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include <openssl/sha.h>
#include <openssl/hmac.h>
#include <openssl/evp.h>
#include <arpa/inet.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <ctype.h>
#include "kafka_parser.h"
static kafka_api_version_t kafka_api_version_queryable[] = {
@@ -610,13 +610,13 @@ int kafka_parser_append_message(const void *buf, size_t *size,
if (s > parser->message_size - parser->cur_size)
{
memcpy(parser->msgbuf + parser->cur_size, buf,
memcpy((char *)parser->msgbuf + parser->cur_size, buf,
parser->message_size - parser->cur_size);
parser->cur_size = parser->message_size;
}
else
{
memcpy(parser->msgbuf + parser->cur_size, buf, s);
memcpy((char *)parser->msgbuf + parser->cur_size, buf, s);
parser->cur_size += s;
}