Merge pull request #1780 from sogou/chunk-win

Add WFHttpChunkedClient.
This commit is contained in:
xiehan
2025-09-17 22:25:27 +08:00
committed by GitHub
13 changed files with 715 additions and 0 deletions

View File

@@ -71,6 +71,7 @@ set(INCLUDE_HEADERS
src/protocol/DnsUtil.h
src/protocol/TLVMessage.h
src/protocol/SSLWrapper.h
src/protocol/PackageWrapper.h
src/server/WFServer.h
src/server/WFHttpServer.h
src/server/WFRedisServer.h
@@ -78,6 +79,7 @@ set(INCLUDE_HEADERS
src/server/WFDnsServer.h
src/client/WFMySQLConnection.h
src/client/WFDnsClient.h
src/client/WFHttpChunkedClient.h
src/manager/DnsCache.h
src/manager/WFGlobal.h
src/manager/UpstreamManager.h
@@ -106,6 +108,7 @@ set(INCLUDE_HEADERS
src/factory/WFResourcePool.h
src/factory/WFMessageQueue.h
src/factory/WFHttpServerTask.h
src/factory/HttpTaskImpl.inl
src/nameservice/WFNameService.h
src/nameservice/WFDnsResolver.h
src/nameservice/WFServiceGovernance.h

View File

@@ -4,6 +4,7 @@ project(client)
set(SRC
WFMySQLConnection.cc
WFDnsClient.cc
WFHttpChunkedClient.cc
)
add_library(${PROJECT_NAME} OBJECT ${SRC})

View File

@@ -0,0 +1,74 @@
/*
Copyright (c) 2025 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Xie Han (xiehan@sogou-inc.com)
*/
#include "HttpTaskImpl.inl"
#include "WFHttpChunkedClient.h"
void WFHttpChunkedTask::task_extract(protocol::HttpMessageChunk *chunk,
WFHttpTask *task)
{
auto *t = (WFHttpChunkedTask *)task->user_data;
t->chunk = chunk;
if (t->extract)
{
if (chunk || t->extract_flag)
t->extract(t);
}
}
void WFHttpChunkedTask::task_callback(WFHttpTask *task)
{
auto *t = (WFHttpChunkedTask *)task->user_data;
t->state = task->get_state();
t->error = task->get_error();
t->chunk = NULL;
if (t->callback)
t->callback(t);
t->task = NULL;
delete t;
}
WFHttpChunkedTask *
WFHttpChunkedClient::create_chunked_task(const std::string& url,
int redirect_max,
extract_t extract,
callback_t callback)
{
WFHttpTask *task = __WFHttpTaskFactory::create_chunked_task(url,
redirect_max,
WFHttpChunkedTask::task_extract,
WFHttpChunkedTask::task_callback);
return new WFHttpChunkedTask(task, std::move(extract), std::move(callback));
}
WFHttpChunkedTask *
WFHttpChunkedClient::create_chunked_task(const ParsedURI& uri,
int redirect_max,
extract_t extract,
callback_t callback)
{
WFHttpTask *task = __WFHttpTaskFactory::create_chunked_task(uri,
redirect_max,
WFHttpChunkedTask::task_extract,
WFHttpChunkedTask::task_callback);
return new WFHttpChunkedTask(task, std::move(extract), std::move(callback));
}

View File

@@ -0,0 +1,179 @@
/*
Copyright (c) 2025 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Xie Han (xiehan@sogou-inc.com)
*/
#ifndef _WFHTTPCHUNKEDCLIENT_H_
#define _WFHTTPCHUNKEDCLIENT_H_
#include <utility>
#include <functional>
#include <openssl/ssl.h>
#include "HttpMessage.h"
#include "WFTask.h"
#include "WFTaskFactory.h"
class WFHttpChunkedTask : public WFGenericTask
{
public:
protocol::HttpMessageChunk *get_chunk()
{
return this->chunk;
}
const protocol::HttpMessageChunk *get_chunk() const
{
return this->chunk;
}
public:
protocol::HttpRequest *get_req()
{
return this->task->get_req();
}
protocol::HttpResponse *get_resp()
{
return this->task->get_resp();
}
const protocol::HttpRequest *get_req() const
{
return this->task->get_req();
}
const protocol::HttpResponse *get_resp() const
{
return this->task->get_resp();
}
public:
void set_watch_timeout(int timeout)
{
this->task->set_watch_timeout(timeout);
}
void set_recv_timeout(int timeout)
{
this->task->set_receive_timeout(timeout);
}
void set_send_timeout(int timeout)
{
this->task->set_send_timeout(timeout);
}
void set_keep_alive(int timeout)
{
this->task->set_keep_alive(timeout);
}
public:
void set_ssl_ctx(SSL_CTX *ctx)
{
using HttpRequest = protocol::HttpRequest;
using HttpResponse = protocol::HttpResponse;
auto *t = (WFComplexClientTask<HttpRequest, HttpResponse> *)this->task;
t->set_ssl_ctx(ctx);
}
void extract_on_header(bool on)
{
this->extract_flag = on;
}
public:
void set_extract(std::function<void (WFHttpChunkedTask *)> ex)
{
this->extract = std::move(ex);
}
void set_callback(std::function<void (WFHttpChunkedTask *)> cb)
{
this->callback = std::move(cb);
}
public:
const WFHttpTask *get_http_task() const
{
return this->task;
}
protected:
virtual void dispatch()
{
series_of(this)->push_front(this->task);
this->subtask_done();
}
virtual SubTask *done()
{
return series_of(this)->pop();
}
protected:
static void task_extract(protocol::HttpMessageChunk *chunk,
WFHttpTask *task);
static void task_callback(WFHttpTask *task);
protected:
WFHttpTask *task;
protocol::HttpMessageChunk *chunk;
bool extract_flag;
std::function<void (WFHttpChunkedTask *)> extract;
std::function<void (WFHttpChunkedTask *)> callback;
protected:
WFHttpChunkedTask(WFHttpTask *task,
std::function<void (WFHttpChunkedTask *)>&& ex,
std::function<void (WFHttpChunkedTask *)>&& cb) :
extract(std::move(ex)),
callback(std::move(cb))
{
task->user_data = this;
this->task = task;
this->extract_flag = false;
}
virtual ~WFHttpChunkedTask()
{
if (this->task)
this->task->dismiss();
}
friend class WFHttpChunkedClient;
};
class WFHttpChunkedClient
{
public:
using extract_t = std::function<void (WFHttpChunkedTask *)>;
using callback_t = std::function<void (WFHttpChunkedTask *)>;
public:
static WFHttpChunkedTask *create_chunked_task(const std::string& url,
int redirect_max,
extract_t extract,
callback_t callback);
static WFHttpChunkedTask *create_chunked_task(const ParsedURI& uri,
int redirect_max,
extract_t extract,
callback_t callback);
};
#endif

View File

@@ -0,0 +1,41 @@
/*
Copyright (c) 2025 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Authors: Xie Han (xiehan@sogou-inc.com)
*/
#include "HttpMessage.h"
#include "WFTaskFactory.h"
// Internal, for WFHttpChunkedTask only.
class __WFHttpTaskFactory
{
private:
using extract_t = std::function<void (protocol::HttpMessageChunk *,
WFHttpTask *)>;
public:
static WFHttpTask *create_chunked_task(const std::string& url,
int redirect_max,
extract_t extract,
http_callback_t callback);
static WFHttpTask *create_chunked_task(const ParsedURI& uri,
int redirect_max,
extract_t extract,
http_callback_t callback);
};

View File

@@ -68,6 +68,9 @@ public:
INPUT *get_input() { return &this->input; }
OUTPUT *get_output() { return &this->output; }
const INPUT *get_input() const { return &this->input; }
const OUTPUT *get_output() const { return &this->output; }
public:
void *user_data;
@@ -134,6 +137,9 @@ public:
REQ *get_req() { return &this->req; }
RESP *get_resp() { return &this->resp; }
const REQ *get_req() const { return &this->req; }
const RESP *get_resp() const { return &this->resp; }
public:
void *user_data;
@@ -325,6 +331,8 @@ public:
public:
ARGS *get_args() { return &this->args; }
const ARGS *get_args() const { return &this->args; }
long get_retval() const
{
if (this->state == WFT_STATE_SUCCESS)

View File

@@ -180,6 +180,13 @@ static int __ssl_connect(SSL_CTX *ssl_ctx, CommConnEntry *entry)
return -1;
}
void CommMessageIn::renew()
{
CommSession *session = this->entry->session;
session->timeout = -1;
session->begin_time = -1;
}
int CommTarget::init(const struct sockaddr *addr, socklen_t addrlen,
int connect_timeout, int response_timeout)
{

View File

@@ -118,6 +118,9 @@ protected:
/* Send small packet while receiving. Call only in append(). */
virtual int feedback(const void *buf, size_t size);
/* In append(), reset the begin time of receiving to current time. */
virtual void renew();
private:
struct CommConnEntry *entry;
@@ -164,6 +167,7 @@ private:
public:
CommSession() { this->passive = 0; }
virtual ~CommSession();
friend class CommMessageIn;
friend class Communicator;
};

View File

@@ -18,6 +18,7 @@ set(SRC
TLVMessage.cc
HttpUtil.cc
SSLWrapper.cc
PackageWrapper.cc
)
add_library(${PROJECT_NAME} OBJECT ${SRC})

View File

@@ -18,6 +18,7 @@
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <utility>
#include "HttpMessage.h"
@@ -398,5 +399,234 @@ int HttpResponse::append(const void *buf, size_t *size)
return ret;
}
bool HttpMessageChunk::get_chunk_data(const void **data, size_t *size) const
{
if (this->chunk_data && this->nreceived == this->chunk_size + 2)
{
*data = this->chunk_data;
*size = this->chunk_size;
return true;
}
else
return false;
}
bool HttpMessageChunk::move_chunk_data(void **data, size_t *size)
{
if (this->chunk_data && this->nreceived == this->chunk_size + 2)
{
*data = this->chunk_data;
*size = this->chunk_size;
this->chunk_data = NULL;
this->nreceived = 0;
return true;
}
else
return false;
}
bool HttpMessageChunk::set_chunk_data(const void *data, size_t size)
{
char *p = (char *)malloc(size + 3);
if (p)
{
memcpy(p, data, size);
p[size] = '\r';
p[size + 1] = '\n';
p[size + 2] = '\0';
free(this->chunk_data);
this->chunk_data = p;
this->chunk_size = size;
this->nreceived = size + 2;
return true;
}
else
return false;
}
int HttpMessageChunk::encode(struct iovec vectors[], int max)
{
int len = sprintf(this->chunk_line, "%zx\r\n", this->chunk_size);
vectors[0].iov_base = this->chunk_line;
vectors[0].iov_len = len;
vectors[1].iov_base = this->chunk_data;
vectors[1].iov_len = this->chunk_size + 2;
return 2;
}
#define MIN(x, y) ((x) <= (y) ? (x) : (y))
int HttpMessageChunk::append_chunk_line(const void *buf, size_t size)
{
char *end;
size_t i;
size = MIN(size, sizeof this->chunk_line - this->nreceived);
memcpy(this->chunk_line + this->nreceived, buf, size);
for (i = 0; i + 1 < this->nreceived + size; i++)
{
if (this->chunk_line[i] == '\r')
{
if (this->chunk_line[i + 1] != '\n')
{
errno = EBADMSG;
return -1;
}
this->chunk_line[i] = '\0';
this->chunk_size = strtoul(this->chunk_line, &end, 16);
if (end == this->chunk_line)
{
errno = EBADMSG;
return -1;
}
if (this->chunk_size > 64 * 1024 * 1024 ||
this->chunk_size > this->size_limit)
{
errno = EMSGSIZE;
return -1;
}
this->chunk_data = malloc(this->chunk_size + 3);
if (!this->chunk_data)
return -1;
this->nreceived = i + 2;
return 1;
}
}
if (i == sizeof this->chunk_line - 1)
{
errno = EBADMSG;
return -1;
}
this->nreceived += size;
return 0;
}
int HttpMessageChunk::append(const void *buf, size_t *size)
{
size_t nleft;
size_t n;
int ret;
if (!this->chunk_data)
{
n = this->nreceived;
ret = this->append_chunk_line(buf, *size);
if (ret <= 0)
return ret;
n = this->nreceived - n;
this->nreceived = 0;
}
else
n = 0;
if (this->chunk_size != 0)
{
nleft = this->chunk_size + 2 - this->nreceived;
if (*size - n > nleft)
*size = n + nleft;
buf = (const char *)buf + n;
n = *size - n;
memcpy((char *)this->chunk_data + this->nreceived, buf, n);
this->nreceived += n;
if (this->nreceived == this->chunk_size + 2)
{
((char *)this->chunk_data)[this->nreceived] = '\0';
return 1;
}
}
else
{
while (n < *size)
{
char c = ((const char *)buf)[n];
if (this->nreceived == 0)
{
if (c == '\r')
this->nreceived = 1;
else
this->nreceived = (size_t)-2;
}
else if (this->nreceived == 1)
{
if (c == '\n')
{
*size = n + 1;
this->nreceived = 2;
((char *)this->chunk_data)[0] = '\r';
((char *)this->chunk_data)[1] = '\n';
((char *)this->chunk_data)[2] = '\0';
return 1;
}
else
break;
}
else if (this->nreceived == (size_t)-2)
{
if (c == '\r')
this->nreceived = (size_t)-1;
}
else /* if (this->nreceived == (size_t)-1) */
{
if (c == '\n')
this->nreceived = 0;
else
break;
}
n++;
}
if (n < *size)
{
errno = EBADMSG;
return -1;
}
}
return 0;
}
HttpMessageChunk::HttpMessageChunk(HttpMessageChunk&& msg) :
ProtocolMessage(std::move(msg))
{
memcpy(this->chunk_line, msg.chunk_line, sizeof this->chunk_line);
this->chunk_data = msg.chunk_data;
msg.chunk_data = NULL;
this->chunk_size = msg.chunk_size;
this->nreceived = msg.nreceived;
msg.nreceived = 0;
}
HttpMessageChunk& HttpMessageChunk::operator = (HttpMessageChunk&& msg)
{
if (&msg != this)
{
*(ProtocolMessage *)this = std::move(msg);
memcpy(this->chunk_line, msg.chunk_line, sizeof this->chunk_line);
free(this->chunk_data);
this->chunk_data = msg.chunk_data;
msg.chunk_data = NULL;
this->chunk_size = msg.chunk_size;
this->nreceived = msg.nreceived;
msg.nreceived = 0;
}
return *this;
}
}

View File

@@ -19,6 +19,7 @@
#ifndef _HTTPMESSAGE_H_
#define _HTTPMESSAGE_H_
#include <stdlib.h>
#include <string.h>
#include <utility>
#include <string>
@@ -404,6 +405,43 @@ public:
HttpResponse& operator = (HttpResponse&& resp) = default;
};
class HttpMessageChunk : public ProtocolMessage
{
public:
bool get_chunk_data(const void **chunk_data, size_t *size) const;
bool move_chunk_data(void **chunk_data, size_t *size);
bool set_chunk_data(const void *chunk_data, size_t size);
protected:
virtual int encode(struct iovec vectors[], int max);
virtual int append(const void *buf, size_t *size);
private:
int append_chunk_line(const void *buf, size_t size);
private:
char chunk_line[32];
void *chunk_data;
size_t chunk_size;
size_t nreceived;
public:
HttpMessageChunk()
{
this->chunk_data = NULL;
this->nreceived = 0;
}
virtual ~HttpMessageChunk()
{
free(this->chunk_data);
}
public:
HttpMessageChunk(HttpMessageChunk&& msg);
HttpMessageChunk& operator = (HttpMessageChunk&& msg);
};
}
#endif

