add kafka client (#96)

* add kafka client

Co-authored-by: wangzhulei <wangzhulei@sogou-inc.com>
This commit is contained in:
wzl12356
2020-11-13 16:38:40 +08:00
committed by GitHub
parent b41ef2858e
commit 5d42d68ab3
27 changed files with 10486 additions and 9 deletions

View File

@@ -84,3 +84,15 @@ set(INCLUDE_HEADERS
src/factory/WFOperator.h
)
if(KAFKA STREQUAL "y")
set(INCLUDE_HEADERS
${INCLUDE_HEADERS}
src/util/crc32c.h
src/protocol/KafkaMessage.h
src/protocol/KafkaDataTypes.h
src/protocol/KafkaResult.h
src/protocol/kafka_parser.h
src/client/WFKafkaClient.h
src/factory/KafkaTaskImpl.inl
)
endif()

View File

@@ -13,12 +13,19 @@ all: base
base:
mkdir -p $(BUILD_DIR)
ifeq ($(DEBUG),y)
cd $(BUILD_DIR) && $(CMAKE3) -DCMAKE_BUILD_TYPE=Debug $(ROOT_DIR)
else ifneq ("${INSTALL_PREFIX}install_prefix", "install_prefix")
cd $(BUILD_DIR) && $(CMAKE3) -DCMAKE_INSTALL_PREFIX:STRING=${INSTALL_PREFIX} $(ROOT_DIR)
ifeq ($(KAFKA),y)
KAFKA=y
else
cd $(BUILD_DIR) && $(CMAKE3) $(ROOT_DIR)
KAFKA=n
endif
ifeq ($(DEBUG),y)
cd $(BUILD_DIR) && $(CMAKE3) -D CMAKE_BUILD_TYPE=Debug -D KAFKA=$(KAFKA) $(ROOT_DIR)
else ifneq ("${INSTALL_PREFIX}install_prefix", "install_prefix")
cd $(BUILD_DIR) && $(CMAKE3) -DCMAKE_INSTALL_PREFIX:STRING=${INSTALL_PREFIX} -D KAFKA=$(KAFKA) $(ROOT_DIR)
else
cd $(BUILD_DIR) && $(CMAKE3) -D KAFKA=$(KAFKA) $(ROOT_DIR)
endif
tutorial: all

View File

@@ -0,0 +1,195 @@
# 异步Kafka客户端kafka_cli
# 示例代码
[tutorial-13-kafka_cli.cc](../tutorial/tutorial-13-kafka_cli.cc)
# 关于编译选项
通过命令make KAFKA=y 编译支持kafka协议的workflow系统需要预先安装zlib,snappy,lz4(>=1.7.5),zstd等第三方库。
# 关于kafka_cli
程序从命令行读取一个kafka broker服务器地址和本次任务的类型(produce/fetch/meta)
./kafka_cli \<broker_url\> [p/c/m]
程序会在执行完任务后自动退出
其中broker_url可以有多个url组成多个url之间以,分割
- 形式如kafka://host:port,kafka://host1:port...
- port默认为9092;
- 如果用户在这一层有upstream选取需求可以参考[upstream文档](../docs/about-upstream.md)。
Kafka broker_url示例
kafka://127.0.0.1/
kafka://kafka.host:9090/
kafka://10.160.23.23:9000,10.123.23.23,kafka://kafka.sogou
# 创建并启动Kafka任务
由于Kafka需要保存broker、meta和group之类的全局信息因此建议用户使用WFKafkaClient这个二级工厂来创建kafka任务
~~~cpp
using kafka_callback_t = std::function<void (WFKafkaTask *)>;
WFKafkaTask *create_kafka_task(const std::string& query, int retry_max, kafka_callback_t cb);
WFKafkaTask *create_kafka_task(int retry_max, kafka_callback_t cb);
~~~
用户有两种方式设置任务的详细信息:
1、在query中直接指定任务类型、topic等信息
使用示例如下:
~~~cpp
int main(int argc, char *argv[])
{
...
client = new WFKafkaClient();
client->init(url);
task = client->create_kafka_task("api=fetch&topic=xxx&topic=yyy", 3, kafka_callback);
...
task->start();
...
}
~~~
2、在创建完WFKafkaTask之后根据任务的类型先调用set_api_type设置然后调用add接口准备输入
关于二级工厂的更多接口,可以在[WFKafkaClient.h](../src/client/WFKafkaClient.h)中查看
比如针对produce任务先创建KafkaRecord然后调用set_key, set_value, add_header_pair等方法构建KafkaRecord
接着调用add_produce_record添加record关于KafkaRecord的更多接口可以在[KafkaDataTypes.h](../src/protocol/KafkaDataTypes.h)中查看
针对fetch和meta任务需要调用add_topic指定topic
其他包括callback、series、user_data等与workflow其他task用法类似。
使用示例如下:
~~~cpp
int main(int argc, char *argv[])
{
...
WFKafkaClient *client_fetch = new WFKafkaClient();
client_fetch->init(url);
task = client_fetch->create_kafka_task("api=produce&topic=xxx&topic=yyy", 3, kafka_callback);
KafkaRecord record;
record.set_key("key1", strlen("key1"));
record.set_value(buf, sizeof(buf));
record.add_header_pair("hk1", 3, "hv1", 3);
task->add_produce_record("workflow_test1", -1, std::move(record));
...
task->start();
...
}
~~~
# fetch任务
fetch任务支持消费者组模式和手动模式
1、消费者组模式
在初始化client的时候需要指定消费者组的名称
使用示例如下:
~~~cpp
int main(int argc, char *argv[])
{
...
WFKafkaClient *client_fetch = new WFKafkaClient();
client_fetch->init(url, cgroup_name);
task = client_fetch->create_kafka_task("api=fetch&topic=xxx&topic=yyy", 3, kafka_callback);
...
task->start();
...
}
~~~
2、手动模型
无需指定消费者组同时需要用户指定topic、partition和offset
使用示例如下:
~~~cpp
client = new WFKafkaClient();
client->init(url);
task = client->create_kafka_task("api=fetch", 3, kafka_callback);
KafkaToppar toppar;
toppar.set_topic_partition("workflow_test1", 0);
toppar.set_offset(0);
task->add_toppar(toppar);
~~~
# 关于client的关闭
在消费者组模式下client在关闭之前需要调用create_leavegroup_task创建leavegroup_task
它会发送leavegroup协议包否则会导致消费者组没有正确退出
# 处理kafka结果
处理结果的函数和其他的示例一样既可以使用普通函数也可以使用std::function来处理结果
~~~cpp
void kafka_callback(WFKafkaTask *task)
{
int state = task->get_state();
int error = task->get_error();
// handle error states
...
protocol::KafkaResult *result = task->get_result();
result->fetch_records(records);
for (auto &v : records)
{
for (auto &w: v)
{
const void *value;
size_t value_len;
w->get_value(&value, &value_len);
printf("produce\ttopic: %s, partition: %d, status: %d, offset: %lld, val_len: %zu\n",
w->get_topic(), w->get_partition(), w->get_status(), w->get_offset(), value_len);
}
}
...
protocol::KafkaResult new_result = std::move(*task->get_result());
if (new_result.fetch_records(records))
{
for (auto &v : records)
{
if (v.empty())
continue;
for (auto &w: v)
{
if (fp)
{
const void *value;
size_t value_len;
w->get_value(&value, &value_len);
fwrite(w->get_value(), w->get_value_len(), 1, fp);
}
}
}
}
...
}
~~~
在这个callback中task就是二级工厂产生的task任务的结果集类型是protocol::KafkaResult。
结果集对象可以通过task->get_result()直接得到,获得结果。
在[KafkaResult.h](../src/protocol/KafkaResult.h)中可以看到KafkaResult的定义。

View File

@@ -38,6 +38,16 @@ add_library(
$<TARGET_OBJECTS:client>
)
if (KAFKA STREQUAL "y")
add_library(
"wfkafka" STATIC
$<TARGET_OBJECTS:client_kafka>
$<TARGET_OBJECTS:util_kafka>
$<TARGET_OBJECTS:protocol_kafka>
$<TARGET_OBJECTS:factory_kafka>
)
endif ()
install(
TARGETS ${PROJECT_NAME}
ARCHIVE
@@ -56,6 +66,15 @@ else ()
add_dependencies(SCRIPT_SHARED_LIB ${PROJECT_NAME})
endif ()
if (KAFKA STREQUAL "y")
set(LIBSOKAFKA ${LIB_DIR}/libwfkafka.so)
add_custom_target(
SCRIPT_SHARED_LIB_KAFKA ALL
COMMAND ${CMAKE_COMMAND} -E echo 'GROUP ( libwfkafka.a AS_NEEDED ( libpthread.so libssl.so libcrypto.so ) ) ' > ${LIBSOKAFKA}
)
add_dependencies(SCRIPT_SHARED_LIB_KAFKA "wfkafka")
endif ()
install(
FILES ${LIBSO}
DESTINATION ${CMAKE_INSTALL_LIBDIR}

View File

@@ -7,3 +7,9 @@ set(SRC
add_library(${PROJECT_NAME} OBJECT ${SRC})
if (KAFKA STREQUAL "y")
set(SRC
WFKafkaClient.cc
)
add_library("client_kafka" OBJECT ${SRC})
endif ()

1419
src/client/WFKafkaClient.cc Normal file

File diff suppressed because it is too large Load Diff

157
src/client/WFKafkaClient.h Normal file
View File

@@ -0,0 +1,157 @@
/*
Copyright (c) 2020 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Authors: Wang Zhulei (wangzhulei@sogou-inc.com)
*/
#ifndef _WFKAFKACLIENT_H_
#define _WFKAFKACLIENT_H_
#include <string>
#include <vector>
#include <functional>
#include "KafkaMessage.h"
#include "KafkaResult.h"
#include "KafkaTaskImpl.inl"
class WFKafkaTask;
class WFKafkaClient;
using kafka_callback_t = std::function<void (WFKafkaTask *)>;
using kafka_partitioner_t = std::function<int (const char *topic_name,
const void *key,
size_t key_len,
int partition_num)>;
class WFKafkaTask : public WFGenericTask
{
public:
virtual bool add_topic(const std::string& topic) = 0;
virtual bool add_toppar(const protocol::KafkaToppar& toppar) = 0;
virtual bool add_produce_record(const std::string& topic, int partition,
protocol::KafkaRecord record) = 0;
void add_commit_record(const protocol::KafkaRecord& record)
{
protocol::KafkaToppar toppar;
toppar.set_topic_partition(record.get_topic(), record.get_partition());
toppar.set_offset(record.get_offset());
this->toppar_list.add_item(std::move(toppar));
}
void set_api_type(int api_type)
{
this->api_type = api_type;
}
int get_api_type() const
{
return this->api_type;
}
void set_config(protocol::KafkaConfig conf)
{
this->config = std::move(conf);
}
void set_partitioner(kafka_partitioner_t partitioner)
{
this->partitioner = std::move(partitioner);
}
protocol::KafkaResult *get_result()
{
return &this->result;
}
void set_callback(kafka_callback_t cb)
{
this->callback = std::move(cb);
}
protected:
WFKafkaTask(int retry_max, kafka_callback_t&& cb)
{
this->callback = std::move(cb);
this->retry_max = retry_max;
this->finish = false;
}
virtual ~WFKafkaTask() {}
virtual SubTask *done();
protected:
protocol::KafkaConfig config;
protocol::KafkaTopparList toppar_list;
protocol::KafkaMetaList meta_list;
protocol::KafkaResult result;
kafka_callback_t callback;
kafka_partitioner_t partitioner;
int api_type;
int retry_max;
bool finish;
private:
friend class WFKafkaClient;
};
class WFKafkaClient
{
public:
WFKafkaClient();
// example: kafka://10.160.23.23:9000
// example: kafka://kafka.sogou
// example: kafka.sogou:9090
// example: kafka://10.160.23.23:9000,10.123.23.23,kafka://kafka.sogou
int init(const std::string& broker_url);
int init(const std::string& broker_url, const std::string& group);
void set_heartbeat_interval(size_t interval_ms);
void deinit();
// example: topic=xxx&topic=yyy&api=fetch
// example: api=commit
WFKafkaTask *create_kafka_task(const std::string& query,
int retry_max,
kafka_callback_t cb);
WFKafkaTask *create_kafka_task(int retry_max, kafka_callback_t cb);
public:
/* If you don't leavegroup manually, rebalance would be triggered */
WFKafkaTask *create_leavegroup_task(int retry_max,
kafka_callback_t callback);
public:
virtual ~WFKafkaClient();
protocol::KafkaMetaList *get_meta_list();
protocol::KafkaBrokerList *get_broker_list();
private:
class KafkaMember *member;
friend class ComplexKafkaTask;
};
#endif

View File

@@ -12,3 +12,9 @@ set(SRC
add_library(${PROJECT_NAME} OBJECT ${SRC})
if (KAFKA STREQUAL "y")
set(SRC
KafkaTaskImpl.cc
)
add_library("factory_kafka" OBJECT ${SRC})
endif ()

View File

@@ -0,0 +1,462 @@
/*
Copyright (c) 2020 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Authors: Wang Zhulei (wangzhulei@sogou-inc.com)
*/
#include <assert.h>
#include <stdio.h>
#include <string>
#include <set>
#include "KafkaTaskImpl.inl"
using namespace protocol;
#define KAFKA_KEEPALIVE_DEFAULT (60 * 1000)
#define KAFKA_ROUNDTRIP_TIMEOUT (5 * 1000)
/**********Client**********/
class __ComplexKafkaTask : public WFComplexClientTask<KafkaRequest, KafkaResponse, std::function<void (__WFKafkaTask *)>>
{
public:
__ComplexKafkaTask(int retry_max, __kafka_callback_t&& callback) :
WFComplexClientTask(retry_max, std::move(callback))
{
update_metadata_ = false;
is_user_request_ = true;
is_redirect_ = false;
}
protected:
virtual CommMessageOut *message_out();
virtual CommMessageIn *message_in();
virtual bool finish_once();
private:
virtual int first_timeout();
bool has_next();
bool check_redirect();
bool update_metadata_;
bool is_user_request_;
bool is_redirect_;
};
CommMessageOut *__ComplexKafkaTask::message_out()
{
KafkaBroker *broker = this->get_req()->get_broker();
if (!broker->get_api())
{
if (!this->get_req()->get_config()->get_broker_version())
{
KafkaRequest *req = new KafkaRequest;
req->duplicate(*this->get_req());
req->set_api(Kafka_ApiVersions);
is_user_request_ = false;
return req;
}
else
{
kafka_api_version_t *api;
size_t api_cnt;
const char *brk_ver = this->get_req()->get_config()->get_broker_version();
int ret = kafka_api_version_is_queryable(brk_ver, &api, &api_cnt);
if (ret == 1)
{
KafkaRequest *req = new KafkaRequest;
req->duplicate(*this->get_req());
req->set_api(Kafka_ApiVersions);
is_user_request_ = false;
return req;
}
else if (ret == 0)
{
broker->allocate_api_version(api_cnt);
memcpy(broker->get_api(), api,
sizeof(kafka_api_version_t) * api_cnt);
}
else
{
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_KAFKA_VERSION_DISALLOWED;
return NULL;
}
}
}
if (this->get_req()->get_api() == Kafka_Fetch)
{
KafkaRequest *req = this->get_req();
req->get_toppar_list()->rewind();
KafkaToppar *toppar;
KafkaTopparList toppar_list;
bool flag = false;
while ((toppar = req->get_toppar_list()->get_next()) != NULL)
{
if (toppar->get_low_watermark() == -2)
{
toppar->set_offset_timestamp(-2);
toppar_list.add_item(*toppar);
flag = true;
}
else if (toppar->get_offset() == -1)
{
toppar->set_offset_timestamp(this->get_req()->get_config()->get_offset_timestamp());
toppar_list.add_item(*toppar);
flag = true;
}
}
if (flag)
{
KafkaRequest *new_req = new KafkaRequest;
new_req->set_broker(*req->get_broker());
new_req->set_toppar_list(toppar_list);
new_req->set_config(*req->get_config());
new_req->set_api(Kafka_ListOffsets);
is_user_request_ = false;
return new_req;
}
}
return this->WFClientTask::message_out();
}
CommMessageIn *__ComplexKafkaTask::message_in()
{
KafkaRequest *req = static_cast<KafkaRequest *>(this->get_message_out());
KafkaResponse *resp = this->get_resp();
resp->set_api(req->get_api());
resp->set_api_version(req->get_api_version());
resp->duplicate(*req);
return this->WFClientTask::message_in();
}
int __ComplexKafkaTask::first_timeout()
{
KafkaRequest *client_req = this->get_req();
int ret = 0;
switch(client_req->get_api())
{
case Kafka_Fetch:
ret = client_req->get_config()->get_fetch_timeout();
break;
case Kafka_JoinGroup:
ret = client_req->get_config()->get_session_timeout();
break;
case Kafka_SyncGroup:
ret = client_req->get_config()->get_rebalance_timeout();
break;
case Kafka_Produce:
ret = client_req->get_config()->get_produce_timeout();
break;
default:
return 0;
}
return ret + KAFKA_ROUNDTRIP_TIMEOUT;
}
bool __ComplexKafkaTask::check_redirect()
{
struct sockaddr_storage addr;
socklen_t addrlen = sizeof addr;
const struct sockaddr *paddr = (const struct sockaddr *)&addr;
KafkaBroker *coordinator = this->get_req()->get_cgroup()->get_coordinator();
//always success
this->get_peer_addr((struct sockaddr *)&addr, &addrlen);
if (!coordinator->is_equal(paddr, addrlen))
{
if (coordinator->is_to_addr())
{
const struct sockaddr *addr_coord;
socklen_t addrlen_coord;
coordinator->get_broker_addr(&addr_coord, &addrlen_coord);
set_redirect(TT_TCP, addr_coord, addrlen_coord, "");
}
else
{
std::string url = "kafka://";
url += coordinator->get_host();
url += ":" + std::to_string(coordinator->get_port());
ParsedURI uri;
URIParser::parse(url, uri);
set_redirect(std::move(uri));
}
return true;
}
else
{
this->init(TT_TCP, paddr, addrlen, "");
return false;
}
}
bool __ComplexKafkaTask::has_next()
{
bool ret = true;
KafkaResponse *msg = this->get_resp();
struct sockaddr_storage addr;
socklen_t addrlen = sizeof addr;
const struct sockaddr *paddr = (const struct sockaddr *)&addr;
//always success
this->get_peer_addr((struct sockaddr *)&addr, &addrlen);
if (!msg->get_broker()->is_to_addr())
{
msg->get_broker()->set_broker_addr(paddr, addrlen);
msg->get_broker()->set_to_addr(1);
}
switch (msg->get_api())
{
case Kafka_FindCoordinator:
if (msg->get_cgroup()->get_error())
{
this->error = msg->get_cgroup()->get_error();
this->state = WFT_STATE_TASK_ERROR;
ret = false;
}
else
{
is_redirect_ = check_redirect();
this->get_req()->set_api(Kafka_JoinGroup);
}
break;
case Kafka_JoinGroup:
if (!msg->get_cgroup()->get_coordinator()->is_to_addr())
{
msg->get_cgroup()->get_coordinator()->set_broker_addr(paddr, addrlen);
msg->get_cgroup()->get_coordinator()->set_to_addr(1);
}
if (msg->get_cgroup()->get_error() == KAFKA_MISSING_TOPIC)
{
this->get_req()->set_api(Kafka_Metadata);
update_metadata_ = true;
}
else if (msg->get_cgroup()->get_error() == KAFKA_MEMBER_ID_REQUIRED)
{
this->get_req()->set_api(Kafka_JoinGroup);
}
else if (msg->get_cgroup()->get_error() == KAFKA_UNKNOWN_MEMBER_ID)
{
msg->get_cgroup()->set_member_id("");
this->get_req()->set_api(Kafka_JoinGroup);
}
else if (msg->get_cgroup()->get_error())
{
this->error = msg->get_cgroup()->get_error();
this->state = WFT_STATE_TASK_ERROR;
ret = false;
}
else
this->get_req()->set_api(Kafka_SyncGroup);
break;
case Kafka_SyncGroup:
if (msg->get_cgroup()->get_error())
{
this->error = msg->get_cgroup()->get_error();
this->state = WFT_STATE_TASK_ERROR;
ret = false;
}
else
this->get_req()->set_api(Kafka_OffsetFetch);
break;
case Kafka_Metadata:
if (update_metadata_)
{
KafkaCgroup *cgroup = msg->get_cgroup();
if (cgroup->run_assignor(msg->get_meta_list(),
cgroup->get_protocol_name()) < 0)
{
this->error = errno;
this->state = WFT_STATE_TASK_ERROR;
}
else
this->get_req()->set_api(Kafka_SyncGroup);
}
else
{
ret = false;
msg->get_meta_list()->rewind();
KafkaMeta *meta;
while ((meta = msg->get_meta_list()->get_next()) != NULL)
{
if (meta->get_error() == KAFKA_LEADER_NOT_AVAILABLE)
{
ret = true;
this->get_req()->set_api(Kafka_Metadata);
break;
}
}
}
break;
case Kafka_Produce:
{
msg->get_toppar_list()->rewind();
KafkaToppar *toppar;
while ((toppar = msg->get_toppar_list()->get_next()) != NULL)
{
if (!toppar->record_reach_end())
{
this->get_req()->set_api(Kafka_Produce);
return true;
}
}
}
case Kafka_Fetch:
case Kafka_OffsetCommit:
case Kafka_OffsetFetch:
case Kafka_ListOffsets:
case Kafka_Heartbeat:
case Kafka_LeaveGroup:
case Kafka_ApiVersions:
ret = false;
break;
default:
ret = false;
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_KAFKA_API_UNKNOWN;
break;
}
return ret;
}
bool __ComplexKafkaTask::finish_once()
{
if (this->state == WFT_STATE_SUCCESS)
{
if (this->get_resp()->parse_response() < 0)
{
this->disable_retry();
this->state = WFT_STATE_TASK_ERROR;
this->error = WFT_ERR_KAFKA_PARSE_RESPONSE_FAILED;
}
else if (has_next() && is_user_request_)
{
this->get_req()->clear_buf();
if (is_redirect_)
{
is_redirect_ = false;
return true;
}
this->clear_resp();
return false;
}
if (!is_user_request_)
{
is_user_request_ = true;
delete this->get_message_out();
this->get_resp()->clear_buf();
return false;
}
if (this->get_resp()->get_api() == Kafka_Fetch ||
this->get_resp()->get_api() == Kafka_Produce)
{
if (*get_mutable_ctx())
(*get_mutable_ctx())(this);
}
}
return true;
}
/**********Factory**********/
__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const std::string& url,
int retry_max,
__kafka_callback_t callback)
{
auto *task = new __ComplexKafkaTask(retry_max, std::move(callback));
ParsedURI uri;
URIParser::parse(url, uri);
task->init(std::move(uri));
task->set_keep_alive(KAFKA_KEEPALIVE_DEFAULT);
return task;
}
__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const ParsedURI& uri,
int retry_max,
__kafka_callback_t callback)
{
auto *task = new __ComplexKafkaTask(retry_max, std::move(callback));
task->init(uri);
task->set_keep_alive(KAFKA_KEEPALIVE_DEFAULT);
return task;
}
__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const struct sockaddr *addr,
socklen_t addrlen,
int retry_max,
__kafka_callback_t callback)
{
auto *task = new __ComplexKafkaTask(retry_max, std::move(callback));
task->init(TT_TCP, addr, addrlen, "");
task->set_keep_alive(KAFKA_KEEPALIVE_DEFAULT);
return task;
}
__WFKafkaTask *__WFKafkaTaskFactory::create_kafka_task(const char *host,
int port,
int retry_max,
__kafka_callback_t callback)
{
auto *task = new __ComplexKafkaTask(retry_max, std::move(callback));
std::string url = "kafka://";
url += host;
url += ":" + std::to_string(port);
ParsedURI uri;
URIParser::parse(url, uri);
task->init(std::move(uri));
task->set_keep_alive(KAFKA_KEEPALIVE_DEFAULT);
return task;
}

View File

@@ -0,0 +1,51 @@
/*
Copyright (c) 2020 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Authors: Wang Zhulei (wangzhulei@sogou-inc.com)
*/
#include "WFTaskFactory.h"
#include "KafkaMessage.h"
// Kafka internal task. For __ComplexKafkaTask usage only
using __WFKafkaTask = WFNetworkTask<protocol::KafkaRequest,
protocol::KafkaResponse>;
using __kafka_callback_t = std::function<void (__WFKafkaTask *)>;
class __WFKafkaTaskFactory
{
public:
/* __WFKafkaTask is create by __ComplexKafkaTask. This is an internal
* interface for create internal task. It should not be created directly by common
* user task.
*/
static __WFKafkaTask *create_kafka_task(const ParsedURI& uri,
int retry_max,
__kafka_callback_t callback);
static __WFKafkaTask *create_kafka_task(const std::string& url,
int retry_max,
__kafka_callback_t callback);
static __WFKafkaTask *create_kafka_task(const struct sockaddr *addr,
socklen_t addrlen,
int retry_max,
__kafka_callback_t callback);
static __WFKafkaTask *create_kafka_task(const char *host,
int port,
int retyr_max,
__kafka_callback_t callback);
};

View File

@@ -48,7 +48,18 @@ enum
WFT_ERR_MYSQL_ACCESS_DENIED = 4002, ///< MySQL, authentication failed
WFT_ERR_MYSQL_INVALID_CHARACTER_SET = 4003, ///< MySQL, invalid charset, not found in MySQL-Documentation
WFT_ERR_MYSQL_COMMAND_DISALLOWED = 4004, ///< MySQL, sql command disabled, cannot be "USE"/"SET NAMES"/"SET CHARSET"/"SET CHARACTER SET"
WFT_ERR_MYSQL_QUERY_NOT_SET = 4005 ///< MySQL, query not set sql, maybe forget please check
WFT_ERR_MYSQL_QUERY_NOT_SET = 4005, ///< MySQL, query not set sql, maybe forget please check
//KAFKA
WFT_ERR_KAFKA_PARSE_RESPONSE_FAILED = 5001, ///< Kafka parse response failed
WFT_ERR_KAFKA_PRODUCE_FAILED = 5002,
WFT_ERR_KAFKA_FETCH_FAILED = 5003,
WFT_ERR_KAFKA_CGROUP_FAILED = 5004,
WFT_ERR_KAFKA_COMMIT_FAILED = 5005,
WFT_ERR_KAFKA_META_FAILED = 5006,
WFT_ERR_KAFKA_LEAVEGROUP_FAILED = 5007,
WFT_ERR_KAFKA_API_UNKNOWN = 5008, ///< api type not supported
WFT_ERR_KAFKA_VERSION_DISALLOWED = 5009, ///< broker version not supported
};
#endif

View File

@@ -663,6 +663,33 @@ static inline const char *__get_task_error_string(int error)
case WFT_ERR_MYSQL_COMMAND_DISALLOWED:
return "MySQL Command Disallowed";
case WFT_ERR_KAFKA_PARSE_RESPONSE_FAILED:
return "Kafka parse response failed";
case WFT_ERR_KAFKA_PRODUCE_FAILED:
return "Kafka produce api failed";
case WFT_ERR_KAFKA_FETCH_FAILED:
return "Kafka fetch api failed";
case WFT_ERR_KAFKA_CGROUP_FAILED:
return "Kafka cgroup failed";
case WFT_ERR_KAFKA_COMMIT_FAILED:
return "Kafka commit api failed";
case WFT_ERR_KAFKA_META_FAILED:
return "Kafka meta api failed";
case WFT_ERR_KAFKA_LEAVEGROUP_FAILED:
return "Kafka leavegroup failed";
case WFT_ERR_KAFKA_API_UNKNOWN:
return "Kafka api type unknown";
case WFT_ERR_KAFKA_VERSION_DISALLOWED:
return "Kafka broker version not supported";
default:
break;
}

