diff --git a/src/nameservice/UpstreamPolicies.cc b/src/nameservice/UpstreamPolicies.cc index ced00439..e5359a0c 100644 --- a/src/nameservice/UpstreamPolicies.cc +++ b/src/nameservice/UpstreamPolicies.cc @@ -13,7 +13,8 @@ See the License for the specific language governing permissions and limitations under the License. - Authors: Wu Jiaxu (wujiaxu@sogou-inc.com) + Authors: Li Yingxin (liyingxin@sogou-inc.com) + Wang Zhulei (wangzhulei@sogou-inc.com) */ #include @@ -56,9 +57,6 @@ UPSAddrParams::UPSAddrParams(const struct AddressParams *params, const std::string& address) : PolicyAddrParams(params) { - for (int i = 0; i < VIRTUAL_GROUP_SIZE; i++) - this->consistent_hash[i] = rand(); - this->weight = params->weight; this->server_type = params->server_type; this->group_id = params->group_id; @@ -399,35 +397,23 @@ int UPSGroupPolicy::remove_server_locked(const std::string& address) EndpointAddress *UPSGroupPolicy::consistent_hash_with_group(unsigned int hash, WFNSTracing *tracing) { - const UPSAddrParams *params; - EndpointAddress *addr = NULL; - unsigned int min_dis = (unsigned int)-1; - - for (EndpointAddress *server : this->servers) - { - if (this->is_alive(server)) - { - params = static_cast(server->params); - - for (int i = 0; i < VIRTUAL_GROUP_SIZE; i++) - { - unsigned int dis = std::min - (hash - params->consistent_hash[i], - params->consistent_hash[i] - hash); - - if (dis < min_dis) - { - min_dis = dis; - addr = server; - } - } - } - } - - if (!addr) + if (this->nalives == 0) return NULL; - return this->check_and_get(addr, false, tracing); + std::map::iterator it; + it = this->addr_hash.lower_bound(hash); + + if (it == this->addr_hash.end()) + it = this->addr_hash.begin(); + + while (!this->is_alive(it->second)) + { + it++; + if (it == this->addr_hash.end()) + it = this->addr_hash.begin(); + } + + return this->check_and_get(it->second, false, tracing); } void UPSWeightedRandomPolicy::add_server_locked(EndpointAddress *addr) @@ -437,6 +423,7 @@ void UPSWeightedRandomPolicy::add_server_locked(EndpointAddress *addr) UPSGroupPolicy::add_server_locked(addr); if (params->server_type == 0) this->total_weight += params->weight; + return; } @@ -639,6 +626,31 @@ EndpointAddress *UPSConsistentHashPolicy::first_strategy(const ParsedURI& uri, return this->consistent_hash_with_group(hash_value, tracing); } +void UPSConsistentHashPolicy::add_server_locked(EndpointAddress *addr) +{ + UPSGroupPolicy::add_server_locked(addr); + + for (int i = 0; i < VIRTUAL_GROUP_SIZE; i++) + this->addr_hash.insert(std::make_pair(rand(), addr)); + + return; +} + +int UPSConsistentHashPolicy::remove_server_locked(const std::string& address) +{ + std::map::iterator it; + + for (it = this->addr_hash.begin(); it != this->addr_hash.end();) + { + if (it->second->address == address) + this->addr_hash.erase(it++); + else + it++; + } + + return UPSGroupPolicy::remove_server_locked(address); +} + EndpointAddress *UPSManualPolicy::first_strategy(const ParsedURI& uri, WFNSTracing *tracing) { @@ -662,3 +674,33 @@ EndpointAddress *UPSManualPolicy::another_strategy(const ParsedURI& uri, return this->consistent_hash_with_group(hash_value, tracing); } +void UPSManualPolicy::add_server_locked(EndpointAddress *addr) +{ + UPSGroupPolicy::add_server_locked(addr); + + if (!this->try_another) + return; + + for (int i = 0; i < VIRTUAL_GROUP_SIZE; i++) + this->addr_hash.insert(std::make_pair(rand(), addr)); + + return; +} + +int UPSManualPolicy::remove_server_locked(const std::string& address) +{ + if (this->try_another) + { + std::map::iterator it; + for (it = this->addr_hash.begin(); it != this->addr_hash.end();) + { + if (it->second->address == address) + this->addr_hash.erase(it++); + else + it++; + } + } + + return UPSGroupPolicy::remove_server_locked(address); +} + diff --git a/src/nameservice/UpstreamPolicies.h b/src/nameservice/UpstreamPolicies.h index 75a4df2c..7d809dd2 100644 --- a/src/nameservice/UpstreamPolicies.h +++ b/src/nameservice/UpstreamPolicies.h @@ -13,13 +13,15 @@ See the License for the specific language governing permissions and limitations under the License. - Authors: Wu Jiaxu (wujiaxu@sogou-inc.com) + Authors: Li Yingxin (liyingxin@sogou-inc.com) + Wang Zhulei (wangzhulei@sogou-inc.com) */ #ifndef _UPSTREAMPOLICIES_H_ #define _UPSTREAMPOLICIES_H_ #include +#include #include #include #include "URIParser.h" @@ -39,7 +41,6 @@ public: short server_type; int group_id; EndpointGroup *group; - unsigned int consistent_hash[VIRTUAL_GROUP_SIZE]; UPSAddrParams(const struct AddressParams *params, const std::string& address); @@ -71,12 +72,16 @@ protected: virtual void add_server_locked(EndpointAddress *addr); virtual int remove_server_locked(const std::string& address); - EndpointAddress *consistent_hash_with_group(unsigned int hash, - WFNSTracing *tracing); EndpointAddress *check_and_get(EndpointAddress *addr, bool addr_failed, WFNSTracing *tracing); bool is_alive(const EndpointAddress *addr) const; + +protected: + EndpointAddress *consistent_hash_with_group(unsigned int hash, + WFNSTracing *tracing); + + std::map addr_hash; }; class UPSWeightedRandomPolicy : public UPSGroupPolicy @@ -139,6 +144,8 @@ protected: WFNSTracing *tracing); private: + virtual void add_server_locked(EndpointAddress *addr); + virtual int remove_server_locked(const std::string& address); upstream_route_t consistent_hash; }; @@ -159,6 +166,8 @@ public: WFNSTracing *tracing); private: + virtual void add_server_locked(EndpointAddress *addr); + virtual int remove_server_locked(const std::string& address); upstream_route_t manual_select; upstream_route_t another_select; }; diff --git a/test/upstream_unittest.cc b/test/upstream_unittest.cc index bcb37bfd..b91bc693 100644 --- a/test/upstream_unittest.cc +++ b/test/upstream_unittest.cc @@ -73,7 +73,7 @@ void register_upstream_hosts() }, true, [](const char *path, const char *query, const char *fragment) -> unsigned int { - return 1; // according to consistent_hash this will hit server[0] + return 511702306; // according to consistent_hash this will hit server[0] }); UpstreamManager::upstream_add_server("manual", "127.0.0.1:8001"); UpstreamManager::upstream_add_server("manual", "127.0.0.1:8002");