Merge pull request #799 from flaprocwang/master

add consul client
This commit is contained in:
xiehan
2022-03-17 16:02:46 +08:00
committed by GitHub
11 changed files with 1729 additions and 3 deletions

21
BUILD
View File

@@ -219,6 +219,27 @@ cc_library(
],
)
cc_library(
name = 'consul',
hdrs = [
'src/client/WFConsulClient.h',
'src/protocol/ConsulDataTypes.h',
],
includes = [
'src/client',
'src/factory',
'src/protocol',
'src/util',
],
srcs = [
'src/client/WFConsulClient.cc',
],
deps = [
':common',
],
visibility = ["//visibility:public"],
)
cc_binary(
name = 'helloworld',
srcs = ['tutorial/tutorial-00-helloworld.cc'],

View File

@@ -53,12 +53,14 @@ set(INCLUDE_HEADERS
src/protocol/dns_parser.h
src/protocol/DnsMessage.h
src/protocol/DnsUtil.h
src/protocol/ConsulDataTypes.h
src/server/WFServer.h
src/server/WFDnsServer.h
src/server/WFHttpServer.h
src/server/WFRedisServer.h
src/server/WFMySQLServer.h
src/client/WFMySQLConnection.h
src/client/WFConsulClient.h
src/client/WFDnsClient.h
src/manager/DnsCache.h
src/manager/WFGlobal.h
@@ -104,3 +106,4 @@ if(KAFKA STREQUAL "y")
src/factory/KafkaTaskImpl.inl
)
endif()

View File

@@ -15,11 +15,11 @@ base:
mkdir -p $(BUILD_DIR)
ifeq ($(DEBUG),y)
cd $(BUILD_DIR) && $(CMAKE3) -D CMAKE_BUILD_TYPE=Debug -D KAFKA=$(KAFKA) -D MYSQL=$(MYSQL) -D REDIS=$(REDIS) -D UPSTREAM=$(UPSTREAM) $(ROOT_DIR)
cd $(BUILD_DIR) && $(CMAKE3) -D CMAKE_BUILD_TYPE=Debug -D CONSUL=$(CONSUL) -D KAFKA=$(KAFKA) -D MYSQL=$(MYSQL) -D REDIS=$(REDIS) -D UPSTREAM=$(UPSTREAM) $(ROOT_DIR)
else ifneq ("${INSTALL_PREFIX}install_prefix", "install_prefix")
cd $(BUILD_DIR) && $(CMAKE3) -DCMAKE_INSTALL_PREFIX:STRING=${INSTALL_PREFIX} -D KAFKA=$(KAFKA) -D MYSQL=$(MYSQL) -D REDIS=$(REDIS) -D UPSTREAM=$(UPSTREAM) $(ROOT_DIR)
cd $(BUILD_DIR) && $(CMAKE3) -DCMAKE_INSTALL_PREFIX:STRING=${INSTALL_PREFIX} -D CONSUL=$(CONSUL) -D KAFKA=$(KAFKA) -D MYSQL=$(MYSQL) -D REDIS=$(REDIS) -D UPSTREAM=$(UPSTREAM) $(ROOT_DIR)
else
cd $(BUILD_DIR) && $(CMAKE3) -D KAFKA=$(KAFKA) -D MYSQL=$(MYSQL) -D REDIS=$(REDIS) -D UPSTREAM=$(UPSTREAM) $(ROOT_DIR)
cd $(BUILD_DIR) && $(CMAKE3) -D CONSUL=$(CONSUL) -D KAFKA=$(KAFKA) -D MYSQL=$(MYSQL) -D REDIS=$(REDIS) -D UPSTREAM=$(UPSTREAM) $(ROOT_DIR)
endif
tutorial: all

View File

@@ -12,6 +12,13 @@ if (NOT MYSQL STREQUAL "n")
)
endif ()
if (NOT CONSUL STREQUAL "n")
set(SRC
${SRC}
WFConsulClient.cc
)
endif ()
add_library(${PROJECT_NAME} OBJECT ${SRC})
if (KAFKA STREQUAL "y")
@@ -21,3 +28,4 @@ if (KAFKA STREQUAL "y")
add_library("client_kafka" OBJECT ${SRC})
set_property(SOURCE WFKafkaClient.cc APPEND PROPERTY COMPILE_OPTIONS "-fno-rtti")
endif ()

