From aba3d1ece82589ba75100dd6f1748d4e7f7d1716 Mon Sep 17 00:00:00 2001 From: XieHan Date: Thu, 11 Mar 2021 17:21:56 +0800 Subject: [PATCH 01/53] fix redis message warning --- src/factory/RedisTaskImpl.cc | 2 +- src/protocol/RedisMessage.cc | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/factory/RedisTaskImpl.cc b/src/factory/RedisTaskImpl.cc index cef05fa5..aa71f5e9 100644 --- a/src/factory/RedisTaskImpl.cc +++ b/src/factory/RedisTaskImpl.cc @@ -210,7 +210,7 @@ bool ComplexRedisTask::need_redirect() url.append(uri_.scheme); url.append("://"); url.append(hostport); - + URIParser::parse(url, uri); std::swap(uri.host, uri_.host); std::swap(uri.port, uri_.port); diff --git a/src/protocol/RedisMessage.cc b/src/protocol/RedisMessage.cc index 740bb2ec..cb63d2dd 100644 --- a/src/protocol/RedisMessage.cc +++ b/src/protocol/RedisMessage.cc @@ -663,7 +663,7 @@ bool RedisRequest::get_params(std::vector& params) const #define REDIS_ASK_COMMAND "ASKING" #define REDIS_ASK_REQUEST "*1\r\n$6\r\nASKING\r\n" -#define REDIS_ASK_RESPONSE "+OK\r\n" +#define REDIS_OK_RESPONSE "+OK\r\n" int RedisRequest::encode(struct iovec vectors[], int max) { @@ -684,6 +684,7 @@ int RedisRequest::append(const void *buf, size_t *size) if (ret > 0) { std::string command; + if (get_command(command) && strcasecmp(command.c_str(), REDIS_ASK_COMMAND) == 0) { @@ -691,8 +692,8 @@ int RedisRequest::append(const void *buf, size_t *size) redis_parser_init(this->parser_); set_asking(true); - size_t size = strlen(REDIS_ASK_RESPONSE); - if (this->feedback(REDIS_ASK_RESPONSE, size) != size) + ret = this->feedback(REDIS_OK_RESPONSE, strlen(REDIS_OK_RESPONSE)); + if (ret != strlen(REDIS_OK_RESPONSE)) { errno = EAGAIN; ret = -1; From 3f6a7c208ae0a57d5e278e1779b2c874e53860b0 Mon Sep 17 00:00:00 2001 From: wangzhulei Date: Fri, 9 Apr 2021 14:56:48 +0800 Subject: [PATCH 02/53] modify logic of apiversion --- src/client/WFKafkaClient.cc | 254 +++++++++++++++++++++++++++------- src/factory/KafkaTaskImpl.cc | 25 ++-- src/protocol/KafkaDataTypes.h | 120 ++++++++++++++++ src/protocol/KafkaMessage.cc | 1 - 4 files changed, 340 insertions(+), 60 deletions(-) diff --git a/src/client/WFKafkaClient.cc b/src/client/WFKafkaClient.cc index f334ed13..a5e777ac 100644 --- a/src/client/WFKafkaClient.cc +++ b/src/client/WFKafkaClient.cc @@ -119,6 +119,7 @@ public: this->meta_list = new KafkaMetaList; this->broker_list = new KafkaBrokerList; this->lock_status = new KafkaLockStatus; + this->broker_map = new KafkaBrokerMap; } ~KafkaMember() @@ -130,6 +131,7 @@ public: delete this->cgroup; delete this->meta_list; delete this->broker_list; + delete this->broker_map; delete this->lock_status; } } @@ -138,6 +140,7 @@ public: KafkaCgroup *cgroup; KafkaMetaList *meta_list; KafkaBrokerList *broker_list; + KafkaBrokerMap *broker_map; KafkaLockStatus *lock_status; private: @@ -231,6 +234,7 @@ public: this->cgroup = *client->member->cgroup; this->client_meta_list = *client->member->meta_list; this->client_broker_list = *client->member->broker_list; + this->client_broker_map = *client->member->broker_map; this->query = query; if (!client->member->broker_hosts->empty()) @@ -268,6 +272,10 @@ private: static void kafka_meta_callback(__WFKafkaTask *task); + void kafka_broker_api_callback(__WFKafkaTask *task); + + static void kafka_broker_callback(const ParallelWork *pwork); + static void kafka_cgroup_callback(__WFKafkaTask *task); static void kafka_offsetcommit_callback(__WFKafkaTask *task); @@ -303,6 +311,7 @@ private: KafkaLockStatus lock_status; KafkaMetaList client_meta_list; KafkaBrokerList client_broker_list; + KafkaBrokerMap client_broker_map; KafkaCgroup cgroup; std::map toppar_list_map; ParsedURI uri; @@ -313,22 +322,7 @@ private: KafkaBroker *ComplexKafkaTask::get_broker(int node_id) { - bool flag = false; - this->client_broker_list.rewind(); - KafkaBroker *broker; - while ((broker = this->client_broker_list.get_next()) != NULL) - { - if (broker->get_node_id() == node_id) - { - flag = true; - break; - } - } - - if (flag) - return broker; - else - return NULL; + return this->client_broker_map.find_item(node_id); } int ComplexKafkaTask::get_node_id(const KafkaToppar *toppar) @@ -578,6 +572,143 @@ void ComplexKafkaTask::kafka_meta_callback(__WFKafkaTask *task) kafka_merge_broker_list(&t->client_broker_list, task->get_resp()->get_broker_list()); + if (t->config.get_broker_version()) + { + t->client_broker_list.rewind(); + KafkaBroker *broker; + while ((broker = t->client_broker_list.get_next()) != NULL) + { + kafka_api_version_t *api; + size_t api_cnt; + const char *brk_ver = t->config.get_broker_version(); + int ret = kafka_api_version_is_queryable(brk_ver, &api, &api_cnt); + + if (ret == 0) + { + if (!broker->allocate_api_version(api_cnt)) + { + t->state = WFT_STATE_TASK_ERROR; + t->error = errno; + t->lock_status.get_mutex()->unlock(); + return; + } + + memcpy(broker->get_api(), api, + sizeof(kafka_api_version_t) * api_cnt); + + t->client_broker_map.add_item(*broker); + } + else + { + t->state = WFT_STATE_TASK_ERROR; + t->error = WFT_ERR_KAFKA_VERSION_DISALLOWED; + t->lock_status.get_mutex()->unlock(); + return; + } + } + + *t->lock_status.get_status() |= KAFKA_META_DONE; + *t->lock_status.get_status() &= (~(KAFKA_META_INIT|KAFKA_META_DOING)); + + t->state = WFT_STATE_SUCCESS; + t->error = 0; + } + else + { + SeriesWork *series; + ParallelWork *parallel = Workflow::create_parallel_work(kafka_broker_callback); + parallel->set_context(t); + t->client_broker_list.rewind(); + + KafkaBroker *broker; + while ((broker = t->client_broker_list.get_next()) != NULL) + { + auto cb = std::bind(&ComplexKafkaTask::kafka_broker_api_callback, t, + std::placeholders::_1); + __WFKafkaTask *ntask; + if (broker->is_to_addr()) + { + const struct sockaddr *addr; + socklen_t socklen; + broker->get_broker_addr(&addr, &socklen); + + ntask = __WFKafkaTaskFactory::create_kafka_task(addr, socklen, + t->retry_max, + nullptr); + } + else + { + ntask = __WFKafkaTaskFactory::create_kafka_task(broker->get_host(), + broker->get_port(), + t->retry_max, + nullptr); + } + + ntask->get_req()->set_config(t->config); + ntask->get_req()->set_broker(*broker); + ntask->get_req()->set_api(Kafka_ApiVersions); + ntask->user_data = broker; + KafkaComplexTask *ctask = static_cast(ntask); + *ctask->get_mutable_ctx() = cb; + series = Workflow::create_series_work(ntask, nullptr); + parallel->add_series(series); + } + series_of(task)->push_front(parallel); + t->lock_status.get_mutex()->unlock(); + return; + } + } + else + { + *t->lock_status.get_status() |= KAFKA_META_INIT; + *t->lock_status.get_status() &= (~(KAFKA_META_DONE|KAFKA_META_DOING)); + + t->state = WFT_STATE_TASK_ERROR; + t->error = WFT_ERR_KAFKA_META_FAILED; + t->finish = true; + } + + char name[64]; + snprintf(name, 64, "%p.meta", t->client); + t->lock_status.get_mutex()->unlock(); + WFTaskFactory::count_by_name(name, (unsigned int)-1); +} + +void ComplexKafkaTask::kafka_broker_api_callback(__WFKafkaTask *task) +{ + using KafkaTuple = std::tuple; + KafkaTuple * state_error_broker = new KafkaTuple{task->get_state(), + task->get_error(), + task->user_data}; + series_of(task)->set_context(state_error_broker); +} + +void ComplexKafkaTask::kafka_broker_callback(const ParallelWork *pwork) +{ + ComplexKafkaTask *t = (ComplexKafkaTask *)pwork->get_context(); + t->state = WFT_STATE_SUCCESS; + t->error = 0; + + t->lock_status.get_mutex()->lock(); + using KafkaTuple = std::tuple; + KafkaTuple *state_error_broker; + + for (size_t i = 0; i < pwork->size(); i++) + { + state_error_broker = (KafkaTuple *)pwork->series_at(i)->get_context(); + if (std::get<0>(*state_error_broker) != WFT_STATE_SUCCESS) + { + t->state = std::get<0>(*state_error_broker); + t->error = std::get<1>(*state_error_broker); + } + else + t->client_broker_map.add_item(*std::get<2>(*state_error_broker)); + + delete state_error_broker; + } + + if (t->state == WFT_STATE_SUCCESS) + { *t->lock_status.get_status() |= KAFKA_META_DONE; *t->lock_status.get_status() &= (~(KAFKA_META_INIT|KAFKA_META_DOING)); @@ -596,8 +727,8 @@ void ComplexKafkaTask::kafka_meta_callback(__WFKafkaTask *task) char name[64]; snprintf(name, 64, "%p.meta", t->client); - WFTaskFactory::count_by_name(name, (unsigned int)-1); t->lock_status.get_mutex()->unlock(); + WFTaskFactory::count_by_name(name, (unsigned int)-1); } void ComplexKafkaTask::kafka_cgroup_callback(__WFKafkaTask *task) @@ -654,8 +785,8 @@ void ComplexKafkaTask::kafka_cgroup_callback(__WFKafkaTask *task) char name[64]; snprintf(name, 64, "%p.cgroup", t->client); - WFTaskFactory::count_by_name(name, (unsigned int)-1); t->lock_status.get_mutex()->unlock(); + WFTaskFactory::count_by_name(name, (unsigned int)-1); } void ComplexKafkaTask::kafka_parallel_callback(const ParallelWork *pwork) @@ -782,7 +913,18 @@ void ComplexKafkaTask::dispatch() this->lock_status.get_mutex()->lock(); - if (*this->lock_status.get_status() & KAFKA_META_INIT) + if (*this->lock_status.get_status() & KAFKA_META_DOING) + { + char name[64]; + snprintf(name, 64, "%p.meta", this->client); + counter = WFTaskFactory::create_counter_task(name, 1, nullptr); + series_of(this)->push_front(this); + series_of(this)->push_front(counter); + this->lock_status.get_mutex()->unlock(); + this->subtask_done(); + return; + } + else if (*this->lock_status.get_status() & KAFKA_META_INIT) { task = __WFKafkaTaskFactory::create_kafka_task(this->uri, this->retry_max, @@ -798,37 +940,8 @@ void ComplexKafkaTask::dispatch() this->subtask_done(); return; } - else if (*this->lock_status.get_status() & KAFKA_META_DOING) - { - char name[64]; - snprintf(name, 64, "%p.meta", this->client); - counter = WFTaskFactory::create_counter_task(name, 1, nullptr); - series_of(this)->push_front(this); - series_of(this)->push_front(counter); - this->lock_status.get_mutex()->unlock(); - this->subtask_done(); - return; - } - if ((this->api_type == Kafka_Fetch || this->api_type == Kafka_OffsetCommit) && - (*this->lock_status.get_status() & KAFKA_CGROUP_INIT)) - { - task = __WFKafkaTaskFactory::create_kafka_task(this->uri, - this->retry_max, - kafka_cgroup_callback); - task->user_data = this; - task->get_req()->set_config(this->config); - task->get_req()->set_api(Kafka_FindCoordinator); - task->get_req()->set_cgroup(this->cgroup); - task->get_req()->set_meta_list(this->client_meta_list); - series_of(this)->push_front(this); - series_of(this)->push_front(task); - *this->lock_status.get_status() |= KAFKA_CGROUP_DOING; - this->lock_status.get_mutex()->unlock(); - this->subtask_done(); - return; - } - else if (*this->lock_status.get_status() & KAFKA_CGROUP_DOING) + if (*this->lock_status.get_status() & KAFKA_CGROUP_DOING) { char name[64]; snprintf(name, 64, "%p.cgroup", this->client); @@ -839,6 +952,49 @@ void ComplexKafkaTask::dispatch() this->subtask_done(); return; } + else if ((this->api_type == Kafka_Fetch || this->api_type == Kafka_OffsetCommit) && + (*this->lock_status.get_status() & KAFKA_CGROUP_INIT)) + { + KafkaBroker *broker = this->client_broker_map.get_first_entry(); + if (!broker) + { + this->state = WFT_STATE_TASK_ERROR; + this->error = WFT_ERR_KAFKA_CGROUP_FAILED; + this->finish = true; + return; + } + + if (broker->is_to_addr()) + { + const struct sockaddr *addr; + socklen_t socklen; + broker->get_broker_addr(&addr, &socklen); + + task = __WFKafkaTaskFactory::create_kafka_task(addr, socklen, + this->retry_max, + kafka_cgroup_callback); + } + else + { + task = __WFKafkaTaskFactory::create_kafka_task(broker->get_host(), + broker->get_port(), + this->retry_max, + kafka_cgroup_callback); + } + + task->user_data = this; + task->get_req()->set_config(this->config); + task->get_req()->set_api(Kafka_FindCoordinator); + task->get_req()->set_broker(*broker); + task->get_req()->set_cgroup(this->cgroup); + task->get_req()->set_meta_list(this->client_meta_list); + series_of(this)->push_front(this); + series_of(this)->push_front(task); + *this->lock_status.get_status() |= KAFKA_CGROUP_DOING; + this->lock_status.get_mutex()->unlock(); + this->subtask_done(); + return; + } SeriesWork *series; switch(this->api_type) diff --git a/src/factory/KafkaTaskImpl.cc b/src/factory/KafkaTaskImpl.cc index 15f7254e..4dae9bde 100644 --- a/src/factory/KafkaTaskImpl.cc +++ b/src/factory/KafkaTaskImpl.cc @@ -61,7 +61,7 @@ CommMessageOut *__ComplexKafkaTask::message_out() long long seqid = this->get_seq(); KafkaBroker *broker = this->get_req()->get_broker(); - if (seqid == 0 || !broker->get_api()) + if (!broker->get_api()) { if (!this->get_req()->get_config()->get_broker_version()) { @@ -92,16 +92,16 @@ CommMessageOut *__ComplexKafkaTask::message_out() return NULL; } } + } - if (this->get_req()->get_config()->get_sasl_mechanisms()) - { - KafkaRequest *req = new KafkaRequest; + if (seqid == 0 && this->get_req()->get_config()->get_sasl_mechanisms()) + { + KafkaRequest *req = new KafkaRequest; - req->duplicate(*this->get_req()); - req->set_api(Kafka_SaslHandshake); - is_user_request_ = false; - return req; - } + req->duplicate(*this->get_req()); + req->set_api(Kafka_SaslHandshake); + is_user_request_ = false; + return req; } if (this->get_req()->get_api() == Kafka_Fetch) @@ -156,6 +156,10 @@ CommMessageIn *__ComplexKafkaTask::message_in() bool __ComplexKafkaTask::init_success() { TransportType type = TT_TCP; + if (uri_.scheme && strcasecmp(uri_.scheme, "kafka") == 0) + type = TT_TCP; + //else if (uri_.scheme && strcasecmp(uri_.scheme, "kafkas") == 0) + // type = TT_TCP_SSL; if (this->get_req()->get_config()->get_sasl_mechanisms()) { @@ -424,7 +428,8 @@ bool __ComplexKafkaTask::finish_once() } if (this->get_resp()->get_api() == Kafka_Fetch || - this->get_resp()->get_api() == Kafka_Produce) + this->get_resp()->get_api() == Kafka_Produce || + this->get_resp()->get_api() == Kafka_ApiVersions) { if (*get_mutable_ctx()) (*get_mutable_ctx())(this); diff --git a/src/protocol/KafkaDataTypes.h b/src/protocol/KafkaDataTypes.h index b7cb1f53..84c5ff9b 100644 --- a/src/protocol/KafkaDataTypes.h +++ b/src/protocol/KafkaDataTypes.h @@ -32,6 +32,7 @@ #include #include #include "list.h" +#include "rbtree.h" #include "kafka_parser.h" @@ -188,6 +189,118 @@ private: struct list_head *curpos; }; +template +class KafkaMap +{ +public: + KafkaMap() + { + this->t_map = new struct rb_root; + this->t_map->rb_node = NULL; + + this->ref = new std::atomic(1); + } + + ~KafkaMap() + { + if (--*this->ref == 0) + { + T *t; + while (this->t_map->rb_node) + { + t = rb_entry(this->t_map->rb_node, T, rb); + rb_erase(this->t_map->rb_node, this->t_map); + delete t; + } + + delete this->t_map; + delete this->ref; + } + } + + KafkaMap(const KafkaMap& copy) + { + this->ref = copy.ref; + ++*this->ref; + this->t_map = copy.t_map; + } + + KafkaMap& operator= (const KafkaMap& copy) + { + this->~KafkaMap(); + this->ref = copy.ref; + ++*this->ref; + this->t_map = copy.t_map; + return *this; + } + + T *find_item(int id) + { + rb_node **p = &this->t_map->rb_node; + T *t; + + while (*p) + { + t = rb_entry(*p, T, rb); + + if (id < t->get_id()) + p = &(*p)->rb_left; + else if (id > t->get_id()) + p = &(*p)->rb_right; + else + break; + } + + return *p ? t : NULL; + } + + void add_item(T& obj) + { + rb_node **p = &this->t_map->rb_node; + rb_node *parent = NULL; + T *t; + int id = obj.get_id(); + + while (*p) + { + parent = *p; + t = rb_entry(*p, T, rb); + + if (id < t->get_id()) + p = &(*p)->rb_left; + else if (id > t->get_id()) + p = &(*p)->rb_right; + else + break; + } + + if (*p == NULL) + { + T *nt = new T; + + *nt = obj; + rb_link_node(nt->get_rb(), parent, p); + rb_insert_color(nt->get_rb(), this->t_map); + } + } + + T *get_first_entry() + { + struct rb_node *p = rb_first(this->t_map); + return rb_entry(p, T, rb); + } + + T *get_tail_entry() + { + struct rb_node *p = rb_last(this->t_map); + return rb_entry(p, T, rb); + } + +private: + struct rb_root *t_map; + std::atomic *ref; +}; + class KafkaConfig { public: @@ -570,6 +683,7 @@ class KafkaToppar; using KafkaMetaList = KafkaList; using KafkaBrokerList = KafkaList; +using KafkaBrokerMap = KafkaMap; using KafkaTopparList = KafkaList; using KafkaRecordList = KafkaList; @@ -861,6 +975,8 @@ public: struct list_head *get_list() { return &this->list; } + struct rb_node *get_rb() { return &this->rb; } + void set_feature(unsigned features) { this->ptr->features = features; } bool is_equal(const struct sockaddr *addr, socklen_t socklen) const @@ -913,6 +1029,8 @@ public: int get_node_id() const { return this->ptr->node_id; } + int get_id () const { return this->ptr->node_id; } + bool allocate_api_version(size_t len) { void *p = malloc(len * sizeof(kafka_api_version_t)); @@ -942,10 +1060,12 @@ public: private: struct list_head list; + struct rb_node rb; kafka_broker_t *ptr; std::atomic *ref; friend class KafkaList; + friend class KafkaMap; }; class KafkaMeta diff --git a/src/protocol/KafkaMessage.cc b/src/protocol/KafkaMessage.cc index 369801f1..0cda048e 100644 --- a/src/protocol/KafkaMessage.cc +++ b/src/protocol/KafkaMessage.cc @@ -3035,7 +3035,6 @@ static int kafka_meta_parse_topic(void **buf, size_t *size, int KafkaResponse::parse_metadata(void **buf, size_t *size) { - int throttle_time, controller_id; std::string cluster_id; From 032e729459acbc3df36341bc259295947841c996 Mon Sep 17 00:00:00 2001 From: wangzhulei Date: Fri, 9 Apr 2021 17:56:52 +0800 Subject: [PATCH 03/53] add kafka_process_broker_api --- src/client/WFKafkaClient.cc | 186 ++++++++++++++++++------------------ 1 file changed, 95 insertions(+), 91 deletions(-) diff --git a/src/client/WFKafkaClient.cc b/src/client/WFKafkaClient.cc index a5e777ac..096bacc9 100644 --- a/src/client/WFKafkaClient.cc +++ b/src/client/WFKafkaClient.cc @@ -272,6 +272,8 @@ private: static void kafka_meta_callback(__WFKafkaTask *task); + static void kafka_process_broker_api(ComplexKafkaTask *t, __WFKafkaTask *task); + void kafka_broker_api_callback(__WFKafkaTask *task); static void kafka_broker_callback(const ParallelWork *pwork); @@ -302,7 +304,10 @@ private: int arrange_commit(); - KafkaBroker *get_broker(int node_id); + inline KafkaBroker *get_broker(int node_id) + { + return this->client_broker_map.find_item(node_id); + } int get_node_id(const KafkaToppar *toppar); @@ -320,11 +325,6 @@ private: friend class WFKafkaClient; }; -KafkaBroker *ComplexKafkaTask::get_broker(int node_id) -{ - return this->client_broker_map.find_item(node_id); -} - int ComplexKafkaTask::get_node_id(const KafkaToppar *toppar) { bool flag = false; @@ -561,6 +561,94 @@ void ComplexKafkaTask::kafka_merge_broker_list(KafkaBrokerList *dst, } } +void ComplexKafkaTask::kafka_process_broker_api(ComplexKafkaTask *t, __WFKafkaTask *task) +{ + if (t->config.get_broker_version()) + { + t->client_broker_list.rewind(); + KafkaBroker *broker; + while ((broker = t->client_broker_list.get_next()) != NULL) + { + kafka_api_version_t *api; + size_t api_cnt; + const char *brk_ver = t->config.get_broker_version(); + int ret = kafka_api_version_is_queryable(brk_ver, &api, &api_cnt); + + if (ret == 0) + { + if (!broker->allocate_api_version(api_cnt)) + { + t->state = WFT_STATE_TASK_ERROR; + t->error = errno; + t->lock_status.get_mutex()->unlock(); + return; + } + + memcpy(broker->get_api(), api, + sizeof(kafka_api_version_t) * api_cnt); + + t->client_broker_map.add_item(*broker); + } + else + { + t->state = WFT_STATE_TASK_ERROR; + t->error = WFT_ERR_KAFKA_VERSION_DISALLOWED; + t->lock_status.get_mutex()->unlock(); + return; + } + } + + *t->lock_status.get_status() |= KAFKA_META_DONE; + *t->lock_status.get_status() &= (~(KAFKA_META_INIT|KAFKA_META_DOING)); + + t->state = WFT_STATE_SUCCESS; + t->error = 0; + } + else + { + SeriesWork *series; + ParallelWork *parallel = Workflow::create_parallel_work(kafka_broker_callback); + parallel->set_context(t); + t->client_broker_list.rewind(); + + KafkaBroker *broker; + while ((broker = t->client_broker_list.get_next()) != NULL) + { + auto cb = std::bind(&ComplexKafkaTask::kafka_broker_api_callback, t, + std::placeholders::_1); + __WFKafkaTask *ntask; + if (broker->is_to_addr()) + { + const struct sockaddr *addr; + socklen_t socklen; + broker->get_broker_addr(&addr, &socklen); + + ntask = __WFKafkaTaskFactory::create_kafka_task(addr, socklen, + t->retry_max, + nullptr); + } + else + { + ntask = __WFKafkaTaskFactory::create_kafka_task(broker->get_host(), + broker->get_port(), + t->retry_max, + nullptr); + } + + ntask->get_req()->set_config(t->config); + ntask->get_req()->set_broker(*broker); + ntask->get_req()->set_api(Kafka_ApiVersions); + ntask->user_data = broker; + KafkaComplexTask *ctask = static_cast(ntask); + *ctask->get_mutable_ctx() = cb; + series = Workflow::create_series_work(ntask, nullptr); + parallel->add_series(series); + } + series_of(task)->push_front(parallel); + t->lock_status.get_mutex()->unlock(); + } +} + void ComplexKafkaTask::kafka_meta_callback(__WFKafkaTask *task) { ComplexKafkaTask *t = (ComplexKafkaTask *)task->user_data; @@ -572,91 +660,7 @@ void ComplexKafkaTask::kafka_meta_callback(__WFKafkaTask *task) kafka_merge_broker_list(&t->client_broker_list, task->get_resp()->get_broker_list()); - if (t->config.get_broker_version()) - { - t->client_broker_list.rewind(); - KafkaBroker *broker; - while ((broker = t->client_broker_list.get_next()) != NULL) - { - kafka_api_version_t *api; - size_t api_cnt; - const char *brk_ver = t->config.get_broker_version(); - int ret = kafka_api_version_is_queryable(brk_ver, &api, &api_cnt); - - if (ret == 0) - { - if (!broker->allocate_api_version(api_cnt)) - { - t->state = WFT_STATE_TASK_ERROR; - t->error = errno; - t->lock_status.get_mutex()->unlock(); - return; - } - - memcpy(broker->get_api(), api, - sizeof(kafka_api_version_t) * api_cnt); - - t->client_broker_map.add_item(*broker); - } - else - { - t->state = WFT_STATE_TASK_ERROR; - t->error = WFT_ERR_KAFKA_VERSION_DISALLOWED; - t->lock_status.get_mutex()->unlock(); - return; - } - } - - *t->lock_status.get_status() |= KAFKA_META_DONE; - *t->lock_status.get_status() &= (~(KAFKA_META_INIT|KAFKA_META_DOING)); - - t->state = WFT_STATE_SUCCESS; - t->error = 0; - } - else - { - SeriesWork *series; - ParallelWork *parallel = Workflow::create_parallel_work(kafka_broker_callback); - parallel->set_context(t); - t->client_broker_list.rewind(); - - KafkaBroker *broker; - while ((broker = t->client_broker_list.get_next()) != NULL) - { - auto cb = std::bind(&ComplexKafkaTask::kafka_broker_api_callback, t, - std::placeholders::_1); - __WFKafkaTask *ntask; - if (broker->is_to_addr()) - { - const struct sockaddr *addr; - socklen_t socklen; - broker->get_broker_addr(&addr, &socklen); - - ntask = __WFKafkaTaskFactory::create_kafka_task(addr, socklen, - t->retry_max, - nullptr); - } - else - { - ntask = __WFKafkaTaskFactory::create_kafka_task(broker->get_host(), - broker->get_port(), - t->retry_max, - nullptr); - } - - ntask->get_req()->set_config(t->config); - ntask->get_req()->set_broker(*broker); - ntask->get_req()->set_api(Kafka_ApiVersions); - ntask->user_data = broker; - KafkaComplexTask *ctask = static_cast(ntask); - *ctask->get_mutable_ctx() = cb; - series = Workflow::create_series_work(ntask, nullptr); - parallel->add_series(series); - } - series_of(task)->push_front(parallel); - t->lock_status.get_mutex()->unlock(); - return; - } + kafka_process_broker_api(t, task); } else { From bf63ed32637d5a191badaa1a04c4af7ab1999fa4 Mon Sep 17 00:00:00 2001 From: XieHan Date: Sun, 11 Apr 2021 18:50:06 +0800 Subject: [PATCH 04/53] remove std::tuple in kafka client --- src/client/WFKafkaClient.cc | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/src/client/WFKafkaClient.cc b/src/client/WFKafkaClient.cc index 096bacc9..60c5b619 100644 --- a/src/client/WFKafkaClient.cc +++ b/src/client/WFKafkaClient.cc @@ -678,13 +678,20 @@ void ComplexKafkaTask::kafka_meta_callback(__WFKafkaTask *task) WFTaskFactory::count_by_name(name, (unsigned int)-1); } +struct __broker_status +{ + KafkaBroker *broker; + int state; + int error; +}; + void ComplexKafkaTask::kafka_broker_api_callback(__WFKafkaTask *task) { - using KafkaTuple = std::tuple; - KafkaTuple * state_error_broker = new KafkaTuple{task->get_state(), - task->get_error(), - task->user_data}; - series_of(task)->set_context(state_error_broker); + struct __broker_status *status = new struct __broker_status; + status->broker = (KafkaBroker *)task->user_data; + status->state = task->get_state(); + status->error = task->get_error(); + series_of(task)->set_context(status); } void ComplexKafkaTask::kafka_broker_callback(const ParallelWork *pwork) @@ -694,21 +701,20 @@ void ComplexKafkaTask::kafka_broker_callback(const ParallelWork *pwork) t->error = 0; t->lock_status.get_mutex()->lock(); - using KafkaTuple = std::tuple; - KafkaTuple *state_error_broker; + struct __broker_status *status; for (size_t i = 0; i < pwork->size(); i++) { - state_error_broker = (KafkaTuple *)pwork->series_at(i)->get_context(); - if (std::get<0>(*state_error_broker) != WFT_STATE_SUCCESS) + status = (struct __broker_status *)pwork->series_at(i)->get_context(); + if (status->error != WFT_STATE_SUCCESS) { - t->state = std::get<0>(*state_error_broker); - t->error = std::get<1>(*state_error_broker); + t->state = status->state; + t->error = status->error; } else - t->client_broker_map.add_item(*std::get<2>(*state_error_broker)); + t->client_broker_map.add_item(*status->broker); - delete state_error_broker; + delete status; } if (t->state == WFT_STATE_SUCCESS) From a2ea22a9a57239d6a7e09e38a3021120a2aa209e Mon Sep 17 00:00:00 2001 From: XieHan Date: Sun, 11 Apr 2021 18:53:34 +0800 Subject: [PATCH 05/53] error->state --- src/client/WFKafkaClient.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/WFKafkaClient.cc b/src/client/WFKafkaClient.cc index 60c5b619..524dc2d7 100644 --- a/src/client/WFKafkaClient.cc +++ b/src/client/WFKafkaClient.cc @@ -706,7 +706,7 @@ void ComplexKafkaTask::kafka_broker_callback(const ParallelWork *pwork) for (size_t i = 0; i < pwork->size(); i++) { status = (struct __broker_status *)pwork->series_at(i)->get_context(); - if (status->error != WFT_STATE_SUCCESS) + if (status->state != WFT_STATE_SUCCESS) { t->state = status->state; t->error = status->error; From fd54c3ff96513472d928fd75e4774390a1e20ab1 Mon Sep 17 00:00:00 2001 From: XieHan Date: Sun, 11 Apr 2021 19:15:13 +0800 Subject: [PATCH 06/53] fix some code style --- src/protocol/kafka_parser.c | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/src/protocol/kafka_parser.c b/src/protocol/kafka_parser.c index a0c35679..acb7cfe7 100644 --- a/src/protocol/kafka_parser.c +++ b/src/protocol/kafka_parser.c @@ -23,9 +23,6 @@ #include #include "kafka_parser.h" - -#define MIN(a, b) ((x) <= (y) ? (x) : (y)) - static kafka_api_version_t kafka_api_version_queryable[] = { { Kafka_ApiVersions, 0, 0 } }; @@ -207,8 +204,8 @@ static int kafka_get_legacy_api_version(const char *broker_version, { "", kafka_api_version_queryable, 1 }, { NULL, NULL, 0 } }; - int i; + for (i = 0 ; vermap[i].pfx ; i++) { if (!strncmp(vermap[i].pfx, broker_version, strlen(vermap[i].pfx))) @@ -263,7 +260,7 @@ unsigned kafka_get_features(kafka_api_version_t *api, size_t api_cnt) int i, fails, r; const kafka_api_version_t *match; - for (i = 0 ; kafka_feature_map[i].feature != 0 ; i++) + for (i = 0; kafka_feature_map[i].feature != 0; i++) { fails = 0; for (match = &kafka_feature_map[i].depends[0]; @@ -574,6 +571,7 @@ void kafka_block_deinit(kafka_block_t *block) int kafka_parser_append_message(const void *buf, size_t *size, kafka_parser_t *parser) { + size_t s = *size; int totaln; if (parser->complete) @@ -582,9 +580,7 @@ int kafka_parser_append_message(const void *buf, size_t *size, return 1; } - size_t s = *size; - - if (parser->hsize + *size < 4) + if (parser->hsize + s < 4) { memcpy(parser->headbuf + parser->hsize, buf, s); parser->hsize += s; @@ -672,15 +668,12 @@ int kafka_record_header_set_kv(const void *key, size_t key_len, kafka_record_header_t *header) { void *k = malloc(key_len); - - if (!k) - return -1; - void *v = malloc(val_len); - if (!v) + if (!k || !v) { free(k); + free(v); return -1; } @@ -728,11 +721,12 @@ int kafka_sasl_plain_client_new(void *p) size_t ulen = strlen(conf->sasl.username); size_t plen = strlen(conf->sasl.passwd); size_t blen = ulen + plen + 3; - char *buf = malloc(blen); + size_t off = 0; + char *buf = (char *)malloc(blen); + if (!buf) return -1; - size_t off = 0; buf[off++] = '\0'; memcpy(buf + off, conf->sasl.username, ulen); @@ -783,3 +777,4 @@ int kafka_sasl_set_passwd(const char *passwd, kafka_config_t *conf) conf->sasl.passwd = t; return 0; } + From 3f273ad116d16797d441cd91e4845dc127814fb4 Mon Sep 17 00:00:00 2001 From: liyingxin Date: Mon, 12 Apr 2021 20:18:57 +0800 Subject: [PATCH 07/53] add define guard for WFMySQLConnection.h (#335) --- src/client/WFMySQLConnection.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/client/WFMySQLConnection.h b/src/client/WFMySQLConnection.h index 8fb119a2..66bacf47 100644 --- a/src/client/WFMySQLConnection.h +++ b/src/client/WFMySQLConnection.h @@ -16,6 +16,9 @@ Author: Xie Han (xiehan@sogou-inc.com) */ +#ifndef _WFMYSQLCONNECTION_H_ +#define _WFMYSQLCONNECTION_H_ + #include #include #include @@ -73,3 +76,5 @@ WFMySQLConnection::create_disconnect_task(mysql_callback_t callback) return task; } +#endif + From 758eddf6bfc6f7d94b9cda8751c22739a713405b Mon Sep 17 00:00:00 2001 From: XieHan Date: Thu, 15 Apr 2021 01:09:39 +0800 Subject: [PATCH 08/53] fix addrinfo with unix domain socket --- src/algorithm/DNSRoutine.cc | 57 ++++++++++++++++++++++++------------- src/algorithm/DNSRoutine.h | 3 ++ 2 files changed, 41 insertions(+), 19 deletions(-) diff --git a/src/algorithm/DNSRoutine.cc b/src/algorithm/DNSRoutine.cc index ba3ad2e1..abbfc51e 100644 --- a/src/algorithm/DNSRoutine.cc +++ b/src/algorithm/DNSRoutine.cc @@ -16,8 +16,14 @@ Authors: Wu Jiaxu (wujiaxu@sogou-inc.com) */ -#include +#include +#include #include +#include +#include +#include +#include +#include #include "DNSRoutine.h" #define PORT_STR_MAX 5 @@ -48,27 +54,40 @@ DNSOutput& DNSOutput::operator= (DNSOutput&& move) return *this; } +void DNSRoutine::run_local_path(const std::string& path, DNSOutput *out) +{ + struct sockaddr_un *sun = NULL; + + if (path.size() <= sizeof sun->sun_path) + { + size_t size = sizeof (struct addrinfo) + sizeof (struct sockaddr_un); + + out->addrinfo_ = (struct addrinfo *)calloc(size, 1); + if (out->addrinfo_) + { + sun = (struct sockaddr_un *)(out->addrinfo_ + 1); + sun->sun_family = AF_UNIX; + memcpy(sun->sun_path, path.c_str(), path.size()); + + out->addrinfo_->ai_family = AF_UNIX; + out->addrinfo_->ai_socktype = SOCK_STREAM; + out->addrinfo_->ai_addr = (struct sockaddr *)sun; + size = offsetof(struct sockaddr_un, sun_path) + path.size(); + out->addrinfo_->ai_addrlen = size; + out->error_ = 0; + return; + } + } + else + errno = EINVAL; + + out->error_ = EAI_SYSTEM; +} + void DNSRoutine::run(const DNSInput *in, DNSOutput *out) { if (!in->host_.empty() && in->host_[0] == '/') - { - out->error_ = 0; - out->addrinfo_ = (addrinfo*)malloc(sizeof (struct addrinfo) + sizeof (struct sockaddr_un)); - out->addrinfo_->ai_flags = AI_ADDRCONFIG; - out->addrinfo_->ai_family = AF_UNIX; - out->addrinfo_->ai_socktype = SOCK_STREAM; - out->addrinfo_->ai_protocol = 0; - out->addrinfo_->ai_addrlen = sizeof (struct sockaddr_un); - out->addrinfo_->ai_addr = (struct sockaddr *)((char *)(out->addrinfo_) + sizeof (struct addrinfo)); - out->addrinfo_->ai_canonname = NULL; - out->addrinfo_->ai_next = NULL; - struct sockaddr_un *sun = (struct sockaddr_un *)(out->addrinfo_->ai_addr); - - sun->sun_family = AF_UNIX; - memset(sun->sun_path, 0, sizeof (sun->sun_path)); - strcpy(sun->sun_path, in->host_.c_str()); - return; - } + run_local_path(in->host_, out); struct addrinfo hints = { #ifdef AI_ADDRCONFIG diff --git a/src/algorithm/DNSRoutine.h b/src/algorithm/DNSRoutine.h index 785cae14..0ec69aad 100644 --- a/src/algorithm/DNSRoutine.h +++ b/src/algorithm/DNSRoutine.h @@ -92,6 +92,9 @@ class DNSRoutine { public: static void run(const DNSInput *in, DNSOutput *out); + +private: + static void run_local_path(const std::string& path, DNSOutput *out); }; //new WFDNSTask(queue, executor, dns_routine, callback) From 7fee812f16be22b7a9b4c414b5ebd921e6e4e34a Mon Sep 17 00:00:00 2001 From: XieHan Date: Thu, 15 Apr 2021 01:11:47 +0800 Subject: [PATCH 09/53] add a return --- src/algorithm/DNSRoutine.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/algorithm/DNSRoutine.cc b/src/algorithm/DNSRoutine.cc index abbfc51e..f5eb698c 100644 --- a/src/algorithm/DNSRoutine.cc +++ b/src/algorithm/DNSRoutine.cc @@ -87,7 +87,10 @@ void DNSRoutine::run_local_path(const std::string& path, DNSOutput *out) void DNSRoutine::run(const DNSInput *in, DNSOutput *out) { if (!in->host_.empty() && in->host_[0] == '/') + { run_local_path(in->host_, out); + return; + } struct addrinfo hints = { #ifdef AI_ADDRCONFIG From 02f68313732310343945ae4b698b27ec6fd9fb32 Mon Sep 17 00:00:00 2001 From: XieHan Date: Thu, 15 Apr 2021 01:19:37 +0800 Subject: [PATCH 10/53] update workflow package version --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 69a245f8..51829e42 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,7 +5,7 @@ set(CMAKE_SKIP_RPATH TRUE) project( workflow - VERSION 0.9.5 + VERSION 0.9.6 LANGUAGES C CXX ) From d4f0fa0f5bb6ecffefd990c7b4c32be3469bc0d6 Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Thu, 15 Apr 2021 01:24:41 +0800 Subject: [PATCH 11/53] fix addrinfo with unix domain socket (#337) * fix addrinfo with unix domain socket * add a return * update workflow package version --- CMakeLists.txt | 2 +- src/algorithm/DNSRoutine.cc | 54 ++++++++++++++++++++++++++----------- src/algorithm/DNSRoutine.h | 3 +++ 3 files changed, 42 insertions(+), 17 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 69a245f8..51829e42 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,7 +5,7 @@ set(CMAKE_SKIP_RPATH TRUE) project( workflow - VERSION 0.9.5 + VERSION 0.9.6 LANGUAGES C CXX ) diff --git a/src/algorithm/DNSRoutine.cc b/src/algorithm/DNSRoutine.cc index ba3ad2e1..f5eb698c 100644 --- a/src/algorithm/DNSRoutine.cc +++ b/src/algorithm/DNSRoutine.cc @@ -16,8 +16,14 @@ Authors: Wu Jiaxu (wujiaxu@sogou-inc.com) */ -#include +#include +#include #include +#include +#include +#include +#include +#include #include "DNSRoutine.h" #define PORT_STR_MAX 5 @@ -48,25 +54,41 @@ DNSOutput& DNSOutput::operator= (DNSOutput&& move) return *this; } +void DNSRoutine::run_local_path(const std::string& path, DNSOutput *out) +{ + struct sockaddr_un *sun = NULL; + + if (path.size() <= sizeof sun->sun_path) + { + size_t size = sizeof (struct addrinfo) + sizeof (struct sockaddr_un); + + out->addrinfo_ = (struct addrinfo *)calloc(size, 1); + if (out->addrinfo_) + { + sun = (struct sockaddr_un *)(out->addrinfo_ + 1); + sun->sun_family = AF_UNIX; + memcpy(sun->sun_path, path.c_str(), path.size()); + + out->addrinfo_->ai_family = AF_UNIX; + out->addrinfo_->ai_socktype = SOCK_STREAM; + out->addrinfo_->ai_addr = (struct sockaddr *)sun; + size = offsetof(struct sockaddr_un, sun_path) + path.size(); + out->addrinfo_->ai_addrlen = size; + out->error_ = 0; + return; + } + } + else + errno = EINVAL; + + out->error_ = EAI_SYSTEM; +} + void DNSRoutine::run(const DNSInput *in, DNSOutput *out) { if (!in->host_.empty() && in->host_[0] == '/') { - out->error_ = 0; - out->addrinfo_ = (addrinfo*)malloc(sizeof (struct addrinfo) + sizeof (struct sockaddr_un)); - out->addrinfo_->ai_flags = AI_ADDRCONFIG; - out->addrinfo_->ai_family = AF_UNIX; - out->addrinfo_->ai_socktype = SOCK_STREAM; - out->addrinfo_->ai_protocol = 0; - out->addrinfo_->ai_addrlen = sizeof (struct sockaddr_un); - out->addrinfo_->ai_addr = (struct sockaddr *)((char *)(out->addrinfo_) + sizeof (struct addrinfo)); - out->addrinfo_->ai_canonname = NULL; - out->addrinfo_->ai_next = NULL; - struct sockaddr_un *sun = (struct sockaddr_un *)(out->addrinfo_->ai_addr); - - sun->sun_family = AF_UNIX; - memset(sun->sun_path, 0, sizeof (sun->sun_path)); - strcpy(sun->sun_path, in->host_.c_str()); + run_local_path(in->host_, out); return; } diff --git a/src/algorithm/DNSRoutine.h b/src/algorithm/DNSRoutine.h index 785cae14..0ec69aad 100644 --- a/src/algorithm/DNSRoutine.h +++ b/src/algorithm/DNSRoutine.h @@ -92,6 +92,9 @@ class DNSRoutine { public: static void run(const DNSInput *in, DNSOutput *out); + +private: + static void run_local_path(const std::string& path, DNSOutput *out); }; //new WFDNSTask(queue, executor, dns_routine, callback) From 1b3cc5d513004329352bfcdf8403dc71498e1c7e Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Thu, 15 Apr 2021 02:36:59 +0800 Subject: [PATCH 12/53] Update README_cn.md --- README_cn.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README_cn.md b/README_cn.md index 5a397113..06adcb6b 100644 --- a/README_cn.md +++ b/README_cn.md @@ -30,7 +30,7 @@ int main() * 作为万能异步客户端。目前支持``http``,``redis``,``mysql``和``kafka``协议。 * 轻松构建效率极高的spider。 * 实现自定义协议client/server,构建自己的RPC系统。 - * [srpc](https://github.com/sogou/srpc)就是以它为基础,作为独立项目开源。支持``srpc``,``brpc``和``thrift``等协议。 + * [srpc](https://github.com/sogou/srpc)就是以它为基础,作为独立项目开源。支持``srpc``,``brpc``,``trpc``和``thrift``等协议。 * 构建异步任务流,支持常用的串并联,也支持更加复杂的DAG结构。 * 作为并行计算工具使用。除了网络任务,我们也包含计算任务的调度。所有类型的任务都可以放入同一个流中。 * 在``Linux``系统下作为文件异步IO工具使用,性能超过任何标准调用。磁盘IO也是一种任务。 From c90f7abf31ad406fe19ec2b5dfff38d81ff4350f Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Thu, 15 Apr 2021 02:38:49 +0800 Subject: [PATCH 13/53] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1af208d4..91723d04 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ int main() * As a **multifunctional asynchronous client**, it currently supports `HTTP`, `Redis`, `MySQL` and `Kafka` protocols. * To implement **client/server on user-defined protocol** and build your own **RPC system**. - * [srpc](https://github.com/sogou/srpc) is based on it and it is an independent open source project, which supports srpc, brpc and thrift protocols. + * [srpc](https://github.com/sogou/srpc) is based on it and it is an independent open source project, which supports srpc, brpc, trpc and thrift protocols. * To build **asynchronous workflow**; support common **series** and **parallel** structures, and also support any **DAG** structures. * As a **parallel computing tool**. In addition to **networking tasks**, Sogou C++ Workflow also includes **the scheduling of computing tasks**. All types of tasks can be put into **the same** flow. * As a **asynchronous file IO tool** in `Linux` system, with high performance exceeding any system call. Disk file IO is also a task. From 6eba304d136dd11f874953bacbe71cc134cdd2bb Mon Sep 17 00:00:00 2001 From: XieHan Date: Thu, 15 Apr 2021 15:36:16 +0800 Subject: [PATCH 14/53] fix unix domain socket path --- src/algorithm/DNSRoutine.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/algorithm/DNSRoutine.cc b/src/algorithm/DNSRoutine.cc index f5eb698c..40a45184 100644 --- a/src/algorithm/DNSRoutine.cc +++ b/src/algorithm/DNSRoutine.cc @@ -58,7 +58,7 @@ void DNSRoutine::run_local_path(const std::string& path, DNSOutput *out) { struct sockaddr_un *sun = NULL; - if (path.size() <= sizeof sun->sun_path) + if (path.size() + 1 <= sizeof sun->sun_path) { size_t size = sizeof (struct addrinfo) + sizeof (struct sockaddr_un); @@ -72,7 +72,7 @@ void DNSRoutine::run_local_path(const std::string& path, DNSOutput *out) out->addrinfo_->ai_family = AF_UNIX; out->addrinfo_->ai_socktype = SOCK_STREAM; out->addrinfo_->ai_addr = (struct sockaddr *)sun; - size = offsetof(struct sockaddr_un, sun_path) + path.size(); + size = offsetof(struct sockaddr_un, sun_path) + path.size() + 1; out->addrinfo_->ai_addrlen = size; out->error_ = 0; return; From d7064330aaef1bdce5a400abc95f1b1477045c2a Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Thu, 15 Apr 2021 15:42:22 +0800 Subject: [PATCH 15/53] Fix unix domain socket path and addrlen (#338) * fix addrinfo with unix domain socket * add a return * update workflow package version * fix unix domain socket path --- src/algorithm/DNSRoutine.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/algorithm/DNSRoutine.cc b/src/algorithm/DNSRoutine.cc index f5eb698c..40a45184 100644 --- a/src/algorithm/DNSRoutine.cc +++ b/src/algorithm/DNSRoutine.cc @@ -58,7 +58,7 @@ void DNSRoutine::run_local_path(const std::string& path, DNSOutput *out) { struct sockaddr_un *sun = NULL; - if (path.size() <= sizeof sun->sun_path) + if (path.size() + 1 <= sizeof sun->sun_path) { size_t size = sizeof (struct addrinfo) + sizeof (struct sockaddr_un); @@ -72,7 +72,7 @@ void DNSRoutine::run_local_path(const std::string& path, DNSOutput *out) out->addrinfo_->ai_family = AF_UNIX; out->addrinfo_->ai_socktype = SOCK_STREAM; out->addrinfo_->ai_addr = (struct sockaddr *)sun; - size = offsetof(struct sockaddr_un, sun_path) + path.size(); + size = offsetof(struct sockaddr_un, sun_path) + path.size() + 1; out->addrinfo_->ai_addrlen = size; out->error_ = 0; return; From 8aab9c50a6193eed70e8f80c9e9c6db82d5c5eaf Mon Sep 17 00:00:00 2001 From: XieHan Date: Fri, 16 Apr 2021 01:11:13 +0800 Subject: [PATCH 16/53] remove some RedisValue constructors --- src/protocol/RedisMessage.cc | 60 ------------------------------------ src/protocol/RedisMessage.h | 18 ----------- 2 files changed, 78 deletions(-) diff --git a/src/protocol/RedisMessage.cc b/src/protocol/RedisMessage.cc index cb63d2dd..e0a8535b 100644 --- a/src/protocol/RedisMessage.cc +++ b/src/protocol/RedisMessage.cc @@ -31,66 +31,6 @@ typedef int64_t Rint; typedef std::string Rstr; typedef std::vector Rarr; -RedisValue::RedisValue(int64_t intv): - type_(REDIS_REPLY_TYPE_INTEGER), - data_(new Rint(intv)) -{ -} - -RedisValue::RedisValue(const char *str): - type_(REDIS_REPLY_TYPE_STRING), - data_(new Rstr(str)) -{ -} - -RedisValue::RedisValue(const char *str, size_t len): - type_(REDIS_REPLY_TYPE_STRING), - data_(new Rstr(str, len)) -{ -} - -RedisValue::RedisValue(const std::string& strv): - type_(REDIS_REPLY_TYPE_STRING), - data_(new Rstr(strv)) -{ -} - -RedisValue::RedisValue(const char *str, StatusTag status_tag): - type_(REDIS_REPLY_TYPE_STATUS), - data_(new Rstr(str)) -{ -} - -RedisValue::RedisValue(const char *str, size_t len, StatusTag status_tag): - type_(REDIS_REPLY_TYPE_STATUS), - data_(new Rstr(str, len)) -{ -} - -RedisValue::RedisValue(const std::string& strv, StatusTag status_tag): - type_(REDIS_REPLY_TYPE_STATUS), - data_(new Rstr(strv)) -{ -} - -RedisValue::RedisValue(const char *str, ErrorTag error_tag): - type_(REDIS_REPLY_TYPE_ERROR), - data_(new Rstr(str)) -{ -} - -RedisValue::RedisValue(const char *str, size_t len, ErrorTag error_tag): - type_(REDIS_REPLY_TYPE_ERROR), - data_(new Rstr(str, len)) -{ -} - -RedisValue::RedisValue(const std::string& strv, ErrorTag error_tag): - type_(REDIS_REPLY_TYPE_ERROR), - data_(new Rstr(strv)) -{ -} - RedisValue& RedisValue::operator= (const RedisValue& copy) { if (this != ©) diff --git a/src/protocol/RedisMessage.h b/src/protocol/RedisMessage.h index 067f9516..1083e5b6 100644 --- a/src/protocol/RedisMessage.h +++ b/src/protocol/RedisMessage.h @@ -107,24 +107,6 @@ public: // format data to text std::string debug_string() const; -public: - struct StatusTag {}; - struct ErrorTag {}; - // integer - RedisValue(int64_t intv); - // string - RedisValue(const char *str); - RedisValue(const char *str, size_t len); - RedisValue(const std::string& strv); - // status - RedisValue(const char *str, StatusTag status_tag); - RedisValue(const char *str, size_t len, StatusTag status_tag); - RedisValue(const std::string& strv, StatusTag status_tag); - // error - RedisValue(const char *str, ErrorTag error_tag); - RedisValue(const char *str, size_t len, ErrorTag error_tag); - RedisValue(const std::string& strv, ErrorTag error_tag); - private: void free_data(); void only_set_string_data(const std::string& strv); From cb17e68238d32753c11c98642e49b1adbe517f84 Mon Sep 17 00:00:00 2001 From: XieHan Date: Fri, 16 Apr 2021 20:17:05 +0800 Subject: [PATCH 17/53] add include --- src/algorithm/DNSRoutine.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/algorithm/DNSRoutine.cc b/src/algorithm/DNSRoutine.cc index 40a45184..c0938641 100644 --- a/src/algorithm/DNSRoutine.cc +++ b/src/algorithm/DNSRoutine.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include From acb446c6a63c0ce20ac0739176e02d14298334fa Mon Sep 17 00:00:00 2001 From: XieHan Date: Fri, 23 Apr 2021 00:16:52 +0800 Subject: [PATCH 18/53] set_type->set_transport_type --- src/factory/HttpTaskImpl.cc | 2 +- src/factory/KafkaTaskImpl.cc | 2 +- src/factory/MySQLTaskImpl.cc | 2 +- src/factory/RedisTaskImpl.cc | 2 +- src/factory/WFTaskFactory.inl | 41 ++++++++++++++++++----------------- src/manager/WFFacilities.inl | 2 +- 6 files changed, 26 insertions(+), 25 deletions(-) diff --git a/src/factory/HttpTaskImpl.cc b/src/factory/HttpTaskImpl.cc index 8baba64c..c07272cf 100644 --- a/src/factory/HttpTaskImpl.cc +++ b/src/factory/HttpTaskImpl.cc @@ -255,7 +255,7 @@ bool ComplexHttpTask::init_success() } } - this->WFComplexClientTask::set_type(is_ssl ? TT_TCP_SSL : TT_TCP); + this->WFComplexClientTask::set_transport_type(is_ssl ? TT_TCP_SSL : TT_TCP); client_req->set_request_uri(request_uri.c_str()); client_req->set_header_pair("Host", header_host.c_str()); diff --git a/src/factory/KafkaTaskImpl.cc b/src/factory/KafkaTaskImpl.cc index 4dae9bde..a5584c96 100644 --- a/src/factory/KafkaTaskImpl.cc +++ b/src/factory/KafkaTaskImpl.cc @@ -167,7 +167,7 @@ bool __ComplexKafkaTask::init_success() this->WFComplexClientTask::set_info(info); } - this->WFComplexClientTask::set_type(type); + this->WFComplexClientTask::set_transport_type(type); return true; } diff --git a/src/factory/MySQLTaskImpl.cc b/src/factory/MySQLTaskImpl.cc index 1264685b..641a8293 100644 --- a/src/factory/MySQLTaskImpl.cc +++ b/src/factory/MySQLTaskImpl.cc @@ -464,7 +464,7 @@ bool ComplexMySQLTask::init_success() "charset:%d|rcharset:%s", username_.c_str(), password_.c_str(), db_.c_str(), character_set_, res_charset_.c_str()); - this->WFComplexClientTask::set_type(type); + this->WFComplexClientTask::set_transport_type(type); if (!transaction.empty()) { diff --git a/src/factory/RedisTaskImpl.cc b/src/factory/RedisTaskImpl.cc index aa71f5e9..cfda13b3 100644 --- a/src/factory/RedisTaskImpl.cc +++ b/src/factory/RedisTaskImpl.cc @@ -167,7 +167,7 @@ bool ComplexRedisTask::init_success() char *info = new char[info_len]; sprintf(info, "redis|pass:%s|db:%d", password_.c_str(), db_num_); - this->WFComplexClientTask::set_type(type); + this->WFComplexClientTask::set_transport_type(type); this->WFComplexClientTask::set_info(info); delete []info; diff --git a/src/factory/WFTaskFactory.inl b/src/factory/WFTaskFactory.inl index 8a689680..2cf42fb1 100644 --- a/src/factory/WFTaskFactory.inl +++ b/src/factory/WFTaskFactory.inl @@ -160,21 +160,6 @@ protected: virtual bool finish_once() { return true; } public: - void set_info(const std::string& info) - { - info_.assign(info); - } - - void set_info(const char *info) - { - info_.assign(info); - } - - void set_type(TransportType type) - { - type_ = type; - } - void init(const ParsedURI& uri) { uri_ = uri; @@ -192,6 +177,13 @@ public: socklen_t addrlen, const std::string& info); + void set_transport_type(TransportType type) + { + type_ = type; + } + + TransportType get_transport_type() const { return type_; } + const ParsedURI *get_current_uri() const { return &uri_; } void set_redirect(const ParsedURI& uri) @@ -207,6 +199,17 @@ public: init(type, addr, addrlen, info); } +protected: + void set_info(const std::string& info) + { + info_.assign(info); + } + + void set_info(const char *info) + { + info_.assign(info); + } + protected: virtual void dispatch(); virtual SubTask *done(); @@ -225,8 +228,6 @@ protected: retry_times_ = retry_max_; } - TransportType get_transport_type() const { return type_; } - protected: TransportType type_; ParsedURI uri_; @@ -541,7 +542,7 @@ WFNetworkTaskFactory::create_client_task(TransportType type, url += buf; URIParser::parse(url, uri); task->init(std::move(uri)); - task->set_type(type); + task->set_transport_type(type); return task; } @@ -557,7 +558,7 @@ WFNetworkTaskFactory::create_client_task(TransportType type, URIParser::parse(url, uri); task->init(std::move(uri)); - task->set_type(type); + task->set_transport_type(type); return task; } @@ -571,7 +572,7 @@ WFNetworkTaskFactory::create_client_task(TransportType type, auto *task = new WFComplexClientTask(retry_max, std::move(callback)); task->init(uri); - task->set_type(type); + task->set_transport_type(type); return task; } diff --git a/src/manager/WFFacilities.inl b/src/manager/WFFacilities.inl index 7cce7c16..79dba6f9 100644 --- a/src/manager/WFFacilities.inl +++ b/src/manager/WFFacilities.inl @@ -66,7 +66,7 @@ WFFuture> WFFacilities::async_request(Transp URIParser::parse(url, uri); task->init(std::move(uri)); - task->set_type(type); + task->set_transport_type(type); *task->get_req() = std::forward(req); task->start(); return fr; From 818f2debdaa5d6b20cd29bba39b87a05c6124363 Mon Sep 17 00:00:00 2001 From: XieHan Date: Sat, 24 Apr 2021 00:21:40 +0800 Subject: [PATCH 19/53] fix kafka heartbeat interval. --- src/client/WFKafkaClient.cc | 9 ++------- src/client/WFKafkaClient.h | 2 -- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/src/client/WFKafkaClient.cc b/src/client/WFKafkaClient.cc index 524dc2d7..f5781e7c 100644 --- a/src/client/WFKafkaClient.cc +++ b/src/client/WFKafkaClient.cc @@ -23,7 +23,7 @@ #include #include "WFKafkaClient.h" -static size_t KAFKA_HEARTBEAT_TIMEOUT = 3 * 1000; +#define KAFKA_HEARTBEAT_INTERVAL (3 * 1000 * 1000) #define KAFKA_META_INIT (1<<0) #define KAFKA_META_DOING (1<<1) @@ -469,7 +469,7 @@ void ComplexKafkaTask::kafka_heartbeat_callback(__WFKafkaTask *task) *t->get_lock_status()->get_status() |= KAFKA_HEARTBEAT_DONE; *t->get_lock_status()->get_status() &= ~KAFKA_HEARTBEAT_DOING; WFTimerTask *timer_task; - timer_task = WFTaskFactory::create_timer_task(KAFKA_HEARTBEAT_TIMEOUT, + timer_task = WFTaskFactory::create_timer_task(KAFKA_HEARTBEAT_INTERVAL, kafka_timer_callback); timer_task->user_data = t; timer_task->start(); @@ -1660,11 +1660,6 @@ int WFKafkaClient::init(const std::string& broker, const std::string& group) return -1; } -void WFKafkaClient::set_heartbeat_interval(size_t interval_ms) -{ - KAFKA_HEARTBEAT_TIMEOUT = interval_ms; -} - void WFKafkaClient::deinit() { this->member->lock_status->dec_cnt(); diff --git a/src/client/WFKafkaClient.h b/src/client/WFKafkaClient.h index d140b97f..d4231672 100644 --- a/src/client/WFKafkaClient.h +++ b/src/client/WFKafkaClient.h @@ -125,8 +125,6 @@ public: int init(const std::string& broker_url, const std::string& group); - void set_heartbeat_interval(size_t interval_ms); - void deinit(); // example: topic=xxx&topic=yyy&api=fetch From 8ee0925cc9bd88f1ada6b4b84533262843b1b50d Mon Sep 17 00:00:00 2001 From: XieHan Date: Mon, 26 Apr 2021 23:18:53 +0800 Subject: [PATCH 20/53] add tutorial-00-helloworld.cc --- tutorial/CMakeLists.txt | 1 + tutorial/tutorial-00-helloworld.cc | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) create mode 100644 tutorial/tutorial-00-helloworld.cc diff --git a/tutorial/CMakeLists.txt b/tutorial/CMakeLists.txt index c3c08722..a03ac883 100644 --- a/tutorial/CMakeLists.txt +++ b/tutorial/CMakeLists.txt @@ -23,6 +23,7 @@ else () endif () set(TUTORIAL_LIST + tutorial-00-helloworld tutorial-01-wget tutorial-02-redis_cli tutorial-03-wget_to_redis diff --git a/tutorial/tutorial-00-helloworld.cc b/tutorial/tutorial-00-helloworld.cc new file mode 100644 index 00000000..0c210290 --- /dev/null +++ b/tutorial/tutorial-00-helloworld.cc @@ -0,0 +1,17 @@ +#include +#include "workflow/WFHttpServer.h" + +int main() +{ + WFHttpServer server([](WFHttpTask *task) { + task->get_resp()->append_output_body("Hello World!"); + }); + + if (server.start(8888) == 0) { // start server on port 8888 + getchar(); // press "Enter" to end. + server.stop(); + } + + return 0; +} + From 8813c2891a4dc3d1205d4353db59833d5149f61e Mon Sep 17 00:00:00 2001 From: XieHan Date: Fri, 30 Apr 2021 17:00:38 +0800 Subject: [PATCH 21/53] Fix Http 100 continue --- src/protocol/HttpMessage.cc | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/protocol/HttpMessage.cc b/src/protocol/HttpMessage.cc index d9b07f1e..6094cde0 100644 --- a/src/protocol/HttpMessage.cc +++ b/src/protocol/HttpMessage.cc @@ -342,11 +342,14 @@ int HttpResponse::append(const void *buf, size_t *size) { int ret = HttpMessage::append(buf, size); - if (ret > 0 && *http_parser_get_code(this->parser) == '1') + if (ret > 0) { - http_parser_deinit(this->parser); - http_parser_init(1, this->parser); - ret = 0; + if (strcmp(http_parser_get_code(this->parser), "100") == 0) + { + http_parser_deinit(this->parser); + http_parser_init(1, this->parser); + ret = 0; + } } return ret; From 0b00534c723cf428501d8f9908ed8dd26d8e5998 Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Sat, 1 May 2021 17:54:53 +0800 Subject: [PATCH 22/53] Update README_cn.md --- README_cn.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README_cn.md b/README_cn.md index 06adcb6b..67215478 100644 --- a/README_cn.md +++ b/README_cn.md @@ -38,6 +38,8 @@ int main() * 构建微服务系统。 * 项目内置服务治理与负载均衡等功能。 +#### + #### 编译和运行环境 * 项目支持``Linux``,``macOS``,``Windows``,``Android``等操作系统。 * ``Windows``版以[windows](https://github.com/sogou/workflow/tree/windows)分支发布,使用``iocp``实现异步网络。用户接口与``Linux``版一致。 @@ -46,6 +48,14 @@ int main() * 不喜欢SSL的用户可以使用[nossl](https://github.com/sogou/workflow/tree/nossl)分支,代码更简洁。但仍需链接``crypto``。 * 项目使用了``C++11``标准,需要用支持``C++11``的编译器编译。但不依赖``boost``或``asio``。 * 项目无其它依赖。如需使用``kafka``协议,需自行安装``lz4``,``zstd``和``snappy``几个压缩库。 +* 快速开始(Linux) +~~~sh +$ git clone https://github.com/sogou/workflow +$ cd workflow +$ make +$ cd tutorial +$ make +~~~ # 试一下! * Client基础 From d70210d369880a86305cd8be3d4abfb157b654e1 Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Sat, 1 May 2021 17:57:45 +0800 Subject: [PATCH 23/53] Update README_cn.md --- README_cn.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README_cn.md b/README_cn.md index 67215478..d0ee080f 100644 --- a/README_cn.md +++ b/README_cn.md @@ -48,7 +48,7 @@ int main() * 不喜欢SSL的用户可以使用[nossl](https://github.com/sogou/workflow/tree/nossl)分支,代码更简洁。但仍需链接``crypto``。 * 项目使用了``C++11``标准,需要用支持``C++11``的编译器编译。但不依赖``boost``或``asio``。 * 项目无其它依赖。如需使用``kafka``协议,需自行安装``lz4``,``zstd``和``snappy``几个压缩库。 -* 快速开始(Linux) +* 快速开始(Linux): ~~~sh $ git clone https://github.com/sogou/workflow $ cd workflow From ce9274a76b5aaab1a560aab8c2c6e9f5e5f1a5bd Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Sat, 1 May 2021 18:00:58 +0800 Subject: [PATCH 24/53] Update README.md --- README.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 91723d04..9dbe2fb8 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,15 @@ int main() * Uses the `C++11` standard and therefore, it should be compiled with a compiler which supports `C++11`. Does not rely on `boost` or `asio`. * No other dependencies. However, if you need `Kafka` protocol, some compression libraries should be installed, including `lz4`, `zstd` and `snappy`. -# Try it! +### Get started (Linux): +~~~sh +$ git clone https://github.com/sogou/workflow +$ make +$ cd tutorial +$ make +~~~~ + +# Tutorials * Client * [Creating your first task:wget](docs/en/tutorial-01-wget.md) From b778af84d2a725d9b082b384c02f07b62c1d0d9c Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Sat, 1 May 2021 18:03:05 +0800 Subject: [PATCH 25/53] Update README_cn.md --- README_cn.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/README_cn.md b/README_cn.md index d0ee080f..e53a0642 100644 --- a/README_cn.md +++ b/README_cn.md @@ -38,8 +38,6 @@ int main() * 构建微服务系统。 * 项目内置服务治理与负载均衡等功能。 -#### - #### 编译和运行环境 * 项目支持``Linux``,``macOS``,``Windows``,``Android``等操作系统。 * ``Windows``版以[windows](https://github.com/sogou/workflow/tree/windows)分支发布,使用``iocp``实现异步网络。用户接口与``Linux``版一致。 @@ -48,7 +46,8 @@ int main() * 不喜欢SSL的用户可以使用[nossl](https://github.com/sogou/workflow/tree/nossl)分支,代码更简洁。但仍需链接``crypto``。 * 项目使用了``C++11``标准,需要用支持``C++11``的编译器编译。但不依赖``boost``或``asio``。 * 项目无其它依赖。如需使用``kafka``协议,需自行安装``lz4``,``zstd``和``snappy``几个压缩库。 -* 快速开始(Linux): + +#### 快速开始(Linux): ~~~sh $ git clone https://github.com/sogou/workflow $ cd workflow @@ -57,7 +56,7 @@ $ cd tutorial $ make ~~~ -# 试一下! +# 示例教程 * Client基础 * [创建第一个任务:wget](docs/tutorial-01-wget.md) * [实现一次redis写入与读出:redis_cli](docs/tutorial-02-redis_cli.md) From 8e1268d272d6c8e7da754c6699789a10933f7388 Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Sat, 1 May 2021 19:43:19 +0800 Subject: [PATCH 26/53] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 9dbe2fb8..16b23f10 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ int main() * Uses the `C++11` standard and therefore, it should be compiled with a compiler which supports `C++11`. Does not rely on `boost` or `asio`. * No other dependencies. However, if you need `Kafka` protocol, some compression libraries should be installed, including `lz4`, `zstd` and `snappy`. -### Get started (Linux): +### Get started (Linux, macOS): ~~~sh $ git clone https://github.com/sogou/workflow $ make From baec95aa426ce7058d80970265697f71c9f3f786 Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Sat, 1 May 2021 19:43:45 +0800 Subject: [PATCH 27/53] Update README_cn.md --- README_cn.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README_cn.md b/README_cn.md index e53a0642..63c8dca7 100644 --- a/README_cn.md +++ b/README_cn.md @@ -47,7 +47,7 @@ int main() * 项目使用了``C++11``标准,需要用支持``C++11``的编译器编译。但不依赖``boost``或``asio``。 * 项目无其它依赖。如需使用``kafka``协议,需自行安装``lz4``,``zstd``和``snappy``几个压缩库。 -#### 快速开始(Linux): +#### 快速开始(Linux, maxOS): ~~~sh $ git clone https://github.com/sogou/workflow $ cd workflow From f82491fdefb805d90c4de58c69f5e78bbc189ab5 Mon Sep 17 00:00:00 2001 From: XieHan Date: Mon, 3 May 2021 00:51:57 +0800 Subject: [PATCH 28/53] optimize poller on SSL --- src/kernel/poller.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/kernel/poller.c b/src/kernel/poller.c index 7bd33449..bb395b4b 100644 --- a/src/kernel/poller.c +++ b/src/kernel/poller.c @@ -383,6 +383,9 @@ static int __poller_handle_ssl_error(struct __poller_node *node, int ret, return -1; } + if (event == node->event) + return 0; + pthread_mutex_lock(&poller->mutex); if (!node->removed) { From 8e2f36a43d85ccfb29884121f974489a1f60a213 Mon Sep 17 00:00:00 2001 From: XieHan Date: Wed, 5 May 2021 02:18:55 +0800 Subject: [PATCH 29/53] add SSLWrapper --- src/protocol/CMakeLists.txt | 1 + src/protocol/ProtocolMessage.h | 2 +- src/protocol/SSLWrapper.cc | 123 +++++++++++++++++++++++++++++++++ src/protocol/SSLWrapper.h | 54 +++++++++++++++ 4 files changed, 179 insertions(+), 1 deletion(-) create mode 100644 src/protocol/SSLWrapper.cc create mode 100644 src/protocol/SSLWrapper.h diff --git a/src/protocol/CMakeLists.txt b/src/protocol/CMakeLists.txt index 3ddbaa81..a476c510 100644 --- a/src/protocol/CMakeLists.txt +++ b/src/protocol/CMakeLists.txt @@ -12,6 +12,7 @@ set(SRC HttpMessage.cc RedisMessage.cc HttpUtil.cc + SSLWrapper.cc ) add_library(${PROJECT_NAME} OBJECT ${SRC}) diff --git a/src/protocol/ProtocolMessage.h b/src/protocol/ProtocolMessage.h index 0366a527..020b1c4d 100644 --- a/src/protocol/ProtocolMessage.h +++ b/src/protocol/ProtocolMessage.h @@ -33,7 +33,7 @@ namespace protocol class ProtocolMessage : public CommMessageOut, public CommMessageIn { -private: +public: virtual int encode(struct iovec vectors[], int max) { errno = ENOSYS; diff --git a/src/protocol/SSLWrapper.cc b/src/protocol/SSLWrapper.cc new file mode 100644 index 00000000..88c0431d --- /dev/null +++ b/src/protocol/SSLWrapper.cc @@ -0,0 +1,123 @@ +/* + Copyright (c) 2021 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Author: Xie Han (xiehan@sogou-inc.com) +*/ + +#include +#include +#include +#include +#include "SSLWrapper.h" + +namespace protocol +{ + +int SSLWrapper::encode(struct iovec vectors[], int iovcnt) +{ + BIO *bio = SSL_get_wbio(this->ssl); + struct iovec *iov; + void *buf; + int ret; + + ret = this->msg->encode(vectors, iovcnt); + if ((unsigned int)ret > (unsigned int)iovcnt) + return ret; + + for (iov = vectors; iov < vectors + ret; iov++) + { + if (iov->iov_len > 0) + { + ret = SSL_write(this->ssl, iov->iov_base, iov->iov_len); + if (ret <= 0) + { + ret = SSL_get_error(this->ssl, ret); + if (ret != SSL_ERROR_SYSCALL) + errno = -ret; + + return -1; + } + } + } + + ret = BIO_pending(bio); + if (ret <= 0) + return ret; + + buf = malloc(ret); + if (buf) + { + ret = BIO_read(bio, buf, ret); + if (ret > 0) + { + free(this->buf); + this->buf = buf; + vectors[0].iov_base = buf; + vectors[0].iov_len = ret; + return 1; + } + + free(buf); + } + + return -1; +} + +#define BUFSIZE 8192 + +int SSLWrapper::append(const void *buf, size_t *size) +{ + BIO *bio = SSL_get_rbio(this->ssl); + char rbuf[BUFSIZE]; + size_t nleft; + size_t n; + int ret; + + ret = BIO_write(bio, buf, *size); + if (ret > 0) + { + *size = ret; + while ((ret = SSL_read(this->ssl, rbuf, BUFSIZE)) > 0) + { + buf = rbuf; + nleft = ret; + do + { + n = nleft; + ret = this->msg->append(buf, &n); + if (ret == 0) + { + buf = (char *)buf + n; + nleft -= n; + } + else + return ret; + + } while (nleft > 0); + } + + ret = SSL_get_error(this->ssl, ret); + if (ret == SSL_ERROR_WANT_READ) + return 0; + + if (ret != SSL_ERROR_SYSCALL) + errno = -ret; + } + + return -1; +} + +} + diff --git a/src/protocol/SSLWrapper.h b/src/protocol/SSLWrapper.h new file mode 100644 index 00000000..96961af9 --- /dev/null +++ b/src/protocol/SSLWrapper.h @@ -0,0 +1,54 @@ +/* + Copyright (c) 2021 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Author: Xie Han (xiehan@sogou-inc.com) +*/ + +#ifndef _SSLWRAPPER_H_ +#define _SSLWRAPPER_H_ + +#include +#include +#include "ProtocolMessage.h" + +namespace protocol +{ + +class SSLWrapper : public ProtocolMessage +{ +protected: + virtual int encode(struct iovec vectors[], int max); + virtual int append(const void *buf, size_t *size); + +protected: + ProtocolMessage *msg; + SSL *ssl; + void *buf; + +public: + SSLWrapper(ProtocolMessage *msg, SSL *ssl) + { + this->msg = msg; + this->ssl = ssl; + this->buf = NULL; + } + + virtual ~SSLWrapper() { free(this->buf); } +}; + +} + +#endif + From 0f1efb8f52780c4d761b80c9119e52d07fde6d26 Mon Sep 17 00:00:00 2001 From: XieHan Date: Wed, 5 May 2021 02:22:45 +0800 Subject: [PATCH 30/53] add include stdlib.h --- src/protocol/SSLWrapper.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/protocol/SSLWrapper.cc b/src/protocol/SSLWrapper.cc index 88c0431d..74138f0e 100644 --- a/src/protocol/SSLWrapper.cc +++ b/src/protocol/SSLWrapper.cc @@ -17,6 +17,7 @@ */ #include +#include #include #include #include From 497fc5fd557e64f0211e53bf935243d8d88f1730 Mon Sep 17 00:00:00 2001 From: XieHan Date: Wed, 5 May 2021 02:39:28 +0800 Subject: [PATCH 31/53] update code --- src/protocol/SSLWrapper.cc | 55 +++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/src/protocol/SSLWrapper.cc b/src/protocol/SSLWrapper.cc index 74138f0e..32f3cb64 100644 --- a/src/protocol/SSLWrapper.cc +++ b/src/protocol/SSLWrapper.cc @@ -87,37 +87,42 @@ int SSLWrapper::append(const void *buf, size_t *size) int ret; ret = BIO_write(bio, buf, *size); - if (ret > 0) + if (ret <= 0) + return -1; + + *size = ret; + while ((ret = SSL_read(this->ssl, rbuf, BUFSIZE)) > 0) { - *size = ret; - while ((ret = SSL_read(this->ssl, rbuf, BUFSIZE)) > 0) + buf = rbuf; + nleft = ret; + do { - buf = rbuf; - nleft = ret; - do + n = nleft; + ret = this->msg->append(buf, &n); + if (ret == 0) { - n = nleft; - ret = this->msg->append(buf, &n); - if (ret == 0) - { - buf = (char *)buf + n; - nleft -= n; - } - else - return ret; - - } while (nleft > 0); - } + buf = (char *)buf + n; + nleft -= n; + } + else + return ret; - ret = SSL_get_error(this->ssl, ret); - if (ret == SSL_ERROR_WANT_READ) - return 0; - - if (ret != SSL_ERROR_SYSCALL) - errno = -ret; + } while (nleft > 0); } - return -1; + if (ret < 0) + { + ret = SSL_get_error(this->ssl, ret); + if (ret != SSL_ERROR_WANT_READ) + { + if (ret != SSL_ERROR_SYSCALL) + errno = -ret; + + return -1; + } + } + + return 0; } } From 3026f3a3492804ce7c44fb0dd58538a0634b53b0 Mon Sep 17 00:00:00 2001 From: XieHan Date: Wed, 5 May 2021 02:45:06 +0800 Subject: [PATCH 32/53] iovcnt->max --- src/protocol/SSLWrapper.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/protocol/SSLWrapper.cc b/src/protocol/SSLWrapper.cc index 32f3cb64..2ec92909 100644 --- a/src/protocol/SSLWrapper.cc +++ b/src/protocol/SSLWrapper.cc @@ -26,15 +26,15 @@ namespace protocol { -int SSLWrapper::encode(struct iovec vectors[], int iovcnt) +int SSLWrapper::encode(struct iovec vectors[], int max) { BIO *bio = SSL_get_wbio(this->ssl); struct iovec *iov; void *buf; int ret; - ret = this->msg->encode(vectors, iovcnt); - if ((unsigned int)ret > (unsigned int)iovcnt) + ret = this->msg->encode(vectors, max); + if ((unsigned int)ret > (unsigned int)max) return ret; for (iov = vectors; iov < vectors + ret; iov++) From cd0cb94f2b743583a669736b4c3388cc89f334e6 Mon Sep 17 00:00:00 2001 From: XieHan Date: Sat, 8 May 2021 17:42:49 +0800 Subject: [PATCH 33/53] add tutoria11-graph_task.cc --- tutorial/CMakeLists.txt | 1 + tutorial/tutorial-11-graph_task.cc | 97 ++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+) create mode 100644 tutorial/tutorial-11-graph_task.cc diff --git a/tutorial/CMakeLists.txt b/tutorial/CMakeLists.txt index a03ac883..56593d01 100644 --- a/tutorial/CMakeLists.txt +++ b/tutorial/CMakeLists.txt @@ -33,6 +33,7 @@ set(TUTORIAL_LIST tutorial-07-sort_task tutorial-08-matrix_multiply tutorial-09-http_file_server + tutorial-11-graph_task tutorial-12-mysql_cli ) diff --git a/tutorial/tutorial-11-graph_task.cc b/tutorial/tutorial-11-graph_task.cc new file mode 100644 index 00000000..b702852b --- /dev/null +++ b/tutorial/tutorial-11-graph_task.cc @@ -0,0 +1,97 @@ +/* + Copyright (c) 2021 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Author: Xie Han (xiehan@sogou-inc.com;63350856@qq.com) +*/ + +#include +#include "workflow/WFTaskFactory.h" +#include "workflow/WFGraphTask.h" +#include "workflow/HttpMessage.h" +#include "workflow/WFFacilities.h" + +using namespace protocol; + +static WFFacilities::WaitGroup wait_group(1); + +void go_func(size_t *size1, size_t *size2) +{ + printf("page1 size1 = %zu, page2 size = %zu\n", *size1, *size2); +} + +void http_callback(WFHttpTask *task) +{ + size_t *size = (size_t *)task->user_data; + const void *body; + + if (task->get_state() == WFT_STATE_SUCCESS) + task->get_resp()->get_parsed_body(&body, size); + else + *size = (size_t)-1; +} + +#define REDIRECT_MAX 3 +#define RETRY_MAX 1 + +int main(void) +{ + WFTimerTask *timer; + WFHttpTask *http_task1; + WFHttpTask *http_task2; + WFGoTask *go_task; + size_t size1; + size_t size2; + + timer = WFTaskFactory::create_timer_task(1000000, [](WFTimerTask *) { + printf("timer task complete(1s).\n"); + }); + + /* Http task1 */ + http_task1 = WFTaskFactory::create_http_task("https://www.sogou.com/", + REDIRECT_MAX, RETRY_MAX, + http_callback); + http_task1->user_data = &size1; + + /* Http task2 */ + http_task2 = WFTaskFactory::create_http_task("https://www.baidu.com/", + REDIRECT_MAX, RETRY_MAX, + http_callback); + http_task2->user_data = &size2; + + /* go task will print the http pages's size */ + go_task = WFTaskFactory::create_go_task("go", go_func, &size1, &size2); + + /* Greate a graph. graph is also a kind of task */ + WFGraphTask *graph = WFTaskFactory::create_graph_task([](WFGraphTask *) { + printf("Graph task complete. Wakeup main process\n"); + wait_group.done(); + }); + + /* Greate graph nodes */ + WFGraphNode& a = graph->create_graph_node(timer); + WFGraphNode& b = graph->create_graph_node(http_task1); + WFGraphNode& c = graph->create_graph_node(http_task2); + WFGraphNode& d = graph->create_graph_node(go_task); + + /* Build the graph */ + a-->b; + a-->c; + b-->d; + c-->d; + + graph->start(); + wait_group.wait(); +} + From 28ec2945a63de2131e10d7b000297ae83135c535 Mon Sep 17 00:00:00 2001 From: XieHan Date: Sat, 8 May 2021 17:49:10 +0800 Subject: [PATCH 34/53] add tutorial-11-graph_task --- tutorial/CMakeLists.txt | 1 + tutorial/tutorial-11-graph_task.cc | 97 ++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+) create mode 100644 tutorial/tutorial-11-graph_task.cc diff --git a/tutorial/CMakeLists.txt b/tutorial/CMakeLists.txt index a03ac883..56593d01 100644 --- a/tutorial/CMakeLists.txt +++ b/tutorial/CMakeLists.txt @@ -33,6 +33,7 @@ set(TUTORIAL_LIST tutorial-07-sort_task tutorial-08-matrix_multiply tutorial-09-http_file_server + tutorial-11-graph_task tutorial-12-mysql_cli ) diff --git a/tutorial/tutorial-11-graph_task.cc b/tutorial/tutorial-11-graph_task.cc new file mode 100644 index 00000000..b702852b --- /dev/null +++ b/tutorial/tutorial-11-graph_task.cc @@ -0,0 +1,97 @@ +/* + Copyright (c) 2021 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Author: Xie Han (xiehan@sogou-inc.com;63350856@qq.com) +*/ + +#include +#include "workflow/WFTaskFactory.h" +#include "workflow/WFGraphTask.h" +#include "workflow/HttpMessage.h" +#include "workflow/WFFacilities.h" + +using namespace protocol; + +static WFFacilities::WaitGroup wait_group(1); + +void go_func(size_t *size1, size_t *size2) +{ + printf("page1 size1 = %zu, page2 size = %zu\n", *size1, *size2); +} + +void http_callback(WFHttpTask *task) +{ + size_t *size = (size_t *)task->user_data; + const void *body; + + if (task->get_state() == WFT_STATE_SUCCESS) + task->get_resp()->get_parsed_body(&body, size); + else + *size = (size_t)-1; +} + +#define REDIRECT_MAX 3 +#define RETRY_MAX 1 + +int main(void) +{ + WFTimerTask *timer; + WFHttpTask *http_task1; + WFHttpTask *http_task2; + WFGoTask *go_task; + size_t size1; + size_t size2; + + timer = WFTaskFactory::create_timer_task(1000000, [](WFTimerTask *) { + printf("timer task complete(1s).\n"); + }); + + /* Http task1 */ + http_task1 = WFTaskFactory::create_http_task("https://www.sogou.com/", + REDIRECT_MAX, RETRY_MAX, + http_callback); + http_task1->user_data = &size1; + + /* Http task2 */ + http_task2 = WFTaskFactory::create_http_task("https://www.baidu.com/", + REDIRECT_MAX, RETRY_MAX, + http_callback); + http_task2->user_data = &size2; + + /* go task will print the http pages's size */ + go_task = WFTaskFactory::create_go_task("go", go_func, &size1, &size2); + + /* Greate a graph. graph is also a kind of task */ + WFGraphTask *graph = WFTaskFactory::create_graph_task([](WFGraphTask *) { + printf("Graph task complete. Wakeup main process\n"); + wait_group.done(); + }); + + /* Greate graph nodes */ + WFGraphNode& a = graph->create_graph_node(timer); + WFGraphNode& b = graph->create_graph_node(http_task1); + WFGraphNode& c = graph->create_graph_node(http_task2); + WFGraphNode& d = graph->create_graph_node(go_task); + + /* Build the graph */ + a-->b; + a-->c; + b-->d; + c-->d; + + graph->start(); + wait_group.wait(); +} + From 7f514dab2e63cb6fb1da8c5da29200f048ce76d0 Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Sat, 8 May 2021 17:51:01 +0800 Subject: [PATCH 35/53] Update tutorial-11-graph_task.cc --- tutorial/tutorial-11-graph_task.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tutorial/tutorial-11-graph_task.cc b/tutorial/tutorial-11-graph_task.cc index b702852b..aecfcbd8 100644 --- a/tutorial/tutorial-11-graph_task.cc +++ b/tutorial/tutorial-11-graph_task.cc @@ -28,7 +28,7 @@ static WFFacilities::WaitGroup wait_group(1); void go_func(size_t *size1, size_t *size2) { - printf("page1 size1 = %zu, page2 size = %zu\n", *size1, *size2); + printf("page1 size = %zu, page2 size = %zu\n", *size1, *size2); } void http_callback(WFHttpTask *task) From 43206527055cb5679c389723a861c2416a231521 Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Sat, 8 May 2021 18:03:03 +0800 Subject: [PATCH 36/53] Update tutorial-11-graph_task.cc --- tutorial/tutorial-11-graph_task.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/tutorial/tutorial-11-graph_task.cc b/tutorial/tutorial-11-graph_task.cc index aecfcbd8..e031d985 100644 --- a/tutorial/tutorial-11-graph_task.cc +++ b/tutorial/tutorial-11-graph_task.cc @@ -93,5 +93,6 @@ int main(void) graph->start(); wait_group.wait(); + return 0; } From ec9055f90a7f42f289b5bcbb460dd8ee46d0e391 Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Sat, 8 May 2021 23:30:43 +0800 Subject: [PATCH 37/53] Fix typo. --- tutorial/tutorial-11-graph_task.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tutorial/tutorial-11-graph_task.cc b/tutorial/tutorial-11-graph_task.cc index e031d985..febdedc8 100644 --- a/tutorial/tutorial-11-graph_task.cc +++ b/tutorial/tutorial-11-graph_task.cc @@ -73,7 +73,7 @@ int main(void) /* go task will print the http pages's size */ go_task = WFTaskFactory::create_go_task("go", go_func, &size1, &size2); - /* Greate a graph. graph is also a kind of task */ + /* Create a graph. Graph is also a kind of task */ WFGraphTask *graph = WFTaskFactory::create_graph_task([](WFGraphTask *) { printf("Graph task complete. Wakeup main process\n"); wait_group.done(); From d5416f778b9de027af5b2403452b7013cc00dba8 Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Sun, 9 May 2021 01:38:43 +0800 Subject: [PATCH 38/53] Fix typo. --- tutorial/tutorial-11-graph_task.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tutorial/tutorial-11-graph_task.cc b/tutorial/tutorial-11-graph_task.cc index febdedc8..d552d808 100644 --- a/tutorial/tutorial-11-graph_task.cc +++ b/tutorial/tutorial-11-graph_task.cc @@ -79,7 +79,7 @@ int main(void) wait_group.done(); }); - /* Greate graph nodes */ + /* Create graph nodes */ WFGraphNode& a = graph->create_graph_node(timer); WFGraphNode& b = graph->create_graph_node(http_task1); WFGraphNode& c = graph->create_graph_node(http_task2); From c705910b2fd3c603bc6ceac968383cdd104b5181 Mon Sep 17 00:00:00 2001 From: "junfeng.fj" Date: Sun, 9 May 2021 21:28:36 +0800 Subject: [PATCH 39/53] optimize tutorial-11 --- tutorial/tutorial-11-graph_task.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tutorial/tutorial-11-graph_task.cc b/tutorial/tutorial-11-graph_task.cc index d552d808..2f44e27a 100644 --- a/tutorial/tutorial-11-graph_task.cc +++ b/tutorial/tutorial-11-graph_task.cc @@ -26,7 +26,7 @@ using namespace protocol; static WFFacilities::WaitGroup wait_group(1); -void go_func(size_t *size1, size_t *size2) +void go_func(const size_t *size1, const size_t *size2) { printf("page1 size = %zu, page2 size = %zu\n", *size1, *size2); } @@ -45,7 +45,7 @@ void http_callback(WFHttpTask *task) #define REDIRECT_MAX 3 #define RETRY_MAX 1 -int main(void) +int main() { WFTimerTask *timer; WFHttpTask *http_task1; @@ -70,7 +70,7 @@ int main(void) http_callback); http_task2->user_data = &size2; - /* go task will print the http pages's size */ + /* go task will print the http pages size */ go_task = WFTaskFactory::create_go_task("go", go_func, &size1, &size2); /* Create a graph. Graph is also a kind of task */ From 12cb6a9fc6fa1fc11f0cdcb4f5a3915254faeacc Mon Sep 17 00:00:00 2001 From: XieHan Date: Mon, 10 May 2021 18:49:13 +0800 Subject: [PATCH 40/53] remove SSLWrapper.cc from CMakeLists --- src/protocol/CMakeLists.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/src/protocol/CMakeLists.txt b/src/protocol/CMakeLists.txt index a476c510..3ddbaa81 100644 --- a/src/protocol/CMakeLists.txt +++ b/src/protocol/CMakeLists.txt @@ -12,7 +12,6 @@ set(SRC HttpMessage.cc RedisMessage.cc HttpUtil.cc - SSLWrapper.cc ) add_library(${PROJECT_NAME} OBJECT ${SRC}) From 566360683be921b10f1fd76587cff0e633492121 Mon Sep 17 00:00:00 2001 From: shopee-jin <40892357+shopee-jin@users.noreply.github.com> Date: Mon, 10 May 2021 21:10:44 +0800 Subject: [PATCH 41/53] Fix Keep-Alive timeout unit (multiple by 1000) --- src/factory/HttpTaskImpl.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/factory/HttpTaskImpl.cc b/src/factory/HttpTaskImpl.cc index c07272cf..1b1d9b2b 100644 --- a/src/factory/HttpTaskImpl.cc +++ b/src/factory/HttpTaskImpl.cc @@ -517,7 +517,8 @@ CommMessageOut *WFHttpServerTask::message_out() if (!(flag & 1) && strcasecmp(key.c_str(), "timeout") == 0) { flag |= 1; - this->keep_alive_timeo = atoi(val.c_str()); + // keep_alive_timeo = 5000ms when Keep-Alive: timeout=5 + this->keep_alive_timeo = atoi(val.c_str()) * 1000; if (flag == 3) break; } From 34810cc03e3d3b02918a18404000422f68030b25 Mon Sep 17 00:00:00 2001 From: XieHan Date: Mon, 10 May 2021 22:43:45 +0800 Subject: [PATCH 42/53] fix another keep alive timeout --- src/factory/HttpTaskImpl.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/factory/HttpTaskImpl.cc b/src/factory/HttpTaskImpl.cc index 1b1d9b2b..e9366223 100644 --- a/src/factory/HttpTaskImpl.cc +++ b/src/factory/HttpTaskImpl.cc @@ -149,7 +149,7 @@ CommMessageOut *ComplexHttpTask::message_out() std::string val = StringUtil::strip(arr[1]); if (strcasecmp(key.c_str(), "timeout") == 0) { - this->keep_alive_timeo = atoi(val.c_str()); + this->keep_alive_timeo = 1000 * atoi(val.c_str()); break; } } @@ -518,7 +518,7 @@ CommMessageOut *WFHttpServerTask::message_out() { flag |= 1; // keep_alive_timeo = 5000ms when Keep-Alive: timeout=5 - this->keep_alive_timeo = atoi(val.c_str()) * 1000; + this->keep_alive_timeo = 1000 * atoi(val.c_str()); if (flag == 3) break; } From 05158644109f7a35823818d1c3f2b43dd6f89459 Mon Sep 17 00:00:00 2001 From: XieHan Date: Tue, 11 May 2021 20:02:15 +0800 Subject: [PATCH 43/53] replace BIO_read with BIO_get_mem_data --- src/protocol/SSLWrapper.cc | 32 +++++++++++--------------------- src/protocol/SSLWrapper.h | 5 ----- 2 files changed, 11 insertions(+), 26 deletions(-) diff --git a/src/protocol/SSLWrapper.cc b/src/protocol/SSLWrapper.cc index 2ec92909..a6ddaa50 100644 --- a/src/protocol/SSLWrapper.cc +++ b/src/protocol/SSLWrapper.cc @@ -17,7 +17,6 @@ */ #include -#include #include #include #include @@ -30,7 +29,8 @@ int SSLWrapper::encode(struct iovec vectors[], int max) { BIO *bio = SSL_get_wbio(this->ssl); struct iovec *iov; - void *buf; + char *ptr; + long len; int ret; ret = this->msg->encode(vectors, max); @@ -53,27 +53,17 @@ int SSLWrapper::encode(struct iovec vectors[], int max) } } - ret = BIO_pending(bio); - if (ret <= 0) - return ret; - - buf = malloc(ret); - if (buf) + len = BIO_get_mem_data(bio, &ptr); + if (len > 0) { - ret = BIO_read(bio, buf, ret); - if (ret > 0) - { - free(this->buf); - this->buf = buf; - vectors[0].iov_base = buf; - vectors[0].iov_len = ret; - return 1; - } - - free(buf); + vectors[0].iov_base = ptr; + vectors[0].iov_len = len; + return 1; } - - return -1; + else if (len == 0) + return 0; + else + return -1; } #define BUFSIZE 8192 diff --git a/src/protocol/SSLWrapper.h b/src/protocol/SSLWrapper.h index 96961af9..35505542 100644 --- a/src/protocol/SSLWrapper.h +++ b/src/protocol/SSLWrapper.h @@ -19,7 +19,6 @@ #ifndef _SSLWRAPPER_H_ #define _SSLWRAPPER_H_ -#include #include #include "ProtocolMessage.h" @@ -35,17 +34,13 @@ protected: protected: ProtocolMessage *msg; SSL *ssl; - void *buf; public: SSLWrapper(ProtocolMessage *msg, SSL *ssl) { this->msg = msg; this->ssl = ssl; - this->buf = NULL; } - - virtual ~SSLWrapper() { free(this->buf); } }; } From 415c03c188f7b5ed307df09f3b8fe5be76ef580c Mon Sep 17 00:00:00 2001 From: XieHan Date: Tue, 11 May 2021 20:37:57 +0800 Subject: [PATCH 44/53] fix SSLWrapper::encode bug --- src/protocol/SSLWrapper.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/protocol/SSLWrapper.cc b/src/protocol/SSLWrapper.cc index a6ddaa50..4ceaffaa 100644 --- a/src/protocol/SSLWrapper.cc +++ b/src/protocol/SSLWrapper.cc @@ -37,7 +37,8 @@ int SSLWrapper::encode(struct iovec vectors[], int max) if ((unsigned int)ret > (unsigned int)max) return ret; - for (iov = vectors; iov < vectors + ret; iov++) + max = ret; + for (iov = vectors; iov < vectors + max; iov++) { if (iov->iov_len > 0) { From 1f3d48e2148c864710eaf715ba9fdde09a2726e6 Mon Sep 17 00:00:00 2001 From: XieHan Date: Wed, 12 May 2021 20:49:10 +0800 Subject: [PATCH 45/53] add SSLHandshaker --- src/protocol/SSLWrapper.cc | 60 ++++++++++++++++++++++++++++++++++++++ src/protocol/SSLWrapper.h | 13 +++++++++ 2 files changed, 73 insertions(+) diff --git a/src/protocol/SSLWrapper.cc b/src/protocol/SSLWrapper.cc index 4ceaffaa..64616003 100644 --- a/src/protocol/SSLWrapper.cc +++ b/src/protocol/SSLWrapper.cc @@ -25,6 +25,63 @@ namespace protocol { +int SSLHandshaker::encode(struct iovec vectors[], int max) +{ + BIO *bio = SSL_get_wbio(this->ssl); + char *ptr; + long len; + + if (BIO_reset(bio) <= 0) + return -1; + + if (SSL_do_handshake(this->ssl) < 0) + return -1; + + len = BIO_get_mem_data(bio, &ptr); + if (len > 0) + { + vectors[0].iov_base = ptr; + vectors[0].iov_len = len; + return 1; + } + else if (len == 0) + return 0; + else + return -1; +} + +int SSLHandshaker::append(const void *buf, size_t *size) +{ + BIO *rbio = SSL_get_rbio(this->ssl); + BIO *wbio = SSL_get_wbio(this->ssl); + char *ptr; + long len; + int ret; + + if (BIO_write(rbio, buf, *size) <= 0) + return -1; + + BIO_reset(wbio); + ret = SSL_do_handshake(this->ssl); + if (ret > 0) + return 1; + + ret = SSL_get_error(this->ssl, ret); + if (ret != SSL_ERROR_WANT_READ) + { + if (ret != SSL_ERROR_SYSCALL) + errno = -ret; + + return -1; + } + + len = BIO_get_mem_data(wbio, &ptr); + if (len < 0 || this->feedback(ptr, len) < 0) + return -1; + + return 0; +} + int SSLWrapper::encode(struct iovec vectors[], int max) { BIO *bio = SSL_get_wbio(this->ssl); @@ -37,6 +94,9 @@ int SSLWrapper::encode(struct iovec vectors[], int max) if ((unsigned int)ret > (unsigned int)max) return ret; + if (BIO_reset(bio) <= 0) + return -1; + max = ret; for (iov = vectors; iov < vectors + max; iov++) { diff --git a/src/protocol/SSLWrapper.h b/src/protocol/SSLWrapper.h index 35505542..34774d9a 100644 --- a/src/protocol/SSLWrapper.h +++ b/src/protocol/SSLWrapper.h @@ -25,6 +25,19 @@ namespace protocol { +class SSLHandshaker : public ProtocolMessage +{ +protected: + virtual int encode(struct iovec vectors[], int max); + virtual int append(const void *buf, size_t *size); + +protected: + SSL *ssl; + +public: + SSLHandshaker(SSL *ssl) { this->ssl = ssl; } +}; + class SSLWrapper : public ProtocolMessage { protected: From 86c9097d2e944f31cf1f6beadb1323f0c2cdda75 Mon Sep 17 00:00:00 2001 From: XieHan Date: Wed, 12 May 2021 23:21:34 +0800 Subject: [PATCH 46/53] fix SSLhandshaker::append --- src/protocol/CMakeLists.txt | 1 + src/protocol/SSLWrapper.cc | 63 +++++++++++++++++++++++-------------- 2 files changed, 41 insertions(+), 23 deletions(-) diff --git a/src/protocol/CMakeLists.txt b/src/protocol/CMakeLists.txt index 3ddbaa81..a476c510 100644 --- a/src/protocol/CMakeLists.txt +++ b/src/protocol/CMakeLists.txt @@ -12,6 +12,7 @@ set(SRC HttpMessage.cc RedisMessage.cc HttpUtil.cc + SSLWrapper.cc ) add_library(${PROJECT_NAME} OBJECT ${SRC}) diff --git a/src/protocol/SSLWrapper.cc b/src/protocol/SSLWrapper.cc index 64616003..e717a6b7 100644 --- a/src/protocol/SSLWrapper.cc +++ b/src/protocol/SSLWrapper.cc @@ -27,17 +27,17 @@ namespace protocol int SSLHandshaker::encode(struct iovec vectors[], int max) { - BIO *bio = SSL_get_wbio(this->ssl); + BIO *wbio = SSL_get_wbio(this->ssl); char *ptr; long len; - if (BIO_reset(bio) <= 0) + if (BIO_reset(wbio) <= 0) return -1; if (SSL_do_handshake(this->ssl) < 0) return -1; - len = BIO_get_mem_data(bio, &ptr); + len = BIO_get_mem_data(wbio, &ptr); if (len > 0) { vectors[0].iov_base = ptr; @@ -58,45 +58,62 @@ int SSLHandshaker::append(const void *buf, size_t *size) long len; int ret; - if (BIO_write(rbio, buf, *size) <= 0) + if (BIO_reset(wbio) <= 0) return -1; - BIO_reset(wbio); + ret = BIO_write(rbio, buf, *size); + if (ret <= 0) + return -1; + + *size = ret; ret = SSL_do_handshake(this->ssl); - if (ret > 0) - return 1; - - ret = SSL_get_error(this->ssl, ret); - if (ret != SSL_ERROR_WANT_READ) + if (ret <= 0) { - if (ret != SSL_ERROR_SYSCALL) - errno = -ret; + ret = SSL_get_error(this->ssl, ret); + if (ret != SSL_ERROR_WANT_READ) + { + if (ret != SSL_ERROR_SYSCALL) + errno = -ret; - return -1; + return -1; + } + + ret = 0; } len = BIO_get_mem_data(wbio, &ptr); - if (len < 0 || this->feedback(ptr, len) < 0) - return -1; + if (len >= 0) + { + long n = 0; - return 0; + if (len > 0) + n = this->feedback(ptr, len); + + if (n == len) + return ret; + + if (n >= 0) + errno = EAGAIN; + } + + return -1; } int SSLWrapper::encode(struct iovec vectors[], int max) { - BIO *bio = SSL_get_wbio(this->ssl); + BIO *wbio = SSL_get_wbio(this->ssl); struct iovec *iov; char *ptr; long len; int ret; + if (BIO_reset(wbio) <= 0) + return -1; + ret = this->msg->encode(vectors, max); if ((unsigned int)ret > (unsigned int)max) return ret; - if (BIO_reset(bio) <= 0) - return -1; - max = ret; for (iov = vectors; iov < vectors + max; iov++) { @@ -114,7 +131,7 @@ int SSLWrapper::encode(struct iovec vectors[], int max) } } - len = BIO_get_mem_data(bio, &ptr); + len = BIO_get_mem_data(wbio, &ptr); if (len > 0) { vectors[0].iov_base = ptr; @@ -131,13 +148,13 @@ int SSLWrapper::encode(struct iovec vectors[], int max) int SSLWrapper::append(const void *buf, size_t *size) { - BIO *bio = SSL_get_rbio(this->ssl); + BIO *rbio = SSL_get_rbio(this->ssl); char rbuf[BUFSIZE]; size_t nleft; size_t n; int ret; - ret = BIO_write(bio, buf, *size); + ret = BIO_write(rbio, buf, *size); if (ret <= 0) return -1; From 2a431d479a3d32a0773886d636e14f62d20cbaaf Mon Sep 17 00:00:00 2001 From: XieHan Date: Thu, 13 May 2021 00:50:18 +0800 Subject: [PATCH 47/53] fix SSLHandshaker::encode --- src/protocol/SSLWrapper.cc | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/protocol/SSLWrapper.cc b/src/protocol/SSLWrapper.cc index e717a6b7..3966d472 100644 --- a/src/protocol/SSLWrapper.cc +++ b/src/protocol/SSLWrapper.cc @@ -30,12 +30,23 @@ int SSLHandshaker::encode(struct iovec vectors[], int max) BIO *wbio = SSL_get_wbio(this->ssl); char *ptr; long len; + int ret; if (BIO_reset(wbio) <= 0) return -1; - if (SSL_do_handshake(this->ssl) < 0) - return -1; + ret = SSL_do_handshake(this->ssl); + if (ret <= 0) + { + ret = SSL_get_error(this->ssl, ret); + if (ret != SSL_ERROR_WANT_READ) + { + if (ret != SSL_ERROR_SYSCALL) + errno = -ret; + + return -1; + } + } len = BIO_get_mem_data(wbio, &ptr); if (len > 0) From cacbe51b16ee42743c71400542903dc068029d20 Mon Sep 17 00:00:00 2001 From: XieHan Date: Thu, 13 May 2021 17:20:44 +0800 Subject: [PATCH 48/53] simplify code. Feedback with 0 bytes is OK. --- src/protocol/SSLWrapper.cc | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/protocol/SSLWrapper.cc b/src/protocol/SSLWrapper.cc index 3966d472..96140cba 100644 --- a/src/protocol/SSLWrapper.cc +++ b/src/protocol/SSLWrapper.cc @@ -95,10 +95,7 @@ int SSLHandshaker::append(const void *buf, size_t *size) len = BIO_get_mem_data(wbio, &ptr); if (len >= 0) { - long n = 0; - - if (len > 0) - n = this->feedback(ptr, len); + long n = this->feedback(ptr, len); if (n == len) return ret; From c3a8fa63867d6f09c19a15f67541a7a91ef3415f Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Sun, 16 May 2021 16:09:47 +0800 Subject: [PATCH 49/53] Update tutorial-12-mysql_cli.md --- docs/en/tutorial-12-mysql_cli.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/tutorial-12-mysql_cli.md b/docs/en/tutorial-12-mysql_cli.md index 0476a00f..995d7f08 100644 --- a/docs/en/tutorial-12-mysql_cli.md +++ b/docs/en/tutorial-12-mysql_cli.md @@ -2,7 +2,7 @@ # Sample code -[tutorial-12-mysql\_cli.cc](../tutorial/tutorial-12-mysql_cli.cc) +[tutorial-12-mysql\_cli.cc](../../tutorial/tutorial-12-mysql_cli.cc) # About mysql\_cli From d0bc3614336fce9fde3caf4821b9185a2b3da4bb Mon Sep 17 00:00:00 2001 From: xiehan <52160700+Barenboim@users.noreply.github.com> Date: Sun, 16 May 2021 16:10:37 +0800 Subject: [PATCH 50/53] Update tutorial-12-mysql_cli.md --- docs/en/tutorial-12-mysql_cli.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/tutorial-12-mysql_cli.md b/docs/en/tutorial-12-mysql_cli.md index 995d7f08..838b9b84 100644 --- a/docs/en/tutorial-12-mysql_cli.md +++ b/docs/en/tutorial-12-mysql_cli.md @@ -2,7 +2,7 @@ # Sample code -[tutorial-12-mysql\_cli.cc](../../tutorial/tutorial-12-mysql_cli.cc) +[tutorial-12-mysql\_cli.cc](/tutorial/tutorial-12-mysql_cli.cc) # About mysql\_cli From 6a03669b9187c0d674908302edddd6c5b8557cc0 Mon Sep 17 00:00:00 2001 From: XieHan Date: Sun, 16 May 2021 19:04:39 +0800 Subject: [PATCH 51/53] update tutorials' links in docs --- docs/tutorial-01-wget.md | 2 +- docs/tutorial-02-redis_cli.md | 2 +- docs/tutorial-03-wget_to_redis.md | 2 +- docs/tutorial-04-http_echo_server.md | 2 +- docs/tutorial-05-http_proxy.md | 2 +- docs/tutorial-06-parallel_wget.md | 2 +- docs/tutorial-07-sort_task.md | 2 +- docs/tutorial-08-matrix_multiply.md | 2 +- docs/tutorial-09-http_file_server.md | 2 +- docs/tutorial-10-user_defined_protocol.md | 8 ++++---- docs/tutorial-12-mysql_cli.md | 2 +- docs/tutorial-13-kafka_cli.md | 2 +- 12 files changed, 15 insertions(+), 15 deletions(-) diff --git a/docs/tutorial-01-wget.md b/docs/tutorial-01-wget.md index 2a5514f5..7281b791 100644 --- a/docs/tutorial-01-wget.md +++ b/docs/tutorial-01-wget.md @@ -1,7 +1,7 @@ # 创建第一个任务:wget # 示例代码 -[tutorial-01-wget.cc](../tutorial/tutorial-01-wget.cc) +[tutorial-01-wget.cc](/tutorial/tutorial-01-wget.cc) # 关于wget 程序从stdin读取http/https URL,抓取网页并把内容打印到stdout,并将请求和响应的http header打印在stderr。 diff --git a/docs/tutorial-02-redis_cli.md b/docs/tutorial-02-redis_cli.md index 746d13dc..8b8883d2 100644 --- a/docs/tutorial-02-redis_cli.md +++ b/docs/tutorial-02-redis_cli.md @@ -1,7 +1,7 @@ # 实现一次redis写入与读出:redis_cli # 示例代码 -[tutorial-02-redis_cli.cc](../tutorial/tutorial-02-redis_cli.cc) +[tutorial-02-redis_cli.cc](/tutorial/tutorial-02-redis_cli.cc) # 关于redis_cli diff --git a/docs/tutorial-03-wget_to_redis.md b/docs/tutorial-03-wget_to_redis.md index b2a41d1b..b155c108 100644 --- a/docs/tutorial-03-wget_to_redis.md +++ b/docs/tutorial-03-wget_to_redis.md @@ -1,7 +1,7 @@ # 任务序列的更多功能:wget_to_redis # 示例代码 -[tutorial-03-wget_to_redis.cc](../tutorial/tutorial-03-wget_to_redis.cc) +[tutorial-03-wget_to_redis.cc](/tutorial/tutorial-03-wget_to_redis.cc) # 关于wget_to_redis diff --git a/docs/tutorial-04-http_echo_server.md b/docs/tutorial-04-http_echo_server.md index 06831c8d..2a5c1e5f 100644 --- a/docs/tutorial-04-http_echo_server.md +++ b/docs/tutorial-04-http_echo_server.md @@ -1,7 +1,7 @@ # 第一个server:http_echo_server # 示例代码 -[tutorial-04-http_echo_server.cc](../tutorial/tutorial-04-http_echo_server.cc) +[tutorial-04-http_echo_server.cc](/tutorial/tutorial-04-http_echo_server.cc) # 关于http_echo_server diff --git a/docs/tutorial-05-http_proxy.md b/docs/tutorial-05-http_proxy.md index 0c8d3320..12d86f29 100644 --- a/docs/tutorial-05-http_proxy.md +++ b/docs/tutorial-05-http_proxy.md @@ -1,7 +1,7 @@ # 异步server的示例:http_proxy # 示例代码 -[tutorial-05-http_proxy.cc](../tutorial/tutorial-05-http_proxy.cc) +[tutorial-05-http_proxy.cc](/tutorial/tutorial-05-http_proxy.cc) # 关于http_proxy diff --git a/docs/tutorial-06-parallel_wget.md b/docs/tutorial-06-parallel_wget.md index a8f92087..b8314646 100644 --- a/docs/tutorial-06-parallel_wget.md +++ b/docs/tutorial-06-parallel_wget.md @@ -1,7 +1,7 @@ # 一个简单的并行抓取:parallel_wget # 示例代码 -[tutorial-06-parallel_wget.cc](../tutorial/tutorial-06-parallel_wget.cc) +[tutorial-06-parallel_wget.cc](/tutorial/tutorial-06-parallel_wget.cc) # 关于parallel_wget diff --git a/docs/tutorial-07-sort_task.md b/docs/tutorial-07-sort_task.md index dcd3c7a9..a0d5303e 100644 --- a/docs/tutorial-07-sort_task.md +++ b/docs/tutorial-07-sort_task.md @@ -1,7 +1,7 @@ # 使用内置算法工厂:sort_task # 示例代码 -[tutorial-07-sort_task.cc](../tutorial/tutorial-07-sort_task.cc) +[tutorial-07-sort_task.cc](/tutorial/tutorial-07-sort_task.cc) # 关于sort_task diff --git a/docs/tutorial-08-matrix_multiply.md b/docs/tutorial-08-matrix_multiply.md index 9ea712f0..5a1e08d7 100644 --- a/docs/tutorial-08-matrix_multiply.md +++ b/docs/tutorial-08-matrix_multiply.md @@ -1,7 +1,7 @@ # 自定义计算任务:matrix_multiply # 示例代码 -[tutorial-08-matrix_multiply.cc](../tutorial/tutorial-08-matrix_multiply.cc) +[tutorial-08-matrix_multiply.cc](/tutorial/tutorial-08-matrix_multiply.cc) # 关于matrix_multiply diff --git a/docs/tutorial-09-http_file_server.md b/docs/tutorial-09-http_file_server.md index 0c7e5260..7672a64f 100644 --- a/docs/tutorial-09-http_file_server.md +++ b/docs/tutorial-09-http_file_server.md @@ -1,7 +1,7 @@ # 异步IO的http server:http_file_server # 示例代码 -[tutorial-09-http_file_server.cc](../tutorial/tutorial-09-http_file_server.cc) +[tutorial-09-http_file_server.cc](/tutorial/tutorial-09-http_file_server.cc) # 关于http_file_server diff --git a/docs/tutorial-10-user_defined_protocol.md b/docs/tutorial-10-user_defined_protocol.md index f54d318a..cc93f743 100644 --- a/docs/tutorial-10-user_defined_protocol.md +++ b/docs/tutorial-10-user_defined_protocol.md @@ -1,10 +1,10 @@ # 简单的用户自定义协议client/server # 示例代码 -[message.h](../tutorial/tutorial-10-user_defined_protocol/message.h) -[message.cc](../tutorial/tutorial-10-user_defined_protocol/message.cc) -[server.cc](../tutorial/tutorial-10-user_defined_protocol/server.cc) -[client.cc](../tutorial/tutorial-10-user_defined_protocol/client.cc) +[message.h](/tutorial/tutorial-10-user_defined_protocol/message.h) +[message.cc](/tutorial/tutorial-10-user_defined_protocol/message.cc) +[server.cc](/tutorial/tutorial-10-user_defined_protocol/server.cc) +[client.cc](/tutorial/tutorial-10-user_defined_protocol/client.cc) # 关于user_defined_protocol diff --git a/docs/tutorial-12-mysql_cli.md b/docs/tutorial-12-mysql_cli.md index 3198a42c..91d61c11 100644 --- a/docs/tutorial-12-mysql_cli.md +++ b/docs/tutorial-12-mysql_cli.md @@ -1,7 +1,7 @@ # 异步MySQL客户端:mysql_cli # 示例代码 -[tutorial-12-mysql_cli.cc](../tutorial/tutorial-12-mysql_cli.cc) +[tutorial-12-mysql_cli.cc](/tutorial/tutorial-12-mysql_cli.cc) # 关于mysql_cli diff --git a/docs/tutorial-13-kafka_cli.md b/docs/tutorial-13-kafka_cli.md index 41c5c4a9..d50ecc8a 100644 --- a/docs/tutorial-13-kafka_cli.md +++ b/docs/tutorial-13-kafka_cli.md @@ -1,7 +1,7 @@ # 异步Kafka客户端:kafka_cli # 示例代码 -[tutorial-13-kafka_cli.cc](../tutorial/tutorial-13-kafka_cli.cc) +[tutorial-13-kafka_cli.cc](/tutorial/tutorial-13-kafka_cli.cc) # 关于编译选项 From d101edb5a7d593faca435005ffb6259855dbe349 Mon Sep 17 00:00:00 2001 From: XieHan Date: Mon, 17 May 2021 02:32:35 +0800 Subject: [PATCH 52/53] add server's SSL wrapper --- src/protocol/SSLWrapper.cc | 47 ++++++++++++++++++++++++++------------ src/protocol/SSLWrapper.h | 21 ++++++++++++++++- 2 files changed, 52 insertions(+), 16 deletions(-) diff --git a/src/protocol/SSLWrapper.cc b/src/protocol/SSLWrapper.cc index 96140cba..e1876c72 100644 --- a/src/protocol/SSLWrapper.cc +++ b/src/protocol/SSLWrapper.cc @@ -154,31 +154,25 @@ int SSLWrapper::encode(struct iovec vectors[], int max) #define BUFSIZE 8192 -int SSLWrapper::append(const void *buf, size_t *size) +int SSLWrapper::append_message() { - BIO *rbio = SSL_get_rbio(this->ssl); - char rbuf[BUFSIZE]; - size_t nleft; - size_t n; + char buf[BUFSIZE]; int ret; - ret = BIO_write(rbio, buf, *size); - if (ret <= 0) - return -1; - - *size = ret; - while ((ret = SSL_read(this->ssl, rbuf, BUFSIZE)) > 0) + while ((ret = SSL_read(this->ssl, buf, BUFSIZE)) > 0) { - buf = rbuf; - nleft = ret; + size_t nleft = ret; + char *p = buf; + size_t n; + do { n = nleft; - ret = this->msg->append(buf, &n); + ret = this->msg->append(p, &n); if (ret == 0) { - buf = (char *)buf + n; nleft -= n; + p += n; } else return ret; @@ -201,5 +195,28 @@ int SSLWrapper::append(const void *buf, size_t *size) return 0; } +int SSLWrapper::append(const void *buf, size_t *size) +{ + BIO *rbio = SSL_get_rbio(this->ssl); + int ret; + + ret = BIO_write(rbio, buf, *size); + if (ret <= 0) + return -1; + + *size = ret; + return this->append_message(); +} + +int ServerSSLWrapper::append(const void *buf, size_t *size) +{ + int ret = this->handshaker.append(buf, size); + + if (ret > 0) + ret = this->append_message(); + + return ret; +} + } diff --git a/src/protocol/SSLWrapper.h b/src/protocol/SSLWrapper.h index 34774d9a..77dea65e 100644 --- a/src/protocol/SSLWrapper.h +++ b/src/protocol/SSLWrapper.h @@ -27,7 +27,7 @@ namespace protocol class SSLHandshaker : public ProtocolMessage { -protected: +public: virtual int encode(struct iovec vectors[], int max); virtual int append(const void *buf, size_t *size); @@ -44,6 +44,9 @@ protected: virtual int encode(struct iovec vectors[], int max); virtual int append(const void *buf, size_t *size); +protected: + int append_message(); + protected: ProtocolMessage *msg; SSL *ssl; @@ -56,6 +59,22 @@ public: } }; +class ServerSSLWrapper : public SSLWrapper +{ +protected: + virtual int append(const void *buf, size_t *size); + +protected: + SSLHandshaker handshaker; + +public: + ServerSSLWrapper(ProtocolMessage *msg, SSL *ssl) : + SSLWrapper(msg, ssl), + handshaker(ssl) + { + } +}; + } #endif From 047ed9858abbcf1c8ab5c555607c4519b9131df3 Mon Sep 17 00:00:00 2001 From: XieHan Date: Mon, 17 May 2021 02:46:46 +0800 Subject: [PATCH 53/53] Server->Service --- src/protocol/SSLWrapper.cc | 2 +- src/protocol/SSLWrapper.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/protocol/SSLWrapper.cc b/src/protocol/SSLWrapper.cc index e1876c72..edaaf7ea 100644 --- a/src/protocol/SSLWrapper.cc +++ b/src/protocol/SSLWrapper.cc @@ -208,7 +208,7 @@ int SSLWrapper::append(const void *buf, size_t *size) return this->append_message(); } -int ServerSSLWrapper::append(const void *buf, size_t *size) +int ServiceSSLWrapper::append(const void *buf, size_t *size) { int ret = this->handshaker.append(buf, size); diff --git a/src/protocol/SSLWrapper.h b/src/protocol/SSLWrapper.h index 77dea65e..768ca58d 100644 --- a/src/protocol/SSLWrapper.h +++ b/src/protocol/SSLWrapper.h @@ -59,7 +59,7 @@ public: } }; -class ServerSSLWrapper : public SSLWrapper +class ServiceSSLWrapper : public SSLWrapper { protected: virtual int append(const void *buf, size_t *size); @@ -68,7 +68,7 @@ protected: SSLHandshaker handshaker; public: - ServerSSLWrapper(ProtocolMessage *msg, SSL *ssl) : + ServiceSSLWrapper(ProtocolMessage *msg, SSL *ssl) : SSLWrapper(msg, ssl), handshaker(ssl) {