View File

@@ -0,0 +1,72 @@
/*
Copyright (c) 2022 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Xie Han (xiehan@sogou-inc.com)
*/
#include <errno.h>
#include "PackageWrapper.h"
namespace protocol
{
int PackageWrapper::encode(struct iovec vectors[], int max)
{
int cnt = 0;
int ret;
while (max >= 8)
{
ret = this->ProtocolWrapper::encode(vectors, max);
if ((unsigned int)ret > (unsigned int)max)
{
if (ret < 0)
return ret;
break;
}
cnt += ret;
this->set_message(this->next_out(this->message));
if (!this->message)
return cnt;
vectors += ret;
max -= ret;
}
errno = EOVERFLOW;
return -1;
}
int PackageWrapper::append(const void *buf, size_t *size)
{
int ret = this->ProtocolWrapper::append(buf, size);
if (ret > 0)
{
this->set_message(this->next_in(this->message));
if (this->message)
{
this->renew();
ret = 0;
}
}
return ret;
}
}

View File

@@ -0,0 +1,57 @@
/*
Copyright (c) 2022 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Xie Han (xiehan@sogou-inc.com)
*/
#ifndef _PACKAGEWRAPPER_H_
#define _PACKAGEWRAPPER_H_
#include "ProtocolMessage.h"
namespace protocol
{
class PackageWrapper : public ProtocolWrapper
{
private:
virtual ProtocolMessage *next_out(ProtocolMessage *message)
{
return NULL;
}
virtual ProtocolMessage *next_in(ProtocolMessage *message)
{
return NULL;
}
protected:
virtual int encode(struct iovec vectors[], int max);
virtual int append(const void *buf, size_t *size);
public:
PackageWrapper(ProtocolMessage *message) : ProtocolWrapper(message)
{
}
public:
PackageWrapper(PackageWrapper&& wrapper) = default;
PackageWrapper& operator = (PackageWrapper&& wrapper) = default;
};
}
#endif