Merge pull request #1049 from Barenboim/master

Update some codes
This commit is contained in:
xiehan
2022-09-18 03:05:11 +08:00
committed by GitHub
6 changed files with 73 additions and 80 deletions

View File

@@ -151,10 +151,8 @@ CommMessageOut *ComplexHttpTask::message_out()
if ((unsigned int)this->keep_alive_timeo > HTTP_KEEPALIVE_MAX)
this->keep_alive_timeo = HTTP_KEEPALIVE_MAX;
//if (this->keep_alive_timeo < 0 || this->keep_alive_timeo > HTTP_KEEPALIVE_MAX)
}
//req->set_header_pair("Accept", "*/*");
return this->WFComplexClientTask::message_out();
}
@@ -191,7 +189,6 @@ bool ComplexHttpTask::init_success()
std::string request_uri;
std::string header_host;
bool is_ssl;
bool is_unix = false;
if (uri_.scheme && strcasecmp(uri_.scheme, "http") == 0)
is_ssl = false;
@@ -221,13 +218,9 @@ bool ComplexHttpTask::init_success()
}
if (uri_.host && uri_.host[0])
{
header_host = uri_.host;
if (uri_.host[0] == '/')
is_unix = true;
}
if (!is_unix && uri_.port && uri_.port[0])
if (uri_.port && uri_.port[0])
{
int port = atoi(uri_.port);
@@ -252,7 +245,6 @@ bool ComplexHttpTask::init_success()
this->WFComplexClientTask::set_transport_type(is_ssl ? TT_TCP_SSL : TT_TCP);
client_req->set_request_uri(request_uri.c_str());
client_req->set_header_pair("Host", header_host.c_str());
return true;
}

View File

@@ -19,6 +19,7 @@
*/
#include <stdio.h>
#include <string.h>
#include <string>
#include "WFTaskError.h"
#include "WFTaskFactory.h"
@@ -153,11 +154,14 @@ bool ComplexRedisTask::init_success()
//https://stackoverflow.com/questions/26964595/whats-the-correct-way-to-use-a-unix-domain-socket-in-requests-framework
//https://stackoverflow.com/questions/27037990/connecting-to-postgres-via-database-url-and-unix-socket-in-rails
//todo userinfo=username:password
if (uri_.userinfo && uri_.userinfo[0] == ':' && uri_.userinfo[1])
if (uri_.userinfo)
{
password_.assign(uri_.userinfo + 1);
StringUtil::url_decode(password_);
char *p = strchr(uri_.userinfo, ':');
if (p)
{
password_.assign(p + 1);
StringUtil::url_decode(password_);
}
}
if (uri_.path && uri_.path[0] == '/' && uri_.path[1])
@@ -219,6 +223,7 @@ bool ComplexRedisTask::need_redirect()
return true;
}
return false;
}
@@ -243,6 +248,7 @@ bool ComplexRedisTask::finish_once()
this->error = WFT_ERR_REDIS_ACCESS_DENIED;
}
}
return false;
}

View File

@@ -93,40 +93,11 @@ public:
this->parser) == 0;
}
bool is_header_complete() const
{
return http_parser_header_complete(this->parser);
}
bool has_connection_header() const
{
return http_parser_has_connection(this->parser);
}
bool has_content_length_header() const
{
return http_parser_has_content_length(this->parser);
}
bool has_keep_alive_header() const
{
return http_parser_has_keep_alive(this->parser);
}
bool get_parsed_body(const void **body, size_t *size) const
{
return http_parser_get_body(body, size, this->parser) == 0;
}
/* Call when the message is incomplete, but you want the parsed body.
* If get_parse_body() still returns false after calling this function,
* even header is incomplete. In a success state task, messages are
* always complete. */
void end_parsing()
{
http_parser_close_message(this->parser);
}
/* Output body is for sending. Want to transfer a message received, maybe:
* msg->get_parsed_body(&body, &size);
* msg->append_output_body_nocopy(body, size); */
@@ -151,7 +122,7 @@ public:
return this->output_body_size;
}
/* std::string interface */
/* std::string interfaces */
public:
bool get_http_version(std::string& version) const
{
@@ -195,12 +166,34 @@ public:
return this->append_output_body_nocopy(buf.c_str(), buf.size());
}
protected:
http_parser_t *parser;
size_t cur_size;
/* for http task implementations. */
public:
/* for header visitors. */
bool is_header_complete() const
{
return http_parser_header_complete(this->parser);
}
bool has_connection_header() const
{
return http_parser_has_connection(this->parser);
}
bool has_content_length_header() const
{
return http_parser_has_content_length(this->parser);
}
bool has_keep_alive_header() const
{
return http_parser_has_keep_alive(this->parser);
}
void end_parsing()
{
http_parser_close_message(this->parser);
}
/* for header cursor implementations. */
const http_parser_t *get_parser() const
{
return this->parser;
@@ -210,6 +203,10 @@ protected:
virtual int encode(struct iovec vectors[], int max);
virtual int append(const void *buf, size_t *size);
protected:
http_parser_t *parser;
size_t cur_size;
private:
struct list_head *combine_from(struct list_head *pos, size_t size);
@@ -236,7 +233,6 @@ public:
}
}
/* for std::move() */
public:
HttpMessage(HttpMessage&& msg);
HttpMessage& operator = (HttpMessage&& msg);
@@ -265,7 +261,7 @@ public:
return http_parser_set_uri(uri, this->parser) == 0;
}
/* std::string interface */
/* std::string interfaces */
public:
bool get_method(std::string& method) const
{
@@ -312,7 +308,6 @@ private:
public:
HttpRequest() : HttpMessage(false) { }
/* for std::move() */
public:
HttpRequest(HttpRequest&& req) = default;
HttpRequest& operator = (HttpRequest&& req) = default;
@@ -341,13 +336,7 @@ public:
return http_parser_set_phrase(phrase, this->parser) == 0;
}
/* Tell the parser, it is a HEAD response. */
void parse_zero_body()
{
this->parser->transfer_length = 0;
}
/* std::string interface */
/* std::string interfaces */
public:
bool get_status_code(std::string& code) const
{
@@ -385,13 +374,19 @@ public:
return this->set_reason_phrase(phrase.c_str());
}
public:
/* Tell the parser, it is a HEAD response. For implementations. */
void parse_zero_body()
{
this->parser->transfer_length = 0;
}
protected:
virtual int append(const void *buf, size_t *size);
public:
HttpResponse() : HttpMessage(true) { }
/* for std::move() */
public:
HttpResponse(HttpResponse&& resp) = default;
HttpResponse& operator = (HttpResponse&& resp) = default;

