rename WFServiceGovernance; update UpstreamPolicies constructor; move EndpointGroup;

This commit is contained in:
holmes1412
2021-03-03 21:32:32 +08:00
parent e43c757488
commit a44c2cd023
7 changed files with 95 additions and 91 deletions

View File

@@ -84,7 +84,7 @@ set(INCLUDE_HEADERS
src/factory/WFOperator.h
src/nameservice/WFNameService.h
src/nameservice/WFDNSResolver.h
src/nameservice/ServiceGovernance.h
src/nameservice/WFServiceGovernance.h
src/nameservice/UpstreamPolicies.h
)

View File

@@ -23,7 +23,7 @@
#include <functional>
#include "URIParser.h"
#include "EndpointParams.h"
#include "ServiceGovernance.h"
#include "WFServiceGovernance.h"
#include "UpstreamPolicies.h"
#include "WFGlobal.h"

View File

@@ -4,7 +4,7 @@ project(nameservice)
set(SRC
WFNameService.cc
WFDNSResolver.cc
ServiceGovernance.cc
WFServiceGovernance.cc
UpstreamPolicies.cc
)

View File

@@ -18,10 +18,40 @@
#include <pthread.h>
#include <algorithm>
#include <random>
#include "URIParser.h"
#include "StringUtil.h"
#include "UpstreamPolicies.h"
class EndpointGroup
{
public:
EndpointGroup(int group_id, UPSGroupPolicy *policy) :
mutex(PTHREAD_MUTEX_INITIALIZER),
gen(rd())
{
this->id = group_id;
this->policy = policy;
this->nalives = 0;
this->weight = 0;
}
const EndpointAddress *get_one();
const EndpointAddress *get_one_backup();
public:
int id;
UPSGroupPolicy *policy;
struct rb_node rb;
pthread_mutex_t mutex;
std::random_device rd;
std::mt19937 gen;
std::vector<EndpointAddress *> mains;
std::vector<EndpointAddress *> backups;
std::atomic<int> nalives;
int weight;
};
UPSAddrParams::UPSAddrParams() :
PolicyAddrParams(&ADDRESS_PARAMS_DEFAULT)
{
@@ -82,6 +112,29 @@ UPSGroupPolicy::~UPSGroupPolicy()
}
}
inline bool UPSGroupPolicy::is_alive_or_group_alive(const EndpointAddress *addr) const
{
UPSAddrParams *params = static_cast<UPSAddrParams *>(addr->params);
return ((params->group_id < 0 &&
addr->fail_count < addr->params->max_fails) ||
(params->group_id >= 0 &&
params->group->nalives > 0));
}
void UPSGroupPolicy::recover_one_server(const EndpointAddress *addr)
{
this->nalives++;
UPSAddrParams *params = static_cast<UPSAddrParams *>(addr->params);
params->group->nalives++;
}
void UPSGroupPolicy::fuse_one_server(const EndpointAddress *addr)
{
this->nalives--;
UPSAddrParams *params = static_cast<UPSAddrParams *>(addr->params);
params->group->nalives--;
}
void UPSGroupPolicy::add_server(const std::string& address,
const AddressParams *params)
{

View File

@@ -20,17 +20,15 @@
#define _UPSTREAM_POLICIES_H_
#include <pthread.h>
#include <algorithm>
#include <vector>
#include <atomic>
#include <random>
#include "URIParser.h"
#include "EndpointParams.h"
#include "WFNameService.h"
#include "WFDNSResolver.h"
#include "WFGlobal.h"
#include "WFTaskError.h"
#include "ServiceGovernance.h"
#include "WFServiceGovernance.h"
using upstream_route_t = std::function<unsigned int (const char *, const char *, const char *)>;
@@ -51,36 +49,7 @@ public:
const std::string& address);
};
class EndpointGroup
{
public:
int id;
UPSGroupPolicy *policy;
struct rb_node rb;
pthread_mutex_t mutex;
std::random_device rd;
std::mt19937 gen;
std::vector<EndpointAddress *> mains;
std::vector<EndpointAddress *> backups;
std::atomic<int> nalives;
int weight;
EndpointGroup(int group_id, UPSGroupPolicy *policy) :
mutex(PTHREAD_MUTEX_INITIALIZER),
gen(rd())
{
this->id = group_id;
this->policy = policy;
this->nalives = 0;
this->weight = 0;
}
public:
const EndpointAddress *get_one();
const EndpointAddress *get_one_backup();
};
class UPSGroupPolicy : public ServiceGovernance
class UPSGroupPolicy : public WFServiceGovernance
{
public:
UPSGroupPolicy();
@@ -98,19 +67,8 @@ protected:
EndpointGroup *default_group;
private:
virtual void recover_one_server(const EndpointAddress *addr)
{
this->nalives++;
UPSAddrParams *params = static_cast<UPSAddrParams *>(addr->params);
params->group->nalives++;
}
virtual void fuse_one_server(const EndpointAddress *addr)
{
this->nalives--;
UPSAddrParams *params = static_cast<UPSAddrParams *>(addr->params);
params->group->nalives--;
}
virtual void recover_one_server(const EndpointAddress *addr);
virtual void fuse_one_server(const EndpointAddress *addr);
protected:
virtual void add_server_locked(EndpointAddress *addr);
@@ -119,14 +77,7 @@ protected:
const EndpointAddress *consistent_hash_with_group(unsigned int hash);
const EndpointAddress *check_and_get(const EndpointAddress *addr, bool flag);
inline bool is_alive_or_group_alive(const EndpointAddress *addr) const
{
UPSAddrParams *params = static_cast<UPSAddrParams *>(addr->params);
return ((params->group_id < 0 &&
addr->fail_count < addr->params->max_fails) ||
(params->group_id >= 0 &&
params->group->nalives > 0));
}
bool is_alive_or_group_alive(const EndpointAddress *addr) const;
};
class UPSWeightedRandomPolicy : public UPSGroupPolicy
@@ -155,14 +106,14 @@ private:
class UPSConsistentHashPolicy : public UPSGroupPolicy
{
public:
UPSConsistentHashPolicy()
UPSConsistentHashPolicy() :
consistent_hash(UPSConsistentHashPolicy::default_consistent_hash)
{
this->consistent_hash = this->default_consistent_hash;
}
UPSConsistentHashPolicy(upstream_route_t consistent_hash)
UPSConsistentHashPolicy(upstream_route_t&& consistent_hash) :
consistent_hash(std::move(consistent_hash))
{
this->consistent_hash = std::move(consistent_hash);
}
protected:
@@ -188,12 +139,12 @@ public:
class UPSManualPolicy : public UPSGroupPolicy
{
public:
UPSManualPolicy(bool try_another, upstream_route_t select,
upstream_route_t try_another_select)
UPSManualPolicy(bool try_another, upstream_route_t&& select,
upstream_route_t&& try_another_select) :
manual_select(std::move(select)),
try_another_select(std::move(try_another_select))
{
this->try_another = try_another;
this->manual_select = select;
this->try_another_select = try_another_select;
}
const EndpointAddress *first_strategy(const ParsedURI& uri);

View File

@@ -21,7 +21,7 @@
#include "StringUtil.h"
#include "WFNameService.h"
#include "WFDNSResolver.h"
#include "ServiceGovernance.h"
#include "WFServiceGovernance.h"
#include "UpstreamManager.h"
#define GET_CURRENT_SECOND std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count()
@@ -117,8 +117,8 @@ EndpointAddress::EndpointAddress(const std::string& address,
this->port = arr[1];
}
WFRouterTask *ServiceGovernance::create_router_task(const struct WFNSParams *params,
router_callback_t callback)
WFRouterTask *WFServiceGovernance::create_router_task(const struct WFNSParams *params,
router_callback_t callback)
{
EndpointAddress *addr;
WFRouterTask *task;
@@ -140,7 +140,7 @@ WFRouterTask *ServiceGovernance::create_router_task(const struct WFNSParams *par
return task;
}
inline void ServiceGovernance::recover_server_from_breaker(EndpointAddress *addr)
inline void WFServiceGovernance::recover_server_from_breaker(EndpointAddress *addr)
{
addr->fail_count = 0;
pthread_mutex_lock(&this->breaker_lock);
@@ -154,7 +154,7 @@ inline void ServiceGovernance::recover_server_from_breaker(EndpointAddress *addr
pthread_mutex_unlock(&this->breaker_lock);
}
inline void ServiceGovernance::fuse_server_to_breaker(EndpointAddress *addr)
inline void WFServiceGovernance::fuse_server_to_breaker(EndpointAddress *addr)
{
pthread_mutex_lock(&this->breaker_lock);
if (!addr->entry.list.next)
@@ -167,8 +167,8 @@ inline void ServiceGovernance::fuse_server_to_breaker(EndpointAddress *addr)
pthread_mutex_unlock(&this->breaker_lock);
}
void ServiceGovernance::success(RouteManager::RouteResult *result, void *cookie,
CommTarget *target)
void WFServiceGovernance::success(RouteManager::RouteResult *result, void *cookie,
CommTarget *target)
{
pthread_rwlock_rdlock(&this->rwlock);
this->recover_server_from_breaker((EndpointAddress *)cookie);
@@ -177,8 +177,8 @@ void ServiceGovernance::success(RouteManager::RouteResult *result, void *cookie,
WFDNSResolver::success(result, NULL, target);
}
void ServiceGovernance::failed(RouteManager::RouteResult *result, void *cookie,
CommTarget *target)
void WFServiceGovernance::failed(RouteManager::RouteResult *result, void *cookie,
CommTarget *target)
{
EndpointAddress *server = (EndpointAddress *)cookie;
@@ -192,7 +192,7 @@ void ServiceGovernance::failed(RouteManager::RouteResult *result, void *cookie,
WFDNSResolver::failed(result, NULL, target);
}
void ServiceGovernance::check_breaker()
void WFServiceGovernance::check_breaker()
{
pthread_mutex_lock(&this->breaker_lock);
if (!list_empty(&this->breaker_list))
@@ -224,18 +224,18 @@ void ServiceGovernance::check_breaker()
pthread_mutex_unlock(&this->breaker_lock);
}
const EndpointAddress *ServiceGovernance::first_strategy(const ParsedURI& uri)
const EndpointAddress *WFServiceGovernance::first_strategy(const ParsedURI& uri)
{
unsigned int idx = rand() % this->servers.size();
return this->servers[idx];
}
const EndpointAddress *ServiceGovernance::another_strategy(const ParsedURI& uri)
const EndpointAddress *WFServiceGovernance::another_strategy(const ParsedURI& uri)
{
return this->first_strategy(uri);
}
bool ServiceGovernance::select(const ParsedURI& uri, EndpointAddress **addr)
bool WFServiceGovernance::select(const ParsedURI& uri, EndpointAddress **addr)
{
pthread_rwlock_rdlock(&this->rwlock);
unsigned int n = (unsigned int)this->servers.size();
@@ -274,7 +274,7 @@ bool ServiceGovernance::select(const ParsedURI& uri, EndpointAddress **addr)
return false;
}
void ServiceGovernance::add_server_locked(EndpointAddress *addr)
void WFServiceGovernance::add_server_locked(EndpointAddress *addr)
{
this->addresses.push_back(addr);
this->server_map[addr->address].push_back(addr);
@@ -283,7 +283,7 @@ void ServiceGovernance::add_server_locked(EndpointAddress *addr)
this->server_list_change(addr, ADD_SERVER);
}
int ServiceGovernance::remove_server_locked(const std::string& address)
int WFServiceGovernance::remove_server_locked(const std::string& address)
{
const auto map_it = this->server_map.find(address);
if (map_it != this->server_map.cend())
@@ -325,8 +325,8 @@ int ServiceGovernance::remove_server_locked(const std::string& address)
return ret;
}
void ServiceGovernance::add_server(const std::string& address,
const AddressParams *params)
void WFServiceGovernance::add_server(const std::string& address,
const AddressParams *params)
{
EndpointAddress *addr = new EndpointAddress(address,
new PolicyAddrParams(params));
@@ -336,7 +336,7 @@ void ServiceGovernance::add_server(const std::string& address,
pthread_rwlock_unlock(&this->rwlock);
}
int ServiceGovernance::remove_server(const std::string& address)
int WFServiceGovernance::remove_server(const std::string& address)
{
int ret;
pthread_rwlock_wrlock(&this->rwlock);
@@ -345,8 +345,8 @@ int ServiceGovernance::remove_server(const std::string& address)
return ret;
}
int ServiceGovernance::replace_server(const std::string& address,
const AddressParams *params)
int WFServiceGovernance::replace_server(const std::string& address,
const AddressParams *params)
{
int ret;
EndpointAddress *addr = new EndpointAddress(address,
@@ -359,7 +359,7 @@ int ServiceGovernance::replace_server(const std::string& address,
return ret;
}
void ServiceGovernance::enable_server(const std::string& address)
void WFServiceGovernance::enable_server(const std::string& address)
{
pthread_rwlock_rdlock(&this->rwlock);
const auto map_it = this->server_map.find(address);
@@ -371,7 +371,7 @@ void ServiceGovernance::enable_server(const std::string& address)
pthread_rwlock_unlock(&this->rwlock);
}
void ServiceGovernance::disable_server(const std::string& address)
void WFServiceGovernance::disable_server(const std::string& address)
{
pthread_rwlock_rdlock(&this->rwlock);
const auto map_it = this->server_map.find(address);
@@ -386,7 +386,7 @@ void ServiceGovernance::disable_server(const std::string& address)
pthread_rwlock_unlock(&this->rwlock);
}
void ServiceGovernance::get_current_address(std::vector<std::string>& addr_list)
void WFServiceGovernance::get_current_address(std::vector<std::string>& addr_list)
{
pthread_rwlock_rdlock(&this->rwlock);

View File

@@ -102,7 +102,7 @@ public:
virtual ~EndpointAddress() { delete this->params; }
};
class ServiceGovernance : public WFDNSResolver
class WFServiceGovernance : public WFDNSResolver
{
public:
virtual WFRouterTask *create_router_task(const struct WFNSParams *params,
@@ -126,7 +126,7 @@ public:
void set_mttr_second(unsigned int second) { this->mttr_second = second; }
public:
ServiceGovernance() :
WFServiceGovernance() :
breaker_lock(PTHREAD_MUTEX_INITIALIZER),
rwlock(PTHREAD_RWLOCK_INITIALIZER)
{
@@ -136,7 +136,7 @@ public:
INIT_LIST_HEAD(&this->breaker_list);
}
virtual ~ServiceGovernance()
virtual ~WFServiceGovernance()
{
for (EndpointAddress *addr : this->addresses)
delete addr;