1175
src/client/WFConsulClient.cc Normal file

File diff suppressed because it is too large Load Diff

155
src/client/WFConsulClient.h Normal file
View File

@@ -0,0 +1,155 @@
/*
Copyright (c) 2022 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Authors: Wang Zhenpeng (wangzhenpeng@sogou-inc.com)
*/
#ifndef _WFCONSULCLIENT_H_
#define _WFCONSULCLIENT_H_
#include <string>
#include <vector>
#include <functional>
#include "HttpMessage.h"
#include "ConsulDataTypes.h"
#include "WFTaskFactory.h"
class WFConsulTask;
using consul_callback_t = std::function<void (WFConsulTask *)>;
enum
{
CONSUL_API_TYPE_UNKNOWN = 0,
CONSUL_API_TYPE_DISCOVER,
CONSUL_API_TYPE_LIST_SERVICE,
CONSUL_API_TYPE_REGISTER,
CONSUL_API_TYPE_DEREGISTER,
};
class WFConsulTask : public WFGenericTask
{
public:
bool get_discover_result(std::vector<struct protocol::ConsulServiceInstance>& result);
bool get_list_service_result(std::vector<struct protocol::ConsulServiceTags>& result);
public:
void set_service(const struct protocol::ConsulService *service);
void set_api_type(int api_type)
{
this->api_type = api_type;
}
int get_api_type() const
{
return this->api_type;
}
void set_callback(consul_callback_t cb)
{
this->callback = std::move(cb);
}
void set_consul_index(long long consul_index)
{
this->consul_index = consul_index;
}
long long get_consul_index() const { return this->consul_index; }
std::string get_error_reason() const
{
return this->error_reason;
}
protected:
void set_config(protocol::ConsulConfig conf)
{
this->config = std::move(conf);
}
protected:
virtual void dispatch();
virtual SubTask *done();
WFHttpTask *create_discover_task();
WFHttpTask *create_list_service_task();
WFHttpTask *create_register_task();
WFHttpTask *create_deregister_task();
std::string generate_discover_request();
long long get_consul_index(protocol::HttpResponse *resp);
static bool check_task_result(WFHttpTask *task, WFConsulTask *consul_task);
static void discover_callback(WFHttpTask *task);
static void list_service_callback(WFHttpTask *task);
static void register_callback(WFHttpTask *task);
protected:
protocol::ConsulConfig config;
struct protocol::ConsulService service;
std::string proxy_url;
std::string error_reason;
std::string discover_res;
std::string list_service_res;
int retry_max;
int api_type;
bool finish;
long long consul_index;
consul_callback_t callback;
protected:
WFConsulTask(const std::string& proxy_url,
const std::string& service_namespace,
const std::string& service_name,
const std::string& service_id,
int retry_max, consul_callback_t&& cb);
virtual ~WFConsulTask() {}
friend class WFConsulClient;
};
class WFConsulClient
{
public:
// example: http://127.0.0.1:8500
int init(const std::string& proxy_url);
int init(const std::string& proxy_url, protocol::ConsulConfig config);
void deinit() { }
WFConsulTask *create_discover_task(const std::string& service_namespace,
const std::string& service_name,
int retry_max, consul_callback_t cb);
WFConsulTask *create_list_service_task(const std::string& service_namespace,
int retry_max, consul_callback_t cb);
WFConsulTask *create_register_task(const std::string& service_namespace,
const std::string& service_name,
const std::string& service_id,
int retry_max, consul_callback_t cb);
WFConsulTask *create_deregister_task(const std::string& service_namespace,
const std::string& service_id,
int retry_max, consul_callback_t cb);
private:
std::string proxy_url;
protocol::ConsulConfig config;
public:
virtual ~WFConsulClient() { }
};
#endif

View File

@@ -63,6 +63,10 @@ enum
WFT_ERR_KAFKA_API_UNKNOWN = 5008, ///< api type not supported
WFT_ERR_KAFKA_VERSION_DISALLOWED = 5009, ///< broker version not supported
WFT_ERR_KAFKA_SASL_DISALLOWED = 5010, ///< sasl not supported
//CONSUL
WFT_ERR_CONSUL_API_UNKNOWN = 6001, ///< api type not supported
WFT_ERR_CONSUL_CHECK_RESPONSE_FAILED = 6002, ///< Consul http code failed
};
#endif

View File

@@ -0,0 +1 @@
../../protocol/ConsulDataTypes.h