View File

@@ -16,3 +16,13 @@ set(SRC
add_library(${PROJECT_NAME} OBJECT ${SRC})
if (KAFKA STREQUAL "y")
set(SRC
kafka_parser.c
KafkaMessage.cc
KafkaDataTypes.cc
KafkaResult.cc
)
add_library("protocol_kafka" OBJECT ${SRC})
endif ()

View File

@@ -0,0 +1,619 @@
/*
Copyright (c) 2020 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Authors: Wang Zhulei (wangzhulei@sogou-inc.com)
*/
#include <errno.h>
#include <assert.h>
#include "KafkaDataTypes.h"
namespace protocol
{
#define MIN(x, y) ((x) <= (y) ? (x) : (y))
static int compare_member(const void *p1, const void *p2)
{
kafka_member_t *member1 = (kafka_member_t *)p1;
kafka_member_t *member2 = (kafka_member_t *)p2;
return strcmp(member1->member_id, member2->member_id);
}
static bool operator<(const KafkaMetaSubscriber& s1, const KafkaMetaSubscriber& s2)
{
return strcmp(s1.get_meta()->get_topic(), s2.get_meta()->get_topic()) < 0;
}
/*
* For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
* resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
*
* The assignment will be:
* C0: [t0p0, t0p1, t1p0, t1p1]
* C1: [t0p2, t1p2]
*/
int KafkaCgroup::kafka_range_assignor(kafka_member_t **members,
int member_elements,
void *meta_topic)
{
std::vector<KafkaMetaSubscriber> *subscribers =
static_cast<std::vector<KafkaMetaSubscriber> *>(meta_topic);
/* The range assignor works on a per-topic basis. */
for (auto& subscriber : *subscribers)
{
subscriber.sort_by_member();
int num_partitions_per_consumer =
subscriber.get_meta()->get_partition_elements() /
subscriber.get_member()->size();
/* If it does not evenly divide, then the first few consumers
* will have one extra partition. */
int consumers_with_extra_partition =
subscriber.get_meta()->get_partition_elements() %
subscriber.get_member()->size();
for (int i = 0 ; i < (int)subscriber.get_member()->size(); i++)
{
int start = num_partitions_per_consumer * i +
MIN(i, consumers_with_extra_partition);
int length = num_partitions_per_consumer +
(i + 1 > consumers_with_extra_partition ? 0 : 1);
if (length == 0)
continue;
for (int j = start; j < length + start; ++j)
{
KafkaToppar *toppar = new KafkaToppar;
if (!toppar->set_topic_partition(subscriber.get_meta()->get_topic(), j))
{
delete toppar;
return -1;
}
list_add_tail(&toppar->list, &subscriber.get_member()->at(i)->assigned_toppar_list);
}
}
}
return 0;
}
/*
* For example, suppose there are two consumers C0 and C1, two topics t0 and
* t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
* t0p2, t1p0, t1p1, and t1p2.
*
* The assignment will be:
* C0: [t0p0, t0p2, t1p1]
* C1: [t0p1, t1p0, t1p2]
*/
int KafkaCgroup::kafka_roundrobin_assignor(kafka_member_t **members,
int member_elements,
void *meta_topic)
{
std::vector<KafkaMetaSubscriber> *subscribers =
static_cast<std::vector<KafkaMetaSubscriber> *>(meta_topic);
int next = -1;
std::sort(subscribers->begin(), subscribers->end());
qsort(members, member_elements, sizeof (kafka_member_t *), compare_member);
for (const auto& subscriber : *subscribers)
{
int partition_elements = subscriber.get_meta()->get_partition_elements();
for (int partition = 0; partition < partition_elements; ++partition)
{
next = (next + 1) % subscriber.get_member()->size();
struct list_head *pos;
KafkaToppar *toppar;
list_for_each(pos, &members[next]->toppar_list)
{
toppar = list_entry(pos, KafkaToppar, list);
if (strcmp(subscriber.get_meta()->get_topic(), toppar->get_topic()) == 0)
break;
else
next++;
if (next >= (int) subscriber.get_member()->size())
abort();
}
toppar = new KafkaToppar;
if (!toppar->set_topic_partition(subscriber.get_meta()->get_topic(),
partition))
{
delete toppar;
return -1;
}
list_add_tail(toppar->get_list(), &members[next]->assigned_toppar_list);
}
}
return 0;
}
bool KafkaMeta::create_partitions(int partition_cnt)
{
if (partition_cnt <= 0)
return true;
kafka_partition_t **partitions;
partitions = (kafka_partition_t **)malloc(sizeof(void *) * partition_cnt);
if (!partitions)
return false;
int i;
for (i = 0; i < partition_cnt; ++i)
{
partitions[i] = (kafka_partition_t *)malloc(sizeof(kafka_partition_t));
if (!partitions[i])
break;
kafka_partition_init(partitions[i]);
}
if (i != partition_cnt)
{
do
{
kafka_partition_deinit(partitions[i]);
free(partitions[i]);
} while (--i >= 0);
free(partitions);
return false;
}
for (i = 0; i < this->ptr->partition_elements; ++i)
{
kafka_partition_deinit(this->ptr->partitions[i]);
free(this->ptr->partitions[i]);
}
free(this->ptr->partitions);
this->ptr->partitions = partitions;
this->ptr->partition_elements = partition_cnt;
return true;
}
int KafkaCgroup::run_assignor(KafkaMetaList *meta_list,
const char *protocol_name)
{
std::vector<KafkaMetaSubscriber> subscribers;
meta_list->rewind();
KafkaMeta *meta;
while ((meta = meta_list->get_next()) != NULL)
{
KafkaMetaSubscriber subscriber;
subscriber.set_meta(meta);
for (int i = 0; i < this->get_member_elements(); ++i)
{
struct list_head *pos;
KafkaToppar *toppar;
bool flag = false;
list_for_each(pos, &this->get_members()[i]->toppar_list)
{
toppar = list_entry(pos, KafkaToppar, list);
if (strcmp(meta->get_topic(), toppar->get_topic()) == 0)
{
flag = true;
break;
}
}
if (flag)
subscriber.add_member(this->get_members()[i]);
}
if (!subscriber.get_member()->empty())
subscribers.emplace_back(subscriber);
}
struct list_head *pos;
kafka_group_protocol_t *protocol;
bool flag = false;
list_for_each(pos, this->get_group_protocol())
{
protocol = list_entry(pos, kafka_group_protocol_t, list);
if (strcmp(protocol_name, protocol->protocol_name) == 0)
{
flag = true;
break;
}
}
if (!flag)
{
errno = EBADMSG;
return -1;
}
protocol->assignor(this->get_members(), this->get_member_elements(),
&subscribers);
return 0;
}
KafkaCgroup::KafkaCgroup()
{
this->ptr = new kafka_cgroup_t;
kafka_cgroup_init(this->ptr);
kafka_group_protocol_t *protocol = new kafka_group_protocol_t;
protocol->protocol_name = new char[strlen("range") + 1];
memcpy(protocol->protocol_name, "range", strlen("range") + 1);
protocol->assignor = kafka_range_assignor;
list_add_tail(&protocol->list, &this->ptr->group_protocol_list);
protocol = new kafka_group_protocol_t;
protocol->protocol_name = new char[strlen("roundrobin") + 1];
memcpy(protocol->protocol_name, "roundrobin", strlen("roundrobin") + 1);
protocol->assignor = kafka_roundrobin_assignor;
list_add_tail(&protocol->list, &this->ptr->group_protocol_list);
this->ref = new std::atomic<int>(1);
this->coordinator = NULL;
}
KafkaCgroup::~KafkaCgroup()
{
if (--*this->ref == 0)
{
for (int i = 0; i < this->ptr->member_elements; ++i)
{
kafka_member_t *member = this->ptr->members[i];
KafkaToppar *toppar;
struct list_head *pos, *tmp;
list_for_each_safe(pos, tmp, &member->toppar_list)
{
toppar = list_entry(pos, KafkaToppar, list);
list_del(pos);
delete toppar;
}
list_for_each_safe(pos, tmp, &member->assigned_toppar_list)
{
toppar = list_entry(pos, KafkaToppar, list);
list_del(pos);
delete toppar;
}
}
kafka_cgroup_deinit(this->ptr);
struct list_head *tmp, *pos;
KafkaToppar *toppar;
list_for_each_safe(pos, tmp, &this->ptr->assigned_toppar_list)
{
toppar = list_entry(pos, KafkaToppar, list);
list_del(pos);
delete toppar;
}
kafka_group_protocol_t *protocol;
list_for_each_safe(pos, tmp, &this->ptr->group_protocol_list)
{
protocol = list_entry(pos, kafka_group_protocol_t, list);
list_del(pos);
delete []protocol->protocol_name;
delete protocol;
}
delete []this->ptr->group_name;
delete this->ptr;
delete this->ref;
}
delete this->coordinator;
}
KafkaCgroup::KafkaCgroup(KafkaCgroup&& move)
{
this->ptr = move.ptr;
move.ptr = new kafka_cgroup_t;
kafka_cgroup_init(move.ptr);
this->ref = new std::atomic<int>(1);
this->coordinator = move.coordinator;
move.coordinator = NULL;
}
KafkaCgroup& KafkaCgroup::operator= (KafkaCgroup&& move)
{
if (this != &move)
{
this->~KafkaCgroup();
this->ptr = move.ptr;
move.ptr = new kafka_cgroup_t;
kafka_cgroup_init(move.ptr);
this->ref = new std::atomic<int>(1);
this->coordinator = move.coordinator;
move.coordinator = NULL;
}
return *this;
}
KafkaCgroup::KafkaCgroup(const KafkaCgroup& copy)
{
this->ptr = copy.ptr;
this->ref = copy.ref;
++*this->ref;
if (copy.coordinator)
this->coordinator = new KafkaBroker(copy.coordinator->get_raw_ptr());
}
KafkaCgroup& KafkaCgroup::operator= (const KafkaCgroup& copy)
{
this->~KafkaCgroup();
this->ptr = copy.ptr;
this->ref = copy.ref;
++*this->ref;
if (copy.coordinator)
this->coordinator = new KafkaBroker(copy.coordinator->get_raw_ptr());
return *this;
}
bool KafkaCgroup::create_members(int member_cnt)
{
if (member_cnt == 0)
return true;
kafka_member_t **members;
members = (kafka_member_t **)malloc(sizeof(void *) * member_cnt);
if (!members)
return false;
int i;
for (i = 0; i < member_cnt; ++i)
{
members[i] = (kafka_member_t *)malloc(sizeof(kafka_member_t));
if (!members[i])
break;
kafka_member_init(members[i]);
INIT_LIST_HEAD(&members[i]->toppar_list);
INIT_LIST_HEAD(&members[i]->assigned_toppar_list);
}
if (i != member_cnt)
{
do
{
KafkaToppar *toppar;
struct list_head *pos, *tmp;
list_for_each_safe(pos, tmp, &members[i]->toppar_list)
{
toppar = list_entry(pos, KafkaToppar, list);
list_del(pos);
delete toppar;
}
list_for_each_safe(pos, tmp, &members[i]->assigned_toppar_list)
{
toppar = list_entry(pos, KafkaToppar, list);
list_del(pos);
delete toppar;
}
kafka_member_deinit(members[i]);
free(members[i]);
} while (--i >= 0);
free(members);
return false;
}
for (i = 0; i < this->ptr->member_elements; ++i)
{
KafkaToppar *toppar;
struct list_head *pos, *tmp;
list_for_each_safe(pos, tmp, &this->ptr->members[i]->toppar_list)
{
toppar = list_entry(pos, KafkaToppar, list);
list_del(pos);
delete toppar;
}
list_for_each_safe(pos, tmp, &this->ptr->members[i]->assigned_toppar_list)
{
toppar = list_entry(pos, KafkaToppar, list);
list_del(pos);
delete toppar;
}
kafka_member_deinit(this->ptr->members[i]);
free(this->ptr->members[i]);
}
free(this->ptr->members);
this->ptr->members = members;
this->ptr->member_elements = member_cnt;
return true;
}
void KafkaCgroup::add_assigned_toppar(KafkaToppar *toppar)
{
list_add_tail(toppar->get_list(), &this->ptr->assigned_toppar_list);
}
void KafkaCgroup::assigned_toppar_rewind()
{
this->curpos = &this->ptr->assigned_toppar_list;
}
KafkaToppar *KafkaCgroup::get_assigned_toppar_next()
{
if (this->curpos->next == &this->ptr->assigned_toppar_list)
return NULL;
this->curpos = this->curpos->next;
return list_entry(this->curpos, KafkaToppar, list);
}
void KafkaCgroup::del_assigned_toppar_cur()
{
assert(this->curpos != &this->ptr->assigned_toppar_list);
this->curpos = this->curpos->prev;
list_del(this->curpos->next);
}
bool KafkaRecord::add_header_pair(const void *key, size_t key_len,
const void *val, size_t val_len)
{
kafka_record_header_t *header;
header = (kafka_record_header_t *)malloc(sizeof(kafka_record_header_t));
if (!header)
return false;
kafka_record_header_init(header);
if (kafka_record_header_set_kv(key, key_len, val, val_len, header) < 0)
{
free(header);
return false;
}
list_add_tail(&header->list, &this->ptr->header_list);
return true;
}
bool KafkaRecord::add_header_pair(const std::string& key,
const std::string& val)
{
return add_header_pair(key.c_str(), key.size(), val.c_str(), val.size());
}
KafkaToppar::~KafkaToppar()
{
if (--*this->ref == 0)
{
kafka_topic_partition_deinit(this->ptr);
struct list_head *tmp, *pos;
KafkaRecord *record;
list_for_each_safe(pos, tmp, &this->ptr->record_list)
{
record = list_entry(pos, KafkaRecord, list);
list_del(pos);
delete record;
}
delete this->ptr;
delete this->ref;
}
}
void KafkaBuffer::list_splice(KafkaBuffer *buffer)
{
struct list_head *pre_insert;
struct list_head *pre_tail;
this->buf_size -= this->insert_buf_size;
pre_insert = this->insert_pos->next;
__list_splice(buffer->get_head(), this->insert_pos);
pre_tail = this->block_list.get_tail();
buffer->get_head()->prev->next = this->block_list.get_head();
this->block_list.get_head()->prev = buffer->get_head()->prev;
buffer->get_head()->next = pre_insert;
buffer->get_head()->prev = pre_tail;
pre_tail->next = buffer->get_head();
pre_insert->prev = buffer->get_head();
this->buf_size += buffer->get_size();
}
size_t KafkaBuffer::peek(const char **buf)
{
if (!this->inited)
{
this->inited = true;
this->cur_pos = std::make_pair(this->block_list.get_next(), 0);
}
if (this->cur_pos.first == this->block_list.get_tail_entry() &&
this->cur_pos.second == this->block_list.get_tail_entry()->get_len())
{
*buf = NULL;
return 0;
}
KafkaBlock *block = this->cur_pos.first;
if (this->cur_pos.second >= block->get_len())
{
block = this->block_list.get_next();
this->cur_pos = std::make_pair(block, 0);
}
*buf = (char *)block->get_block() + this->cur_pos.second;
return block->get_len() - this->cur_pos.second;
}
KafkaToppar *get_toppar(const char *topic, int partition,
KafkaTopparList *toppar_list)
{
struct list_head *pos;
KafkaToppar *toppar;
list_for_each(pos, toppar_list->get_head())
{
toppar = list_entry(pos, KafkaToppar, list);
if (strcmp(toppar->get_topic(), topic) == 0 &&
toppar->get_partition() == partition)
return toppar;
}
return NULL;
}
const KafkaMeta *get_meta(const char *topic, KafkaMetaList *meta_list)
{
struct list_head *pos;
const KafkaMeta *meta;
list_for_each(pos, meta_list->get_head())
{
meta = list_entry(pos, KafkaMeta, list);
if (strcmp(meta->get_topic(), topic) == 0)
return meta;
}
return NULL;
}
} /* namespace protocol */

File diff suppressed because it is too large Load Diff

3643
src/protocol/KafkaMessage.cc Normal file

File diff suppressed because it is too large Load Diff

247
src/protocol/KafkaMessage.h Normal file
View File

@@ -0,0 +1,247 @@
/*
Copyright (c) 2020 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Wang Zhulei(wangzhulei@sogou-inc.com)
*/
#ifndef _KAFKAMESSAGE_H_
#define _KAFKAMESSAGE_H_
#include <string.h>
#include <utility>
#include <string>
#include <vector>
#include <map>
#include <functional>
#include "kafka_parser.h"
#include "ProtocolMessage.h"
#include "EncodeStream.h"
#include "KafkaDataTypes.h"
namespace protocol
{
class KafkaMessage : public ProtocolMessage
{
public:
KafkaMessage();
virtual ~KafkaMessage();
private:
virtual int encode(struct iovec vectors[], int max);
virtual int append(const void *buf, size_t *size);
int encode_head();
public:
KafkaMessage(KafkaMessage&& msg);
KafkaMessage& operator= (KafkaMessage&& msg);
public:
int encode_message(int api_type, struct iovec vectors[], int max);
void set_api(int api_type) { this->api_type = api_type; }
int get_api() const { return this->api_type; }
void set_api_version(int ver) { this->api_version = ver; }
int get_api_version() const { return this->api_version; }
void set_config(const KafkaConfig& conf)
{
this->config = conf;
}
const KafkaConfig *get_config() const { return &this->config; }
void set_cgroup(const KafkaCgroup& cgroup)
{
this->cgroup = cgroup;
}
KafkaCgroup *get_cgroup()
{
return &this->cgroup;
}
void set_broker(const KafkaBroker& broker)
{
this->broker = broker;
}
KafkaBroker *get_broker()
{
return &this->broker;
}
void set_meta_list(const KafkaMetaList& meta_list)
{
this->meta_list = meta_list;
}
KafkaMetaList *get_meta_list()
{
return &this->meta_list;
}
void set_toppar_list(const KafkaTopparList& toppar_list)
{
this->toppar_list = toppar_list;
}
KafkaTopparList *get_toppar_list()
{
return &this->toppar_list;
}
void set_broker_list(const KafkaBrokerList& broker_list)
{
this->broker_list = broker_list;
}
KafkaBrokerList *get_broker_list()
{
return &this->broker_list;
}
void duplicate(KafkaMessage& msg)
{
this->config = msg.config;
this->cgroup = msg.cgroup;
this->broker = msg.broker;
this->meta_list = msg.meta_list;
this->broker_list = msg.broker_list;
this->toppar_list = msg.toppar_list;
}
void duplicate2(KafkaMessage& msg)
{
kafka_parser_deinit(this->parser);
delete this->parser;
this->config = msg.config;
this->cgroup = msg.cgroup;
this->broker = msg.broker;
this->meta_list = msg.meta_list;
this->broker_list = msg.broker_list;
this->toppar_list = msg.toppar_list;
this->uncompressed = msg.uncompressed;
this->parser = msg.parser;
msg.parser = new kafka_parser_t;
kafka_parser_init(msg.parser);
}
void clear_buf()
{
this->msgbuf.clear();
this->headbuf.clear();
kafka_parser_deinit(this->parser);
kafka_parser_init(this->parser);
this->cur_size = 0;
this->serialized = std::move(KafkaBuffer());
}
protected:
static int parse_message_set(void **buf, size_t *size, int msg_vers,
struct list_head *record_list,
KafkaBuffer *uncompressed,
KafkaToppar *toppar);
static int parse_records(void **buf, size_t *size,
struct list_head *record_list,
KafkaBuffer *uncompressed,
KafkaToppar *toppar);
static std::string get_member_assignment(kafka_member_t *member);
static KafkaToppar *find_toppar_by_name(const std::string& topic, int partition,
struct list_head *toppar_list);
static KafkaToppar *find_toppar_by_name(const std::string& topic, int partition,
KafkaTopparList *toppar_list);
static int kafka_parse_member_assignment(const char *bbuf, size_t n,
KafkaCgroup *cgroup);
protected:
kafka_parser_t *parser;
using encode_func = std::function<int (struct iovec vectors[], int max)>;
std::map<int, encode_func> encode_func_map;
using parse_func = std::function<int (void **buf, size_t *size)>;
std::map<int, parse_func> parse_func_map;
EncodeStream *stream;
std::string msgbuf;
std::string headbuf;
KafkaConfig config;
KafkaCgroup cgroup;
KafkaBroker broker;
KafkaMetaList meta_list;
KafkaBrokerList broker_list;
KafkaTopparList toppar_list;
KafkaBuffer serialized;
KafkaBuffer uncompressed;
int api_type;
int api_version;
int message_version;
std::map<int, int> api_mver_map;
void *compress_env;
size_t cur_size;
};
class KafkaRequest : public KafkaMessage
{
public:
KafkaRequest();
private:
int encode_produce(struct iovec vectors[], int max);
int encode_fetch(struct iovec vectors[], int max);
int encode_metadata(struct iovec vectors[], int max);
int encode_findcoordinator(struct iovec vectors[], int max);
int encode_listoffset(struct iovec vectors[], int max);
int encode_joingroup(struct iovec vectors[], int max);
int encode_syncgroup(struct iovec vectors[], int max);
int encode_leavegroup(struct iovec vectors[], int max);
int encode_heartbeat(struct iovec vectors[], int max);
int encode_offsetcommit(struct iovec vectors[], int max);
int encode_offsetfetch(struct iovec vectors[], int max);
int encode_apiversions(struct iovec vectors[], int max);
};
class KafkaResponse : public KafkaMessage
{
public:
KafkaResponse();
int parse_response();
private:
int parse_produce(void **buf, size_t *size);
int parse_fetch(void **buf, size_t *size);
int parse_metadata(void **buf, size_t *size);
int parse_findcoordinator(void **buf, size_t *size);
int parse_joingroup(void **buf, size_t *size);
int parse_syncgroup(void **buf, size_t *size);
int parse_leavegroup(void **buf, size_t *size);
int parse_listoffset(void **buf, size_t *size);
int parse_offsetcommit(void **buf, size_t *size);
int parse_offsetfetch(void **buf, size_t *size);
int parse_heartbeat(void **buf, size_t *size);
int parse_apiversions(void **buf, size_t *size);
};
}
#endif

120
src/protocol/KafkaResult.cc Normal file
View File

@@ -0,0 +1,120 @@
/*
Copyright (c) 2020 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Authors: Wang Zhulei (wangzhulei@sogou-inc.com)
*/
#include "KafkaResult.h"
namespace protocol
{
enum
{
KAFKA_STATUS_GET_RESULT,
KAFKA_STATUS_END,
};
KafkaResult::KafkaResult()
{
this->resp_vec = NULL;
this->resp_num = 0;
}
KafkaResult& KafkaResult::operator= (KafkaResult&& move)
{
if (this != &move)
{
delete []this->resp_vec;
this->resp_vec = move.resp_vec;
move.resp_vec = NULL;
this->resp_num = move.resp_num;
move.resp_num = 0;
}
return *this;
}
KafkaResult::KafkaResult(KafkaResult&& move)
{
this->resp_vec = move.resp_vec;
move.resp_vec = NULL;
this->resp_num = move.resp_num;
move.resp_num = 0;
}
void KafkaResult::create(size_t n)
{
delete []this->resp_vec;
this->resp_vec = new KafkaResponse[n];
this->resp_num = n;
}
void KafkaResult::set_resp(KafkaResponse&& resp, size_t i)
{
assert(i < this->resp_num);
this->resp_vec[i] = std::move(resp);
}
void KafkaResult::fetch_toppars(std::vector<KafkaToppar *>& toppars)
{
toppars.clear();
KafkaToppar *toppar = NULL;
for (size_t i = 0; i < this->resp_num; ++i)
{
if (this->resp_vec[i].get_api() != Kafka_OffsetCommit)
continue;
this->resp_vec[i].get_toppar_list()->rewind();
while ((toppar = this->resp_vec[i].get_toppar_list()->get_next()) != NULL)
toppars.push_back(toppar);
}
}
void KafkaResult::fetch_records(std::vector<std::vector<KafkaRecord *>>& records)
{
records.clear();
KafkaToppar *toppar = NULL;
KafkaRecord *record = NULL;
for (size_t i = 0; i < this->resp_num; ++i)
{
if (this->resp_vec[i].get_api() != Kafka_Produce &&
this->resp_vec[i].get_api() != Kafka_Fetch)
continue;
this->resp_vec[i].get_toppar_list()->rewind();
while ((toppar = this->resp_vec[i].get_toppar_list()->get_next()) != NULL)
{
std::vector<KafkaRecord *> tmp;
toppar->record_rewind();
while ((record = toppar->get_record_next()) != NULL)
tmp.push_back(record);
if (!tmp.empty())
records.emplace_back(std::move(tmp));
}
}
}
}

View File

@@ -0,0 +1,64 @@
/*
Copyright (c) 2020 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Authors: Wang Zhulei (wangzhulei@sogou-inc.com)
*/
#ifndef _KAFKARESULT_H_
#define _KAFKARESULT_H_
#include <map>
#include <vector>
#include <string>
#include "KafkaMessage.h"
#include "KafkaDataTypes.h"
namespace protocol
{
class KafkaResult
{
public:
// for offsetcommit
void fetch_toppars(std::vector<KafkaToppar *>& toppars);
// for produce, fetch
void fetch_records(std::vector<std::vector<KafkaRecord *>>& records);
public:
void create(size_t n);
void set_resp(KafkaResponse&& resp, size_t i);
public:
KafkaResult();
virtual ~KafkaResult()
{
delete []this->resp_vec;
}
KafkaResult& operator= (KafkaResult&& move);
KafkaResult(KafkaResult&& move);
private:
KafkaResponse *resp_vec;
size_t resp_num;
};
}
#endif

687
src/protocol/kafka_parser.c Normal file
View File

@@ -0,0 +1,687 @@
/*
Copyright (c) 2020 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Authors: Wang Zhulei (wangzhulei@sogou-inc.com)
*/
#include <arpa/inet.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <ctype.h>
#include "kafka_parser.h"
#define MIN(a, b) ((x) <= (y) ? (x) : (y))
static kafka_api_version_t kafka_api_version_queryable[] = {
{ Kafka_ApiVersions, 0, 0 }
};
static kafka_api_version_t kafka_api_version_0_9_0[] = {
{ Kafka_Produce, 0, 1 },
{ Kafka_Fetch, 0, 1 },
{ Kafka_ListOffsets, 0, 0 },
{ Kafka_Metadata, 0, 0 },
{ Kafka_OffsetCommit, 0, 2 },
{ Kafka_OffsetFetch, 0, 1 },
{ Kafka_FindCoordinator, 0, 0 },
{ Kafka_JoinGroup, 0, 0 },
{ Kafka_Heartbeat, 0, 0 },
{ Kafka_LeaveGroup, 0, 0 },
{ Kafka_SyncGroup, 0, 0 },
{ Kafka_DescribeGroups, 0, 0 },
{ Kafka_ListGroups, 0, 0 }
};
static kafka_api_version_t kafka_api_version_0_8_2[] = {
{ Kafka_Produce, 0, 0 },
{ Kafka_Fetch, 0, 0 },
{ Kafka_ListOffsets, 0, 0 },
{ Kafka_Metadata, 0, 0 },
{ Kafka_OffsetCommit, 0, 1 },
{ Kafka_OffsetFetch, 0, 1 },
{ Kafka_FindCoordinator, 0, 0 }
};
static kafka_api_version_t kafka_api_version_0_8_1[] = {
{ Kafka_Produce, 0, 0 },
{ Kafka_Fetch, 0, 0 },
{ Kafka_ListOffsets, 0, 0 },
{ Kafka_Metadata, 0, 0 },
{ Kafka_OffsetCommit, 0, 1 },
{ Kafka_OffsetFetch, 0, 0 }
};
static kafka_api_version_t kafka_api_version_0_8_0[] = {
{ Kafka_Produce, 0, 0 },
{ Kafka_Fetch, 0, 0 },
{ Kafka_ListOffsets, 0, 0 },
{ Kafka_Metadata, 0, 0 }
};
static const struct kafka_feature_map {
unsigned feature;
kafka_api_version_t depends[Kafka_ApiNums];
} kafka_feature_map[] = {
{
.feature = KAFKA_FEATURE_MSGVER1,
.depends = {
{ Kafka_Produce, 2, 2 },
{ Kafka_Fetch, 2, 2 },
{ Kafka_Unknown, 0, 0 },
},
},
{
.feature = KAFKA_FEATURE_MSGVER2,
.depends = {
{ Kafka_Produce, 3, 3 },
{ Kafka_Fetch, 4, 4 },
{ Kafka_Unknown, 0, 0 },
},
},
{
.feature = KAFKA_FEATURE_APIVERSION,
.depends = {
{ Kafka_ApiVersions, 0, 0 },
{ Kafka_Unknown, 0, 0 },
},
},
{
.feature = KAFKA_FEATURE_BROKER_GROUP_COORD,
.depends = {
{ Kafka_FindCoordinator, 0, 0 },
{ Kafka_Unknown, 0, 0 },
},
},
{
.feature = KAFKA_FEATURE_BROKER_BALANCED_CONSUMER,
.depends = {
{ Kafka_FindCoordinator, 0, 0 },
{ Kafka_OffsetCommit, 1, 2 },
{ Kafka_OffsetFetch, 1, 1 },
{ Kafka_JoinGroup, 0, 0 },
{ Kafka_SyncGroup, 0, 0 },
{ Kafka_Heartbeat, 0, 0 },
{ Kafka_LeaveGroup, 0, 0 },
{ Kafka_Unknown, 0, 0 },
},
},
{
.feature = KAFKA_FEATURE_THROTTLETIME,
.depends = {
{ Kafka_Produce, 1, 2 },
{ Kafka_Fetch, 1, 2 },
{ Kafka_Unknown, 0, 0 },
},
},
{
.feature = KAFKA_FEATURE_LZ4,
.depends = {
{ Kafka_FindCoordinator, 0, 0 },
{ Kafka_Unknown, 0, 0 },
},
},
{
.feature = KAFKA_FEATURE_OFFSET_TIME,
.depends = {
{ Kafka_ListOffsets, 1, 1 },
{ Kafka_Unknown, 0, 0 },
}
},
{
.feature = KAFKA_FEATURE_ZSTD,
.depends = {
{ Kafka_Produce, 7, 7 },
{ Kafka_Fetch, 10, 10 },
{ Kafka_Unknown, 0, 0 },
},
},
{
.feature = 0,
},
};
static int kafka_get_legacy_api_version(const char *broker_version,
kafka_api_version_t **api,
size_t *api_cnt)
{
static const struct {
const char *pfx;
kafka_api_version_t *api;
size_t api_cnt;
} vermap[] = {
{ "0.9.0",
kafka_api_version_0_9_0,
sizeof(kafka_api_version_0_9_0) / sizeof(kafka_api_version_t)
},
{ "0.8.2",
kafka_api_version_0_8_2,
sizeof(kafka_api_version_0_8_2) / sizeof(kafka_api_version_t)
},
{ "0.8.1",
kafka_api_version_0_8_1,
sizeof(kafka_api_version_0_8_1) / sizeof(kafka_api_version_t)
},
{ "0.8.0",
kafka_api_version_0_8_0,
sizeof(kafka_api_version_0_8_0) / sizeof(kafka_api_version_t)
},
{ "0.7.", NULL, 0 },
{ "0.6", NULL, 0 },
{ "", kafka_api_version_queryable, 1 },
{ NULL, NULL, 0 }
};
int i, ret = 0;
for (i = 0 ; vermap[i].pfx ; i++)
{
if (!strncmp(vermap[i].pfx, broker_version, strlen(vermap[i].pfx)))
{
if (!vermap[i].api)
return -1;
*api = vermap[i].api;
*api_cnt = vermap[i].api_cnt;
break;
}
}
return ret;
}
int kafka_api_version_is_queryable(const char *broker_version,
kafka_api_version_t **api,
size_t *api_cnt)
{
int ret = kafka_get_legacy_api_version(broker_version, api, api_cnt);
if (ret <= 0)
return ret;
return *api == kafka_api_version_queryable;
}
static int kafka_api_version_key_cmp(const void *_a, const void *_b)
{
const kafka_api_version_t *a = _a, *b = _b;
if (a->api_key > b->api_key)
return 1;
else if (a->api_key == b->api_key)
return 0;
else
return -1;
}
static int kafka_api_version_check(const kafka_api_version_t *apis,
size_t api_cnt,
const kafka_api_version_t *match)
{
const kafka_api_version_t *api;
api = bsearch(match, apis, api_cnt, sizeof(*apis),
kafka_api_version_key_cmp);
if (!api)
return 0;
return match->min_ver <= api->max_ver && api->min_ver <= match->max_ver;
}
unsigned kafka_get_features(kafka_api_version_t *api, size_t api_cnt)
{
unsigned features = 0;
int i, fails, r;
const kafka_api_version_t *match;
for (i = 0 ; kafka_feature_map[i].feature != 0 ; i++)
{
fails = 0;
for (match = &kafka_feature_map[i].depends[0];
match->api_key != -1 ; match++)
{
r = kafka_api_version_check(api, api_cnt, match);
fails += !r;
}
if (!fails)
features |= kafka_feature_map[i].feature;
}
return features;
}
int kafka_broker_get_api_version(const kafka_broker_t *broker,
int api_key,
int min_ver, int max_ver)
{
kafka_api_version_t sk = { .api_key = api_key };
kafka_api_version_t *retp;
retp = bsearch(&sk, broker->api, broker->api_elements,
sizeof(*broker->api), kafka_api_version_key_cmp);
if (!retp)
return -1;
if (retp->max_ver < max_ver)
{
if (retp->max_ver < min_ver)
return -1;
else
return retp->max_ver;
}
else if (retp->min_ver > min_ver)
return -1;
else
return max_ver;
}
void kafka_parser_init(kafka_parser_t *parser)
{
parser->complete = 0;
parser->message_size = 0;
parser->msgbuf = NULL;
parser->cur_size = 0;
parser->hsize = 0;
}
void kafka_parser_deinit(kafka_parser_t *parser)
{
free(parser->msgbuf);
}
void kafka_config_init(kafka_config_t *conf)
{
conf->produce_timeout = 100;
conf->produce_msg_max_bytes = 1000000;
conf->produce_msgset_cnt = 10000;
conf->produce_msgset_max_bytes = 1000000;
conf->fetch_timeout = 100;
conf->fetch_min_bytes = 1;
conf->fetch_max_bytes = 50 * 1024 * 1024;
conf->fetch_msg_max_bytes = 1024 * 1024;
conf->offset_timestamp = -2;
conf->commit_timestamp = 0;
conf->session_timeout = 10*1000;
conf->rebalance_timeout = 10000;
conf->retention_time_period = 20000;
conf->produce_acks = -1;
conf->allow_auto_topic_creation = 1;
conf->api_version_request = 0;
conf->api_version_timeout = 10000;
conf->broker_version = NULL;
conf->compress_type = Kafka_NoCompress;
conf->compress_level = 0;
conf->client_id = NULL;
}
void kafka_config_deinit(kafka_config_t *conf)
{
free(conf->broker_version);
free(conf->client_id);
}
void kafka_partition_init(kafka_partition_t *partition)
{
partition->error = KAFKA_NONE;
partition->partition_index = -1;
kafka_broker_init(&partition->leader);
partition->replica_nodes = NULL;
partition->replica_node_elements = 0;
partition->isr_nodes = NULL;
partition->isr_node_elements = 0;
}
void kafka_partition_deinit(kafka_partition_t *partition)
{
kafka_broker_deinit(&partition->leader);
free(partition->replica_nodes);
free(partition->isr_nodes);
}
void kafka_broker_init(kafka_broker_t *broker)
{
broker->node_id = -1;
broker->port = 0;
broker->host = NULL;
broker->rack = NULL;
broker->to_addr = 0;
memset(&broker->addr, 0, sizeof(broker->addr));
broker->addrlen = 0;
broker->features = 0;
broker->api = NULL;
broker->api_elements = 0;
}
void kafka_broker_deinit(kafka_broker_t *broker)
{
free(broker->host);
free(broker->rack);
free(broker->api);
}
void kafka_meta_init(kafka_meta_t *meta)
{
meta->error = KAFKA_NONE;
meta->topic_name = NULL;
meta->error_message = NULL;
meta->is_internal = 0;
meta->partitions = NULL;
meta->partition_elements = 0;
}
void kafka_meta_deinit(kafka_meta_t *meta)
{
int i;
free(meta->topic_name);
free(meta->error_message);
for (i = 0; i < meta->partition_elements; ++i)
{
kafka_partition_deinit(meta->partitions[i]);
free(meta->partitions[i]);
}
free(meta->partitions);
}
void kafka_topic_partition_init(kafka_topic_partition_t *toppar)
{
toppar->error = KAFKA_NONE;
toppar->topic_name = NULL;
toppar->partition = -1;
toppar->offset = -1;
toppar->high_watermark = -1;
toppar->low_watermark = -2;
toppar->last_stable_offset = -1;
toppar->log_start_offset = -1;
toppar->offset_timestamp = -1;
toppar->committed_metadata = NULL;
INIT_LIST_HEAD(&toppar->record_list);
}
void kafka_topic_partition_deinit(kafka_topic_partition_t *toppar)
{
free(toppar->topic_name);
free(toppar->committed_metadata);
}
void kafka_record_header_init(kafka_record_header_t *header)
{
header->key = NULL;
header->key_len = 0;
header->key_is_move = 0;
header->value = NULL;
header->value_len = 0;
header->value_is_move = 0;
}
void kafka_record_header_deinit(kafka_record_header_t *header)
{
if (!header->key_is_move)
free(header->key);
if (!header->value_is_move)
free(header->value);
}
void kafka_record_init(kafka_record_t *record)
{
record->key = NULL;
record->key_len = 0;
record->key_is_move = 0;
record->value = NULL;
record->value_len = 0;
record->value_is_move = 0;
record->timestamp = 0;
record->offset = 0;
INIT_LIST_HEAD(&record->header_list);
record->status = KAFKA_UNKNOWN_SERVER_ERROR;
record->toppar = NULL;
}
void kafka_record_deinit(kafka_record_t *record)
{
struct list_head *tmp, *pos;
kafka_record_header_t *header;
if (!record->key_is_move)
free(record->key);
if (!record->value_is_move)
free(record->value);
list_for_each_safe(pos, tmp, &record->header_list)
{
header = list_entry(pos, kafka_record_header_t, list);
list_del(pos);
kafka_record_header_deinit(header);
free(header);
}
}
void kafka_member_init(kafka_member_t *member)
{
member->member_id = NULL;
member->client_id = NULL;
member->client_host = NULL;
member->member_metadata = NULL;
member->member_metadata_len = 0;
}
void kafka_member_deinit(kafka_member_t *member)
{
free(member->member_id);
free(member->client_id);
free(member->client_host);
//do not need free!
//free(member->member_metadata);
}
void kafka_cgroup_init(kafka_cgroup_t *cgroup)
{
INIT_LIST_HEAD(&cgroup->assigned_toppar_list);
cgroup->error = KAFKA_NONE;
cgroup->error_msg = NULL;
kafka_broker_init(&cgroup->coordinator);
cgroup->leader_id = NULL;
cgroup->member_id = NULL;
cgroup->members = NULL;
cgroup->member_elements = 0;
cgroup->generation_id = -1;
cgroup->group_name = NULL;
cgroup->protocol_type = "consumer";
cgroup->protocol_name = NULL;
INIT_LIST_HEAD(&cgroup->group_protocol_list);
}
void kafka_cgroup_deinit(kafka_cgroup_t *cgroup)
{
int i;
free(cgroup->error_msg);
kafka_broker_deinit(&cgroup->coordinator);
free(cgroup->leader_id);
free(cgroup->member_id);
for (i = 0; i < cgroup->member_elements; ++i)
{
kafka_member_deinit(cgroup->members[i]);
free(cgroup->members[i]);
}
free(cgroup->members);
free(cgroup->protocol_name);
}
void kafka_block_init(kafka_block_t *block)
{
block->buf = NULL;
block->len = 0;
block->is_move = 0;
}
void kafka_block_deinit(kafka_block_t *block)
{
if (!block->is_move)
free(block->buf);
}
int kafka_parser_append_message(const void *buf, size_t *size,
kafka_parser_t *parser)
{
int totaln;
if (parser->complete)
{
*size = 0;
return 1;
}
size_t s = *size;
if (parser->hsize + *size < 4)
{
memcpy(parser->headbuf + parser->hsize, buf, s);
parser->hsize += s;
return 0;
}
else if (!parser->msgbuf)
{
memcpy(parser->headbuf + parser->hsize, buf, 4 - parser->hsize);
buf = (const char *)buf + 4 - parser->hsize;
s -= 4 - parser->hsize;
parser->hsize = 4;
memcpy(&totaln, parser->headbuf, 4);
parser->message_size = ntohl(totaln);
parser->msgbuf = malloc(parser->message_size);
if (!parser->msgbuf)
return -1;
parser->cur_size = 0;
}
if (s > parser->message_size - parser->cur_size)
{
memcpy(parser->msgbuf + parser->cur_size, buf, parser->message_size - parser->cur_size);
parser->cur_size = parser->message_size;
}
else
{
memcpy(parser->msgbuf + parser->cur_size, buf, s);
parser->cur_size += s;
}
if (parser->cur_size < parser->message_size)
return 0;
*size -= parser->message_size - parser->cur_size;
return 1;
}
int kafka_topic_partition_set_tp(const char *topic_name, int partition,
kafka_topic_partition_t *toppar)
{
char *p = strdup(topic_name);
if (!p)
return -1;
free(toppar->topic_name);
toppar->topic_name = p;
toppar->partition = partition;
return 0;
}
int kafka_record_set_key(const void *key, size_t key_len,
kafka_record_t *record)
{
void *k = malloc(key_len);
if (!k)
return -1;
free(record->key);
memcpy(k, key, key_len);
record->key = k;
record->key_len = key_len;
return 0;
}
int kafka_record_set_value(const void *val, size_t val_len,
kafka_record_t *record)
{
void *v = malloc(val_len);
if (!v)
return -1;
free(record->value);
memcpy(v, val, val_len);
record->value = v;
record->value_len = val_len;
return 0;
}
int kafka_record_header_set_kv(const void *key, size_t key_len,
const void *val, size_t val_len,
kafka_record_header_t *header)
{
void *k = malloc(key_len);
if (!k)
return -1;
void *v = malloc(val_len);
if (!v)
{
free(k);
return -1;
}
memcpy(k, key, key_len);
memcpy(v, val, val_len);
header->key = k;
header->key_len = key_len;
header->value = v;
header->value_len = val_len;
return 0;
}
int kafka_meta_set_topic(const char *topic, kafka_meta_t *meta)
{
char *t = strdup(topic);
if (!t)
return -1;
free(meta->topic_name);
meta->topic_name = t;
return 0;
}
int kafka_cgroup_set_group(const char *group, kafka_cgroup_t *cgroup)
{
char *t = strdup(group);
if (!t)
return -1;
free(cgroup->group_name);
cgroup->group_name = t;
return 0;
}

425
src/protocol/kafka_parser.h Normal file
View File

@@ -0,0 +1,425 @@
/*
Copyright (c) 2020 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Authors: Wang Zhulei (wangzhulei@sogou-inc.com)
*/
#ifndef _KAFKA_PARSER_H_
#define _KAFKA_PARSER_H_
#include <arpa/inet.h>
#include <stddef.h>
#include <stdint.h>
#include "list.h"
enum
{
KAFKA_UNKNOWN_SERVER_ERROR = -1,
KAFKA_NONE = 0,
KAFKA_OFFSET_OUT_OF_RANGE = 1,
KAFKA_CORRUPT_MESSAGE = 2,
KAFKA_UNKNOWN_TOPIC_OR_PARTITION = 3,
KAFKA_INVALID_FETCH_SIZE = 4,
KAFKA_LEADER_NOT_AVAILABLE = 5,
KAFKA_NOT_LEADER_FOR_PARTITION = 6,
KAFKA_REQUEST_TIMED_OUT = 7,
KAFKA_BROKER_NOT_AVAILABLE = 8,
KAFKA_REPLICA_NOT_AVAILABLE = 9,
KAFKA_MESSAGE_TOO_LARGE = 10,
KAFKA_STALE_CONTROLLER_EPOCH = 11,
KAFKA_OFFSET_METADATA_TOO_LARGE = 12,
KAFKA_NETWORK_EXCEPTION = 13,
KAFKA_COORDINATOR_LOAD_IN_PROGRESS = 14,
KAFKA_COORDINATOR_NOT_AVAILABLE = 15,
KAFKA_NOT_COORDINATOR = 16,
KAFKA_INVALID_TOPIC_EXCEPTION = 17,
KAFKA_RECORD_LIST_TOO_LARGE = 18,
KAFKA_NOT_ENOUGH_REPLICAS = 19,
KAFKA_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20,
KAFKA_INVALID_REQUIRED_ACKS = 21,
KAFKA_ILLEGAL_GENERATION = 22,
KAFKA_INCONSISTENT_GROUP_PROTOCOL = 23,
KAFKA_INVALID_GROUP_ID = 24,
KAFKA_UNKNOWN_MEMBER_ID = 25,
KAFKA_INVALID_SESSION_TIMEOUT = 26,
KAFKA_REBALANCE_IN_PROGRESS = 27,
KAFKA_INVALID_COMMIT_OFFSET_SIZE = 28,
KAFKA_TOPIC_AUTHORIZATION_FAILED = 29,
KAFKA_GROUP_AUTHORIZATION_FAILED = 30,
KAFKA_CLUSTER_AUTHORIZATION_FAILED = 31,
KAFKA_INVALID_TIMESTAMP = 32,
KAFKA_UNSUPPORTED_SASL_MECHANISM = 33,
KAFKA_ILLEGAL_SASL_STATE = 34,
KAFKA_UNSUPPORTED_VERSION = 35,
KAFKA_TOPIC_ALREADY_EXISTS = 36,
KAFKA_INVALID_PARTITIONS = 37,
KAFKA_INVALID_REPLICATION_FACTOR = 38,
KAFKA_INVALID_REPLICA_ASSIGNMENT = 39,
KAFKA_INVALID_CONFIG = 40,
KAFKA_NOT_CONTROLLER = 41,
KAFKA_INVALID_REQUEST = 42,
KAFKA_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43,
KAFKA_POLICY_VIOLATION = 44,
KAFKA_OUT_OF_ORDER_SEQUENCE_NUMBER = 45,
KAFKA_DUPLICATE_SEQUENCE_NUMBER = 46,
KAFKA_INVALID_PRODUCER_EPOCH = 47,
KAFKA_INVALID_TXN_STATE = 48,
KAFKA_INVALID_PRODUCER_ID_MAPPING = 49,
KAFKA_INVALID_TRANSACTION_TIMEOUT = 50,
KAFKA_CONCURRENT_TRANSACTIONS = 51,
KAFKA_TRANSACTION_COORDINATOR_FENCED = 52,
KAFKA_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53,
KAFKA_SECURITY_DISABLED = 54,
KAFKA_OPERATION_NOT_ATTEMPTED = 55,
KAFKA_KAFKA_STORAGE_ERROR = 56,
KAFKA_LOG_DIR_NOT_FOUND = 57,
KAFKA_SASL_AUTHENTICATION_FAILED = 58,
KAFKA_UNKNOWN_PRODUCER_ID = 59,
KAFKA_REASSIGNMENT_IN_PROGRESS = 60,
KAFKA_DELEGATION_TOKEN_AUTH_DISABLED = 61,
KAFKA_DELEGATION_TOKEN_NOT_FOUND = 62,
KAFKA_DELEGATION_TOKEN_OWNER_MISMATCH = 63,
KAFKA_DELEGATION_TOKEN_REQUEST_NOT_ALLOWED = 64,
KAFKA_DELEGATION_TOKEN_AUTHORIZATION_FAILED = 65,
KAFKA_DELEGATION_TOKEN_EXPIRED = 66,
KAFKA_INVALID_PRINCIPAL_TYPE = 67,
KAFKA_NON_EMPTY_GROUP = 68,
KAFKA_GROUP_ID_NOT_FOUND = 69,
KAFKA_FETCH_SESSION_ID_NOT_FOUND = 70,
KAFKA_INVALID_FETCH_SESSION_EPOCH = 71,
KAFKA_LISTENER_NOT_FOUND = 72,
KAFKA_TOPIC_DELETION_DISABLED = 73,
KAFKA_FENCED_LEADER_EPOCH = 74,
KAFKA_UNKNOWN_LEADER_EPOCH = 75,
KAFKA_UNSUPPORTED_COMPRESSION_TYPE = 76,
KAFKA_STALE_BROKER_EPOCH = 77,
KAFKA_OFFSET_NOT_AVAILABLE = 78,
KAFKA_MEMBER_ID_REQUIRED = 79,
KAFKA_PREFERRED_LEADER_NOT_AVAILABLE = 80,
KAFKA_GROUP_MAX_SIZE_REACHED = 81,
KAFKA_FENCED_INSTANCE_ID = 82,
KAFKA_MISSING_TOPIC = 256 + 1,
};
enum
{
Kafka_Unknown = -1,
Kafka_Produce = 0,
Kafka_Fetch = 1,
Kafka_ListOffsets = 2,
Kafka_Metadata = 3,
Kafka_LeaderAndIsr = 4,
Kafka_StopReplica = 5,
Kafka_UpdateMetadata = 6,
Kafka_ControlledShutdown = 7,
Kafka_OffsetCommit = 8,
Kafka_OffsetFetch = 9,
Kafka_FindCoordinator = 10,
Kafka_JoinGroup = 11,
Kafka_Heartbeat = 12,
Kafka_LeaveGroup = 13,
Kafka_SyncGroup = 14,
Kafka_DescribeGroups = 15,
Kafka_ListGroups = 16,
Kafka_SaslHandshake = 17,
Kafka_ApiVersions = 18,
Kafka_CreateTopics = 19,
Kafka_DeleteTopics = 20,
Kafka_DeleteRecords = 21,
Kafka_InitProducerId = 22,
Kafka_OffsetForLeaderEpoch = 23,
Kafka_AddPartitionsToTxn = 24,
Kafka_AddOffsetsToTxn = 25,
Kafka_EndTxn = 26,
Kafka_WriteTxnMarkers = 27,
Kafka_TxnOffsetCommit = 28,
Kafka_DescribeAcls = 29,
Kafka_CreateAcls = 30,
Kafka_DeleteAcls = 31,
Kafka_DescribeConfigs = 32,
Kafka_AlterConfigs = 33,
Kafka_AlterReplicaLogDirs = 34,
Kafka_DescribeLogDirs = 35,
Kafka_SaslAuthenticate = 36,
Kafka_CreatePartitions = 37,
Kafka_CreateDelegationToken = 38,
Kafka_RenewDelegationToken = 39,
Kafka_ExpireDelegationToken = 40,
Kafka_DescribeDelegationToken = 41,
Kafka_DeleteGroups = 42,
Kafka_ElectPreferredLeaders = 43,
Kafka_IncrementalAlterConfigs = 44,
Kafka_ApiNums,
};
enum
{
Kafka_NoCompress,
Kafka_Gzip,
Kafka_Snappy,
Kafka_Lz4,
Kafka_Zstd,
};
enum
{
KAFKA_FEATURE_APIVERSION = 1<<0,
KAFKA_FEATURE_BROKER_BALANCED_CONSUMER = 1<<1,
KAFKA_FEATURE_THROTTLETIME = 1<<2,
KAFKA_FEATURE_BROKER_GROUP_COORD = 1<<3,
KAFKA_FEATURE_LZ4 = 1<<4,
KAFKA_FEATURE_OFFSET_TIME = 1<<5,
KAFKA_FEATURE_MSGVER2 = 1<<6,
KAFKA_FEATURE_MSGVER1 = 1<<7,
KAFKA_FEATURE_ZSTD = 1<<8,
};
typedef struct __kafka_api_version
{
short api_key;
short min_ver;
short max_ver;
} kafka_api_version_t;
typedef struct __kafka_parser
{
int complete;
size_t message_size;
void *msgbuf;
size_t cur_size;
char headbuf[4];
size_t hsize;
} kafka_parser_t;
typedef struct __kafka_config
{
int produce_timeout;
int produce_msg_max_bytes;
int produce_msgset_cnt;
int produce_msgset_max_bytes;
int fetch_timeout;
int fetch_min_bytes;
int fetch_max_bytes;
int fetch_msg_max_bytes;
long long offset_timestamp;
long long commit_timestamp;
int session_timeout;
int rebalance_timeout;
long long retention_time_period;
int produce_acks;
int allow_auto_topic_creation;
int api_version_request;
int api_version_timeout;
char *broker_version;
int compress_type;
int compress_level;
char *client_id;
} kafka_config_t;
typedef struct __kafka_broker
{
int node_id;
int port;
char *host;
char *rack;
int to_addr;
struct sockaddr_storage addr;
socklen_t addrlen;
unsigned features;
kafka_api_version_t *api;
int api_elements;
} kafka_broker_t;
typedef struct __kafka_partition
{
short error;
int partition_index;
kafka_broker_t leader;
int *replica_nodes;
int replica_node_elements;
int *isr_nodes;
int isr_node_elements;
} kafka_partition_t;
typedef struct __kafka_meta
{
short error;
char *topic_name;
char *error_message;
signed char is_internal;
kafka_partition_t **partitions;
int partition_elements;
} kafka_meta_t;
typedef struct __kafka_topic_partition
{
short error;
char *topic_name;
int partition;
long long offset;
long long high_watermark;
long long low_watermark;
long long last_stable_offset;
long long log_start_offset;
long long offset_timestamp;
char *committed_metadata;
struct list_head record_list;
} kafka_topic_partition_t;
typedef struct __kafka_record_header
{
struct list_head list;
void *key;
size_t key_len;
int key_is_move;
void *value;
size_t value_len;
int value_is_move;
} kafka_record_header_t;
typedef struct __kafka_record
{
void *key;
size_t key_len;
int key_is_move;
void *value;
size_t value_len;
int value_is_move;
long long timestamp;
long long offset;
struct list_head header_list;
short status;
kafka_topic_partition_t *toppar;
} kafka_record_t;
typedef struct __kafka_memeber
{
char *member_id;
char *client_id;
char *client_host;
void *member_metadata;
size_t member_metadata_len;
struct list_head toppar_list;
struct list_head assigned_toppar_list;
} kafka_member_t;
typedef int (*kafka_assignor_t)(kafka_member_t **members, int member_elements,
void *meta_topic);
typedef struct __kafka_group_protocol
{
struct list_head list;
char *protocol_name;
kafka_assignor_t assignor;
} kafka_group_protocol_t;
typedef struct __kafka_cgroup
{
struct list_head assigned_toppar_list;
short error;
char *error_msg;
kafka_broker_t coordinator;
char *leader_id;
char *member_id;
kafka_member_t **members;
int member_elements;
int generation_id;
char *group_name;
char *protocol_type;
char *protocol_name;
struct list_head group_protocol_list;
} kafka_cgroup_t;
typedef struct __kafka_block
{
void *buf;
size_t len;
int is_move;
} kafka_block_t;
#ifdef __cplusplus
extern "C"
{
#endif
int kafka_parser_append_message(const void *buf, size_t *size,
kafka_parser_t *parser);
void kafka_parser_init(kafka_parser_t *parser);
void kafka_parser_deinit(kafka_parser_t *parser);
void kafka_topic_partition_init(kafka_topic_partition_t *toppar);
void kafka_topic_partition_deinit(kafka_topic_partition_t *toppar);
void kafka_cgroup_init(kafka_cgroup_t *cgroup);
void kafka_cgroup_deinit(kafka_cgroup_t *cgroup);
void kafka_block_init(kafka_block_t *block);
void kafka_block_deinit(kafka_block_t *block);
void kafka_broker_init(kafka_broker_t *brock);
void kafka_broker_deinit(kafka_broker_t *broker);
void kafka_config_init(kafka_config_t *config);
void kafka_config_deinit(kafka_config_t *config);
void kafka_meta_init(kafka_meta_t *meta);
void kafka_meta_deinit(kafka_meta_t *meta);
void kafka_partition_init(kafka_partition_t *partition);
void kafka_partition_deinit(kafka_partition_t *partition);
void kafka_member_init(kafka_member_t *member);
void kafka_member_deinit(kafka_member_t *member);
void kafka_record_init(kafka_record_t *record);
void kafka_record_deinit(kafka_record_t *record);
void kafka_record_header_init(kafka_record_header_t *header);
void kafka_record_header_deinit(kafka_record_header_t *header);
int kafka_topic_partition_set_tp(const char *topic_name, int partition,
kafka_topic_partition_t *toppar);
int kafka_record_set_key(const void *key, size_t key_len,
kafka_record_t *record);
int kafka_record_set_value(const void *val, size_t val_len,
kafka_record_t *record);
int kafka_record_header_set_kv(const void *key, size_t key_len,
const void *val, size_t val_len,
kafka_record_header_t *header);
int kafka_meta_set_topic(const char *topic_name, kafka_meta_t *meta);
int kafka_cgroup_set_group(const char *group_name, kafka_cgroup_t *cgroup);
int kafka_broker_get_api_version(const kafka_broker_t *broker,
int api_key,
int min_ver, int max_ver);
unsigned kafka_get_features(kafka_api_version_t *api, size_t api_cnt);
int kafka_api_version_is_queryable(const char *broker_version,
kafka_api_version_t **api,
size_t *api_cnt);
#ifdef __cplusplus
}
#endif
#endif

View File

@@ -10,3 +10,9 @@ set(SRC
add_library(${PROJECT_NAME} OBJECT ${SRC})
if (KAFKA STREQUAL "y")
set(SRC
crc32c.c
)
add_library("util_kafka" OBJECT ${SRC})
endif ()

519
src/util/crc32c.c Normal file
View File

@@ -0,0 +1,519 @@
/* Copied from http://stackoverflow.com/a/17646775/1821055
* with the following modifications:
* * remove test code
* * global hw/sw initialization to be called once per process
* * HW support is determined by configure's WITH_CRC32C_HW
* * Windows porting (no hardware support on Windows yet)
*
* FIXME:
* * Hardware support on Windows (MSVC assembler)
* * Hardware support on ARM
*/
/* crc32c.c -- compute CRC-32C using the Intel crc32 instruction
* Copyright (C) 2013 Mark Adler
* Version 1.1 1 Aug 2013 Mark Adler
*/
/*
This software is provided 'as-is', without any express or implied
warranty. In no event will the author be held liable for any damages
arising from the use of this software.
Permission is granted to anyone to use this software for any purpose,
including commercial applications, and to alter it and redistribute it
freely, subject to the following restrictions:
1. The origin of this software must not be misrepresented; you must not
claim that you wrote the original software. If you use this software
in a product, an acknowledgment in the product documentation would be
appreciated but is not required.
2. Altered source versions must be plainly marked as such, and must not be
misrepresented as being the original software.
3. This notice may not be removed or altered from any source distribution.
Mark Adler
madler@alumni.caltech.edu
*/
/* Use hardware CRC instruction on Intel SSE 4.2 processors. This computes a
CRC-32C, *not* the CRC-32 used by Ethernet and zip, gzip, etc. A software
version is provided as a fall-back, as well as for speed comparisons. */
/* Version history:
1.0 10 Feb 2013 First version
1.1 1 Aug 2013 Correct comments on why three crc instructions in parallel
*/
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
/**
* Provides portable endian-swapping macros/functions.
*
* be64toh()
* htobe64()
* be32toh()
* htobe32()
* be16toh()
* htobe16()
* le64toh()
*/
#ifdef __FreeBSD__
#include <sys/endian.h>
#elif defined __GLIBC__
#include <endian.h>
#ifndef be64toh
/* Support older glibc (<2.9) which lack be64toh */
#include <byteswap.h>
#if __BYTE_ORDER == __BIG_ENDIAN
#define be16toh(x) (x)
#define be32toh(x) (x)
#define be64toh(x) (x)
#define le64toh(x) __bswap_64 (x)
#define le32toh(x) __bswap_32 (x)
#else
#define be16toh(x) __bswap_16 (x)
#define be32toh(x) __bswap_32 (x)
#define be64toh(x) __bswap_64 (x)
#define le64toh(x) (x)
#define le32toh(x) (x)
#endif
#endif
#elif defined __CYGWIN__
#include <endian.h>
#elif defined __BSD__
#include <sys/endian.h>
#elif defined __sun
#include <sys/byteorder.h>
#include <sys/isa_defs.h>
#define __LITTLE_ENDIAN 1234
#define __BIG_ENDIAN 4321
#ifdef _BIG_ENDIAN
#define __BYTE_ORDER __BIG_ENDIAN
#define be64toh(x) (x)
#define be32toh(x) (x)
#define be16toh(x) (x)
#define le16toh(x) ((uint16_t)BSWAP_16(x))
#define le32toh(x) BSWAP_32(x)
#define le64toh(x) BSWAP_64(x)
# else
#define __BYTE_ORDER __LITTLE_ENDIAN
#define be64toh(x) BSWAP_64(x)
#define be32toh(x) ntohl(x)
#define be16toh(x) ntohs(x)
#define le16toh(x) (x)
#define le32toh(x) (x)
#define le64toh(x) (x)
#define htole16(x) (x)
#define htole64(x) (x)
#endif /* __sun */
#elif defined __APPLE__
#include <machine/endian.h>
#include <libkern/OSByteOrder.h>
#if __DARWIN_BYTE_ORDER == __DARWIN_BIG_ENDIAN
#define be64toh(x) (x)
#define be32toh(x) (x)
#define be16toh(x) (x)
#define le16toh(x) OSSwapInt16(x)
#define le32toh(x) OSSwapInt32(x)
#define le64toh(x) OSSwapInt64(x)
#else
#define be64toh(x) OSSwapInt64(x)
#define be32toh(x) OSSwapInt32(x)
#define be16toh(x) OSSwapInt16(x)
#define le16toh(x) (x)
#define le32toh(x) (x)
#define le64toh(x) (x)
#endif
#elif defined(_WIN32)
#include <intrin.h>
#define be64toh(x) _byteswap_uint64(x)
#define be32toh(x) _byteswap_ulong(x)
#define be16toh(x) _byteswap_ushort(x)
#define le16toh(x) (x)
#define le32toh(x) (x)
#define le64toh(x) (x)
#elif defined _AIX /* AIX is always big endian */
#define be64toh(x) (x)
#define be32toh(x) (x)
#define be16toh(x) (x)
#define le32toh(x) \
((((x) & 0xff) << 24) | \
(((x) & 0xff00) << 8) | \
(((x) & 0xff0000) >> 8) | \
(((x) & 0xff000000) >> 24))
#define le64toh(x) \
((((x) & 0x00000000000000ffL) << 56) | \
(((x) & 0x000000000000ff00L) << 40) | \
(((x) & 0x0000000000ff0000L) << 24) | \
(((x) & 0x00000000ff000000L) << 8) | \
(((x) & 0x000000ff00000000L) >> 8) | \
(((x) & 0x0000ff0000000000L) >> 24) | \
(((x) & 0x00ff000000000000L) >> 40) | \
(((x) & 0xff00000000000000L) >> 56))
#else
#include <endian.h>
#endif
/*
* On Solaris, be64toh is a function, not a macro, so there's no need to error
* if it's not defined.
*/
#if !defined(__sun) && !defined(be64toh)
#error Missing definition for be64toh
#endif
#ifndef be32toh
#define be32toh(x) ntohl(x)
#endif
#ifndef be16toh
#define be16toh(x) ntohs(x)
#endif
#ifndef htobe64
#define htobe64(x) be64toh(x)
#endif
#ifndef htobe32
#define htobe32(x) be32toh(x)
#endif
#ifndef htobe16
#define htobe16(x) be16toh(x)
#endif
#ifndef htole32
#define htole32(x) le32toh(x)
#endif
/* CRC-32C (iSCSI) polynomial in reversed bit order. */
#define POLY 0x82f63b78
/* Table for a quadword-at-a-time software crc. */
static uint32_t crc32c_table[8][256];
/* Construct table for software CRC-32C calculation. */
static void crc32c_init_sw(void)
{
uint32_t n, crc, k;
for (n = 0; n < 256; n++) {
crc = n;
crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
crc = crc & 1 ? (crc >> 1) ^ POLY : crc >> 1;
crc32c_table[0][n] = crc;
}
for (n = 0; n < 256; n++) {
crc = crc32c_table[0][n];
for (k = 1; k < 8; k++) {
crc = crc32c_table[0][crc & 0xff] ^ (crc >> 8);
crc32c_table[k][n] = crc;
}
}
}
/* Table-driven software version as a fall-back. This is about 15 times slower
than using the hardware instructions. This assumes little-endian integers,
as is the case on Intel processors that the assembler code here is for. */
static uint32_t crc32c_sw(uint32_t crci, const void *buf, size_t len)
{
const unsigned char *next = buf;
uint64_t crc;
crc = crci ^ 0xffffffff;
while (len && ((uintptr_t)next & 7) != 0) {
crc = crc32c_table[0][(crc ^ *next++) & 0xff] ^ (crc >> 8);
len--;
}
while (len >= 8) {
/* Alignment-safe */
uint64_t ncopy;
memcpy(&ncopy, next, sizeof(ncopy));
crc ^= le64toh(ncopy);
crc = crc32c_table[7][crc & 0xff] ^
crc32c_table[6][(crc >> 8) & 0xff] ^
crc32c_table[5][(crc >> 16) & 0xff] ^
crc32c_table[4][(crc >> 24) & 0xff] ^
crc32c_table[3][(crc >> 32) & 0xff] ^
crc32c_table[2][(crc >> 40) & 0xff] ^
crc32c_table[1][(crc >> 48) & 0xff] ^
crc32c_table[0][crc >> 56];
next += 8;
len -= 8;
}
while (len) {
crc = crc32c_table[0][(crc ^ *next++) & 0xff] ^ (crc >> 8);
len--;
}
return (uint32_t)crc ^ 0xffffffff;
}
#if WITH_CRC32C_HW
static int sse42; /* Cached SSE42 support */
/* Multiply a matrix times a vector over the Galois field of two elements,
GF(2). Each element is a bit in an unsigned integer. mat must have at
least as many entries as the power of two for most significant one bit in
vec. */
static RD_INLINE uint32_t gf2_matrix_times(uint32_t *mat, uint32_t vec)
{
uint32_t sum;
sum = 0;
while (vec) {
if (vec & 1)
sum ^= *mat;
vec >>= 1;
mat++;
}
return sum;
}
/* Multiply a matrix by itself over GF(2). Both mat and square must have 32
rows. */
static RD_INLINE void gf2_matrix_square(uint32_t *square, uint32_t *mat)
{
int n;
for (n = 0; n < 32; n++)
square[n] = gf2_matrix_times(mat, mat[n]);
}
/* Construct an operator to apply len zeros to a crc. len must be a power of
two. If len is not a power of two, then the result is the same as for the
largest power of two less than len. The result for len == 0 is the same as
for len == 1. A version of this routine could be easily written for any
len, but that is not needed for this application. */
static void crc32c_zeros_op(uint32_t *even, size_t len)
{
int n;
uint32_t row;
uint32_t odd[32]; /* odd-power-of-two zeros operator */
/* put operator for one zero bit in odd */
odd[0] = POLY; /* CRC-32C polynomial */
row = 1;
for (n = 1; n < 32; n++) {
odd[n] = row;
row <<= 1;
}
/* put operator for two zero bits in even */
gf2_matrix_square(even, odd);
/* put operator for four zero bits in odd */
gf2_matrix_square(odd, even);
/* first square will put the operator for one zero byte (eight zero bits),
in even -- next square puts operator for two zero bytes in odd, and so
on, until len has been rotated down to zero */
do {
gf2_matrix_square(even, odd);
len >>= 1;
if (len == 0)
return;
gf2_matrix_square(odd, even);
len >>= 1;
} while (len);
/* answer ended up in odd -- copy to even */
for (n = 0; n < 32; n++)
even[n] = odd[n];
}
/* Take a length and build four lookup tables for applying the zeros operator
for that length, byte-by-byte on the operand. */
static void crc32c_zeros(uint32_t zeros[][256], size_t len)
{
uint32_t n;
uint32_t op[32];
crc32c_zeros_op(op, len);
for (n = 0; n < 256; n++) {
zeros[0][n] = gf2_matrix_times(op, n);
zeros[1][n] = gf2_matrix_times(op, n << 8);
zeros[2][n] = gf2_matrix_times(op, n << 16);
zeros[3][n] = gf2_matrix_times(op, n << 24);
}
}
/* Apply the zeros operator table to crc. */
static RD_INLINE uint32_t crc32c_shift(uint32_t zeros[][256], uint32_t crc)
{
return zeros[0][crc & 0xff] ^ zeros[1][(crc >> 8) & 0xff] ^
zeros[2][(crc >> 16) & 0xff] ^ zeros[3][crc >> 24];
}
/* Block sizes for three-way parallel crc computation. LONG and SHORT must
both be powers of two. The associated string constants must be set
accordingly, for use in constructing the assembler instructions. */
#define LONG 8192
#define LONGx1 "8192"
#define LONGx2 "16384"
#define SHORT 256
#define SHORTx1 "256"
#define SHORTx2 "512"
/* Tables for hardware crc that shift a crc by LONG and SHORT zeros. */
static uint32_t crc32c_long[4][256];
static uint32_t crc32c_short[4][256];
/* Initialize tables for shifting crcs. */
static void crc32c_init_hw(void)
{
crc32c_zeros(crc32c_long, LONG);
crc32c_zeros(crc32c_short, SHORT);
}
/* Compute CRC-32C using the Intel hardware instruction. */
static uint32_t crc32c_hw(uint32_t crc, const void *buf, size_t len)
{
const unsigned char *next = buf;
const unsigned char *end;
uint64_t crc0, crc1, crc2; /* need to be 64 bits for crc32q */
/* pre-process the crc */
crc0 = crc ^ 0xffffffff;
/* compute the crc for up to seven leading bytes to bring the data pointer
to an eight-byte boundary */
while (len && ((uintptr_t)next & 7) != 0) {
__asm__("crc32b\t" "(%1), %0"
: "=r"(crc0)
: "r"(next), "0"(crc0));
next++;
len--;
}
/* compute the crc on sets of LONG*3 bytes, executing three independent crc
instructions, each on LONG bytes -- this is optimized for the Nehalem,
Westmere, Sandy Bridge, and Ivy Bridge architectures, which have a
throughput of one crc per cycle, but a latency of three cycles */
while (len >= LONG*3) {
crc1 = 0;
crc2 = 0;
end = next + LONG;
do {
__asm__("crc32q\t" "(%3), %0\n\t"
"crc32q\t" LONGx1 "(%3), %1\n\t"
"crc32q\t" LONGx2 "(%3), %2"
: "=r"(crc0), "=r"(crc1), "=r"(crc2)
: "r"(next), "0"(crc0), "1"(crc1), "2"(crc2));
next += 8;
} while (next < end);
crc0 = crc32c_shift(crc32c_long, crc0) ^ crc1;
crc0 = crc32c_shift(crc32c_long, crc0) ^ crc2;
next += LONG*2;
len -= LONG*3;
}
/* do the same thing, but now on SHORT*3 blocks for the remaining data less
than a LONG*3 block */
while (len >= SHORT*3) {
crc1 = 0;
crc2 = 0;
end = next + SHORT;
do {
__asm__("crc32q\t" "(%3), %0\n\t"
"crc32q\t" SHORTx1 "(%3), %1\n\t"
"crc32q\t" SHORTx2 "(%3), %2"
: "=r"(crc0), "=r"(crc1), "=r"(crc2)
: "r"(next), "0"(crc0), "1"(crc1), "2"(crc2));
next += 8;
} while (next < end);
crc0 = crc32c_shift(crc32c_short, crc0) ^ crc1;
crc0 = crc32c_shift(crc32c_short, crc0) ^ crc2;
next += SHORT*2;
len -= SHORT*3;
}
/* compute the crc on the remaining eight-byte units less than a SHORT*3
block */
end = next + (len - (len & 7));
while (next < end) {
__asm__("crc32q\t" "(%1), %0"
: "=r"(crc0)
: "r"(next), "0"(crc0));
next += 8;
}
len &= 7;
/* compute the crc for up to seven trailing bytes */
while (len) {
__asm__("crc32b\t" "(%1), %0"
: "=r"(crc0)
: "r"(next), "0"(crc0));
next++;
len--;
}
/* return a post-processed crc */
return (uint32_t)crc0 ^ 0xffffffff;
}
/* Check for SSE 4.2. SSE 4.2 was first supported in Nehalem processors
introduced in November, 2008. This does not check for the existence of the
cpuid instruction itself, which was introduced on the 486SL in 1992, so this
will fail on earlier x86 processors. cpuid works on all Pentium and later
processors. */
#define SSE42(have) \
do { \
uint32_t eax, ecx; \
eax = 1; \
__asm__("cpuid" \
: "=c"(ecx) \
: "a"(eax) \
: "%ebx", "%edx"); \
(have) = (ecx >> 20) & 1; \
} while (0)
#endif /* WITH_CRC32C_HW */
/* Compute a CRC-32C. If the crc32 instruction is available, use the hardware
version. Otherwise, use the software version. */
uint32_t crc32c(uint32_t crc, const void *buf, size_t len)
{
#if WITH_CRC32C_HW
if (sse42)
return crc32c_hw(crc, buf, len);
else
#endif
return crc32c_sw(crc, buf, len);
}
/**
* @brief Populate shift tables once
*/
void crc32c_global_init (void) {
#if WITH_CRC32C_HW
SSE42(sse42);
if (sse42)
crc32c_init_hw();
else
#endif
crc32c_init_sw();
}

46
src/util/crc32c.h Normal file
View File

@@ -0,0 +1,46 @@
/*
* Copyright (c) 2017 Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _CRC32C_H_
#define _CRC32C_H_
#include <stdint.h>
#include <stddef.h>
#ifdef __cplusplus
extern "C"
{
#endif
uint32_t crc32c(uint32_t crc, const void *buf, size_t len);
void crc32c_global_init (void);
#ifdef __cplusplus
}
#endif
#endif /* _CRC32C_H_ */

View File

@@ -48,6 +48,11 @@ foreach(src ${TUTORIAL_LIST})
target_link_libraries(${bin_name} ${WORKFLOW_LIB})
endforeach()
if (KAFKA STREQUAL "y")
add_executable("kafka_cli" "tutorial-13-kafka_cli.cc")
target_link_libraries("kafka_cli" wfkafka workflow z snappy lz4 zstd rt)
endif ()
set(DIR10 tutorial-10-user_defined_protocol)
add_executable(server ${DIR10}/server.cc ${DIR10}/message.cc)
add_executable(client ${DIR10}/client.cc ${DIR10}/message.cc)

View File

@@ -10,10 +10,17 @@ CMAKE3 := $(shell if which cmake3>/dev/null ; then echo cmake3; else echo cmake;
all:
mkdir -p $(BUILD_DIR)
ifeq ($(DEBUG),y)
cd $(BUILD_DIR) && $(CMAKE3) -D CMAKE_BUILD_TYPE=Debug $(ROOT_DIR)
ifeq ($(KAFKA),y)
KAFKA=y
else
cd $(BUILD_DIR) && $(CMAKE3) $(ROOT_DIR)
KAFKA=n
endif
ifeq ($(DEBUG),y)
cd $(BUILD_DIR) && $(CMAKE3) -D CMAKE_BUILD_TYPE=Debug -D KAFKA=$(KAFKA) $(ROOT_DIR)
else
cd $(BUILD_DIR) && $(CMAKE3) -D KAFKA=$(KAFKA) $(ROOT_DIR)
endif
make -C $(BUILD_DIR) -f Makefile

View File

@@ -0,0 +1,290 @@
/*
Copyright (c) 2020 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Wang Zhulei (wangzhulei@sogou-inc.com)
*/
#include <netdb.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <string>
#include "workflow/WFKafkaClient.h"
#include "workflow/KafkaMessage.h"
#include "workflow/KafkaResult.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"
#include "workflow/WFGlobal.h"
using namespace protocol;
static WFFacilities::WaitGroup wait_group(1);
std::string url;
bool no_cgroup = false;
WFKafkaClient client;
void kafka_callback(WFKafkaTask *task)
{
int state = task->get_state();
int error = task->get_error();
if (state != WFT_STATE_SUCCESS)
{
fprintf(stderr, "error msg: %s\n",
WFGlobal::get_error_string(state, error));
fprintf(stderr, "Failed. Press Ctrl-C to exit.\n");
wait_group.done();
return;
}
WFKafkaTask *next_task = NULL;
std::vector<std::vector<KafkaRecord *>> records;
std::vector<KafkaToppar *> toppars;
int api_type = task->get_api_type();
protocol::KafkaResult new_result;
switch (api_type)
{
case Kafka_Produce:
task->get_result()->fetch_records(records);
for (const auto &v : records)
{
for (const auto &w: v)
{
const void *value;
size_t value_len;
w->get_value(&value, &value_len);
printf("produce\ttopic: %s, partition: %d, status: %d, \
offset: %lld, val_len: %zu\n",
w->get_topic(), w->get_partition(), w->get_status(),
w->get_offset(), value_len);
}
}
break;
case Kafka_Fetch:
new_result = std::move(*task->get_result());
new_result.fetch_records(records);
if (!records.empty())
{
if (!no_cgroup)
next_task = client.create_kafka_task("api=commit", 3, kafka_callback);
std::string out;
for (const auto &v : records)
{
if (v.empty())
continue;
char fn[1024];
snprintf(fn, 1024, "/tmp/kafka.%s.%d.%llu",
v.back()->get_topic(), v.back()->get_partition(),
v.back()->get_offset());
FILE *fp = fopen(fn, "w+");
long long offset = 0;
int partition = 0;
std::string topic;
for (const auto &w : v)
{
const void *value;
size_t value_len;
w->get_value(&value, &value_len);
if (fp)
fwrite(value, value_len, 1, fp);
offset = w->get_offset();
partition = w->get_partition();
topic = w->get_topic();
if (!no_cgroup)
next_task->add_commit_record(*w);
}
if (!topic.empty())
{
out += "topic: " + topic;
out += ",partition: " + std::to_string(partition);
out += ",offset: " + std::to_string(offset) + ";";
}
if (fp)
fclose(fp);
}
printf("fetch\t%s\n", out.c_str());
if (!no_cgroup)
series_of(task)->push_back(next_task);
}
break;
case Kafka_Metadata:
{
protocol::KafkaMetaList *meta_list = client.get_meta_list();
KafkaMeta *meta;
while ((meta = meta_list->get_next()) != NULL)
{
printf("meta\ttopic: %s, partition_num: %d\n",
meta->get_topic(), meta->get_partition_elements());
}
break;
}
case Kafka_OffsetCommit:
task->get_result()->fetch_toppars(toppars);
if (!toppars.empty())
{
for (const auto& v : toppars)
{
printf("commit\ttopic: %s, partition: %d, \
offset: %llu, error: %d\n",
v->get_topic(), v->get_partition(),
v->get_offset(), v->get_error());
}
}
next_task = client.create_leavegroup_task(3, kafka_callback);
series_of(task)->push_back(next_task);
break;
case Kafka_LeaveGroup:
printf("leavegroup callback\n");
break;
default:
break;
}
if (!next_task)
{
client.deinit();
wait_group.done();
}
}
void sig_handler(int signo) { }
int main(int argc, char *argv[])
{
if (argc < 3)
{
fprintf(stderr, "USAGE: %s url [p/c/m]\n", argv[0]);
exit(1);
}
signal(SIGINT, sig_handler);
url = argv[1];
if (strncmp(argv[1], "kafka://", 8) != 0)
url = "kafka://" + url;
char buf[512 * 1024];
WFKafkaTask *task;
if (argv[2][0] == 'p')
{
int compress_type = Kafka_NoCompress;
if (argc > 3)
compress_type = atoi(argv[3]);
if (compress_type > Kafka_Zstd)
exit(1);
client.init(url);
task = client.create_kafka_task("api=produce", 3, kafka_callback);
KafkaConfig config;
KafkaRecord record;
config.set_compress_type(compress_type);
task->set_config(std::move(config));
for (size_t i = 0; i < sizeof (buf); ++i)
buf[i] = '1' + rand() % 128;
record.set_key("key1", strlen("key1"));
record.set_value(buf, sizeof (buf));
record.add_header_pair("hk1", 3, "hv1", 3);
task->add_produce_record("workflow_test1", -1, std::move(record));
record.set_key("key2", strlen("key2"));
record.set_value(buf, sizeof (buf));
record.add_header_pair("hk2", 3, "hv2", 3);
task->add_produce_record("workflow_test2", -1, std::move(record));
}
else if (argv[2][0] == 'm')
{
client.init(url);
task = client.create_kafka_task(url, 3, kafka_callback);
task->add_topic("workflow_test1");
task->add_topic("workflow_test2");
task->set_api_type(Kafka_Metadata);
}
else if (argv[2][0] == 'c')
{
if (argc > 3 && argv[3][0] == 'd')
{
client.init(url);
task = client.create_kafka_task("api=fetch", 3, kafka_callback);
KafkaToppar toppar;
toppar.set_topic_partition("workflow_test1", 0);
toppar.set_offset(0);
task->add_toppar(toppar);
toppar.set_topic_partition("workflow_test2", 0);
toppar.set_offset(1);
task->add_toppar(toppar);
no_cgroup = true;
}
else
{
client.init(url, "workflow_group");
task = client.create_kafka_task("topic=workflow_test1&topic=workflow_test2&api=fetch",
3, kafka_callback);
KafkaConfig config;
config.set_client_id("workflow");
task->set_config(std::move(config));
}
}
else
{
fprintf(stderr, "USAGE: %s url [p/c/m]\n", argv[0]);
exit(1);
}
task->start();
wait_group.wait();
return 0;
}