Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d9ff2e1a62 | ||
|
|
3396100bc4 | ||
|
|
c53693be69 | ||
|
|
855f602e2f | ||
|
|
fd9a5ae249 | ||
|
|
61d5243678 | ||
|
|
e75a4dda25 | ||
|
|
65bfe5c03c | ||
|
|
b7c3e1f80b |
@@ -30,9 +30,10 @@ the following table is detail compatible feature:
|
||||
|
||||
## Architecture
|
||||
|
||||

|
||||
|
||||
Architecture quote from [frp](https://github.com/fatedier/frp) project, replace frpc with xfrpc.
|
||||

|
||||
|
||||
|
||||
|
||||
## Sequence Diagram
|
||||
|
||||
@@ -184,10 +185,12 @@ xfrpc -c frpc_mini.ini -f -d 7
|
||||
xfrpc -c frpc_mini.ini -d 0
|
||||
```
|
||||
|
||||
## Openwrt UI
|
||||
## Openwrt luci configure ui
|
||||
|
||||
If running xfrpc in openwrt box, [luci-app-xfrpc](https://github.com/liudf0716/luci-app-xfrpc) is a good choice
|
||||
|
||||
luci-app-xfrpc was recruited by [luci project](https://github.com/openwrt/luci)
|
||||
|
||||
## How to contribute our project
|
||||
|
||||
See [CONTRIBUTING](https://github.com/liudf0716/xfrpc/blob/master/CONTRIBUTING.md) for details on submitting patches and the contribution workflow.
|
||||
|
||||
18
client.c
18
client.c
@@ -70,11 +70,10 @@ xfrp_proxy_event_cb(struct bufferevent *bev, short what, void *ctx)
|
||||
debug(LOG_DEBUG, "xfrpc proxy close connect server [%s:%d] stream_id %d: %s",
|
||||
client->ps->local_ip, client->ps->local_port,
|
||||
client->stream_id, strerror(errno));
|
||||
tcp_mux_send_win_update_fin(client->ctl_bev, client->stream_id);
|
||||
client->stream_state = LOCAL_CLOSE;
|
||||
tmux_stream_close(client->ctl_bev, &client->stream);
|
||||
} else if (what & BEV_EVENT_CONNECTED) {
|
||||
debug(LOG_DEBUG, "client [%d] connected", client->stream_id);
|
||||
//client->stream_state = ESTABLISHED;
|
||||
//client->stream.state = ESTABLISHED;
|
||||
if (client->data_tail_size > 0) {
|
||||
debug(LOG_DEBUG, "send client data ...");
|
||||
send_client_data_tail(client);
|
||||
@@ -178,6 +177,7 @@ send_client_data_tail(struct proxy_client *client)
|
||||
static void
|
||||
free_proxy_client(struct proxy_client *client)
|
||||
{
|
||||
debug(LOG_DEBUG, "free client %d", client->stream_id);
|
||||
if (client->local_proxy_bev) bufferevent_free(client->local_proxy_bev);
|
||||
free(client);
|
||||
}
|
||||
@@ -195,6 +195,15 @@ del_proxy_client(struct proxy_client *client)
|
||||
free_proxy_client(client);
|
||||
}
|
||||
|
||||
void
|
||||
del_proxy_client_by_stream_id(uint32_t sid)
|
||||
{
|
||||
del_stream(sid);
|
||||
|
||||
struct proxy_client *pc = get_proxy_client(sid);
|
||||
del_proxy_client(pc);
|
||||
}
|
||||
|
||||
struct proxy_client *
|
||||
get_proxy_client(uint32_t sid)
|
||||
{
|
||||
@@ -209,8 +218,7 @@ new_proxy_client()
|
||||
struct proxy_client *client = calloc(1, sizeof(struct proxy_client));
|
||||
assert(client);
|
||||
client->stream_id = get_next_session_id();
|
||||
client->send_window = 200*1024;
|
||||
client->stream_state = INIT;
|
||||
init_tmux_stream(&client->stream, client->stream_id, INIT);
|
||||
HASH_ADD_INT(all_pc, stream_id, client);
|
||||
|
||||
return client;
|
||||
|
||||
7
client.h
7
client.h
@@ -44,10 +44,9 @@ struct proxy_client {
|
||||
struct bufferevent *ctl_bev; // xfrpc proxy <---> frps
|
||||
struct bufferevent *local_proxy_bev; // xfrpc proxy <---> local service
|
||||
struct base_conf *bconf;
|
||||
|
||||
struct tmux_stream stream;
|
||||
|
||||
uint32_t stream_id;
|
||||
uint32_t send_window;
|
||||
enum tcp_mux_state stream_state;
|
||||
int connected;
|
||||
int work_started;
|
||||
struct proxy_service *ps;
|
||||
@@ -91,6 +90,8 @@ void start_xfrp_tunnel(struct proxy_client *client);
|
||||
|
||||
void del_proxy_client(struct proxy_client *client);
|
||||
|
||||
void del_proxy_client_by_stream_id(uint32_t sid);
|
||||
|
||||
struct proxy_client *get_proxy_client(uint32_t sid);
|
||||
|
||||
int send_client_data_tail(struct proxy_client *client);
|
||||
|
||||
@@ -282,11 +282,11 @@ function(from_hex HEX DEC)
|
||||
set(${DEC} ${_res} PARENT_SCOPE)
|
||||
endfunction()
|
||||
|
||||
if (OPENSSL_INCLUDE_DIR)
|
||||
if(OPENSSL_INCLUDE_DIR AND EXISTS "${OPENSSL_INCLUDE_DIR}/openssl/opensslv.h")
|
||||
file(STRINGS "${OPENSSL_INCLUDE_DIR}/openssl/opensslv.h" openssl_version_str
|
||||
REGEX "^# *define[\t ]+OPENSSL_VERSION_NUMBER[\t ]+0x([0-9a-fA-F])+.*")
|
||||
if(OPENSSL_INCLUDE_DIR AND EXISTS "${OPENSSL_INCLUDE_DIR}/openssl/opensslv.h")
|
||||
file(STRINGS "${OPENSSL_INCLUDE_DIR}/openssl/opensslv.h" openssl_version_str
|
||||
REGEX "^#[\t ]*define[\t ]+OPENSSL_VERSION_NUMBER[\t ]+0x([0-9a-fA-F])+.*")
|
||||
|
||||
if(openssl_version_str)
|
||||
# The version number is encoded as 0xMNNFFPPS: major minor fix patch status
|
||||
# The status gives if this is a developer or prerelease and is ignored here.
|
||||
# Major, minor, and fix directly translate into the version numbers shown in
|
||||
@@ -315,6 +315,25 @@ if (OPENSSL_INCLUDE_DIR)
|
||||
endif ()
|
||||
|
||||
set(OPENSSL_VERSION "${OPENSSL_VERSION_MAJOR}.${OPENSSL_VERSION_MINOR}.${OPENSSL_VERSION_FIX}${OPENSSL_VERSION_PATCH_STRING}")
|
||||
else ()
|
||||
# Since OpenSSL 3.0.0, the new version format is MAJOR.MINOR.PATCH and
|
||||
# a new OPENSSL_VERSION_STR macro contains exactly that
|
||||
file(STRINGS "${OPENSSL_INCLUDE_DIR}/openssl/opensslv.h" OPENSSL_VERSION_STR
|
||||
REGEX "^#[\t ]*define[\t ]+OPENSSL_VERSION_STR[\t ]+\"([0-9])+\\.([0-9])+\\.([0-9])+\".*")
|
||||
string(REGEX REPLACE "^.*OPENSSL_VERSION_STR[\t ]+\"([0-9]+\\.[0-9]+\\.[0-9]+)\".*$"
|
||||
"\\1" OPENSSL_VERSION_STR "${OPENSSL_VERSION_STR}")
|
||||
|
||||
set(OPENSSL_VERSION "${OPENSSL_VERSION_STR}")
|
||||
|
||||
# Setting OPENSSL_VERSION_MAJOR OPENSSL_VERSION_MINOR and OPENSSL_VERSION_FIX
|
||||
string(REGEX MATCHALL "([0-9])+" OPENSSL_VERSION_NUMBER "${OPENSSL_VERSION}")
|
||||
list(POP_FRONT OPENSSL_VERSION_NUMBER
|
||||
OPENSSL_VERSION_MAJOR
|
||||
OPENSSL_VERSION_MINOR
|
||||
OPENSSL_VERSION_FIX)
|
||||
|
||||
unset(OPENSSL_VERSION_NUMBER)
|
||||
unset(OPENSSL_VERSION_STR)
|
||||
endif ()
|
||||
endif ()
|
||||
|
||||
|
||||
78
config.c
78
config.c
@@ -56,33 +56,9 @@ void free_common_config()
|
||||
struct common_conf *c_conf = get_common_config();
|
||||
|
||||
if (c_conf->server_addr) free(c_conf->server_addr);
|
||||
if (c_conf->http_proxy) free(c_conf->http_proxy);
|
||||
if (c_conf->log_file) free(c_conf->log_file);
|
||||
if (c_conf->log_way) free(c_conf->log_way);
|
||||
if (c_conf->log_level) free(c_conf->log_level);
|
||||
if (c_conf->auth_token) free(c_conf->auth_token);
|
||||
if (c_conf->privilege_token) free(c_conf->privilege_token);
|
||||
SAFE_FREE(c_conf->server_ip);
|
||||
};
|
||||
|
||||
void set_common_server_ip(const char *ip)
|
||||
{
|
||||
struct common_conf *c_conf = get_common_config();
|
||||
c_conf->server_ip = strdup(ip);
|
||||
assert(c_conf->server_ip);
|
||||
|
||||
debug(LOG_DEBUG, "server IP address: [%s]", c_conf->server_ip);
|
||||
}
|
||||
|
||||
void free_base_config(struct base_conf *bconf)
|
||||
{
|
||||
if (bconf->name) free(bconf->name);
|
||||
if (bconf->auth_token) free(bconf->auth_token);
|
||||
if (bconf->privilege_token) free(bconf->privilege_token);
|
||||
if (bconf->host_header_rewrite) free(bconf->host_header_rewrite);
|
||||
if (bconf->subdomain) free(bconf->subdomain);
|
||||
}
|
||||
|
||||
static int is_true(const char *val)
|
||||
{
|
||||
if (val && (strcmp(val, "true") == 0 || strcmp(val, "1") == 0))
|
||||
@@ -114,8 +90,9 @@ static void dump_common_conf()
|
||||
return;
|
||||
}
|
||||
|
||||
debug(LOG_DEBUG, "Section[common]: {server_addr:%s, server_port:%d, auth_token:%s, privilege_token:%s, interval:%d, timeout:%d}",
|
||||
c_conf->server_addr, c_conf->server_port, c_conf->auth_token, c_conf->privilege_token, c_conf->heartbeat_interval, c_conf->heartbeat_timeout);
|
||||
debug(LOG_DEBUG, "Section[common]: {server_addr:%s, server_port:%d, auth_token:%s, interval:%d, timeout:%d}",
|
||||
c_conf->server_addr, c_conf->server_port, c_conf->auth_token,
|
||||
c_conf->heartbeat_interval, c_conf->heartbeat_timeout);
|
||||
}
|
||||
|
||||
static void dump_proxy_service(const int index, struct proxy_service *ps)
|
||||
@@ -262,22 +239,17 @@ proxy_service_handler(void *user, const char *sect, const char *nm, const char *
|
||||
ps->remote_data_port = atoi(value);
|
||||
} else if (MATCH_NAME("http_user")) {
|
||||
ps->http_user = strdup(value);
|
||||
assert(ps->http_user);
|
||||
} else if (MATCH_NAME("http_pwd")) {
|
||||
ps->http_pwd = strdup(value);
|
||||
assert(ps->http_pwd);
|
||||
} else if (MATCH_NAME("subdomain")) {
|
||||
ps->subdomain = strdup(value);
|
||||
assert(ps->http_pwd);
|
||||
} else if (MATCH_NAME("custom_domains")) {
|
||||
ps->custom_domains = strdup(value);
|
||||
assert(ps->custom_domains);
|
||||
} else if (MATCH_NAME("locations")) {
|
||||
ps->locations = strdup(value);
|
||||
assert(ps->locations);
|
||||
} else if (MATCH_NAME("host_header_rewrite")) {
|
||||
ps->host_header_rewrite = strdup(value);
|
||||
assert(ps->host_header_rewrite);
|
||||
} else if (MATCH_NAME("use_encryption")) {
|
||||
ps->use_encryption = TO_BOOL(value);
|
||||
} else if (MATCH_NAME("use_compression")) {
|
||||
@@ -295,39 +267,10 @@ static int common_handler(void *user, const char *section, const char *name, con
|
||||
#define MATCH(s, n) strcmp(section, s) == 0 && strcmp(name, n) == 0
|
||||
if (MATCH("common", "server_addr")) {
|
||||
SAFE_FREE(config->server_addr);
|
||||
int addr_len = strlen(value) + 1;
|
||||
config->server_addr = (char *)calloc(1, addr_len);
|
||||
config->server_addr = strdup(value);
|
||||
assert(config->server_addr);
|
||||
if(dns_unified(value, config->server_addr, addr_len)) {
|
||||
debug(LOG_ERR, "error: server_addr [%s] is invalid!", value);
|
||||
exit(0);
|
||||
}
|
||||
if (is_valid_ip_address(value))
|
||||
set_common_server_ip(value);
|
||||
} else if (MATCH("common", "server_port")) {
|
||||
config->server_port = atoi(value);
|
||||
} else if (MATCH("common", "http_proxy")) {
|
||||
SAFE_FREE(config->http_proxy);
|
||||
config->http_proxy = strdup(value);
|
||||
assert(config->http_proxy);
|
||||
} else if (MATCH("common", "log_file")) {
|
||||
SAFE_FREE(config->log_file);
|
||||
config->log_file = strdup(value);
|
||||
assert(config->log_file);
|
||||
} else if (MATCH("common", "log_way")) {
|
||||
SAFE_FREE(config->log_way);
|
||||
config->log_way = strdup(value);
|
||||
assert(config->log_way);
|
||||
} else if (MATCH("common", "log_level")) {
|
||||
SAFE_FREE(config->log_level);
|
||||
config->log_level = strdup(value);
|
||||
assert(config->log_level);
|
||||
} else if (MATCH("common", "log_max_days")) {
|
||||
config->log_max_days = atoi(value);
|
||||
} else if (MATCH("common", "privilege_token")) {
|
||||
SAFE_FREE(config->privilege_token);
|
||||
config->privilege_token = strdup(value);
|
||||
assert(config->privilege_token);
|
||||
} else if (MATCH("common", "heartbeat_interval")) {
|
||||
config->heartbeat_interval = atoi(value);
|
||||
} else if (MATCH("common", "heartbeat_timeout")) {
|
||||
@@ -336,10 +279,6 @@ static int common_handler(void *user, const char *section, const char *name, con
|
||||
SAFE_FREE(config->auth_token);
|
||||
config->auth_token = strdup(value);
|
||||
assert(config->auth_token);
|
||||
} else if (MATCH("common", "user")) {
|
||||
SAFE_FREE(config->user);
|
||||
config->user = strdup(value);
|
||||
assert(config->user);
|
||||
} else if (MATCH("common", "tcp_mux")) {
|
||||
config->tcp_mux = atoi(value);
|
||||
config->tcp_mux = !!config->tcp_mux;
|
||||
@@ -355,18 +294,9 @@ static void init_common_conf(struct common_conf *config)
|
||||
config->server_addr = strdup("0.0.0.0");
|
||||
assert(config->server_addr);
|
||||
config->server_port = 7000;
|
||||
config->log_file = strdup("console");
|
||||
assert(config->log_file);
|
||||
config->log_way = strdup("console");
|
||||
assert(config->log_way);
|
||||
config->log_level = strdup("info");
|
||||
assert(config->log_level);
|
||||
config->log_max_days = 3;
|
||||
config->heartbeat_interval = 30;
|
||||
config->heartbeat_timeout = 90;
|
||||
config->tcp_mux = 1;
|
||||
config->user = NULL;
|
||||
config->server_ip = NULL;
|
||||
config->is_router = 0;
|
||||
}
|
||||
|
||||
|
||||
26
config.h
26
config.h
@@ -31,34 +31,14 @@
|
||||
|
||||
#define FTP_RMT_CTL_PROXY_SUFFIX "_ftp_remote_ctl_proxy"
|
||||
|
||||
struct base_conf{
|
||||
char *name;
|
||||
char *auth_token;
|
||||
int use_encryption;
|
||||
int use_gzip;
|
||||
int privilege_mode;
|
||||
char *privilege_token;
|
||||
int pool_count;
|
||||
char *host_header_rewrite;
|
||||
char *subdomain;
|
||||
};
|
||||
|
||||
// common config
|
||||
//client common config
|
||||
struct common_conf {
|
||||
char *server_addr; /* default 0.0.0.0 */
|
||||
char *server_ip;
|
||||
int server_port; /* default 7000 */
|
||||
char *http_proxy;
|
||||
char *log_file; /* default consol */
|
||||
char *log_way; /* default console */
|
||||
char *log_level; /* default info */
|
||||
int log_max_days; /* default 3 */
|
||||
char *privilege_token;
|
||||
char *auth_token;
|
||||
int heartbeat_interval; /* default 10 */
|
||||
int heartbeat_timeout; /* default 30 */
|
||||
int tcp_mux; /* default 0 */
|
||||
char *user;
|
||||
|
||||
/* private fields */
|
||||
int is_router; // to sign router (Openwrt/LEDE) or not
|
||||
@@ -68,14 +48,10 @@ struct common_conf *get_common_config();
|
||||
|
||||
void free_common_config();
|
||||
|
||||
void free_base_config(struct base_conf *bconf);
|
||||
|
||||
void load_config(const char *confile);
|
||||
|
||||
char *get_ftp_data_proxy_name(const char *ftp_proxy_name);
|
||||
|
||||
void set_common_server_ip(const char *ip);
|
||||
|
||||
int is_running_in_router();
|
||||
|
||||
struct proxy_service *get_proxy_service(const char *proxy_name);
|
||||
|
||||
265
control.c
265
control.c
@@ -52,11 +52,11 @@
|
||||
#include "tcpmux.h"
|
||||
|
||||
static struct control *main_ctl;
|
||||
static int clients_conn_signel = 0;
|
||||
static int client_connected = 0;
|
||||
static int is_login = 0;
|
||||
static time_t pong_time = 0;
|
||||
|
||||
static void sync_new_work_connection(struct bufferevent *bev, uint32_t sid);
|
||||
static void new_work_connection(struct bufferevent *bev, struct tmux_stream *stream);
|
||||
static void recv_cb(struct bufferevent *bev, void *ctx);
|
||||
static void clear_main_control();
|
||||
static void start_base_connect();
|
||||
@@ -65,25 +65,25 @@ static void keep_control_alive();
|
||||
static int
|
||||
is_client_connected()
|
||||
{
|
||||
return clients_conn_signel;
|
||||
return client_connected;
|
||||
}
|
||||
|
||||
static int
|
||||
client_connected(int is_connected)
|
||||
set_client_status(int is_connected)
|
||||
{
|
||||
if (is_connected)
|
||||
clients_conn_signel = 1;
|
||||
client_connected = 1;
|
||||
else
|
||||
clients_conn_signel = 0;
|
||||
client_connected = 0;
|
||||
|
||||
return clients_conn_signel;
|
||||
return client_connected;
|
||||
}
|
||||
|
||||
static int
|
||||
set_client_work_start(struct proxy_client *client, int is_start_work)
|
||||
{
|
||||
assert(client->ps);
|
||||
if (is_start_work) {
|
||||
assert(client->ps);
|
||||
client->work_started = 1;
|
||||
}else
|
||||
client->work_started = 0;
|
||||
@@ -110,8 +110,8 @@ client_start_event_cb(struct bufferevent *bev, short what, void *ctx)
|
||||
} else if (what & BEV_EVENT_CONNECTED) {
|
||||
bufferevent_setcb(bev, recv_cb, NULL, client_start_event_cb, client);
|
||||
bufferevent_enable(bev, EV_READ|EV_WRITE);
|
||||
sync_new_work_connection(bev, 0);
|
||||
client_connected(1);
|
||||
new_work_connection(bev, &main_ctl->stream);
|
||||
set_client_status(1);
|
||||
debug(LOG_INFO, "proxy service start");
|
||||
}
|
||||
}
|
||||
@@ -127,7 +127,8 @@ new_client_connect()
|
||||
if (c_conf->tcp_mux) {
|
||||
debug(LOG_DEBUG, "new client through tcp mux: %d", client->stream_id);
|
||||
client->ctl_bev = main_ctl->connect_bev;
|
||||
sync_new_work_connection(client->ctl_bev, client->stream_id);
|
||||
send_window_update(client->ctl_bev, &client->stream, 0);
|
||||
new_work_connection(client->ctl_bev, &client->stream);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -177,11 +178,11 @@ ping()
|
||||
}
|
||||
|
||||
char *ping_msg = "{}";
|
||||
send_enc_msg_frp_server(bout, TypePing, ping_msg, strlen(ping_msg), main_ctl->stream_id);
|
||||
send_enc_msg_frp_server(bout, TypePing, ping_msg, strlen(ping_msg), &main_ctl->stream);
|
||||
}
|
||||
|
||||
static void
|
||||
sync_new_work_connection(struct bufferevent *bev, uint32_t sid)
|
||||
new_work_connection(struct bufferevent *bev, struct tmux_stream *stream)
|
||||
{
|
||||
assert(bev);
|
||||
|
||||
@@ -199,10 +200,8 @@ sync_new_work_connection(struct bufferevent *bev, uint32_t sid)
|
||||
debug(LOG_ERR, "new work connection request run_id marshal failed!");
|
||||
return;
|
||||
}
|
||||
|
||||
tcp_mux_send_win_update_syn(bev, sid);
|
||||
|
||||
send_msg_frp_server(bev, TypeNewWorkConn, new_work_conn_request_message, nret, sid);
|
||||
send_msg_frp_server(bev, TypeNewWorkConn, new_work_conn_request_message, nret, stream);
|
||||
|
||||
SAFE_FREE(new_work_conn_request_message);
|
||||
SAFE_FREE(work_c);
|
||||
@@ -237,7 +236,7 @@ static void
|
||||
hb_sender_cb(evutil_socket_t fd, short event, void *arg)
|
||||
{
|
||||
if (is_client_connected()) {
|
||||
debug(LOG_DEBUG, "ping frps");
|
||||
//debug(LOG_DEBUG, "ping frps");
|
||||
ping(NULL);
|
||||
}
|
||||
|
||||
@@ -312,7 +311,6 @@ handle_enc_msg(const uint8_t *enc_msg, int ilen, uint8_t **out)
|
||||
init_main_decoder(buf);
|
||||
buf += get_block_size();
|
||||
ilen -= get_block_size();
|
||||
debug(LOG_DEBUG, "first recv stream message, init decoder iv succeed! %d", ilen);
|
||||
if (!ilen) {
|
||||
// recv only iv
|
||||
debug(LOG_DEBUG, "recv eas1238 iv data");
|
||||
@@ -323,7 +321,6 @@ handle_enc_msg(const uint8_t *enc_msg, int ilen, uint8_t **out)
|
||||
uint8_t *dec_msg = NULL;
|
||||
size_t len = decrypt_data(buf, ilen, get_main_decoder(), &dec_msg);
|
||||
*out = dec_msg;
|
||||
//debug(LOG_DEBUG, "dec out len %d ", len);
|
||||
|
||||
return len;
|
||||
}
|
||||
@@ -336,10 +333,10 @@ handle_control_work(const uint8_t *buf, int len, void *ctx)
|
||||
const uint8_t *enc_msg = buf;
|
||||
|
||||
if (!ctx) {
|
||||
debug(LOG_DEBUG, "main control message");
|
||||
//debug(LOG_DEBUG, "main control message");
|
||||
handle_enc_msg(enc_msg, len, &frps_cmd);
|
||||
} else {
|
||||
debug(LOG_DEBUG, "worker message");
|
||||
//debug(LOG_DEBUG, "worker message");
|
||||
frps_cmd = (uint8_t *)buf;
|
||||
}
|
||||
|
||||
@@ -349,18 +346,18 @@ handle_control_work(const uint8_t *buf, int len, void *ctx)
|
||||
struct msg_hdr *msg = (struct msg_hdr *)frps_cmd;
|
||||
|
||||
cmd_type = msg->type;
|
||||
debug(LOG_DEBUG, "cmd_type is %c data is %s", cmd_type, msg->data);
|
||||
switch(cmd_type) {
|
||||
case TypeReqWorkConn:
|
||||
//debug(LOG_DEBUG, "TypeReqWorkConn cmd");
|
||||
{
|
||||
if (! is_client_connected()) {
|
||||
start_proxy_services();
|
||||
client_connected(1);
|
||||
set_client_status(1);
|
||||
}
|
||||
new_client_connect();
|
||||
break;
|
||||
}
|
||||
case TypeNewProxyResp:
|
||||
debug(LOG_DEBUG, "TypeNewProxyResp cmd ");
|
||||
{
|
||||
struct new_proxy_response *npr = new_proxy_resp_unmarshal((const char *)msg->data);
|
||||
if (npr == NULL) {
|
||||
debug(LOG_ERR, "new proxy response buffer unmarshal faild!");
|
||||
@@ -370,8 +367,9 @@ handle_control_work(const uint8_t *buf, int len, void *ctx)
|
||||
proxy_service_resp_raw(npr);
|
||||
SAFE_FREE(npr);
|
||||
break;
|
||||
}
|
||||
case TypeStartWorkConn:
|
||||
debug(LOG_DEBUG, "TypeStartWorkConn cmd");
|
||||
{
|
||||
struct start_work_conn_resp *sr = start_work_conn_resp_unmarshal((const char *)msg->data);
|
||||
if (! sr) {
|
||||
debug(LOG_ERR,
|
||||
@@ -406,8 +404,8 @@ handle_control_work(const uint8_t *buf, int len, void *ctx)
|
||||
set_client_work_start(client, 1);
|
||||
|
||||
break;
|
||||
}
|
||||
case TypePong:
|
||||
//debug(LOG_DEBUG, "receive pong from frps");
|
||||
pong_time = time(NULL);
|
||||
break;
|
||||
default:
|
||||
@@ -457,7 +455,7 @@ handle_login_response(const uint8_t *buf, int len)
|
||||
assert(nret > 0);
|
||||
// start proxy services must first send
|
||||
start_proxy_services();
|
||||
client_connected(1);
|
||||
set_client_status(1);
|
||||
debug(LOG_DEBUG, "TypeReqWorkConn cmd, msg :%s", &frps_cmd[8]);
|
||||
assert (frps_cmd[0] == TypeReqWorkConn);
|
||||
new_client_connect();
|
||||
@@ -476,6 +474,8 @@ handle_frps_msg(uint8_t *buf, int len, void *ctx)
|
||||
}
|
||||
}
|
||||
|
||||
static struct tmux_stream abandon_stream;
|
||||
|
||||
// ctx: if recv_cb was called by common control, ctx == NULL
|
||||
// else ctx == client struct
|
||||
static void
|
||||
@@ -484,22 +484,109 @@ recv_cb(struct bufferevent *bev, void *ctx)
|
||||
struct evbuffer *input = bufferevent_get_input(bev);
|
||||
int len = evbuffer_get_length(input);
|
||||
if (len <= 0) {
|
||||
return;
|
||||
return;
|
||||
}
|
||||
|
||||
uint8_t *buf = calloc(len+1, 1);
|
||||
assert(buf);
|
||||
evbuffer_remove(input, buf, len);
|
||||
|
||||
struct common_conf *c_conf = get_common_config();
|
||||
|
||||
struct common_conf *c_conf = get_common_config();
|
||||
if (c_conf->tcp_mux) {
|
||||
handle_tcp_mux_frps_msg(buf, len, handle_frps_msg);
|
||||
} else {
|
||||
handle_frps_msg(buf, len, ctx);
|
||||
}
|
||||
static struct tcp_mux_header tmux_hdr;
|
||||
static uint32_t stream_len = 0;
|
||||
while (len > 0) {
|
||||
struct tmux_stream *cur = get_cur_stream();
|
||||
size_t nr = 0;
|
||||
if (!cur) {
|
||||
memset(&tmux_hdr, 0, sizeof(tmux_hdr));
|
||||
uint8_t *data = (uint8_t *)&tmux_hdr;
|
||||
if (len < sizeof(tmux_hdr)) {
|
||||
debug(LOG_INFO, "len [%d] < sizeof tmux_hdr", len);
|
||||
break;
|
||||
}
|
||||
nr = bufferevent_read(bev, data, sizeof(tmux_hdr));
|
||||
assert(nr == sizeof(tmux_hdr));
|
||||
assert(validate_tcp_mux_protocol(&tmux_hdr) > 0);
|
||||
len -= nr;
|
||||
if (tmux_hdr.type == DATA) {
|
||||
uint32_t stream_id = ntohl(tmux_hdr.stream_id);
|
||||
stream_len = ntohl(tmux_hdr.length);
|
||||
cur = get_stream_by_id(stream_id);
|
||||
if (!cur) {
|
||||
debug(LOG_INFO, "cur is NULL stream_id is %d, stream_len is %d len is %d",
|
||||
stream_id, stream_len, len);
|
||||
if (stream_len > 0)
|
||||
cur = &abandon_stream;
|
||||
else
|
||||
continue;
|
||||
}
|
||||
|
||||
if (len == 0) {
|
||||
set_cur_stream(cur);
|
||||
break;
|
||||
}
|
||||
if (len >= stream_len) {
|
||||
nr = tmux_stream_read(bev, cur, stream_len);
|
||||
assert(nr == stream_len);
|
||||
len -= stream_len;
|
||||
} else {
|
||||
nr = tmux_stream_read(bev, cur, len);
|
||||
stream_len -= len;
|
||||
assert(nr == len);
|
||||
set_cur_stream(cur);
|
||||
len -= nr;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
assert(tmux_hdr.type == DATA);
|
||||
if (len >= stream_len ) {
|
||||
nr = tmux_stream_read(bev, cur, stream_len);
|
||||
assert(nr == stream_len);
|
||||
len -= stream_len;
|
||||
} else {
|
||||
nr = tmux_stream_read(bev, cur, len);
|
||||
stream_len -= len;
|
||||
assert(nr == len);
|
||||
len -= nr;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (cur == &abandon_stream) {
|
||||
debug(LOG_INFO, "abandon stream data ...");
|
||||
memset(cur , 0, sizeof(abandon_stream));
|
||||
set_cur_stream(NULL);
|
||||
continue;
|
||||
}
|
||||
|
||||
switch(tmux_hdr.type) {
|
||||
case DATA:
|
||||
case WINDOW_UPDATE:
|
||||
{
|
||||
handle_tcp_mux_stream(&tmux_hdr, handle_frps_msg);
|
||||
break;
|
||||
}
|
||||
case PING:
|
||||
handle_tcp_mux_ping(&tmux_hdr);
|
||||
break;
|
||||
case GO_AWAY:
|
||||
handle_tcp_mux_go_away(&tmux_hdr);
|
||||
break;
|
||||
default:
|
||||
debug(LOG_ERR, "impossible here!!!!");
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
set_cur_stream(NULL);
|
||||
}
|
||||
} else {
|
||||
uint8_t *buf = calloc(len, 1);
|
||||
assert(buf);
|
||||
evbuffer_remove(input, buf, len);
|
||||
|
||||
handle_frps_msg(buf, len, ctx);
|
||||
SAFE_FREE(buf);
|
||||
}
|
||||
|
||||
|
||||
SAFE_FREE(buf);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -520,11 +607,12 @@ connect_event_cb (struct bufferevent *bev, short what, void *ctx)
|
||||
c_conf->server_addr,
|
||||
c_conf->server_port,
|
||||
strerror(errno));
|
||||
reset_session_id();
|
||||
clear_main_control();
|
||||
run_control();
|
||||
} else if (what & BEV_EVENT_CONNECTED) {
|
||||
retry_times = 0;
|
||||
tcp_mux_send_win_update_syn(bev, main_ctl->stream_id);
|
||||
send_window_update(bev, &main_ctl->stream, 0);
|
||||
login();
|
||||
|
||||
keep_control_alive();
|
||||
@@ -544,32 +632,6 @@ keep_control_alive()
|
||||
set_ticker_ping_timer(main_ctl->ticker_ping);
|
||||
}
|
||||
|
||||
static void
|
||||
server_dns_cb(int event_code, struct evutil_addrinfo *addr, void *ctx)
|
||||
{
|
||||
if (event_code) {
|
||||
set_common_server_ip((const char *)evutil_gai_strerror(event_code));
|
||||
} else {
|
||||
struct evutil_addrinfo *ai;
|
||||
if (addr->ai_canonname)
|
||||
debug(LOG_DEBUG, "addr->ai_canonname [%s]", addr->ai_canonname);
|
||||
for (ai = addr; ai; ai = ai->ai_next) {
|
||||
char buf[128];
|
||||
const char *s = NULL;
|
||||
if (ai->ai_family == AF_INET) {
|
||||
struct sockaddr_in *sin = (struct sockaddr_in *)ai->ai_addr;
|
||||
s = evutil_inet_ntop(AF_INET, &sin->sin_addr, buf, 128);
|
||||
} else if (ai->ai_family == AF_INET6) {
|
||||
struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)ai->ai_addr;
|
||||
s = evutil_inet_ntop(AF_INET6, &sin6->sin6_addr, buf, 128);
|
||||
}
|
||||
|
||||
if (s) set_common_server_ip(s);
|
||||
}
|
||||
if (addr) evutil_freeaddrinfo(addr);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
start_base_connect()
|
||||
{
|
||||
@@ -602,7 +664,7 @@ login()
|
||||
exit(0);
|
||||
}
|
||||
|
||||
send_msg_frp_server(NULL, TypeLogin, lg_msg, len, 1);
|
||||
send_msg_frp_server(NULL, TypeLogin, lg_msg, len, &main_ctl->stream);
|
||||
SAFE_FREE(lg_msg);
|
||||
}
|
||||
|
||||
@@ -611,7 +673,7 @@ send_msg_frp_server(struct bufferevent *bev,
|
||||
const enum msg_type type,
|
||||
const char *msg,
|
||||
const size_t msg_len,
|
||||
uint32_t sid)
|
||||
struct tmux_stream *stream)
|
||||
{
|
||||
struct bufferevent *bout = NULL;
|
||||
if (bev) {
|
||||
@@ -632,9 +694,11 @@ send_msg_frp_server(struct bufferevent *bev,
|
||||
req_msg->length = msg_hton((uint64_t)msg_len);
|
||||
memcpy(req_msg->data, msg, msg_len);
|
||||
|
||||
tcp_mux_send_data(bout, sid, len);
|
||||
|
||||
bufferevent_write(bout, (uint8_t *)req_msg, len);
|
||||
struct common_conf *c_conf = get_common_config();
|
||||
if (c_conf->tcp_mux)
|
||||
tmux_stream_write(bout, (uint8_t *)req_msg, len, stream);
|
||||
else
|
||||
bufferevent_write(bout, (uint8_t *)req_msg, len);
|
||||
|
||||
free(req_msg);
|
||||
}
|
||||
@@ -644,7 +708,7 @@ send_enc_msg_frp_server(struct bufferevent *bev,
|
||||
const enum msg_type type,
|
||||
const char *msg,
|
||||
const size_t msg_len,
|
||||
uint32_t sid)
|
||||
struct tmux_stream *stream)
|
||||
{
|
||||
struct bufferevent *bout = NULL;
|
||||
if (bev) {
|
||||
@@ -654,31 +718,28 @@ send_enc_msg_frp_server(struct bufferevent *bev,
|
||||
}
|
||||
assert(bout);
|
||||
|
||||
debug(LOG_DEBUG, "send enc msg ----> [%c: %s]", type, msg);
|
||||
|
||||
struct msg_hdr *req_msg = calloc(msg_len+sizeof(struct msg_hdr), 1);
|
||||
assert(req_msg);
|
||||
req_msg->type = type;
|
||||
req_msg->length = msg_hton((uint64_t)msg_len);
|
||||
memcpy(req_msg->data, msg, msg_len);
|
||||
|
||||
struct common_conf *c_conf = get_common_config();
|
||||
if (get_main_encoder() == NULL) {
|
||||
debug(LOG_DEBUG, "init_main_encoder .......");
|
||||
|
||||
tcp_mux_send_data(bout, sid, 16);
|
||||
|
||||
struct frp_coder *coder = init_main_encoder();
|
||||
bufferevent_write(bout, coder->iv, 16);
|
||||
if (c_conf->tcp_mux)
|
||||
tmux_stream_write(bout, coder->iv, 16, stream);
|
||||
else
|
||||
bufferevent_write(bout, coder->iv, 16);
|
||||
}
|
||||
|
||||
uint8_t *enc_msg = NULL;
|
||||
size_t olen = encrypt_data((uint8_t *)req_msg, msg_len+sizeof(struct msg_hdr), get_main_encoder(), &enc_msg);
|
||||
assert(olen > 0);
|
||||
//debug(LOG_DEBUG, "encrypt_data length %d", olen);
|
||||
|
||||
tcp_mux_send_data(bout, sid, olen);
|
||||
|
||||
bufferevent_write(bout, enc_msg, olen);
|
||||
if (c_conf->tcp_mux)
|
||||
tmux_stream_write(bout, enc_msg, olen, stream);
|
||||
else
|
||||
bufferevent_write(bout, enc_msg, olen);
|
||||
|
||||
free(enc_msg);
|
||||
free(req_msg);
|
||||
@@ -726,7 +787,7 @@ send_new_proxy(struct proxy_service *ps)
|
||||
|
||||
debug(LOG_DEBUG, "control proxy client: [Type %d : proxy_name %s : msg_len %d]", TypeNewProxy, ps->proxy_name, len);
|
||||
|
||||
send_enc_msg_frp_server(NULL, TypeNewProxy, new_proxy_msg, len, main_ctl->stream_id);
|
||||
send_enc_msg_frp_server(NULL, TypeNewProxy, new_proxy_msg, len, &main_ctl->stream);
|
||||
SAFE_FREE(new_proxy_msg);
|
||||
}
|
||||
|
||||
@@ -751,9 +812,10 @@ init_main_control()
|
||||
}
|
||||
main_ctl->connect_base = base;
|
||||
|
||||
if (c_conf->tcp_mux)
|
||||
main_ctl->stream_id = get_next_session_id();
|
||||
|
||||
if (c_conf->tcp_mux) {
|
||||
init_tmux_stream(&main_ctl->stream, get_next_session_id(), INIT);
|
||||
}
|
||||
|
||||
// if server_addr is ip, done control init.
|
||||
if (is_valid_ip_address((const char *)c_conf->server_addr))
|
||||
return;
|
||||
@@ -774,29 +836,6 @@ init_main_control()
|
||||
evdns_base_nameserver_ip_add(dnsbase, "223.5.5.5"); //AliDNS
|
||||
evdns_base_nameserver_ip_add(dnsbase, "223.6.6.6"); //AliDNS
|
||||
evdns_base_nameserver_ip_add(dnsbase, "114.114.114.114"); //114DNS
|
||||
|
||||
|
||||
// if server_addr is domain, analyze it to ip for server_ip
|
||||
debug(LOG_DEBUG, "Get ip address of [%s] from DNServer", c_conf->server_addr);
|
||||
|
||||
struct evutil_addrinfo hints;
|
||||
struct evdns_getaddrinfo_request *dns_req;
|
||||
memset(&hints, 0, sizeof(hints));
|
||||
hints.ai_family = AF_UNSPEC;
|
||||
hints.ai_flags = EVUTIL_AI_CANONNAME;
|
||||
hints.ai_socktype = SOCK_STREAM;
|
||||
hints.ai_protocol = IPPROTO_TCP;
|
||||
|
||||
dns_req = evdns_getaddrinfo(dnsbase,
|
||||
c_conf->server_addr,
|
||||
NULL /* no service name given */,
|
||||
&hints,
|
||||
server_dns_cb,
|
||||
NULL);
|
||||
if (!dns_req) {
|
||||
debug(LOG_ERR, "error: can not analyse the dns of [%s]", c_conf->server_addr);
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -814,7 +853,7 @@ clear_main_control()
|
||||
if (main_ctl->tcp_mux_ping_event) evtimer_del(main_ctl->tcp_mux_ping_event);
|
||||
clear_all_proxy_client();
|
||||
free_evp_cipher_ctx();
|
||||
client_connected(0);
|
||||
set_client_status(0);
|
||||
pong_time = 0;
|
||||
is_login = 0;
|
||||
}
|
||||
|
||||
@@ -44,7 +44,7 @@ struct control {
|
||||
|
||||
struct event *tcp_mux_ping_event;
|
||||
uint32_t tcp_mux_ping_id;
|
||||
uint32_t stream_id;
|
||||
struct tmux_stream stream;
|
||||
};
|
||||
|
||||
void connect_eventcb(struct bufferevent *bev, short events, void *ptr);
|
||||
@@ -67,13 +67,13 @@ void send_msg_frp_server(struct bufferevent *bev,
|
||||
const enum msg_type type,
|
||||
const char *msg,
|
||||
const size_t msg_len,
|
||||
uint32_t sid);
|
||||
struct tmux_stream *stream);
|
||||
|
||||
void send_enc_msg_frp_server(struct bufferevent *bev,
|
||||
const enum msg_type type,
|
||||
const char *msg,
|
||||
const size_t msg_len,
|
||||
uint32_t sid);
|
||||
struct tmux_stream *stream);
|
||||
|
||||
void control_process(struct proxy_client *client);
|
||||
|
||||
|
||||
12
crypto.c
12
crypto.c
@@ -50,7 +50,7 @@ static void
|
||||
free_frp_coder(struct frp_coder *coder)
|
||||
{
|
||||
free(coder->salt);
|
||||
free(coder->privilege_token);
|
||||
free(coder->token);
|
||||
free(coder);
|
||||
}
|
||||
|
||||
@@ -91,14 +91,14 @@ get_block_size()
|
||||
}
|
||||
|
||||
struct frp_coder *
|
||||
new_coder(const char *privilege_token, const char *salt)
|
||||
new_coder(const char *token, const char *salt)
|
||||
{
|
||||
struct frp_coder *enc = calloc(sizeof(struct frp_coder), 1);
|
||||
assert(enc);
|
||||
|
||||
enc->privilege_token = privilege_token ? strdup(privilege_token):strdup("\0");
|
||||
enc->token = token ? strdup(token):strdup("\0");
|
||||
enc->salt = strdup(salt);
|
||||
encrypt_key(enc->privilege_token, strlen(enc->privilege_token), enc->salt, enc->key, block_size);
|
||||
encrypt_key(enc->token, strlen(enc->token), enc->salt, enc->key, block_size);
|
||||
encrypt_iv(enc->iv, block_size);
|
||||
return enc;
|
||||
}
|
||||
@@ -109,7 +109,7 @@ clone_coder(const struct frp_coder *coder)
|
||||
assert(coder);
|
||||
struct frp_coder *enc = calloc(sizeof(struct frp_coder), 1);
|
||||
memcpy(enc, coder, sizeof(*coder));
|
||||
enc->privilege_token = strdup(coder->privilege_token);
|
||||
enc->token = strdup(coder->token);
|
||||
enc->salt = strdup(coder->salt);
|
||||
|
||||
return enc;
|
||||
@@ -273,7 +273,7 @@ D_END:
|
||||
void
|
||||
free_encoder(struct frp_coder *encoder) {
|
||||
if (encoder) {
|
||||
SAFE_FREE(encoder->privilege_token);
|
||||
SAFE_FREE(encoder->token);
|
||||
SAFE_FREE(encoder->salt);
|
||||
free(encoder);
|
||||
}
|
||||
|
||||
4
crypto.h
4
crypto.h
@@ -38,7 +38,7 @@ struct frp_coder {
|
||||
uint8_t key[16];
|
||||
char *salt;
|
||||
uint8_t iv[16];
|
||||
char *privilege_token;
|
||||
char *token;
|
||||
};
|
||||
|
||||
size_t get_encrypt_block_size();
|
||||
@@ -47,7 +47,7 @@ int is_encoder_inited();
|
||||
int is_decoder_inited();
|
||||
struct frp_coder *init_main_encoder();
|
||||
struct frp_coder *init_main_decoder(const uint8_t *iv);
|
||||
struct frp_coder *new_coder(const char *privilege_token, const char *salt);
|
||||
struct frp_coder *new_coder(const char *token, const char *salt);
|
||||
uint8_t *encrypt_key(const char *token, size_t token_len, const char *salt, uint8_t *key, size_t key_len);
|
||||
uint8_t *encrypt_iv(uint8_t *iv_buf, size_t iv_len);
|
||||
size_t encrypt_data(const uint8_t *src_data, size_t srclen, struct frp_coder *encoder, uint8_t **ret);
|
||||
|
||||
1
login.c
1
login.c
@@ -89,7 +89,6 @@ void init_login()
|
||||
c_login->metas = NULL;
|
||||
c_login->pool_count = 1;
|
||||
c_login->privilege_key = NULL;
|
||||
c_login->user = c_conf->user;
|
||||
|
||||
c_login->logged = 0;
|
||||
|
||||
|
||||
@@ -101,12 +101,12 @@ void ftp_proxy_c2s_cb(struct bufferevent *bev, void *ctx)
|
||||
struct ftp_pasv *r_fp = new_ftp_pasv();
|
||||
r_fp->code = local_fp->code;
|
||||
|
||||
if (! c_conf->server_ip) {
|
||||
if (! c_conf->server_addr) {
|
||||
debug(LOG_ERR, "error: FTP proxy without server ip!");
|
||||
exit(0);
|
||||
}
|
||||
|
||||
strncpy(r_fp->ftp_server_ip, c_conf->server_ip, IP_LEN);
|
||||
strncpy(r_fp->ftp_server_ip, c_conf->server_addr, IP_LEN);
|
||||
r_fp->ftp_server_port = p->remote_data_port;
|
||||
|
||||
if (r_fp->ftp_server_port <= 0) {
|
||||
@@ -264,4 +264,4 @@ static void free_ftp_pasv(struct ftp_pasv *fp)
|
||||
|
||||
SAFE_FREE(fp);
|
||||
fp = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
26
proxy_tcp.c
26
proxy_tcp.c
@@ -47,7 +47,7 @@
|
||||
#include "config.h"
|
||||
#include "tcpmux.h"
|
||||
|
||||
#define BUF_LEN 4096
|
||||
#define BUF_LEN 2*1024
|
||||
|
||||
// read data from local service
|
||||
void tcp_proxy_c2s_cb(struct bufferevent *bev, void *ctx)
|
||||
@@ -66,23 +66,15 @@ void tcp_proxy_c2s_cb(struct bufferevent *bev, void *ctx)
|
||||
return;
|
||||
}
|
||||
|
||||
if (client->send_window == 0) {
|
||||
debug(LOG_DEBUG, "client %d recv len %d exceed send windows: %d", client->stream_id, len, client->send_window);
|
||||
uint8_t *buf = (uint8_t *)malloc(len);
|
||||
assert(buf != NULL);
|
||||
memset(buf, 0, len);
|
||||
uint32_t nr = bufferevent_read(bev, buf, len);
|
||||
assert(nr == len);
|
||||
nr = tmux_stream_write(partner, buf, len, &client->stream);
|
||||
if (nr < len) {
|
||||
debug(LOG_DEBUG, "stream_id [%d] len is %d tmux_stream_write %d data, disable read", client->stream.id, len, nr);
|
||||
bufferevent_disable(bev, EV_READ);
|
||||
return;
|
||||
} else {
|
||||
len = client->send_window>=len?len:client->send_window;
|
||||
client->send_window -= len;
|
||||
}
|
||||
|
||||
tcp_mux_send_data(partner, client->stream_id, len);
|
||||
uint8_t buf[BUF_LEN];
|
||||
while(len > 0) {
|
||||
memset(buf, 0, BUF_LEN);
|
||||
int nread = bufferevent_read(bev, buf, len>BUF_LEN?BUF_LEN:len);
|
||||
assert(nread >= 0);
|
||||
bufferevent_write(partner, buf, nread);
|
||||
len -= nread;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
718
tcpmux.c
718
tcpmux.c
@@ -24,6 +24,9 @@
|
||||
@author Copyright (C) 2016 Dengfeng Liu <liu_df@qq.com>
|
||||
*/
|
||||
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
#include "common.h"
|
||||
#include "tcpmux.h"
|
||||
#include "client.h"
|
||||
@@ -32,63 +35,77 @@
|
||||
#include "control.h"
|
||||
|
||||
static uint8_t proto_version = 0;
|
||||
static uint8_t remote_go_away;
|
||||
static uint8_t local_go_away;
|
||||
static uint32_t g_session_id = 1;
|
||||
static struct tmux_stream *cur_stream = NULL;
|
||||
static struct tmux_stream *all_stream;
|
||||
|
||||
static struct tcp_mux_type_desc type_desc[] = {
|
||||
{DATA, "data"},
|
||||
{WINDOW_UPDATE, "window update"},
|
||||
{PING, "ping"},
|
||||
{GO_AWAY, "go away"},
|
||||
static uint32_t ring_buffer_read(struct bufferevent *bev, struct ring_buffer *ring, uint32_t len);
|
||||
static uint32_t ring_buffer_write(struct bufferevent *bev, struct ring_buffer *ring, uint32_t len);
|
||||
|
||||
void
|
||||
add_stream(struct tmux_stream *stream)
|
||||
{
|
||||
HASH_ADD_INT(all_stream, id, stream);
|
||||
}
|
||||
|
||||
void
|
||||
del_stream(uint32_t id)
|
||||
{
|
||||
assert(all_stream != NULL);
|
||||
|
||||
struct tmux_stream *stream = get_stream_by_id(id);
|
||||
if (stream)
|
||||
HASH_DEL(all_stream, stream);
|
||||
}
|
||||
|
||||
struct tmux_stream *
|
||||
get_stream_by_id(uint32_t id)
|
||||
{
|
||||
if (!all_stream) return NULL;
|
||||
|
||||
struct tmux_stream *stream = NULL;
|
||||
HASH_FIND_INT(all_stream, &id, stream);
|
||||
return stream;
|
||||
}
|
||||
|
||||
struct tmux_stream *
|
||||
get_cur_stream()
|
||||
{
|
||||
return cur_stream;
|
||||
}
|
||||
|
||||
void
|
||||
set_cur_stream(struct tmux_stream *stream)
|
||||
{
|
||||
cur_stream = stream;
|
||||
}
|
||||
|
||||
void
|
||||
init_tmux_stream(struct tmux_stream *stream, uint32_t id, enum tcp_mux_state state)
|
||||
{
|
||||
stream->id = id;
|
||||
stream->state = state;
|
||||
stream->recv_window = MAX_STREAM_WINDOW_SIZE;
|
||||
stream->send_window = MAX_STREAM_WINDOW_SIZE;
|
||||
|
||||
memset(&stream->tx_ring, 0, sizeof(struct ring_buffer));
|
||||
memset(&stream->rx_ring, 0, sizeof(struct ring_buffer));
|
||||
|
||||
add_stream(stream);
|
||||
};
|
||||
|
||||
static struct tcp_mux_flag_desc flag_desc[] = {
|
||||
{ZERO, "zero"},
|
||||
{SYN, "syn"},
|
||||
{ACK, "ack"},
|
||||
{FIN, "fin"},
|
||||
{RST, "rst"},
|
||||
};
|
||||
|
||||
static const char *
|
||||
type_2_desc(enum tcp_mux_type type)
|
||||
int
|
||||
validate_tcp_mux_protocol(struct tcp_mux_header *tmux_hdr)
|
||||
{
|
||||
for(int i = 0; i < sizeof(type_desc)/sizeof(struct tcp_mux_type_desc); i++){
|
||||
if (type == type_desc[i].type)
|
||||
return type_desc[i].desc;
|
||||
}
|
||||
|
||||
return "unkown_type";
|
||||
}
|
||||
|
||||
static const char *
|
||||
flag_2_desc(enum tcp_mux_flag flag)
|
||||
{
|
||||
for(int i = 0; i < sizeof(flag_desc)/sizeof(struct tcp_mux_flag_desc); i++){
|
||||
if (flag == flag_desc[i].flag)
|
||||
return flag_desc[i].desc;
|
||||
}
|
||||
if (tmux_hdr->version != proto_version) return 0;
|
||||
|
||||
return "unkown_flag";
|
||||
if (tmux_hdr->type > GO_AWAY) return 0;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int
|
||||
valid_tcp_mux_type(uint8_t type)
|
||||
{
|
||||
if (type >= DATA && type <= GO_AWAY)
|
||||
return 1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
valid_tcp_mux_sid(uint32_t sid)
|
||||
{
|
||||
if (sid == 1)
|
||||
return 1;
|
||||
|
||||
return get_proxy_client(sid)?1:0;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
tcp_mux_encode(enum tcp_mux_type type, enum tcp_mux_flag flags, uint32_t stream_id, uint32_t length, struct tcp_mux_header *tmux_hdr)
|
||||
{
|
||||
@@ -107,65 +124,33 @@ tcp_mux_flag()
|
||||
return c_conf->tcp_mux;
|
||||
}
|
||||
|
||||
static void
|
||||
dump_tcp_mux_header(uint8_t *data, int len)
|
||||
{
|
||||
if (len != 12)
|
||||
return;
|
||||
|
||||
printf("tcp mux header is : \n");
|
||||
for (int i = 0; i < len; i++)
|
||||
printf("%2x", data[i]);
|
||||
printf("\n");
|
||||
}
|
||||
|
||||
static uint32_t
|
||||
parse_tcp_mux_proto(uint8_t *data, int len, uint32_t *flag, uint32_t *type, uint32_t *stream_id, uint32_t *dlen)
|
||||
{
|
||||
struct common_conf *c_conf = get_common_config();
|
||||
if (!c_conf->tcp_mux)
|
||||
return 0;
|
||||
|
||||
if (len < sizeof(struct tcp_mux_header))
|
||||
return 0;
|
||||
|
||||
struct tcp_mux_header *hdr = (struct tcp_mux_header *)data;
|
||||
if(hdr->version == proto_version &&
|
||||
valid_tcp_mux_type(hdr->type)) {
|
||||
if (hdr->type == DATA && !valid_tcp_mux_sid(htonl(hdr->stream_id))) {
|
||||
debug(LOG_INFO, "!!!!!type is DATA but cant find stream_id : type [%s] flag [%s] stream_id[%d]",
|
||||
type_2_desc(hdr->type), flag_2_desc(htons(hdr->flags)), htonl(hdr->stream_id));
|
||||
dump_tcp_mux_header(data, len);
|
||||
exit(-1);
|
||||
}
|
||||
*type = hdr->type;
|
||||
*flag = htons(hdr->flags);
|
||||
*stream_id = htonl(hdr->stream_id);
|
||||
*dlen = htonl(hdr->length);
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
void
|
||||
reset_session_id() {
|
||||
g_session_id = 1;
|
||||
}
|
||||
|
||||
uint32_t
|
||||
get_next_session_id() {
|
||||
static uint32_t next_session_id = 1;
|
||||
uint32_t id = next_session_id;
|
||||
next_session_id += 2;
|
||||
uint32_t id = g_session_id;
|
||||
g_session_id += 2;
|
||||
return id;
|
||||
}
|
||||
|
||||
static void
|
||||
tcp_mux_send_win_update(struct bufferevent *bout, enum tcp_mux_flag flags, uint32_t stream_id, uint32_t delta)
|
||||
{
|
||||
struct tcp_mux_header tmux_hdr;
|
||||
memset(&tmux_hdr, 0, sizeof(tmux_hdr));
|
||||
tcp_mux_encode(WINDOW_UPDATE, flags, stream_id, delta, &tmux_hdr);
|
||||
bufferevent_write(bout, (uint8_t *)&tmux_hdr, sizeof(tmux_hdr));
|
||||
}
|
||||
|
||||
void
|
||||
tcp_mux_send_win_update_syn(struct bufferevent *bout, uint32_t stream_id)
|
||||
{
|
||||
if (!tcp_mux_flag()) return;
|
||||
|
||||
struct tcp_mux_header tmux_hdr;
|
||||
memset(&tmux_hdr, 0, sizeof(tmux_hdr));
|
||||
tcp_mux_encode(WINDOW_UPDATE, SYN, stream_id, 0, &tmux_hdr);
|
||||
debug(LOG_DEBUG, "tcp mux [%d] send wind update syn", stream_id);
|
||||
bufferevent_write(bout, (uint8_t *)&tmux_hdr, sizeof(tmux_hdr));
|
||||
|
||||
tcp_mux_send_win_update(bout, SYN, stream_id, 0);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -173,11 +158,7 @@ tcp_mux_send_win_update_ack(struct bufferevent *bout, uint32_t stream_id, uint32
|
||||
{
|
||||
if (!tcp_mux_flag()) return;
|
||||
|
||||
struct tcp_mux_header tmux_hdr;
|
||||
memset(&tmux_hdr, 0, sizeof(tmux_hdr));
|
||||
tcp_mux_encode(WINDOW_UPDATE, ZERO, stream_id, delta, &tmux_hdr);
|
||||
debug(LOG_DEBUG, "tcp mux [%d] send wind update ZERO [%d]", stream_id, delta);
|
||||
bufferevent_write(bout, (uint8_t *)&tmux_hdr, sizeof(tmux_hdr));
|
||||
tcp_mux_send_win_update(bout, ZERO, stream_id, 0);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -185,21 +166,24 @@ tcp_mux_send_win_update_fin(struct bufferevent *bout, uint32_t stream_id)
|
||||
{
|
||||
if (!tcp_mux_flag()) return;
|
||||
|
||||
struct tcp_mux_header tmux_hdr;
|
||||
memset(&tmux_hdr, 0, sizeof(tmux_hdr));
|
||||
tcp_mux_encode(WINDOW_UPDATE, FIN, stream_id, 0, &tmux_hdr);
|
||||
debug(LOG_DEBUG, "tcp mux [%d] send wind update FIN", stream_id);
|
||||
bufferevent_write(bout, (uint8_t *)&tmux_hdr, sizeof(tmux_hdr));
|
||||
tcp_mux_send_win_update(bout, FIN, stream_id, 0);
|
||||
}
|
||||
|
||||
void
|
||||
tcp_mux_send_data(struct bufferevent *bout, uint32_t stream_id, uint32_t length)
|
||||
tcp_mux_send_win_update_rst(struct bufferevent *bout, uint32_t stream_id)
|
||||
{
|
||||
if (!tcp_mux_flag()) return;
|
||||
|
||||
tcp_mux_send_win_update(bout, RST, stream_id, 0);
|
||||
}
|
||||
void
|
||||
tcp_mux_send_data(struct bufferevent *bout, uint16_t flags, uint32_t stream_id, uint32_t length)
|
||||
{
|
||||
if (!tcp_mux_flag()) return;
|
||||
|
||||
struct tcp_mux_header tmux_hdr;
|
||||
memset(&tmux_hdr, 0, sizeof(tmux_hdr));
|
||||
tcp_mux_encode(DATA, ZERO, stream_id, length, &tmux_hdr);
|
||||
tcp_mux_encode(DATA, flags, stream_id, length, &tmux_hdr);
|
||||
//debug(LOG_DEBUG, "tcp mux [%d] send data len : %d", stream_id, length);
|
||||
bufferevent_write(bout, (uint8_t *)&tmux_hdr, sizeof(tmux_hdr));
|
||||
}
|
||||
@@ -228,141 +212,399 @@ tcp_mux_handle_ping(struct bufferevent *bout, uint32_t ping_id)
|
||||
bufferevent_write(bout, (uint8_t *)&tmux_hdr, sizeof(tmux_hdr));
|
||||
}
|
||||
|
||||
void
|
||||
handle_tcp_mux_frps_msg(uint8_t *buf, int ilen, void (*fn)(uint8_t *, int, void *))
|
||||
static void
|
||||
tcp_mux_send_go_away(struct bufferevent *bout, uint32_t reason)
|
||||
{
|
||||
static uint32_t l_stream_id = 0;
|
||||
static uint32_t l_dlen = 0;
|
||||
static uint32_t l_type = 0;
|
||||
static uint32_t l_flag = 0;
|
||||
uint8_t *data = buf;
|
||||
while (ilen > 0) {
|
||||
uint32_t type = 0, stream_id = 0, dlen = 0, flag = 0;
|
||||
uint32_t is_tmux = parse_tcp_mux_proto(data, ilen, &flag, &type, &stream_id, &dlen);
|
||||
if (!is_tmux) {
|
||||
struct proxy_client *pc = get_proxy_client(l_stream_id);
|
||||
debug(LOG_DEBUG, "receive only %s data : l_stream_id %d l_type %s l_flag %s l_dlen %d ilen %d",
|
||||
!pc?"main control ":"worker ",
|
||||
l_stream_id, type_2_desc(l_type),
|
||||
flag_2_desc(l_flag), l_dlen, ilen);
|
||||
assert(ilen);
|
||||
if (ilen == 12)
|
||||
dump_tcp_mux_header(data, ilen);
|
||||
if (!tcp_mux_flag()) return;
|
||||
|
||||
if (!pc || (pc && !pc->local_proxy_bev)) {
|
||||
assert(ilen >= l_dlen);
|
||||
assert(l_dlen > 0);
|
||||
fn(data, l_dlen, pc);
|
||||
data += l_dlen;
|
||||
ilen -= l_dlen;
|
||||
l_dlen = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pc->stream_state != ESTABLISHED) {
|
||||
debug(LOG_INFO, "client [%d] state is [%d]", pc->stream_id, pc->stream_state);
|
||||
break;
|
||||
}
|
||||
struct tcp_mux_header tmux_hdr;
|
||||
memset(&tmux_hdr, 0, sizeof(tmux_hdr));
|
||||
tcp_mux_encode(GO_AWAY, 0, 0, reason, &tmux_hdr);
|
||||
//debug(LOG_DEBUG, "tcp mux send ping ack : %d", ping_id);
|
||||
bufferevent_write(bout, (uint8_t *)&tmux_hdr, sizeof(tmux_hdr));
|
||||
|
||||
if ( ilen >= l_dlen) {
|
||||
assert(pc->local_proxy_bev);
|
||||
bufferevent_write(pc->local_proxy_bev, data, l_dlen);
|
||||
data += l_dlen;
|
||||
ilen -= l_dlen;
|
||||
l_dlen = 0;
|
||||
} else {
|
||||
assert(pc->local_proxy_bev);
|
||||
bufferevent_write(pc->local_proxy_bev, data, ilen);
|
||||
l_dlen -= ilen;
|
||||
ilen = 0;
|
||||
}
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
struct proxy_client *pc = get_proxy_client(stream_id);
|
||||
debug(LOG_DEBUG, "[%s] receive tcp mux type [%s] flag [%s] stream_id [%d] dlen [%d] ilen [%d]",
|
||||
pc?"worker":"main control",
|
||||
type_2_desc(type), flag_2_desc(flag), stream_id, dlen, ilen);
|
||||
data += sizeof(struct tcp_mux_header);
|
||||
ilen -= sizeof(struct tcp_mux_header);
|
||||
l_stream_id = stream_id;
|
||||
l_type = type;
|
||||
l_flag = flag;
|
||||
l_dlen = type==PING?0:dlen;
|
||||
assert(ilen >= 0);
|
||||
|
||||
switch(type) {
|
||||
case DATA:
|
||||
{
|
||||
if (ilen == 0)
|
||||
break;
|
||||
|
||||
if (!pc || (pc && !pc->local_proxy_bev)) {
|
||||
assert(ilen >= dlen);
|
||||
fn(data, dlen, pc);
|
||||
data += dlen;
|
||||
ilen -= dlen;
|
||||
l_dlen = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pc->stream_state != ESTABLISHED) {
|
||||
debug(LOG_INFO, "client [%d] state is [%d]", pc->stream_id, pc->stream_state);
|
||||
break;
|
||||
}
|
||||
|
||||
if (ilen >= dlen){
|
||||
assert(pc->local_proxy_bev);
|
||||
bufferevent_write(pc->local_proxy_bev, data, dlen);
|
||||
data += dlen;
|
||||
ilen -= dlen;
|
||||
l_dlen = 0;
|
||||
} else {
|
||||
assert(pc->local_proxy_bev);
|
||||
bufferevent_write(pc->local_proxy_bev, data, ilen);
|
||||
l_dlen -= ilen;
|
||||
ilen = 0;
|
||||
}
|
||||
static int
|
||||
process_flags(uint16_t flags, struct tmux_stream *stream)
|
||||
{
|
||||
uint32_t close_stream = 0;
|
||||
if ( (flags&ACK) == ACK ) {
|
||||
if (stream->state == SYN_SEND)
|
||||
stream->state = ESTABLISHED;
|
||||
} else if ( (flags&FIN) == FIN ) {
|
||||
switch(stream->state) {
|
||||
case SYN_SEND:
|
||||
case SYN_RECEIVED:
|
||||
case ESTABLISHED:
|
||||
stream->state = REMOTE_CLOSE;
|
||||
break;
|
||||
}
|
||||
case PING:
|
||||
{
|
||||
struct bufferevent *bout = get_main_control()->connect_bev;
|
||||
uint32_t seq = dlen;
|
||||
assert(bout);
|
||||
if (flag == SYN)
|
||||
tcp_mux_handle_ping(bout, seq);
|
||||
case LOCAL_CLOSE:
|
||||
stream->state = CLOSED;
|
||||
close_stream = 1;
|
||||
break;
|
||||
}
|
||||
case WINDOW_UPDATE:
|
||||
{
|
||||
switch(flag) {
|
||||
case RST:
|
||||
case FIN:
|
||||
del_proxy_client(pc);
|
||||
break;
|
||||
case ZERO:
|
||||
case ACK:
|
||||
if (!pc)
|
||||
break;
|
||||
|
||||
if (dlen > 0) {
|
||||
pc->send_window += dlen;
|
||||
bufferevent_enable(pc->local_proxy_bev, EV_READ|EV_WRITE);
|
||||
}
|
||||
pc->stream_state = ESTABLISHED;
|
||||
break;
|
||||
default:
|
||||
debug(LOG_INFO, "window update no need process : flag %2x %s dlen %d stream_id %d",
|
||||
flag, flag_2_desc(flag), dlen, stream_id);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
default:
|
||||
debug(LOG_INFO, "no need unhandle tcp mux msg : type %s flag %s stream_id %d dlen %d ilen %d",
|
||||
type_2_desc(type), flag_2_desc(flag), stream_id, dlen, ilen);
|
||||
debug(LOG_ERR, "unexpected FIN flag in state %d", stream->state);
|
||||
assert(0);
|
||||
return 0;
|
||||
}
|
||||
} else if ( (flags&RST) == RST ) {
|
||||
stream->state = RESET;
|
||||
close_stream = 1;
|
||||
}
|
||||
|
||||
if (close_stream) {
|
||||
debug(LOG_DEBUG, "free stream %d", stream->id);
|
||||
del_proxy_client_by_stream_id(stream->id);
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static uint16_t
|
||||
get_send_flags(struct tmux_stream *stream)
|
||||
{
|
||||
uint16_t flags = 0;
|
||||
|
||||
switch (stream->state) {
|
||||
case INIT:
|
||||
flags |= SYN;
|
||||
stream->state = SYN_SEND;
|
||||
break;
|
||||
case SYN_RECEIVED:
|
||||
flags |= ACK;
|
||||
stream->state = ESTABLISHED;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return flags;
|
||||
}
|
||||
|
||||
void
|
||||
send_window_update(struct bufferevent *bout, struct tmux_stream *stream, uint32_t length)
|
||||
{
|
||||
uint32_t max = MAX_STREAM_WINDOW_SIZE;
|
||||
uint32_t delta = (max - length) - stream->recv_window;
|
||||
|
||||
uint16_t flags = get_send_flags(stream);
|
||||
|
||||
if (delta < max/2 && flags == 0)
|
||||
return;
|
||||
|
||||
stream->recv_window += delta;
|
||||
tcp_mux_send_win_update(bout, flags, stream->id, delta);
|
||||
//debug(LOG_DEBUG, "send window update: flags %d, stream_id %d delta %d, recv_window %u length %u",
|
||||
// flags, stream->id, delta, stream->recv_window, length);
|
||||
}
|
||||
|
||||
static int
|
||||
ring_buffer_pop(struct ring_buffer *ring, uint8_t *data, uint32_t len)
|
||||
{
|
||||
assert(ring->sz >= len);
|
||||
assert(data != NULL);
|
||||
|
||||
uint32_t i = 0;
|
||||
while(i < len) {
|
||||
data[i] = ring->data[ring->cur++];
|
||||
if (ring->cur == RBUF_SIZE)
|
||||
ring->cur = 0;
|
||||
i++;
|
||||
ring->sz--;
|
||||
}
|
||||
|
||||
assert(i == len);
|
||||
return len;
|
||||
}
|
||||
|
||||
static int
|
||||
process_data(struct tmux_stream *stream, uint32_t length, uint16_t flags,
|
||||
void (*fn)(uint8_t *, int, void *), void *param)
|
||||
{
|
||||
if (!process_flags(flags, stream)) return 0;
|
||||
|
||||
|
||||
if (length > stream->recv_window) {
|
||||
debug(LOG_ERR, "receive window exceed (remain %d, recv %d)", stream->recv_window, length);
|
||||
return 0;
|
||||
}
|
||||
|
||||
stream->recv_window -= length;
|
||||
|
||||
struct proxy_client *pc = (struct proxy_client *)param;
|
||||
if (!pc || (pc && !pc->local_proxy_bev)) {
|
||||
uint8_t *data = (uint8_t *)calloc(length, 1);
|
||||
ring_buffer_pop(&stream->rx_ring, data, length);
|
||||
fn(data, length, pc);
|
||||
free(data);
|
||||
} else {
|
||||
ring_buffer_write(pc->local_proxy_bev, &stream->rx_ring, length);
|
||||
}
|
||||
|
||||
struct bufferevent *bout = get_main_control()->connect_bev;
|
||||
send_window_update(bout, stream, length);
|
||||
|
||||
return length;
|
||||
}
|
||||
|
||||
static int
|
||||
incr_send_window(struct bufferevent *bev, struct tcp_mux_header *tmux_hdr, uint16_t flags, struct tmux_stream *stream)
|
||||
{
|
||||
if (!process_flags(flags, stream))
|
||||
return 0;
|
||||
|
||||
uint32_t length = ntohl(tmux_hdr->length);
|
||||
if (stream->send_window == 0) bufferevent_enable(bev, EV_READ);
|
||||
stream->send_window += length;
|
||||
//debug(LOG_DEBUG, "incr_send_window : stream_id %d length %d send_window %d",
|
||||
// stream->id, length, stream->send_window);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int
|
||||
incoming_stream(uint32_t stream_id)
|
||||
{
|
||||
if (local_go_away) {
|
||||
struct bufferevent *bout = get_main_control()->connect_bev;
|
||||
tcp_mux_send_win_update_rst(bout, stream_id);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// TODO
|
||||
// create new stream
|
||||
return 1;
|
||||
}
|
||||
|
||||
void
|
||||
handle_tcp_mux_ping(struct tcp_mux_header *tmux_hdr)
|
||||
{
|
||||
uint16_t flags = ntohs(tmux_hdr->flags);
|
||||
uint32_t ping_id = ntohl(tmux_hdr->length);
|
||||
|
||||
if ( (flags&SYN) == SYN) {
|
||||
struct bufferevent *bout = get_main_control()->connect_bev;
|
||||
tcp_mux_handle_ping(bout, ping_id);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
handle_tcp_mux_go_away(struct tcp_mux_header *tmux_hdr)
|
||||
{
|
||||
uint32_t code = ntohl(tmux_hdr->length);
|
||||
switch(code) {
|
||||
case NORMAL:
|
||||
remote_go_away = 1;
|
||||
break;
|
||||
case PROTO_ERR:
|
||||
debug(LOG_ERR, "receive protocol error go away");
|
||||
break;
|
||||
case INTERNAL_ERR:
|
||||
debug(LOG_ERR, "receive internal error go away");
|
||||
break;
|
||||
default:
|
||||
debug(LOG_ERR, "receive unexpected go away");
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t
|
||||
tmux_stream_read(struct bufferevent *bev, struct tmux_stream *stream, uint32_t len)
|
||||
{
|
||||
assert(stream != NULL);
|
||||
|
||||
return ring_buffer_read(bev, &stream->rx_ring, len);
|
||||
}
|
||||
|
||||
int
|
||||
handle_tcp_mux_stream(struct tcp_mux_header *tmux_hdr, handle_data_fn_t fn)
|
||||
{
|
||||
uint32_t stream_id = ntohl(tmux_hdr->stream_id);
|
||||
uint16_t flags = ntohs(tmux_hdr->flags);
|
||||
|
||||
//debug(LOG_DEBUG, "handle_tcp_mux_stream stream_id %d type %d flags %d", stream_id, tmux_hdr->type, flags);
|
||||
|
||||
if ( (flags&SYN) == SYN) {
|
||||
debug(LOG_INFO, "!!!! as xfrpc, it should not be here %d", stream_id);
|
||||
if (!incoming_stream(stream_id))
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct tmux_stream *stream = get_stream_by_id(stream_id);
|
||||
struct proxy_client *pc = get_proxy_client(stream_id);
|
||||
assert(stream != NULL);
|
||||
if (tmux_hdr->type == WINDOW_UPDATE) {
|
||||
struct bufferevent *bev = pc?pc->local_proxy_bev: get_main_control()->connect_bev;
|
||||
if (!incr_send_window(bev, tmux_hdr, flags, stream)) {
|
||||
struct bufferevent *bout = get_main_control()->connect_bev;
|
||||
tcp_mux_send_go_away(bout, PROTO_ERR);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t length = ntohl(tmux_hdr->length);
|
||||
if (!process_data(stream, length, flags, fn, (void *)pc)) {
|
||||
struct bufferevent *bout = get_main_control()->connect_bev;
|
||||
tcp_mux_send_go_away(bout, PROTO_ERR);
|
||||
return 0;
|
||||
}
|
||||
|
||||
return length;
|
||||
}
|
||||
|
||||
static int
|
||||
ring_buffer_append(struct ring_buffer *ring, uint8_t *data, uint32_t len)
|
||||
{
|
||||
uint32_t left = RBUF_SIZE - ring->sz;
|
||||
assert(left >= len);
|
||||
int i = 0;
|
||||
for (; i < len; i++) {
|
||||
ring->data[ring->end++] = data[i];
|
||||
if (ring->end == RBUF_SIZE) ring->end = 0;
|
||||
ring->sz++;
|
||||
if (ring->cur == ring->end) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return i;
|
||||
}
|
||||
|
||||
static uint32_t
|
||||
ring_buffer_read(struct bufferevent *bev, struct ring_buffer *ring, uint32_t len)
|
||||
{
|
||||
if (ring->sz == RBUF_SIZE) {
|
||||
debug(LOG_ERR, "ring buffer is full");
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint32_t cap = RBUF_SIZE - ring->sz;
|
||||
if (len > cap) {
|
||||
debug(LOG_INFO, "prepare read data [%d] out size ring capacity [%d]", len, cap);
|
||||
len = cap;
|
||||
}
|
||||
|
||||
for (int i = 0; i < len; i++) {
|
||||
bufferevent_read(bev, &ring->data[ring->end++], 1);
|
||||
if (ring->end == RBUF_SIZE) ring->end = 0;
|
||||
ring->sz++;
|
||||
if (ring->cur == ring->end) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return len;
|
||||
|
||||
}
|
||||
|
||||
static uint32_t
|
||||
ring_buffer_write(struct bufferevent *bev, struct ring_buffer *ring, uint32_t len)
|
||||
{
|
||||
if (ring->sz == 0) {
|
||||
debug(LOG_ERR, "ring buffer is empty");
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (len > ring->sz) {
|
||||
debug(LOG_INFO, "prepare write data [%d] out size ring data [%d]", len, ring->sz);
|
||||
len = ring->sz;
|
||||
}
|
||||
|
||||
while(len > 0) {
|
||||
bufferevent_write(bev, &ring->data[ring->cur++], 1);
|
||||
len--;
|
||||
ring->sz--;;
|
||||
if (ring->cur == RBUF_SIZE) ring->cur = 0;
|
||||
if (ring->cur == ring->end) {
|
||||
assert(ring->sz == 0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
uint32_t
|
||||
tmux_stream_write(struct bufferevent *bev, uint8_t *data, uint32_t length, struct tmux_stream *stream)
|
||||
{
|
||||
switch(stream->state) {
|
||||
case LOCAL_CLOSE:
|
||||
case CLOSED:
|
||||
case RESET:
|
||||
debug(LOG_INFO, "stream %d state is closed", stream->id);
|
||||
return 0;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
struct ring_buffer *tx_ring = &stream->tx_ring;
|
||||
uint32_t left = RBUF_SIZE - tx_ring->sz;
|
||||
if (stream->send_window == 0) {
|
||||
debug(LOG_INFO, "stream %d send_window is zero, length %d left %d", stream->id, length, left);
|
||||
ring_buffer_append(tx_ring, data, length);
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint16_t flags = get_send_flags(stream);
|
||||
uint32_t max = length;
|
||||
struct bufferevent *bout = get_main_control()->connect_bev;
|
||||
//debug(LOG_DEBUG, "tmux_stream_write stream id %u: send_window %u tx_ring sz %u length %u",
|
||||
// stream->id, stream->send_window, tx_ring->sz, length);
|
||||
if (stream->send_window < tx_ring->sz) {
|
||||
debug(LOG_INFO, " send_window %u less than tx_ring size %u", stream->send_window, tx_ring->sz);
|
||||
max = stream->send_window;
|
||||
tcp_mux_send_data(bout, flags, stream->id, max);
|
||||
ring_buffer_write(bev, tx_ring, max);
|
||||
ring_buffer_append(tx_ring, data, length);
|
||||
} else if (stream->send_window < tx_ring->sz + length) {
|
||||
debug(LOG_INFO, " send_window %u less than %u", stream->send_window, tx_ring->sz+length);
|
||||
max = stream->send_window;
|
||||
tcp_mux_send_data(bout, flags, stream->id, max);
|
||||
if (tx_ring->sz > 0)
|
||||
ring_buffer_write(bev, tx_ring, tx_ring->sz);
|
||||
bufferevent_write(bev, data, max - tx_ring->sz);
|
||||
ring_buffer_append(tx_ring, data + max - tx_ring->sz, length + tx_ring->sz - max);
|
||||
} else {
|
||||
max = tx_ring->sz + length;
|
||||
tcp_mux_send_data(bout, flags, stream->id, max);
|
||||
if (tx_ring->sz > 0)
|
||||
ring_buffer_write(bev, tx_ring, tx_ring->sz);
|
||||
bufferevent_write(bev, data, length);
|
||||
}
|
||||
|
||||
stream->send_window -= max;
|
||||
|
||||
return max;
|
||||
}
|
||||
|
||||
void
|
||||
tmux_stream_close(struct bufferevent *bout, struct tmux_stream *stream)
|
||||
{
|
||||
int closed = 0;
|
||||
switch(stream->state) {
|
||||
case SYN_SEND:
|
||||
case SYN_RECEIVED:
|
||||
case ESTABLISHED:
|
||||
stream->state = LOCAL_CLOSE;
|
||||
break;
|
||||
case LOCAL_CLOSE:
|
||||
case REMOTE_CLOSE:
|
||||
stream->state = CLOSED;
|
||||
closed = 1;
|
||||
case CLOSED:
|
||||
case RESET:
|
||||
default:
|
||||
return;
|
||||
}
|
||||
|
||||
uint16_t flags = get_send_flags(stream);
|
||||
flags |= FIN;
|
||||
tcp_mux_send_win_update(bout, flags, stream->id, 0);
|
||||
if (closed) {
|
||||
debug(LOG_DEBUG, "del proxy client %d", stream->id);
|
||||
del_proxy_client_by_stream_id(stream->id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
67
tcpmux.h
67
tcpmux.h
@@ -29,6 +29,22 @@
|
||||
|
||||
#include "uthash.h"
|
||||
|
||||
#define MAX_STREAM_WINDOW_SIZE 256*1024
|
||||
#define RBUF_SIZE 32*1024
|
||||
|
||||
struct ring_buffer {
|
||||
uint32_t cur;
|
||||
uint32_t end;
|
||||
uint32_t sz;
|
||||
uint8_t data[RBUF_SIZE];
|
||||
};
|
||||
|
||||
enum go_away_type {
|
||||
NORMAL,
|
||||
PROTO_ERR,
|
||||
INTERNAL_ERR,
|
||||
};
|
||||
|
||||
enum tcp_mux_type {
|
||||
DATA,
|
||||
WINDOW_UPDATE,
|
||||
@@ -73,20 +89,65 @@ enum tcp_mux_state {
|
||||
RESET
|
||||
};
|
||||
|
||||
struct tmux_stream {
|
||||
uint32_t id;
|
||||
uint32_t recv_window;
|
||||
uint32_t send_window;
|
||||
enum tcp_mux_state state;
|
||||
struct ring_buffer tx_ring;
|
||||
struct ring_buffer rx_ring;
|
||||
|
||||
// private arguments
|
||||
UT_hash_handle hh;
|
||||
};
|
||||
|
||||
typedef void (*handle_data_fn_t)(uint8_t *, int, void *);
|
||||
|
||||
void init_tmux_stream(struct tmux_stream *stream, uint32_t id, enum tcp_mux_state state);
|
||||
|
||||
int validate_tcp_mux_protocol(struct tcp_mux_header *tmux_hdr);
|
||||
|
||||
void send_window_update(struct bufferevent *bout, struct tmux_stream *stream, uint32_t length);
|
||||
|
||||
void tcp_mux_send_win_update_syn(struct bufferevent *bout, uint32_t stream_id);
|
||||
|
||||
void tcp_mux_send_win_update_ack(struct bufferevent *bout, uint32_t stream_id, uint32_t delta);
|
||||
|
||||
void tcp_mux_send_win_update_fin(struct bufferevent *bout, uint32_t stream_id);
|
||||
|
||||
void tcp_mux_send_data(struct bufferevent *bout, uint32_t stream_id, uint32_t length);
|
||||
void tcp_mux_send_win_update_rst(struct bufferevent *bout, uint32_t stream_id);
|
||||
|
||||
void tcp_mux_send_data(struct bufferevent *bout, uint16_t flags, uint32_t stream_id, uint32_t length);
|
||||
|
||||
void tcp_mux_send_ping(struct bufferevent *bout, uint32_t ping_id);
|
||||
|
||||
uint32_t get_next_session_id();
|
||||
|
||||
void tcp_mux_encode(enum tcp_mux_type type, enum tcp_mux_flag flags, uint32_t stream_id, uint32_t length, struct tcp_mux_header *tmux_hdr);
|
||||
void tcp_mux_encode(enum tcp_mux_type type, enum tcp_mux_flag flags,
|
||||
uint32_t stream_id, uint32_t length, struct tcp_mux_header *tmux_hdr);
|
||||
|
||||
void handle_tcp_mux_frps_msg(uint8_t *data, int len, void (*fn)(uint8_t *, int, void *));
|
||||
int handle_tcp_mux_stream(struct tcp_mux_header *tmux_hdr, handle_data_fn_t fn);
|
||||
|
||||
void handle_tcp_mux_ping(struct tcp_mux_header *tmux_hdr);
|
||||
|
||||
void handle_tcp_mux_go_away(struct tcp_mux_header *tmux_hdr);
|
||||
|
||||
uint32_t tmux_stream_write(struct bufferevent *bev, uint8_t *data, uint32_t length, struct tmux_stream *stream);
|
||||
|
||||
uint32_t tmux_stream_read(struct bufferevent *bev, struct tmux_stream *stream, uint32_t len);
|
||||
|
||||
void reset_session_id();
|
||||
|
||||
struct tmux_stream *get_cur_stream();
|
||||
|
||||
void set_cur_stream(struct tmux_stream *stream);
|
||||
|
||||
void add_stream(struct tmux_stream *stream);
|
||||
|
||||
void del_stream(uint32_t stream_id);
|
||||
|
||||
struct tmux_stream* get_stream_by_id(uint32_t id);
|
||||
|
||||
void tmux_stream_close(struct bufferevent *bout, struct tmux_stream *stream);
|
||||
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user