diff --git a/docs/about-counter.md b/docs/about-counter.md index a15ac85f..75ad1e5c 100644 --- a/docs/about-counter.md +++ b/docs/about-counter.md @@ -97,7 +97,7 @@ counter->start()调用可以放在for循环之前。counter只要被创建,就 # Server与其它异步引擎结合使用 某些情况下,我们的server可能需要调用非本框架的异步客户端等待结果。简单的方法我们可以在process里同步等待,通过条件变量来唤醒。 -这个做的缺点是我们占用了一个处理线程,把其它框架的异步客户端变为同步客户端。但通过counter的方法,我们可以不占线程地等待。 +这么做的缺点是我们占用了一个处理线程,把其它框架的异步客户端变为同步客户端。但通过counter,我们可以不占线程地等待。 方法很简单: ~~~cpp @@ -120,6 +120,7 @@ void process(WFHttpTask *task) } ~~~ 在这里,我们可以把server任务所在的series理解为一个协程,而目标值为1的counter,可以理解为一个条件变量。 +Counter的缺点是count操作不传递数据。如果业务有数据传达的需求,可以使用[Mailbox任务](https://github.com/sogou/workflow/blob/master/src/factory/WFTaskFactory.h#L268)。 # 命名计数器 diff --git a/src/client/WFConsulClient.cc b/src/client/WFConsulClient.cc index 1a9a65e0..4f80be99 100644 --- a/src/client/WFConsulClient.cc +++ b/src/client/WFConsulClient.cc @@ -493,8 +493,6 @@ static bool create_tagged_address(const ConsulAddress& consul_address, return false; json_object_t *obj = json_value_object(val); - if (!obj) - return false; if (!json_object_append(obj, "Address", JSON_VALUE_STRING, consul_address.first.c_str())) @@ -520,8 +518,6 @@ static bool create_health_check(const ConsulConfig& config, json_object_t *obj) return false; obj = json_value_object(val); - if (!obj) - return false; str = config.get_check_name(); if (!json_object_append(obj, "Name", JSON_VALUE_STRING, str.c_str())) @@ -550,8 +546,6 @@ static bool create_health_check(const ConsulConfig& config, json_object_t *obj) return false; json_object_t *header_obj = json_value_object(val); - if (!header_obj) - return false; for (const auto& header : *config.get_http_headers()) { @@ -561,8 +555,6 @@ static bool create_health_check(const ConsulConfig& config, json_object_t *obj) return false; json_array_t *arr = json_value_array(val); - if (!arr) - return false; for (const auto& value : header.second) { @@ -638,8 +630,6 @@ static bool create_register_request(const json_value_t *root, return false; json_array_t *arr = json_value_array(val); - if (!arr) - return false; for (const auto& tag : service->tags) { @@ -660,8 +650,6 @@ static bool create_register_request(const json_value_t *root, return false; json_object_t *meta_obj = json_value_object(val); - if (!meta_obj) - return false; for (const auto& meta_kv : service->meta) { @@ -820,11 +808,11 @@ static bool parse_discover_node(const json_object_t *obj, } val = json_object_find("CreateIndex", obj); - if (val) + if (val && json_value_type(val) == JSON_VALUE_NUMBER) instance->create_index = json_value_number(val); val = json_object_find("ModifyIndex", obj); - if (val) + if (val && json_value_type(val) == JSON_VALUE_NUMBER) instance->modify_index = json_value_number(val); return true; @@ -860,7 +848,7 @@ static bool parse_tagged_address(const char *name, tagged_address.first = str; val = json_object_find("Port", obj); - if (!val) + if (!val || json_value_type(val) != JSON_VALUE_NUMBER) return false; tagged_address.second = json_value_number(val); diff --git a/src/kernel/Communicator.cc b/src/kernel/Communicator.cc index dcd2774f..d5fd7a10 100644 --- a/src/kernel/Communicator.cc +++ b/src/kernel/Communicator.cc @@ -1112,19 +1112,12 @@ void Communicator::handler_thread_routine(void *context) break; default: free(res); - if (comm->thrdpool) - thrdpool_exit(comm->thrdpool); - continue; + thrdpool_exit(comm->thrdpool); + return; } free(res); } - - if (!comm->thrdpool) - { - mpoller_destroy(comm->mpoller); - msgqueue_destroy(comm->msgqueue); - } } int Communicator::append_message(const void *buf, size_t *size, @@ -1478,15 +1471,12 @@ int Communicator::init(size_t poller_threads, size_t handler_threads) void Communicator::deinit() { - int in_handler = this->is_handler_thread(); - this->stop_flag = 1; mpoller_stop(this->mpoller); msgqueue_set_nonblock(this->msgqueue); thrdpool_destroy(NULL, this->thrdpool); - this->thrdpool = NULL; - if (!in_handler) - Communicator::handler_thread_routine(this); + mpoller_destroy(this->mpoller); + msgqueue_destroy(this->msgqueue); } int Communicator::nonblock_connect(CommTarget *target) diff --git a/src/nameservice/WFDnsResolver.cc b/src/nameservice/WFDnsResolver.cc index b718e0bf..a1334000 100644 --- a/src/nameservice/WFDnsResolver.cc +++ b/src/nameservice/WFDnsResolver.cc @@ -211,10 +211,8 @@ using thread_dns_callback_t = std::function; struct DnsContext { - int state; - int error; - int eai_error; unsigned short port; + int eai_error; struct addrinfo *ai; }; @@ -647,8 +645,8 @@ void WFResolverTask::dns_single_callback(void *net_dns_task) } else { - this->state = dns_task->get_state(); - this->error = dns_task->get_error(); + this->state = WFT_STATE_DNS_ERROR; + this->error = EAI_AGAIN; } task_callback(); @@ -660,57 +658,50 @@ void WFResolverTask::dns_partial_callback(void *net_dns_task) WFGlobal::get_dns_respool()->post(NULL); struct DnsContext *ctx = (struct DnsContext *)dns_task->user_data; + ctx->ai = NULL; - ctx->state = dns_task->get_state(); - ctx->error = dns_task->get_error(); - if (ctx->state == WFT_STATE_SUCCESS) + if (dns_task->get_state() == WFT_STATE_SUCCESS) { protocol::DnsResponse *resp = dns_task->get_resp(); ctx->eai_error = protocol::DnsUtil::getaddrinfo(resp, ctx->port, &ctx->ai); } else - ctx->eai_error = EAI_NONAME; + ctx->eai_error = EAI_AGAIN; } void WFResolverTask::dns_parallel_callback(const void *parallel) { const ParallelWork *pwork = (const ParallelWork *)parallel; - struct DnsContext *c4 = (struct DnsContext *)(pwork->get_context()); + struct DnsContext *c4 = (struct DnsContext *)pwork->get_context(); struct DnsContext *c6 = c4 + 1; - DnsOutput out; - if (c4->state != WFT_STATE_SUCCESS && c6->state != WFT_STATE_SUCCESS) + if (c4->eai_error == 0 || c6->eai_error == 0) { - this->state = c4->state; - this->error = c4->error; - } - else if (c4->eai_error != 0 && c6->eai_error != 0) - { - DnsRoutine::create(&out, c4->eai_error, NULL); + struct addrinfo *ai = NULL; + struct addrinfo **pai = &ai; + DnsOutput out; + + *pai = c4->ai; + while (*pai) + pai = &(*pai)->ai_next; + + *pai = c6->ai; + DnsRoutine::create(&out, 0, ai); dns_callback_internal(&out, dns_ttl_default_, dns_ttl_min_); } else { - struct addrinfo *ai = NULL; - struct addrinfo **pai = &ai; + int eai_error = c4->eai_error; - if (c4->ai != NULL) - { - *pai = c4->ai; - while (*pai) - pai = &(*pai)->ai_next; - } + if (c6->eai_error == EAI_AGAIN) + eai_error = EAI_AGAIN; - if (c6->ai != NULL) - *pai = c6->ai; - - DnsRoutine::create(&out, 0, ai); - dns_callback_internal(&out, dns_ttl_default_, dns_ttl_min_); + this->state = WFT_STATE_DNS_ERROR; + this->error = eai_error; } - delete[] c4; - + delete []c4; task_callback(); } diff --git a/src/util/json_parser.c b/src/util/json_parser.c index 647b3ec7..9d3f418e 100644 --- a/src/util/json_parser.c +++ b/src/util/json_parser.c @@ -1180,6 +1180,12 @@ const json_value_t *json_object_prev_value(const json_value_t *val, return &list_entry(pos->prev, json_member_t, list)->value; } +const char *json_object_value_name(const json_value_t *val, + const json_object_t *obj) +{ + return list_entry(val, json_member_t, value)->name; +} + static const json_value_t *__json_object_insert(const char *name, int type, va_list ap, struct list_head *pos, diff --git a/src/util/json_parser.h b/src/util/json_parser.h index 4f21ebd4..5ca97c75 100644 --- a/src/util/json_parser.h +++ b/src/util/json_parser.h @@ -60,6 +60,8 @@ const char *json_object_prev_name(const char *name, const json_object_t *obj); const json_value_t *json_object_prev_value(const json_value_t *val, const json_object_t *obj); +const char *json_object_value_name(const json_value_t *val, + const json_object_t *obj); const json_value_t *json_object_append(json_object_t *obj, const char *name, int type, ...);