Merge branch 'master' of https://github.com/sogou/workflow into nossl

This commit is contained in:
Xie Han
2025-02-19 19:06:21 +08:00
6 changed files with 41 additions and 63 deletions

View File

@@ -97,7 +97,7 @@ counter->start()调用可以放在for循环之前。counter只要被创建
# Server与其它异步引擎结合使用
某些情况下我们的server可能需要调用非本框架的异步客户端等待结果。简单的方法我们可以在process里同步等待通过条件变量来唤醒。
做的缺点是我们占用了一个处理线程把其它框架的异步客户端变为同步客户端。但通过counter的方法,我们可以不占线程地等待。
做的缺点是我们占用了一个处理线程把其它框架的异步客户端变为同步客户端。但通过counter我们可以不占线程地等待。
方法很简单:
~~~cpp
@@ -120,6 +120,7 @@ void process(WFHttpTask *task)
}
~~~
在这里我们可以把server任务所在的series理解为一个协程而目标值为1的counter可以理解为一个条件变量。
Counter的缺点是count操作不传递数据。如果业务有数据传达的需求可以使用[Mailbox任务](https://github.com/sogou/workflow/blob/master/src/factory/WFTaskFactory.h#L268)。
# 命名计数器

View File

@@ -493,8 +493,6 @@ static bool create_tagged_address(const ConsulAddress& consul_address,
return false;
json_object_t *obj = json_value_object(val);
if (!obj)
return false;
if (!json_object_append(obj, "Address", JSON_VALUE_STRING,
consul_address.first.c_str()))
@@ -520,8 +518,6 @@ static bool create_health_check(const ConsulConfig& config, json_object_t *obj)
return false;
obj = json_value_object(val);
if (!obj)
return false;
str = config.get_check_name();
if (!json_object_append(obj, "Name", JSON_VALUE_STRING, str.c_str()))
@@ -550,8 +546,6 @@ static bool create_health_check(const ConsulConfig& config, json_object_t *obj)
return false;
json_object_t *header_obj = json_value_object(val);
if (!header_obj)
return false;
for (const auto& header : *config.get_http_headers())
{
@@ -561,8 +555,6 @@ static bool create_health_check(const ConsulConfig& config, json_object_t *obj)
return false;
json_array_t *arr = json_value_array(val);
if (!arr)
return false;
for (const auto& value : header.second)
{
@@ -638,8 +630,6 @@ static bool create_register_request(const json_value_t *root,
return false;
json_array_t *arr = json_value_array(val);
if (!arr)
return false;
for (const auto& tag : service->tags)
{
@@ -660,8 +650,6 @@ static bool create_register_request(const json_value_t *root,
return false;
json_object_t *meta_obj = json_value_object(val);
if (!meta_obj)
return false;
for (const auto& meta_kv : service->meta)
{
@@ -820,11 +808,11 @@ static bool parse_discover_node(const json_object_t *obj,
}
val = json_object_find("CreateIndex", obj);
if (val)
if (val && json_value_type(val) == JSON_VALUE_NUMBER)
instance->create_index = json_value_number(val);
val = json_object_find("ModifyIndex", obj);
if (val)
if (val && json_value_type(val) == JSON_VALUE_NUMBER)
instance->modify_index = json_value_number(val);
return true;
@@ -860,7 +848,7 @@ static bool parse_tagged_address(const char *name,
tagged_address.first = str;
val = json_object_find("Port", obj);
if (!val)
if (!val || json_value_type(val) != JSON_VALUE_NUMBER)
return false;
tagged_address.second = json_value_number(val);

View File

@@ -1112,19 +1112,12 @@ void Communicator::handler_thread_routine(void *context)
break;
default:
free(res);
if (comm->thrdpool)
thrdpool_exit(comm->thrdpool);
continue;
thrdpool_exit(comm->thrdpool);
return;
}
free(res);
}
if (!comm->thrdpool)
{
mpoller_destroy(comm->mpoller);
msgqueue_destroy(comm->msgqueue);
}
}
int Communicator::append_message(const void *buf, size_t *size,
@@ -1478,15 +1471,12 @@ int Communicator::init(size_t poller_threads, size_t handler_threads)
void Communicator::deinit()
{
int in_handler = this->is_handler_thread();
this->stop_flag = 1;
mpoller_stop(this->mpoller);
msgqueue_set_nonblock(this->msgqueue);
thrdpool_destroy(NULL, this->thrdpool);
this->thrdpool = NULL;
if (!in_handler)
Communicator::handler_thread_routine(this);
mpoller_destroy(this->mpoller);
msgqueue_destroy(this->msgqueue);
}
int Communicator::nonblock_connect(CommTarget *target)

View File

@@ -211,10 +211,8 @@ using thread_dns_callback_t = std::function<void (ThreadDnsTask *)>;
struct DnsContext
{
int state;
int error;
int eai_error;
unsigned short port;
int eai_error;
struct addrinfo *ai;
};
@@ -647,8 +645,8 @@ void WFResolverTask::dns_single_callback(void *net_dns_task)
}
else
{
this->state = dns_task->get_state();
this->error = dns_task->get_error();
this->state = WFT_STATE_DNS_ERROR;
this->error = EAI_AGAIN;
}
task_callback();
@@ -660,57 +658,50 @@ void WFResolverTask::dns_partial_callback(void *net_dns_task)
WFGlobal::get_dns_respool()->post(NULL);
struct DnsContext *ctx = (struct DnsContext *)dns_task->user_data;
ctx->ai = NULL;
ctx->state = dns_task->get_state();
ctx->error = dns_task->get_error();
if (ctx->state == WFT_STATE_SUCCESS)
if (dns_task->get_state() == WFT_STATE_SUCCESS)
{
protocol::DnsResponse *resp = dns_task->get_resp();
ctx->eai_error = protocol::DnsUtil::getaddrinfo(resp, ctx->port,
&ctx->ai);
}
else
ctx->eai_error = EAI_NONAME;
ctx->eai_error = EAI_AGAIN;
}
void WFResolverTask::dns_parallel_callback(const void *parallel)
{
const ParallelWork *pwork = (const ParallelWork *)parallel;
struct DnsContext *c4 = (struct DnsContext *)(pwork->get_context());
struct DnsContext *c4 = (struct DnsContext *)pwork->get_context();
struct DnsContext *c6 = c4 + 1;
DnsOutput out;
if (c4->state != WFT_STATE_SUCCESS && c6->state != WFT_STATE_SUCCESS)
if (c4->eai_error == 0 || c6->eai_error == 0)
{
this->state = c4->state;
this->error = c4->error;
}
else if (c4->eai_error != 0 && c6->eai_error != 0)
{
DnsRoutine::create(&out, c4->eai_error, NULL);
struct addrinfo *ai = NULL;
struct addrinfo **pai = &ai;
DnsOutput out;
*pai = c4->ai;
while (*pai)
pai = &(*pai)->ai_next;
*pai = c6->ai;
DnsRoutine::create(&out, 0, ai);
dns_callback_internal(&out, dns_ttl_default_, dns_ttl_min_);
}
else
{
struct addrinfo *ai = NULL;
struct addrinfo **pai = &ai;
int eai_error = c4->eai_error;
if (c4->ai != NULL)
{
*pai = c4->ai;
while (*pai)
pai = &(*pai)->ai_next;
}
if (c6->eai_error == EAI_AGAIN)
eai_error = EAI_AGAIN;
if (c6->ai != NULL)
*pai = c6->ai;
DnsRoutine::create(&out, 0, ai);
dns_callback_internal(&out, dns_ttl_default_, dns_ttl_min_);
this->state = WFT_STATE_DNS_ERROR;
this->error = eai_error;
}
delete[] c4;
delete []c4;
task_callback();
}

View File

@@ -1180,6 +1180,12 @@ const json_value_t *json_object_prev_value(const json_value_t *val,
return &list_entry(pos->prev, json_member_t, list)->value;
}
const char *json_object_value_name(const json_value_t *val,
const json_object_t *obj)
{
return list_entry(val, json_member_t, value)->name;
}
static const json_value_t *__json_object_insert(const char *name,
int type, va_list ap,
struct list_head *pos,

View File

@@ -60,6 +60,8 @@ const char *json_object_prev_name(const char *name,
const json_object_t *obj);
const json_value_t *json_object_prev_value(const json_value_t *val,
const json_object_t *obj);
const char *json_object_value_name(const json_value_t *val,
const json_object_t *obj);
const json_value_t *json_object_append(json_object_t *obj,
const char *name,
int type, ...);