View File

@@ -0,0 +1 @@
../../client/WFConsulClient.h

View File

@@ -825,6 +825,12 @@ static inline const char *__get_task_error_string(int error)
case WFT_ERR_KAFKA_VERSION_DISALLOWED:
return "Kafka broker version not supported";
case WFT_ERR_CONSUL_API_UNKNOWN:
return "Consul api type unknown";
case WFT_ERR_CONSUL_CHECK_RESPONSE_FAILED:
return "Consul check response failed";
default:
break;
}

View File

@@ -0,0 +1,352 @@
/*
Copyright (c) 2022 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Authors: Wang Zhenpeng (wangzhenpeng@sogou-inc.com)
*/
#ifndef _CONSULDATATYPES_H_
#define _CONSULDATATYPES_H_
#include <assert.h>
#include <atomic>
#include <map>
#include <vector>
#include <string>
namespace protocol
{
class ConsulConfig
{
public:
// common config
void set_token(const std::string& token) { this->ptr->token = token; }
std::string get_token() const { return this->ptr->token; }
// discover config
void set_datacenter(const std::string& data_center)
{
this->ptr->dc = data_center;
}
std::string get_datacenter() const { return this->ptr->dc; }
void set_near_node(const std::string& near_node)
{
this->ptr->near = near_node;
}
std::string get_near_node() const { return this->ptr->near; }
void set_filter_expr(const std::string& filter_expr)
{
this->ptr->filter = filter_expr;
}
std::string get_filter_expr() const { return this->ptr->filter; }
// blocking query wait, limited to 10 minutes, default:5m, unit:ms
void set_wait_ttl(int wait_ttl) { this->ptr->wait_ttl = wait_ttl; }
int get_wait_ttl() const { return this->ptr->wait_ttl; }
// enable blocking query
void set_blocking_query(bool enable_flag)
{
this->ptr->blocking_query = enable_flag;
}
bool blocking_query() const { return this->ptr->blocking_query; }
// only get health passing status service instance
void set_passing(bool passing) { this->ptr->passing = passing; }
bool get_passing() const { return this->ptr->passing; }
// register config
void set_replace_checks(bool replace_checks)
{
this->ptr->replace_checks = replace_checks;
}
bool get_replace_checks() const
{
return this->ptr->replace_checks;
}
void set_check_name(const std::string& check_name)
{
this->ptr->check_cfg.check_name = check_name;
}
std::string get_check_name() const { return this->ptr->check_cfg.check_name; }
void set_check_http_url(const std::string& http_url)
{
this->ptr->check_cfg.http_url = http_url;
}
std::string get_check_http_url() const
{
return this->ptr->check_cfg.http_url;
}
void set_check_http_method(const std::string& method)
{
this->ptr->check_cfg.http_method = method;
}
std::string get_check_http_method() const
{
return this->ptr->check_cfg.http_method;
}
void add_http_header(const std::string& key,
const std::vector<std::string>& values)
{
this->ptr->check_cfg.headers.emplace(key, values);
}
const std::map<std::string, std::vector<std::string>> *get_http_headers() const
{
return &this->ptr->check_cfg.headers;
}
void set_http_body(const std::string& body)
{
this->ptr->check_cfg.http_body = body;
}
std::string get_http_body() const { return this->ptr->check_cfg.http_body; }
void set_check_interval(int interval)
{
this->ptr->check_cfg.interval = interval;
}
int get_check_interval() const { return this->ptr->check_cfg.interval; }
void set_check_timeout(int timeout)
{
this->ptr->check_cfg.timeout = timeout;
}
int get_check_timeout() const { return this->ptr->check_cfg.timeout; }
void set_check_notes(const std::string& notes)
{
this->ptr->check_cfg.notes = notes;
}
std::string get_check_notes() const { return this->ptr->check_cfg.notes; }
void set_check_tcp(const std::string& tcp_address)
{
this->ptr->check_cfg.tcp_address = tcp_address;
}
std::string get_check_tcp() const { return this->ptr->check_cfg.tcp_address; }
void set_initial_status(const std::string& initial_status)
{
this->ptr->check_cfg.initial_status = initial_status;
}
std::string get_initial_status() const
{
return this->ptr->check_cfg.initial_status;
}
void set_auto_deregister_time(int milliseconds)
{
this->ptr->check_cfg.auto_deregister_time = milliseconds;
}
int get_auto_deregister_time() const
{
return this->ptr->check_cfg.auto_deregister_time;
}
// set success times before passing, refer to success_before_passing, default:0
void set_success_times(int times)
{
this->ptr->check_cfg.success_times = times;
}
int get_success_times() const { return this->ptr->check_cfg.success_times; }
// set failure times before critical, refer to failures_before_critical, default:0
void set_failure_times(int times) { this->ptr->check_cfg.failure_times = times; }
int get_failure_times() const { return this->ptr->check_cfg.failure_times; }
void set_health_check(bool enable_flag)
{
this->ptr->check_cfg.health_check = enable_flag;
}
bool get_health_check() const
{
return this->ptr->check_cfg.health_check;
}
public:
ConsulConfig()
{
this->ptr = new Config;
this->ptr->blocking_query = false;
this->ptr->passing = false;
this->ptr->replace_checks = false;
this->ptr->wait_ttl = 300 * 1000;
this->ptr->check_cfg.interval = 5000; //default:5s
this->ptr->check_cfg.timeout = 10000; //default:10s
this->ptr->check_cfg.http_method = "GET";
this->ptr->check_cfg.initial_status = "critical";
this->ptr->check_cfg.auto_deregister_time = 10 * 60 * 1000;
this->ptr->check_cfg.success_times = 0;
this->ptr->check_cfg.failure_times = 0;
this->ptr->check_cfg.health_check = false;
this->ref = new std::atomic<int>(1);
}
virtual ~ConsulConfig()
{
if (--*this->ref == 0)
{
delete this->ptr;
delete this->ref;
}
}
ConsulConfig(ConsulConfig&& move)
{
this->ptr = move.ptr;
this->ref = move.ref;
move.ptr = new Config;
move.ref = new std::atomic<int>(1);
}
ConsulConfig(const ConsulConfig& copy)
{
this->ptr = copy.ptr;
this->ref = copy.ref;
++(*this->ref);
}
ConsulConfig& operator= (ConsulConfig&& move)
{
if (this != &move)
{
this->~ConsulConfig();
this->ptr = move.ptr;
this->ref = move.ref;
move.ptr = new Config;
move.ref = new std::atomic<int>(1);
}
return *this;
}
ConsulConfig& operator= (const ConsulConfig& copy)
{
if (this != &copy)
{
this->~ConsulConfig();
this->ptr = copy.ptr;
this->ref = copy.ref;
++(*this->ref);
}
return *this;
}
// register health check config
struct HealthCheckConfig
{
std::string check_name;
std::string notes;
std::string http_url;
std::string http_method;
std::string http_body;
std::string tcp_address;
std::string initial_status; // passing or critical, default:critical
std::map<std::string, std::vector<std::string>> headers;
int auto_deregister_time; // refer to deregister_critical_service_after
int interval;
int timeout; // default 10000
int success_times; // default:0 success times before passing
int failure_times; // default:0 failure_before_critical
bool health_check;
};
struct Config
{
// common config
std::string token;
// discover config
std::string dc;
std::string near;
std::string filter;
int wait_ttl;
bool blocking_query;
bool passing;
// register config
bool replace_checks; //refer to replace_existing_checks
HealthCheckConfig check_cfg;
};
// private:
struct Config *ptr;
std::atomic<int> *ref;
};
// k:address, v:port
using ConsulAddress = std::pair<std::string, unsigned short>;
struct ConsulService
{
std::string service_name;
std::string service_namespace;
std::string service_id;
std::vector<std::string> tags;
ConsulAddress service_address;
ConsulAddress lan;
ConsulAddress lan_ipv4;
ConsulAddress lan_ipv6;
ConsulAddress virtual_address;
ConsulAddress wan;
ConsulAddress wan_ipv4;
ConsulAddress wan_ipv6;
std::map<std::string, std::string> meta;
bool tag_override;
};
struct ConsulServiceInstance
{
// node info
std::string node_id;
std::string node_name;
std::string node_address;
std::string dc;
std::map<std::string, std::string> node_meta;
long long create_index;
long long modify_index;
// service info
struct ConsulService service;
// service health check
std::string check_name;
std::string check_id;
std::string check_notes;
std::string check_output;
std::string check_status;
std::string check_type;
};
struct ConsulServiceTags
{
std::string service_name;
std::vector<std::string> tags;
};
}
#endif