View File

@@ -1222,8 +1222,8 @@ int KafkaMessage::parse_message_set(void **buf, size_t *size,
record->timestamp = timestamp;
record->offset = offset;
record->toppar = toppar->get_raw_ptr();
record->key_is_move = 1;
record->value_is_move = 1;
record->key_is_moved = 1;
record->value_is_moved = 1;
record->value = payload;
record->value_len = payload_len;
list_add_tail(kafka_record->get_list(), record_list);
@@ -1375,8 +1375,8 @@ int KafkaMessage::parse_message_record(void **buf, size_t *size,
return -1;
}
header->key_is_move = 1;
header->value_is_move = 1;
header->key_is_moved = 1;
header->value_is_moved = 1;
list_add_tail(&header->list, &record->header_list);
}
@@ -1474,8 +1474,8 @@ int KafkaMessage::parse_record_batch(void **buf, size_t *size,
KafkaRecord *record = new KafkaRecord;
record->set_offset(hdr.base_offset);
record->set_timestamp(hdr.base_timestamp);
record->get_raw_ptr()->key_is_move = 1;
record->get_raw_ptr()->value_is_move = 1;
record->get_raw_ptr()->key_is_moved = 1;
record->get_raw_ptr()->value_is_moved = 1;
record->get_raw_ptr()->toppar = toppar->get_raw_ptr();
switch (parse_message_record(&p, &n, record->get_raw_ptr()))

View File

@@ -461,18 +461,18 @@ void kafka_record_header_init(kafka_record_header_t *header)
{
header->key = NULL;
header->key_len = 0;
header->key_is_move = 0;
header->key_is_moved = 0;
header->value = NULL;
header->value_len = 0;
header->value_is_move = 0;
header->value_is_moved = 0;
}
void kafka_record_header_deinit(kafka_record_header_t *header)
{
if (!header->key_is_move)
if (!header->key_is_moved)
free(header->key);
if (!header->value_is_move)
if (!header->value_is_moved)
free(header->value);
}
@@ -480,10 +480,10 @@ void kafka_record_init(kafka_record_t *record)
{
record->key = NULL;
record->key_len = 0;
record->key_is_move = 0;
record->key_is_moved = 0;
record->value = NULL;
record->value_len = 0;
record->value_is_move = 0;
record->value_is_moved = 0;
record->timestamp = 0;
record->offset = 0;
INIT_LIST_HEAD(&record->header_list);
@@ -496,10 +496,10 @@ void kafka_record_deinit(kafka_record_t *record)
struct list_head *tmp, *pos;
kafka_record_header_t *header;
if (!record->key_is_move)
if (!record->key_is_moved)
free(record->key);
if (!record->value_is_move)
if (!record->value_is_moved)
free(record->value);
list_for_each_safe(pos, tmp, &record->header_list)
@@ -569,12 +569,12 @@ void kafka_block_init(kafka_block_t *block)
{
block->buf = NULL;
block->len = 0;
block->is_move = 0;
block->is_moved = 0;
}
void kafka_block_deinit(kafka_block_t *block)
{
if (!block->is_move)
if (!block->is_moved)
free(block->buf);
}

View File

@@ -354,20 +354,20 @@ typedef struct __kafka_record_header
struct list_head list;
void *key;
size_t key_len;
int key_is_move;
int key_is_moved;
void *value;
size_t value_len;
int value_is_move;
int value_is_moved;
} kafka_record_header_t;
typedef struct __kafka_record
{
void *key;
size_t key_len;
int key_is_move;
int key_is_moved;
void *value;
size_t value_len;
int value_is_move;
int value_is_moved;
long long timestamp;
long long offset;
struct list_head header_list;
@@ -417,7 +417,7 @@ typedef struct __kafka_block
{
void *buf;
size_t len;
int is_move;
int is_moved;
} kafka_block_t;