diff --git a/src/manager/UpstreamManager.cc b/src/manager/UpstreamManager.cc index a9f95798..8bfce200 100644 --- a/src/manager/UpstreamManager.cc +++ b/src/manager/UpstreamManager.cc @@ -55,6 +55,22 @@ private: std::vector upstream_policies; }; +int UpstreamManager::upstream_create_round_robin(const std::string& name, + bool try_another) +{ + WFNameService *ns = WFGlobal::get_name_service(); + auto *policy = new UPSRoundRobinPolicy(try_another); + + if (ns->add_policy(name.c_str(), policy) >= 0) + { + __UpstreamManager::get_instance()->add_upstream_policy(policy); + return 0; + } + + delete policy; + return -1; +} + static unsigned int __default_consistent_hash(const char *path, const char *query, const char *fragment) @@ -70,7 +86,7 @@ static unsigned int __default_consistent_hash(const char *path, int UpstreamManager::upstream_create_consistent_hash(const std::string& name, upstream_route_t consistent_hash) { - auto *ns = WFGlobal::get_name_service(); + WFNameService *ns = WFGlobal::get_name_service(); UPSConsistentHashPolicy *policy; policy = new UPSConsistentHashPolicy( @@ -89,8 +105,8 @@ int UpstreamManager::upstream_create_consistent_hash(const std::string& name, int UpstreamManager::upstream_create_weighted_random(const std::string& name, bool try_another) { - auto *ns = WFGlobal::get_name_service(); - UPSWeightedRandomPolicy *policy = new UPSWeightedRandomPolicy(try_another); + WFNameService *ns = WFGlobal::get_name_service(); + auto *policy = new UPSWeightedRandomPolicy(try_another); if (ns->add_policy(name.c_str(), policy) >= 0) { @@ -104,8 +120,8 @@ int UpstreamManager::upstream_create_weighted_random(const std::string& name, int UpstreamManager::upstream_create_vnswrr(const std::string& name) { - auto *ns = WFGlobal::get_name_service(); - UPSWeightedRandomPolicy *policy = new UPSVNSWRRPolicy(); + WFNameService *ns = WFGlobal::get_name_service(); + auto *policy = new UPSVNSWRRPolicy(); if (ns->add_policy(name.c_str(), policy) >= 0) { @@ -122,7 +138,7 @@ int UpstreamManager::upstream_create_manual(const std::string& name, bool try_another, upstream_route_t consistent_hash) { - auto *ns = WFGlobal::get_name_service(); + WFNameService *ns = WFGlobal::get_name_service(); UPSManualPolicy *policy; policy = new UPSManualPolicy(try_another, std::move(select), diff --git a/src/manager/UpstreamManager.h b/src/manager/UpstreamManager.h index aef93df2..7188a1ae 100644 --- a/src/manager/UpstreamManager.h +++ b/src/manager/UpstreamManager.h @@ -77,6 +77,21 @@ class UpstreamManager { public: + /** + * @brief MODE 0: round-robin select + * @param[in] name upstream name + * @param[in] try_another when first choice is failed, try another one or not + * @return success/fail + * @retval 0 success + * @retval -1 fail, more info see errno + * @note + * when first choose server is already down: + * - if try_another==false, request will be failed + * - if try_another==true, upstream will choose the next + */ + static int upstream_create_round_robin(const std::string& name, + bool try_another); + /** * @brief MODE 1: consistent-hashing select * @param[in] name upstream name diff --git a/src/nameservice/UpstreamPolicies.cc b/src/nameservice/UpstreamPolicies.cc index 013ce248..26b2eedc 100644 --- a/src/nameservice/UpstreamPolicies.cc +++ b/src/nameservice/UpstreamPolicies.cc @@ -165,7 +165,7 @@ bool UPSGroupPolicy::select(const ParsedURI& uri, WFNSTracing *tracing, this->check_breaker(); - // select_addr == NULL will only happened in consistent_hash + // select_addr == NULL will happen in consistent_hash EndpointAddress *select_addr = this->first_strategy(uri, tracing); if (!select_addr || select_addr->fail_count >= select_addr->params->max_fails) @@ -331,8 +331,6 @@ void UPSGroupPolicy::add_server_locked(EndpointAddress *addr) group->backups.push_back(addr); pthread_mutex_unlock(&group->mutex); this->server_list_change(addr, ADD_SERVER); - - return; } int UPSGroupPolicy::remove_server_locked(const std::string& address) @@ -448,6 +446,19 @@ void UPSGroupPolicy::hash_map_remove_addr(const std::string& address) } } +EndpointAddress *UPSRoundRobinPolicy::first_strategy(const ParsedURI& uri, + WFNSTracing *tracing) +{ + return this->servers[this->cur_idx++ % this->servers.size()]; +} + +EndpointAddress *UPSRoundRobinPolicy::another_strategy(const ParsedURI& uri, + WFNSTracing *tracing) +{ + EndpointAddress *addr = this->servers[this->cur_idx++ % this->servers.size()]; + return this->check_and_get(addr, false, tracing); +} + void UPSWeightedRandomPolicy::add_server_locked(EndpointAddress *addr) { UPSAddrParams *params = static_cast(addr->params); @@ -455,8 +466,6 @@ void UPSWeightedRandomPolicy::add_server_locked(EndpointAddress *addr) UPSGroupPolicy::add_server_locked(addr); if (params->server_type == 0) this->total_weight += params->weight; - - return; } int UPSWeightedRandomPolicy::remove_server_locked(const std::string& address) @@ -638,7 +647,6 @@ void UPSVNSWRRPolicy::add_server_locked(EndpointAddress *addr) { UPSWeightedRandomPolicy::add_server_locked(addr); init(); - return; } int UPSVNSWRRPolicy::remove_server_locked(const std::string& address) @@ -662,8 +670,6 @@ void UPSConsistentHashPolicy::add_server_locked(EndpointAddress *addr) { UPSGroupPolicy::add_server_locked(addr); this->hash_map_add_addr(addr); - - return; } int UPSConsistentHashPolicy::remove_server_locked(const std::string& address) @@ -702,8 +708,6 @@ void UPSManualPolicy::add_server_locked(EndpointAddress *addr) if (this->try_another) this->hash_map_add_addr(addr); - - return; } int UPSManualPolicy::remove_server_locked(const std::string& address) diff --git a/src/nameservice/UpstreamPolicies.h b/src/nameservice/UpstreamPolicies.h index f31648c2..147128f5 100644 --- a/src/nameservice/UpstreamPolicies.h +++ b/src/nameservice/UpstreamPolicies.h @@ -23,13 +23,16 @@ #include #include #include +#include #include #include "URIParser.h" #include "EndpointParams.h" #include "WFNameService.h" #include "WFServiceGovernance.h" -using upstream_route_t = std::function; +using upstream_route_t = std::function; class EndpointGroup; class UPSGroupPolicy; @@ -50,8 +53,9 @@ class UPSGroupPolicy : public WFServiceGovernance { public: UPSGroupPolicy(); - ~UPSGroupPolicy(); + virtual ~UPSGroupPolicy(); +public: virtual bool select(const ParsedURI& uri, WFNSTracing *tracing, EndpointAddress **addr); virtual void add_server(const std::string& address, @@ -72,8 +76,8 @@ protected: virtual void add_server_locked(EndpointAddress *addr); virtual int remove_server_locked(const std::string& address); - EndpointAddress *check_and_get(EndpointAddress *addr, - bool addr_failed, WFNSTracing *tracing); + EndpointAddress *check_and_get(EndpointAddress *addr, bool addr_failed, + WFNSTracing *tracing); bool is_alive(const EndpointAddress *addr) const; @@ -86,6 +90,24 @@ protected: std::map addr_hash; }; +class UPSRoundRobinPolicy : public UPSGroupPolicy +{ +public: + UPSRoundRobinPolicy(bool try_another) + { + this->try_another = try_another; + } + +protected: + virtual EndpointAddress *first_strategy(const ParsedURI& uri, + WFNSTracing *tracing); + virtual EndpointAddress *another_strategy(const ParsedURI& uri, + WFNSTracing *tracing); + +protected: + std::atomic cur_idx; +}; + class UPSWeightedRandomPolicy : public UPSGroupPolicy { public: @@ -95,10 +117,12 @@ public: this->available_weight = 0; this->try_another = try_another; } - EndpointAddress *first_strategy(const ParsedURI& uri, - WFNSTracing *tracing); - EndpointAddress *another_strategy(const ParsedURI& uri, - WFNSTracing *tracing); + +protected: + virtual EndpointAddress *first_strategy(const ParsedURI& uri, + WFNSTracing *tracing); + virtual EndpointAddress *another_strategy(const ParsedURI& uri, + WFNSTracing *tracing); protected: virtual void add_server_locked(EndpointAddress *addr); @@ -120,8 +144,10 @@ public: this->cur_idx = 0; this->try_another = false; }; - EndpointAddress *first_strategy(const ParsedURI& uri, - WFNSTracing *tracing); + +protected: + virtual EndpointAddress *first_strategy(const ParsedURI& uri, + WFNSTracing *tracing); private: virtual void add_server_locked(EndpointAddress *addr); @@ -142,8 +168,8 @@ public: } protected: - EndpointAddress *first_strategy(const ParsedURI& uri, - WFNSTracing *tracing); + virtual EndpointAddress *first_strategy(const ParsedURI& uri, + WFNSTracing *tracing); private: virtual void add_server_locked(EndpointAddress *addr);