9 Commits

Author SHA1 Message Date
staylightblow8
d9ff2e1a62 release version 1.11.587 2022-11-16 10:22:03 +08:00
Dengfeng Liu
3396100bc4 clean: clean unused function
Signed-off-by: Dengfeng Liu <liudf0716@gmail.com>
2022-11-14 08:42:17 +00:00
DengfengLiu
c53693be69 refactor: refactor tcp mux
Signed-off-by: DengfengLiu <liu_df@qq.com>
2022-11-13 11:09:36 +00:00
staylightblow8
855f602e2f support openssl > 3.0 2022-10-20 10:45:37 +08:00
staylightblow8
fd9a5ae249 Update README.md 2022-10-18 11:37:02 +08:00
Dengfeng Liu
61d5243678 clean code of config
Signed-off-by: Dengfeng Liu <liudf0716@gmail.com>
2022-07-25 14:37:16 +08:00
staylightblow8
e75a4dda25 update version 2022-07-24 21:20:01 +08:00
staylightblow8
65bfe5c03c Update README.md 2022-07-12 10:06:53 +08:00
Dengfeng Liu
b7c3e1f80b fix: check whether tcp mux header or not mor strictly
Signed-off-by: Dengfeng Liu <liudf0716@gmail.com>
2022-07-11 16:10:08 +08:00
16 changed files with 771 additions and 501 deletions

View File

@@ -30,9 +30,10 @@ the following table is detail compatible feature:
## Architecture
![Architecture](https://github.com/fatedier/frp/blob/dev/doc/pic/architecture.png?raw=true)
Architecture quote from [frp](https://github.com/fatedier/frp) project, replace frpc with xfrpc.
![Architecture](https://user-images.githubusercontent.com/1182593/196329678-1781b4e9-2355-4863-be3f-e128b31cc82c.png)
## Sequence Diagram
@@ -184,10 +185,12 @@ xfrpc -c frpc_mini.ini -f -d 7
xfrpc -c frpc_mini.ini -d 0
```
## Openwrt UI
## Openwrt luci configure ui
If running xfrpc in openwrt box, [luci-app-xfrpc](https://github.com/liudf0716/luci-app-xfrpc) is a good choice
luci-app-xfrpc was recruited by [luci project](https://github.com/openwrt/luci)
## How to contribute our project
See [CONTRIBUTING](https://github.com/liudf0716/xfrpc/blob/master/CONTRIBUTING.md) for details on submitting patches and the contribution workflow.

View File

@@ -70,11 +70,10 @@ xfrp_proxy_event_cb(struct bufferevent *bev, short what, void *ctx)
debug(LOG_DEBUG, "xfrpc proxy close connect server [%s:%d] stream_id %d: %s",
client->ps->local_ip, client->ps->local_port,
client->stream_id, strerror(errno));
tcp_mux_send_win_update_fin(client->ctl_bev, client->stream_id);
client->stream_state = LOCAL_CLOSE;
tmux_stream_close(client->ctl_bev, &client->stream);
} else if (what & BEV_EVENT_CONNECTED) {
debug(LOG_DEBUG, "client [%d] connected", client->stream_id);
//client->stream_state = ESTABLISHED;
//client->stream.state = ESTABLISHED;
if (client->data_tail_size > 0) {
debug(LOG_DEBUG, "send client data ...");
send_client_data_tail(client);
@@ -178,6 +177,7 @@ send_client_data_tail(struct proxy_client *client)
static void
free_proxy_client(struct proxy_client *client)
{
debug(LOG_DEBUG, "free client %d", client->stream_id);
if (client->local_proxy_bev) bufferevent_free(client->local_proxy_bev);
free(client);
}
@@ -195,6 +195,15 @@ del_proxy_client(struct proxy_client *client)
free_proxy_client(client);
}
void
del_proxy_client_by_stream_id(uint32_t sid)
{
del_stream(sid);
struct proxy_client *pc = get_proxy_client(sid);
del_proxy_client(pc);
}
struct proxy_client *
get_proxy_client(uint32_t sid)
{
@@ -209,8 +218,7 @@ new_proxy_client()
struct proxy_client *client = calloc(1, sizeof(struct proxy_client));
assert(client);
client->stream_id = get_next_session_id();
client->send_window = 200*1024;
client->stream_state = INIT;
init_tmux_stream(&client->stream, client->stream_id, INIT);
HASH_ADD_INT(all_pc, stream_id, client);
return client;

View File

@@ -44,10 +44,9 @@ struct proxy_client {
struct bufferevent *ctl_bev; // xfrpc proxy <---> frps
struct bufferevent *local_proxy_bev; // xfrpc proxy <---> local service
struct base_conf *bconf;
struct tmux_stream stream;
uint32_t stream_id;
uint32_t send_window;
enum tcp_mux_state stream_state;
int connected;
int work_started;
struct proxy_service *ps;
@@ -91,6 +90,8 @@ void start_xfrp_tunnel(struct proxy_client *client);
void del_proxy_client(struct proxy_client *client);
void del_proxy_client_by_stream_id(uint32_t sid);
struct proxy_client *get_proxy_client(uint32_t sid);
int send_client_data_tail(struct proxy_client *client);

View File

@@ -282,11 +282,11 @@ function(from_hex HEX DEC)
set(${DEC} ${_res} PARENT_SCOPE)
endfunction()
if (OPENSSL_INCLUDE_DIR)
if(OPENSSL_INCLUDE_DIR AND EXISTS "${OPENSSL_INCLUDE_DIR}/openssl/opensslv.h")
file(STRINGS "${OPENSSL_INCLUDE_DIR}/openssl/opensslv.h" openssl_version_str
REGEX "^# *define[\t ]+OPENSSL_VERSION_NUMBER[\t ]+0x([0-9a-fA-F])+.*")
if(OPENSSL_INCLUDE_DIR AND EXISTS "${OPENSSL_INCLUDE_DIR}/openssl/opensslv.h")
file(STRINGS "${OPENSSL_INCLUDE_DIR}/openssl/opensslv.h" openssl_version_str
REGEX "^#[\t ]*define[\t ]+OPENSSL_VERSION_NUMBER[\t ]+0x([0-9a-fA-F])+.*")
if(openssl_version_str)
# The version number is encoded as 0xMNNFFPPS: major minor fix patch status
# The status gives if this is a developer or prerelease and is ignored here.
# Major, minor, and fix directly translate into the version numbers shown in
@@ -315,6 +315,25 @@ if (OPENSSL_INCLUDE_DIR)
endif ()
set(OPENSSL_VERSION "${OPENSSL_VERSION_MAJOR}.${OPENSSL_VERSION_MINOR}.${OPENSSL_VERSION_FIX}${OPENSSL_VERSION_PATCH_STRING}")
else ()
# Since OpenSSL 3.0.0, the new version format is MAJOR.MINOR.PATCH and
# a new OPENSSL_VERSION_STR macro contains exactly that
file(STRINGS "${OPENSSL_INCLUDE_DIR}/openssl/opensslv.h" OPENSSL_VERSION_STR
REGEX "^#[\t ]*define[\t ]+OPENSSL_VERSION_STR[\t ]+\"([0-9])+\\.([0-9])+\\.([0-9])+\".*")
string(REGEX REPLACE "^.*OPENSSL_VERSION_STR[\t ]+\"([0-9]+\\.[0-9]+\\.[0-9]+)\".*$"
"\\1" OPENSSL_VERSION_STR "${OPENSSL_VERSION_STR}")
set(OPENSSL_VERSION "${OPENSSL_VERSION_STR}")
# Setting OPENSSL_VERSION_MAJOR OPENSSL_VERSION_MINOR and OPENSSL_VERSION_FIX
string(REGEX MATCHALL "([0-9])+" OPENSSL_VERSION_NUMBER "${OPENSSL_VERSION}")
list(POP_FRONT OPENSSL_VERSION_NUMBER
OPENSSL_VERSION_MAJOR
OPENSSL_VERSION_MINOR
OPENSSL_VERSION_FIX)
unset(OPENSSL_VERSION_NUMBER)
unset(OPENSSL_VERSION_STR)
endif ()
endif ()

View File

@@ -56,33 +56,9 @@ void free_common_config()
struct common_conf *c_conf = get_common_config();
if (c_conf->server_addr) free(c_conf->server_addr);
if (c_conf->http_proxy) free(c_conf->http_proxy);
if (c_conf->log_file) free(c_conf->log_file);
if (c_conf->log_way) free(c_conf->log_way);
if (c_conf->log_level) free(c_conf->log_level);
if (c_conf->auth_token) free(c_conf->auth_token);
if (c_conf->privilege_token) free(c_conf->privilege_token);
SAFE_FREE(c_conf->server_ip);
};
void set_common_server_ip(const char *ip)
{
struct common_conf *c_conf = get_common_config();
c_conf->server_ip = strdup(ip);
assert(c_conf->server_ip);
debug(LOG_DEBUG, "server IP address: [%s]", c_conf->server_ip);
}
void free_base_config(struct base_conf *bconf)
{
if (bconf->name) free(bconf->name);
if (bconf->auth_token) free(bconf->auth_token);
if (bconf->privilege_token) free(bconf->privilege_token);
if (bconf->host_header_rewrite) free(bconf->host_header_rewrite);
if (bconf->subdomain) free(bconf->subdomain);
}
static int is_true(const char *val)
{
if (val && (strcmp(val, "true") == 0 || strcmp(val, "1") == 0))
@@ -114,8 +90,9 @@ static void dump_common_conf()
return;
}
debug(LOG_DEBUG, "Section[common]: {server_addr:%s, server_port:%d, auth_token:%s, privilege_token:%s, interval:%d, timeout:%d}",
c_conf->server_addr, c_conf->server_port, c_conf->auth_token, c_conf->privilege_token, c_conf->heartbeat_interval, c_conf->heartbeat_timeout);
debug(LOG_DEBUG, "Section[common]: {server_addr:%s, server_port:%d, auth_token:%s, interval:%d, timeout:%d}",
c_conf->server_addr, c_conf->server_port, c_conf->auth_token,
c_conf->heartbeat_interval, c_conf->heartbeat_timeout);
}
static void dump_proxy_service(const int index, struct proxy_service *ps)
@@ -262,22 +239,17 @@ proxy_service_handler(void *user, const char *sect, const char *nm, const char *
ps->remote_data_port = atoi(value);
} else if (MATCH_NAME("http_user")) {
ps->http_user = strdup(value);
assert(ps->http_user);
} else if (MATCH_NAME("http_pwd")) {
ps->http_pwd = strdup(value);
assert(ps->http_pwd);
} else if (MATCH_NAME("subdomain")) {
ps->subdomain = strdup(value);
assert(ps->http_pwd);
} else if (MATCH_NAME("custom_domains")) {
ps->custom_domains = strdup(value);
assert(ps->custom_domains);
} else if (MATCH_NAME("locations")) {
ps->locations = strdup(value);
assert(ps->locations);
} else if (MATCH_NAME("host_header_rewrite")) {
ps->host_header_rewrite = strdup(value);
assert(ps->host_header_rewrite);
} else if (MATCH_NAME("use_encryption")) {
ps->use_encryption = TO_BOOL(value);
} else if (MATCH_NAME("use_compression")) {
@@ -295,39 +267,10 @@ static int common_handler(void *user, const char *section, const char *name, con
#define MATCH(s, n) strcmp(section, s) == 0 && strcmp(name, n) == 0
if (MATCH("common", "server_addr")) {
SAFE_FREE(config->server_addr);
int addr_len = strlen(value) + 1;
config->server_addr = (char *)calloc(1, addr_len);
config->server_addr = strdup(value);
assert(config->server_addr);
if(dns_unified(value, config->server_addr, addr_len)) {
debug(LOG_ERR, "error: server_addr [%s] is invalid!", value);
exit(0);
}
if (is_valid_ip_address(value))
set_common_server_ip(value);
} else if (MATCH("common", "server_port")) {
config->server_port = atoi(value);
} else if (MATCH("common", "http_proxy")) {
SAFE_FREE(config->http_proxy);
config->http_proxy = strdup(value);
assert(config->http_proxy);
} else if (MATCH("common", "log_file")) {
SAFE_FREE(config->log_file);
config->log_file = strdup(value);
assert(config->log_file);
} else if (MATCH("common", "log_way")) {
SAFE_FREE(config->log_way);
config->log_way = strdup(value);
assert(config->log_way);
} else if (MATCH("common", "log_level")) {
SAFE_FREE(config->log_level);
config->log_level = strdup(value);
assert(config->log_level);
} else if (MATCH("common", "log_max_days")) {
config->log_max_days = atoi(value);
} else if (MATCH("common", "privilege_token")) {
SAFE_FREE(config->privilege_token);
config->privilege_token = strdup(value);
assert(config->privilege_token);
} else if (MATCH("common", "heartbeat_interval")) {
config->heartbeat_interval = atoi(value);
} else if (MATCH("common", "heartbeat_timeout")) {
@@ -336,10 +279,6 @@ static int common_handler(void *user, const char *section, const char *name, con
SAFE_FREE(config->auth_token);
config->auth_token = strdup(value);
assert(config->auth_token);
} else if (MATCH("common", "user")) {
SAFE_FREE(config->user);
config->user = strdup(value);
assert(config->user);
} else if (MATCH("common", "tcp_mux")) {
config->tcp_mux = atoi(value);
config->tcp_mux = !!config->tcp_mux;
@@ -355,18 +294,9 @@ static void init_common_conf(struct common_conf *config)
config->server_addr = strdup("0.0.0.0");
assert(config->server_addr);
config->server_port = 7000;
config->log_file = strdup("console");
assert(config->log_file);
config->log_way = strdup("console");
assert(config->log_way);
config->log_level = strdup("info");
assert(config->log_level);
config->log_max_days = 3;
config->heartbeat_interval = 30;
config->heartbeat_timeout = 90;
config->tcp_mux = 1;
config->user = NULL;
config->server_ip = NULL;
config->is_router = 0;
}

View File

@@ -31,34 +31,14 @@
#define FTP_RMT_CTL_PROXY_SUFFIX "_ftp_remote_ctl_proxy"
struct base_conf{
char *name;
char *auth_token;
int use_encryption;
int use_gzip;
int privilege_mode;
char *privilege_token;
int pool_count;
char *host_header_rewrite;
char *subdomain;
};
// common config
//client common config
struct common_conf {
char *server_addr; /* default 0.0.0.0 */
char *server_ip;
int server_port; /* default 7000 */
char *http_proxy;
char *log_file; /* default consol */
char *log_way; /* default console */
char *log_level; /* default info */
int log_max_days; /* default 3 */
char *privilege_token;
char *auth_token;
int heartbeat_interval; /* default 10 */
int heartbeat_timeout; /* default 30 */
int tcp_mux; /* default 0 */
char *user;
/* private fields */
int is_router; // to sign router (Openwrt/LEDE) or not
@@ -68,14 +48,10 @@ struct common_conf *get_common_config();
void free_common_config();
void free_base_config(struct base_conf *bconf);
void load_config(const char *confile);
char *get_ftp_data_proxy_name(const char *ftp_proxy_name);
void set_common_server_ip(const char *ip);
int is_running_in_router();
struct proxy_service *get_proxy_service(const char *proxy_name);

265
control.c
View File

@@ -52,11 +52,11 @@
#include "tcpmux.h"
static struct control *main_ctl;
static int clients_conn_signel = 0;
static int client_connected = 0;
static int is_login = 0;
static time_t pong_time = 0;
static void sync_new_work_connection(struct bufferevent *bev, uint32_t sid);
static void new_work_connection(struct bufferevent *bev, struct tmux_stream *stream);
static void recv_cb(struct bufferevent *bev, void *ctx);
static void clear_main_control();
static void start_base_connect();
@@ -65,25 +65,25 @@ static void keep_control_alive();
static int
is_client_connected()
{
return clients_conn_signel;
return client_connected;
}
static int
client_connected(int is_connected)
set_client_status(int is_connected)
{
if (is_connected)
clients_conn_signel = 1;
client_connected = 1;
else
clients_conn_signel = 0;
client_connected = 0;
return clients_conn_signel;
return client_connected;
}
static int
set_client_work_start(struct proxy_client *client, int is_start_work)
{
assert(client->ps);
if (is_start_work) {
assert(client->ps);
client->work_started = 1;
}else
client->work_started = 0;
@@ -110,8 +110,8 @@ client_start_event_cb(struct bufferevent *bev, short what, void *ctx)
} else if (what & BEV_EVENT_CONNECTED) {
bufferevent_setcb(bev, recv_cb, NULL, client_start_event_cb, client);
bufferevent_enable(bev, EV_READ|EV_WRITE);
sync_new_work_connection(bev, 0);
client_connected(1);
new_work_connection(bev, &main_ctl->stream);
set_client_status(1);
debug(LOG_INFO, "proxy service start");
}
}
@@ -127,7 +127,8 @@ new_client_connect()
if (c_conf->tcp_mux) {
debug(LOG_DEBUG, "new client through tcp mux: %d", client->stream_id);
client->ctl_bev = main_ctl->connect_bev;
sync_new_work_connection(client->ctl_bev, client->stream_id);
send_window_update(client->ctl_bev, &client->stream, 0);
new_work_connection(client->ctl_bev, &client->stream);
return;
}
@@ -177,11 +178,11 @@ ping()
}
char *ping_msg = "{}";
send_enc_msg_frp_server(bout, TypePing, ping_msg, strlen(ping_msg), main_ctl->stream_id);
send_enc_msg_frp_server(bout, TypePing, ping_msg, strlen(ping_msg), &main_ctl->stream);
}
static void
sync_new_work_connection(struct bufferevent *bev, uint32_t sid)
new_work_connection(struct bufferevent *bev, struct tmux_stream *stream)
{
assert(bev);
@@ -199,10 +200,8 @@ sync_new_work_connection(struct bufferevent *bev, uint32_t sid)
debug(LOG_ERR, "new work connection request run_id marshal failed!");
return;
}
tcp_mux_send_win_update_syn(bev, sid);
send_msg_frp_server(bev, TypeNewWorkConn, new_work_conn_request_message, nret, sid);
send_msg_frp_server(bev, TypeNewWorkConn, new_work_conn_request_message, nret, stream);
SAFE_FREE(new_work_conn_request_message);
SAFE_FREE(work_c);
@@ -237,7 +236,7 @@ static void
hb_sender_cb(evutil_socket_t fd, short event, void *arg)
{
if (is_client_connected()) {
debug(LOG_DEBUG, "ping frps");
//debug(LOG_DEBUG, "ping frps");
ping(NULL);
}
@@ -312,7 +311,6 @@ handle_enc_msg(const uint8_t *enc_msg, int ilen, uint8_t **out)
init_main_decoder(buf);
buf += get_block_size();
ilen -= get_block_size();
debug(LOG_DEBUG, "first recv stream message, init decoder iv succeed! %d", ilen);
if (!ilen) {
// recv only iv
debug(LOG_DEBUG, "recv eas1238 iv data");
@@ -323,7 +321,6 @@ handle_enc_msg(const uint8_t *enc_msg, int ilen, uint8_t **out)
uint8_t *dec_msg = NULL;
size_t len = decrypt_data(buf, ilen, get_main_decoder(), &dec_msg);
*out = dec_msg;
//debug(LOG_DEBUG, "dec out len %d ", len);
return len;
}
@@ -336,10 +333,10 @@ handle_control_work(const uint8_t *buf, int len, void *ctx)
const uint8_t *enc_msg = buf;
if (!ctx) {
debug(LOG_DEBUG, "main control message");
//debug(LOG_DEBUG, "main control message");
handle_enc_msg(enc_msg, len, &frps_cmd);
} else {
debug(LOG_DEBUG, "worker message");
//debug(LOG_DEBUG, "worker message");
frps_cmd = (uint8_t *)buf;
}
@@ -349,18 +346,18 @@ handle_control_work(const uint8_t *buf, int len, void *ctx)
struct msg_hdr *msg = (struct msg_hdr *)frps_cmd;
cmd_type = msg->type;
debug(LOG_DEBUG, "cmd_type is %c data is %s", cmd_type, msg->data);
switch(cmd_type) {
case TypeReqWorkConn:
//debug(LOG_DEBUG, "TypeReqWorkConn cmd");
{
if (! is_client_connected()) {
start_proxy_services();
client_connected(1);
set_client_status(1);
}
new_client_connect();
break;
}
case TypeNewProxyResp:
debug(LOG_DEBUG, "TypeNewProxyResp cmd ");
{
struct new_proxy_response *npr = new_proxy_resp_unmarshal((const char *)msg->data);
if (npr == NULL) {
debug(LOG_ERR, "new proxy response buffer unmarshal faild!");
@@ -370,8 +367,9 @@ handle_control_work(const uint8_t *buf, int len, void *ctx)
proxy_service_resp_raw(npr);
SAFE_FREE(npr);
break;
}
case TypeStartWorkConn:
debug(LOG_DEBUG, "TypeStartWorkConn cmd");
{
struct start_work_conn_resp *sr = start_work_conn_resp_unmarshal((const char *)msg->data);
if (! sr) {
debug(LOG_ERR,
@@ -406,8 +404,8 @@ handle_control_work(const uint8_t *buf, int len, void *ctx)
set_client_work_start(client, 1);
break;
}
case TypePong:
//debug(LOG_DEBUG, "receive pong from frps");
pong_time = time(NULL);
break;
default:
@@ -457,7 +455,7 @@ handle_login_response(const uint8_t *buf, int len)
assert(nret > 0);
// start proxy services must first send
start_proxy_services();
client_connected(1);
set_client_status(1);
debug(LOG_DEBUG, "TypeReqWorkConn cmd, msg :%s", &frps_cmd[8]);
assert (frps_cmd[0] == TypeReqWorkConn);
new_client_connect();
@@ -476,6 +474,8 @@ handle_frps_msg(uint8_t *buf, int len, void *ctx)
}
}
static struct tmux_stream abandon_stream;
// ctx: if recv_cb was called by common control, ctx == NULL
// else ctx == client struct
static void
@@ -484,22 +484,109 @@ recv_cb(struct bufferevent *bev, void *ctx)
struct evbuffer *input = bufferevent_get_input(bev);
int len = evbuffer_get_length(input);
if (len <= 0) {
return;
return;
}
uint8_t *buf = calloc(len+1, 1);
assert(buf);
evbuffer_remove(input, buf, len);
struct common_conf *c_conf = get_common_config();
struct common_conf *c_conf = get_common_config();
if (c_conf->tcp_mux) {
handle_tcp_mux_frps_msg(buf, len, handle_frps_msg);
} else {
handle_frps_msg(buf, len, ctx);
}
static struct tcp_mux_header tmux_hdr;
static uint32_t stream_len = 0;
while (len > 0) {
struct tmux_stream *cur = get_cur_stream();
size_t nr = 0;
if (!cur) {
memset(&tmux_hdr, 0, sizeof(tmux_hdr));
uint8_t *data = (uint8_t *)&tmux_hdr;
if (len < sizeof(tmux_hdr)) {
debug(LOG_INFO, "len [%d] < sizeof tmux_hdr", len);
break;
}
nr = bufferevent_read(bev, data, sizeof(tmux_hdr));
assert(nr == sizeof(tmux_hdr));
assert(validate_tcp_mux_protocol(&tmux_hdr) > 0);
len -= nr;
if (tmux_hdr.type == DATA) {
uint32_t stream_id = ntohl(tmux_hdr.stream_id);
stream_len = ntohl(tmux_hdr.length);
cur = get_stream_by_id(stream_id);
if (!cur) {
debug(LOG_INFO, "cur is NULL stream_id is %d, stream_len is %d len is %d",
stream_id, stream_len, len);
if (stream_len > 0)
cur = &abandon_stream;
else
continue;
}
if (len == 0) {
set_cur_stream(cur);
break;
}
if (len >= stream_len) {
nr = tmux_stream_read(bev, cur, stream_len);
assert(nr == stream_len);
len -= stream_len;
} else {
nr = tmux_stream_read(bev, cur, len);
stream_len -= len;
assert(nr == len);
set_cur_stream(cur);
len -= nr;
break;
}
}
} else {
assert(tmux_hdr.type == DATA);
if (len >= stream_len ) {
nr = tmux_stream_read(bev, cur, stream_len);
assert(nr == stream_len);
len -= stream_len;
} else {
nr = tmux_stream_read(bev, cur, len);
stream_len -= len;
assert(nr == len);
len -= nr;
break;
}
}
if (cur == &abandon_stream) {
debug(LOG_INFO, "abandon stream data ...");
memset(cur , 0, sizeof(abandon_stream));
set_cur_stream(NULL);
continue;
}
switch(tmux_hdr.type) {
case DATA:
case WINDOW_UPDATE:
{
handle_tcp_mux_stream(&tmux_hdr, handle_frps_msg);
break;
}
case PING:
handle_tcp_mux_ping(&tmux_hdr);
break;
case GO_AWAY:
handle_tcp_mux_go_away(&tmux_hdr);
break;
default:
debug(LOG_ERR, "impossible here!!!!");
exit(-1);
}
set_cur_stream(NULL);
}
} else {
uint8_t *buf = calloc(len, 1);
assert(buf);
evbuffer_remove(input, buf, len);
handle_frps_msg(buf, len, ctx);
SAFE_FREE(buf);
}
SAFE_FREE(buf);
return;
}
@@ -520,11 +607,12 @@ connect_event_cb (struct bufferevent *bev, short what, void *ctx)
c_conf->server_addr,
c_conf->server_port,
strerror(errno));
reset_session_id();
clear_main_control();
run_control();
} else if (what & BEV_EVENT_CONNECTED) {
retry_times = 0;
tcp_mux_send_win_update_syn(bev, main_ctl->stream_id);
send_window_update(bev, &main_ctl->stream, 0);
login();
keep_control_alive();
@@ -544,32 +632,6 @@ keep_control_alive()
set_ticker_ping_timer(main_ctl->ticker_ping);
}
static void
server_dns_cb(int event_code, struct evutil_addrinfo *addr, void *ctx)
{
if (event_code) {
set_common_server_ip((const char *)evutil_gai_strerror(event_code));
} else {
struct evutil_addrinfo *ai;
if (addr->ai_canonname)
debug(LOG_DEBUG, "addr->ai_canonname [%s]", addr->ai_canonname);
for (ai = addr; ai; ai = ai->ai_next) {
char buf[128];
const char *s = NULL;
if (ai->ai_family == AF_INET) {
struct sockaddr_in *sin = (struct sockaddr_in *)ai->ai_addr;
s = evutil_inet_ntop(AF_INET, &sin->sin_addr, buf, 128);
} else if (ai->ai_family == AF_INET6) {
struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)ai->ai_addr;
s = evutil_inet_ntop(AF_INET6, &sin6->sin6_addr, buf, 128);
}
if (s) set_common_server_ip(s);
}
if (addr) evutil_freeaddrinfo(addr);
}
}
static void
start_base_connect()
{
@@ -602,7 +664,7 @@ login()
exit(0);
}
send_msg_frp_server(NULL, TypeLogin, lg_msg, len, 1);
send_msg_frp_server(NULL, TypeLogin, lg_msg, len, &main_ctl->stream);
SAFE_FREE(lg_msg);
}
@@ -611,7 +673,7 @@ send_msg_frp_server(struct bufferevent *bev,
const enum msg_type type,
const char *msg,
const size_t msg_len,
uint32_t sid)
struct tmux_stream *stream)
{
struct bufferevent *bout = NULL;
if (bev) {
@@ -632,9 +694,11 @@ send_msg_frp_server(struct bufferevent *bev,
req_msg->length = msg_hton((uint64_t)msg_len);
memcpy(req_msg->data, msg, msg_len);
tcp_mux_send_data(bout, sid, len);
bufferevent_write(bout, (uint8_t *)req_msg, len);
struct common_conf *c_conf = get_common_config();
if (c_conf->tcp_mux)
tmux_stream_write(bout, (uint8_t *)req_msg, len, stream);
else
bufferevent_write(bout, (uint8_t *)req_msg, len);
free(req_msg);
}
@@ -644,7 +708,7 @@ send_enc_msg_frp_server(struct bufferevent *bev,
const enum msg_type type,
const char *msg,
const size_t msg_len,
uint32_t sid)
struct tmux_stream *stream)
{
struct bufferevent *bout = NULL;
if (bev) {
@@ -654,31 +718,28 @@ send_enc_msg_frp_server(struct bufferevent *bev,
}
assert(bout);
debug(LOG_DEBUG, "send enc msg ----> [%c: %s]", type, msg);
struct msg_hdr *req_msg = calloc(msg_len+sizeof(struct msg_hdr), 1);
assert(req_msg);
req_msg->type = type;
req_msg->length = msg_hton((uint64_t)msg_len);
memcpy(req_msg->data, msg, msg_len);
struct common_conf *c_conf = get_common_config();
if (get_main_encoder() == NULL) {
debug(LOG_DEBUG, "init_main_encoder .......");
tcp_mux_send_data(bout, sid, 16);
struct frp_coder *coder = init_main_encoder();
bufferevent_write(bout, coder->iv, 16);
if (c_conf->tcp_mux)
tmux_stream_write(bout, coder->iv, 16, stream);
else
bufferevent_write(bout, coder->iv, 16);
}
uint8_t *enc_msg = NULL;
size_t olen = encrypt_data((uint8_t *)req_msg, msg_len+sizeof(struct msg_hdr), get_main_encoder(), &enc_msg);
assert(olen > 0);
//debug(LOG_DEBUG, "encrypt_data length %d", olen);
tcp_mux_send_data(bout, sid, olen);
bufferevent_write(bout, enc_msg, olen);
if (c_conf->tcp_mux)
tmux_stream_write(bout, enc_msg, olen, stream);
else
bufferevent_write(bout, enc_msg, olen);
free(enc_msg);
free(req_msg);
@@ -726,7 +787,7 @@ send_new_proxy(struct proxy_service *ps)
debug(LOG_DEBUG, "control proxy client: [Type %d : proxy_name %s : msg_len %d]", TypeNewProxy, ps->proxy_name, len);
send_enc_msg_frp_server(NULL, TypeNewProxy, new_proxy_msg, len, main_ctl->stream_id);
send_enc_msg_frp_server(NULL, TypeNewProxy, new_proxy_msg, len, &main_ctl->stream);
SAFE_FREE(new_proxy_msg);
}
@@ -751,9 +812,10 @@ init_main_control()
}
main_ctl->connect_base = base;
if (c_conf->tcp_mux)
main_ctl->stream_id = get_next_session_id();
if (c_conf->tcp_mux) {
init_tmux_stream(&main_ctl->stream, get_next_session_id(), INIT);
}
// if server_addr is ip, done control init.
if (is_valid_ip_address((const char *)c_conf->server_addr))
return;
@@ -774,29 +836,6 @@ init_main_control()
evdns_base_nameserver_ip_add(dnsbase, "223.5.5.5"); //AliDNS
evdns_base_nameserver_ip_add(dnsbase, "223.6.6.6"); //AliDNS
evdns_base_nameserver_ip_add(dnsbase, "114.114.114.114"); //114DNS
// if server_addr is domain, analyze it to ip for server_ip
debug(LOG_DEBUG, "Get ip address of [%s] from DNServer", c_conf->server_addr);
struct evutil_addrinfo hints;
struct evdns_getaddrinfo_request *dns_req;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_flags = EVUTIL_AI_CANONNAME;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
dns_req = evdns_getaddrinfo(dnsbase,
c_conf->server_addr,
NULL /* no service name given */,
&hints,
server_dns_cb,
NULL);
if (!dns_req) {
debug(LOG_ERR, "error: can not analyse the dns of [%s]", c_conf->server_addr);
exit(0);
}
}
static void
@@ -814,7 +853,7 @@ clear_main_control()
if (main_ctl->tcp_mux_ping_event) evtimer_del(main_ctl->tcp_mux_ping_event);
clear_all_proxy_client();
free_evp_cipher_ctx();
client_connected(0);
set_client_status(0);
pong_time = 0;
is_login = 0;
}

View File

@@ -44,7 +44,7 @@ struct control {
struct event *tcp_mux_ping_event;
uint32_t tcp_mux_ping_id;
uint32_t stream_id;
struct tmux_stream stream;
};
void connect_eventcb(struct bufferevent *bev, short events, void *ptr);
@@ -67,13 +67,13 @@ void send_msg_frp_server(struct bufferevent *bev,
const enum msg_type type,
const char *msg,
const size_t msg_len,
uint32_t sid);
struct tmux_stream *stream);
void send_enc_msg_frp_server(struct bufferevent *bev,
const enum msg_type type,
const char *msg,
const size_t msg_len,
uint32_t sid);
struct tmux_stream *stream);
void control_process(struct proxy_client *client);

View File

@@ -50,7 +50,7 @@ static void
free_frp_coder(struct frp_coder *coder)
{
free(coder->salt);
free(coder->privilege_token);
free(coder->token);
free(coder);
}
@@ -91,14 +91,14 @@ get_block_size()
}
struct frp_coder *
new_coder(const char *privilege_token, const char *salt)
new_coder(const char *token, const char *salt)
{
struct frp_coder *enc = calloc(sizeof(struct frp_coder), 1);
assert(enc);
enc->privilege_token = privilege_token ? strdup(privilege_token):strdup("\0");
enc->token = token ? strdup(token):strdup("\0");
enc->salt = strdup(salt);
encrypt_key(enc->privilege_token, strlen(enc->privilege_token), enc->salt, enc->key, block_size);
encrypt_key(enc->token, strlen(enc->token), enc->salt, enc->key, block_size);
encrypt_iv(enc->iv, block_size);
return enc;
}
@@ -109,7 +109,7 @@ clone_coder(const struct frp_coder *coder)
assert(coder);
struct frp_coder *enc = calloc(sizeof(struct frp_coder), 1);
memcpy(enc, coder, sizeof(*coder));
enc->privilege_token = strdup(coder->privilege_token);
enc->token = strdup(coder->token);
enc->salt = strdup(coder->salt);
return enc;
@@ -273,7 +273,7 @@ D_END:
void
free_encoder(struct frp_coder *encoder) {
if (encoder) {
SAFE_FREE(encoder->privilege_token);
SAFE_FREE(encoder->token);
SAFE_FREE(encoder->salt);
free(encoder);
}

View File

@@ -38,7 +38,7 @@ struct frp_coder {
uint8_t key[16];
char *salt;
uint8_t iv[16];
char *privilege_token;
char *token;
};
size_t get_encrypt_block_size();
@@ -47,7 +47,7 @@ int is_encoder_inited();
int is_decoder_inited();
struct frp_coder *init_main_encoder();
struct frp_coder *init_main_decoder(const uint8_t *iv);
struct frp_coder *new_coder(const char *privilege_token, const char *salt);
struct frp_coder *new_coder(const char *token, const char *salt);
uint8_t *encrypt_key(const char *token, size_t token_len, const char *salt, uint8_t *key, size_t key_len);
uint8_t *encrypt_iv(uint8_t *iv_buf, size_t iv_len);
size_t encrypt_data(const uint8_t *src_data, size_t srclen, struct frp_coder *encoder, uint8_t **ret);

View File

@@ -89,7 +89,6 @@ void init_login()
c_login->metas = NULL;
c_login->pool_count = 1;
c_login->privilege_key = NULL;
c_login->user = c_conf->user;
c_login->logged = 0;

View File

@@ -101,12 +101,12 @@ void ftp_proxy_c2s_cb(struct bufferevent *bev, void *ctx)
struct ftp_pasv *r_fp = new_ftp_pasv();
r_fp->code = local_fp->code;
if (! c_conf->server_ip) {
if (! c_conf->server_addr) {
debug(LOG_ERR, "error: FTP proxy without server ip!");
exit(0);
}
strncpy(r_fp->ftp_server_ip, c_conf->server_ip, IP_LEN);
strncpy(r_fp->ftp_server_ip, c_conf->server_addr, IP_LEN);
r_fp->ftp_server_port = p->remote_data_port;
if (r_fp->ftp_server_port <= 0) {
@@ -264,4 +264,4 @@ static void free_ftp_pasv(struct ftp_pasv *fp)
SAFE_FREE(fp);
fp = NULL;
}
}

View File

@@ -47,7 +47,7 @@
#include "config.h"
#include "tcpmux.h"
#define BUF_LEN 4096
#define BUF_LEN 2*1024
// read data from local service
void tcp_proxy_c2s_cb(struct bufferevent *bev, void *ctx)
@@ -66,23 +66,15 @@ void tcp_proxy_c2s_cb(struct bufferevent *bev, void *ctx)
return;
}
if (client->send_window == 0) {
debug(LOG_DEBUG, "client %d recv len %d exceed send windows: %d", client->stream_id, len, client->send_window);
uint8_t *buf = (uint8_t *)malloc(len);
assert(buf != NULL);
memset(buf, 0, len);
uint32_t nr = bufferevent_read(bev, buf, len);
assert(nr == len);
nr = tmux_stream_write(partner, buf, len, &client->stream);
if (nr < len) {
debug(LOG_DEBUG, "stream_id [%d] len is %d tmux_stream_write %d data, disable read", client->stream.id, len, nr);
bufferevent_disable(bev, EV_READ);
return;
} else {
len = client->send_window>=len?len:client->send_window;
client->send_window -= len;
}
tcp_mux_send_data(partner, client->stream_id, len);
uint8_t buf[BUF_LEN];
while(len > 0) {
memset(buf, 0, BUF_LEN);
int nread = bufferevent_read(bev, buf, len>BUF_LEN?BUF_LEN:len);
assert(nread >= 0);
bufferevent_write(partner, buf, nread);
len -= nread;
}
}

718
tcpmux.c
View File

@@ -24,6 +24,9 @@
@author Copyright (C) 2016 Dengfeng Liu <liu_df@qq.com>
*/
#include <unistd.h>
#include <stdlib.h>
#include "common.h"
#include "tcpmux.h"
#include "client.h"
@@ -32,63 +35,77 @@
#include "control.h"
static uint8_t proto_version = 0;
static uint8_t remote_go_away;
static uint8_t local_go_away;
static uint32_t g_session_id = 1;
static struct tmux_stream *cur_stream = NULL;
static struct tmux_stream *all_stream;
static struct tcp_mux_type_desc type_desc[] = {
{DATA, "data"},
{WINDOW_UPDATE, "window update"},
{PING, "ping"},
{GO_AWAY, "go away"},
static uint32_t ring_buffer_read(struct bufferevent *bev, struct ring_buffer *ring, uint32_t len);
static uint32_t ring_buffer_write(struct bufferevent *bev, struct ring_buffer *ring, uint32_t len);
void
add_stream(struct tmux_stream *stream)
{
HASH_ADD_INT(all_stream, id, stream);
}
void
del_stream(uint32_t id)
{
assert(all_stream != NULL);
struct tmux_stream *stream = get_stream_by_id(id);
if (stream)
HASH_DEL(all_stream, stream);
}
struct tmux_stream *
get_stream_by_id(uint32_t id)
{
if (!all_stream) return NULL;
struct tmux_stream *stream = NULL;
HASH_FIND_INT(all_stream, &id, stream);
return stream;
}
struct tmux_stream *
get_cur_stream()
{
return cur_stream;
}
void
set_cur_stream(struct tmux_stream *stream)
{
cur_stream = stream;
}
void
init_tmux_stream(struct tmux_stream *stream, uint32_t id, enum tcp_mux_state state)
{
stream->id = id;
stream->state = state;
stream->recv_window = MAX_STREAM_WINDOW_SIZE;
stream->send_window = MAX_STREAM_WINDOW_SIZE;
memset(&stream->tx_ring, 0, sizeof(struct ring_buffer));
memset(&stream->rx_ring, 0, sizeof(struct ring_buffer));
add_stream(stream);
};
static struct tcp_mux_flag_desc flag_desc[] = {
{ZERO, "zero"},
{SYN, "syn"},
{ACK, "ack"},
{FIN, "fin"},
{RST, "rst"},
};
static const char *
type_2_desc(enum tcp_mux_type type)
int
validate_tcp_mux_protocol(struct tcp_mux_header *tmux_hdr)
{
for(int i = 0; i < sizeof(type_desc)/sizeof(struct tcp_mux_type_desc); i++){
if (type == type_desc[i].type)
return type_desc[i].desc;
}
return "unkown_type";
}
static const char *
flag_2_desc(enum tcp_mux_flag flag)
{
for(int i = 0; i < sizeof(flag_desc)/sizeof(struct tcp_mux_flag_desc); i++){
if (flag == flag_desc[i].flag)
return flag_desc[i].desc;
}
if (tmux_hdr->version != proto_version) return 0;
return "unkown_flag";
if (tmux_hdr->type > GO_AWAY) return 0;
return 1;
}
static int
valid_tcp_mux_type(uint8_t type)
{
if (type >= DATA && type <= GO_AWAY)
return 1;
return 0;
}
static int
valid_tcp_mux_sid(uint32_t sid)
{
if (sid == 1)
return 1;
return get_proxy_client(sid)?1:0;
}
void
tcp_mux_encode(enum tcp_mux_type type, enum tcp_mux_flag flags, uint32_t stream_id, uint32_t length, struct tcp_mux_header *tmux_hdr)
{
@@ -107,65 +124,33 @@ tcp_mux_flag()
return c_conf->tcp_mux;
}
static void
dump_tcp_mux_header(uint8_t *data, int len)
{
if (len != 12)
return;
printf("tcp mux header is : \n");
for (int i = 0; i < len; i++)
printf("%2x", data[i]);
printf("\n");
}
static uint32_t
parse_tcp_mux_proto(uint8_t *data, int len, uint32_t *flag, uint32_t *type, uint32_t *stream_id, uint32_t *dlen)
{
struct common_conf *c_conf = get_common_config();
if (!c_conf->tcp_mux)
return 0;
if (len < sizeof(struct tcp_mux_header))
return 0;
struct tcp_mux_header *hdr = (struct tcp_mux_header *)data;
if(hdr->version == proto_version &&
valid_tcp_mux_type(hdr->type)) {
if (hdr->type == DATA && !valid_tcp_mux_sid(htonl(hdr->stream_id))) {
debug(LOG_INFO, "!!!!!type is DATA but cant find stream_id : type [%s] flag [%s] stream_id[%d]",
type_2_desc(hdr->type), flag_2_desc(htons(hdr->flags)), htonl(hdr->stream_id));
dump_tcp_mux_header(data, len);
exit(-1);
}
*type = hdr->type;
*flag = htons(hdr->flags);
*stream_id = htonl(hdr->stream_id);
*dlen = htonl(hdr->length);
return 1;
}
return 0;
void
reset_session_id() {
g_session_id = 1;
}
uint32_t
get_next_session_id() {
static uint32_t next_session_id = 1;
uint32_t id = next_session_id;
next_session_id += 2;
uint32_t id = g_session_id;
g_session_id += 2;
return id;
}
static void
tcp_mux_send_win_update(struct bufferevent *bout, enum tcp_mux_flag flags, uint32_t stream_id, uint32_t delta)
{
struct tcp_mux_header tmux_hdr;
memset(&tmux_hdr, 0, sizeof(tmux_hdr));
tcp_mux_encode(WINDOW_UPDATE, flags, stream_id, delta, &tmux_hdr);
bufferevent_write(bout, (uint8_t *)&tmux_hdr, sizeof(tmux_hdr));
}
void
tcp_mux_send_win_update_syn(struct bufferevent *bout, uint32_t stream_id)
{
if (!tcp_mux_flag()) return;
struct tcp_mux_header tmux_hdr;
memset(&tmux_hdr, 0, sizeof(tmux_hdr));
tcp_mux_encode(WINDOW_UPDATE, SYN, stream_id, 0, &tmux_hdr);
debug(LOG_DEBUG, "tcp mux [%d] send wind update syn", stream_id);
bufferevent_write(bout, (uint8_t *)&tmux_hdr, sizeof(tmux_hdr));
tcp_mux_send_win_update(bout, SYN, stream_id, 0);
}
void
@@ -173,11 +158,7 @@ tcp_mux_send_win_update_ack(struct bufferevent *bout, uint32_t stream_id, uint32
{
if (!tcp_mux_flag()) return;
struct tcp_mux_header tmux_hdr;
memset(&tmux_hdr, 0, sizeof(tmux_hdr));
tcp_mux_encode(WINDOW_UPDATE, ZERO, stream_id, delta, &tmux_hdr);
debug(LOG_DEBUG, "tcp mux [%d] send wind update ZERO [%d]", stream_id, delta);
bufferevent_write(bout, (uint8_t *)&tmux_hdr, sizeof(tmux_hdr));
tcp_mux_send_win_update(bout, ZERO, stream_id, 0);
}
void
@@ -185,21 +166,24 @@ tcp_mux_send_win_update_fin(struct bufferevent *bout, uint32_t stream_id)
{
if (!tcp_mux_flag()) return;
struct tcp_mux_header tmux_hdr;
memset(&tmux_hdr, 0, sizeof(tmux_hdr));
tcp_mux_encode(WINDOW_UPDATE, FIN, stream_id, 0, &tmux_hdr);
debug(LOG_DEBUG, "tcp mux [%d] send wind update FIN", stream_id);
bufferevent_write(bout, (uint8_t *)&tmux_hdr, sizeof(tmux_hdr));
tcp_mux_send_win_update(bout, FIN, stream_id, 0);
}
void
tcp_mux_send_data(struct bufferevent *bout, uint32_t stream_id, uint32_t length)
tcp_mux_send_win_update_rst(struct bufferevent *bout, uint32_t stream_id)
{
if (!tcp_mux_flag()) return;
tcp_mux_send_win_update(bout, RST, stream_id, 0);
}
void
tcp_mux_send_data(struct bufferevent *bout, uint16_t flags, uint32_t stream_id, uint32_t length)
{
if (!tcp_mux_flag()) return;
struct tcp_mux_header tmux_hdr;
memset(&tmux_hdr, 0, sizeof(tmux_hdr));
tcp_mux_encode(DATA, ZERO, stream_id, length, &tmux_hdr);
tcp_mux_encode(DATA, flags, stream_id, length, &tmux_hdr);
//debug(LOG_DEBUG, "tcp mux [%d] send data len : %d", stream_id, length);
bufferevent_write(bout, (uint8_t *)&tmux_hdr, sizeof(tmux_hdr));
}
@@ -228,141 +212,399 @@ tcp_mux_handle_ping(struct bufferevent *bout, uint32_t ping_id)
bufferevent_write(bout, (uint8_t *)&tmux_hdr, sizeof(tmux_hdr));
}
void
handle_tcp_mux_frps_msg(uint8_t *buf, int ilen, void (*fn)(uint8_t *, int, void *))
static void
tcp_mux_send_go_away(struct bufferevent *bout, uint32_t reason)
{
static uint32_t l_stream_id = 0;
static uint32_t l_dlen = 0;
static uint32_t l_type = 0;
static uint32_t l_flag = 0;
uint8_t *data = buf;
while (ilen > 0) {
uint32_t type = 0, stream_id = 0, dlen = 0, flag = 0;
uint32_t is_tmux = parse_tcp_mux_proto(data, ilen, &flag, &type, &stream_id, &dlen);
if (!is_tmux) {
struct proxy_client *pc = get_proxy_client(l_stream_id);
debug(LOG_DEBUG, "receive only %s data : l_stream_id %d l_type %s l_flag %s l_dlen %d ilen %d",
!pc?"main control ":"worker ",
l_stream_id, type_2_desc(l_type),
flag_2_desc(l_flag), l_dlen, ilen);
assert(ilen);
if (ilen == 12)
dump_tcp_mux_header(data, ilen);
if (!tcp_mux_flag()) return;
if (!pc || (pc && !pc->local_proxy_bev)) {
assert(ilen >= l_dlen);
assert(l_dlen > 0);
fn(data, l_dlen, pc);
data += l_dlen;
ilen -= l_dlen;
l_dlen = 0;
continue;
}
if (pc->stream_state != ESTABLISHED) {
debug(LOG_INFO, "client [%d] state is [%d]", pc->stream_id, pc->stream_state);
break;
}
struct tcp_mux_header tmux_hdr;
memset(&tmux_hdr, 0, sizeof(tmux_hdr));
tcp_mux_encode(GO_AWAY, 0, 0, reason, &tmux_hdr);
//debug(LOG_DEBUG, "tcp mux send ping ack : %d", ping_id);
bufferevent_write(bout, (uint8_t *)&tmux_hdr, sizeof(tmux_hdr));
if ( ilen >= l_dlen) {
assert(pc->local_proxy_bev);
bufferevent_write(pc->local_proxy_bev, data, l_dlen);
data += l_dlen;
ilen -= l_dlen;
l_dlen = 0;
} else {
assert(pc->local_proxy_bev);
bufferevent_write(pc->local_proxy_bev, data, ilen);
l_dlen -= ilen;
ilen = 0;
}
}
continue;
}
struct proxy_client *pc = get_proxy_client(stream_id);
debug(LOG_DEBUG, "[%s] receive tcp mux type [%s] flag [%s] stream_id [%d] dlen [%d] ilen [%d]",
pc?"worker":"main control",
type_2_desc(type), flag_2_desc(flag), stream_id, dlen, ilen);
data += sizeof(struct tcp_mux_header);
ilen -= sizeof(struct tcp_mux_header);
l_stream_id = stream_id;
l_type = type;
l_flag = flag;
l_dlen = type==PING?0:dlen;
assert(ilen >= 0);
switch(type) {
case DATA:
{
if (ilen == 0)
break;
if (!pc || (pc && !pc->local_proxy_bev)) {
assert(ilen >= dlen);
fn(data, dlen, pc);
data += dlen;
ilen -= dlen;
l_dlen = 0;
continue;
}
if (pc->stream_state != ESTABLISHED) {
debug(LOG_INFO, "client [%d] state is [%d]", pc->stream_id, pc->stream_state);
break;
}
if (ilen >= dlen){
assert(pc->local_proxy_bev);
bufferevent_write(pc->local_proxy_bev, data, dlen);
data += dlen;
ilen -= dlen;
l_dlen = 0;
} else {
assert(pc->local_proxy_bev);
bufferevent_write(pc->local_proxy_bev, data, ilen);
l_dlen -= ilen;
ilen = 0;
}
static int
process_flags(uint16_t flags, struct tmux_stream *stream)
{
uint32_t close_stream = 0;
if ( (flags&ACK) == ACK ) {
if (stream->state == SYN_SEND)
stream->state = ESTABLISHED;
} else if ( (flags&FIN) == FIN ) {
switch(stream->state) {
case SYN_SEND:
case SYN_RECEIVED:
case ESTABLISHED:
stream->state = REMOTE_CLOSE;
break;
}
case PING:
{
struct bufferevent *bout = get_main_control()->connect_bev;
uint32_t seq = dlen;
assert(bout);
if (flag == SYN)
tcp_mux_handle_ping(bout, seq);
case LOCAL_CLOSE:
stream->state = CLOSED;
close_stream = 1;
break;
}
case WINDOW_UPDATE:
{
switch(flag) {
case RST:
case FIN:
del_proxy_client(pc);
break;
case ZERO:
case ACK:
if (!pc)
break;
if (dlen > 0) {
pc->send_window += dlen;
bufferevent_enable(pc->local_proxy_bev, EV_READ|EV_WRITE);
}
pc->stream_state = ESTABLISHED;
break;
default:
debug(LOG_INFO, "window update no need process : flag %2x %s dlen %d stream_id %d",
flag, flag_2_desc(flag), dlen, stream_id);
}
break;
}
default:
debug(LOG_INFO, "no need unhandle tcp mux msg : type %s flag %s stream_id %d dlen %d ilen %d",
type_2_desc(type), flag_2_desc(flag), stream_id, dlen, ilen);
debug(LOG_ERR, "unexpected FIN flag in state %d", stream->state);
assert(0);
return 0;
}
} else if ( (flags&RST) == RST ) {
stream->state = RESET;
close_stream = 1;
}
if (close_stream) {
debug(LOG_DEBUG, "free stream %d", stream->id);
del_proxy_client_by_stream_id(stream->id);
}
return 1;
}
static uint16_t
get_send_flags(struct tmux_stream *stream)
{
uint16_t flags = 0;
switch (stream->state) {
case INIT:
flags |= SYN;
stream->state = SYN_SEND;
break;
case SYN_RECEIVED:
flags |= ACK;
stream->state = ESTABLISHED;
break;
default:
break;
}
return flags;
}
void
send_window_update(struct bufferevent *bout, struct tmux_stream *stream, uint32_t length)
{
uint32_t max = MAX_STREAM_WINDOW_SIZE;
uint32_t delta = (max - length) - stream->recv_window;
uint16_t flags = get_send_flags(stream);
if (delta < max/2 && flags == 0)
return;
stream->recv_window += delta;
tcp_mux_send_win_update(bout, flags, stream->id, delta);
//debug(LOG_DEBUG, "send window update: flags %d, stream_id %d delta %d, recv_window %u length %u",
// flags, stream->id, delta, stream->recv_window, length);
}
static int
ring_buffer_pop(struct ring_buffer *ring, uint8_t *data, uint32_t len)
{
assert(ring->sz >= len);
assert(data != NULL);
uint32_t i = 0;
while(i < len) {
data[i] = ring->data[ring->cur++];
if (ring->cur == RBUF_SIZE)
ring->cur = 0;
i++;
ring->sz--;
}
assert(i == len);
return len;
}
static int
process_data(struct tmux_stream *stream, uint32_t length, uint16_t flags,
void (*fn)(uint8_t *, int, void *), void *param)
{
if (!process_flags(flags, stream)) return 0;
if (length > stream->recv_window) {
debug(LOG_ERR, "receive window exceed (remain %d, recv %d)", stream->recv_window, length);
return 0;
}
stream->recv_window -= length;
struct proxy_client *pc = (struct proxy_client *)param;
if (!pc || (pc && !pc->local_proxy_bev)) {
uint8_t *data = (uint8_t *)calloc(length, 1);
ring_buffer_pop(&stream->rx_ring, data, length);
fn(data, length, pc);
free(data);
} else {
ring_buffer_write(pc->local_proxy_bev, &stream->rx_ring, length);
}
struct bufferevent *bout = get_main_control()->connect_bev;
send_window_update(bout, stream, length);
return length;
}
static int
incr_send_window(struct bufferevent *bev, struct tcp_mux_header *tmux_hdr, uint16_t flags, struct tmux_stream *stream)
{
if (!process_flags(flags, stream))
return 0;
uint32_t length = ntohl(tmux_hdr->length);
if (stream->send_window == 0) bufferevent_enable(bev, EV_READ);
stream->send_window += length;
//debug(LOG_DEBUG, "incr_send_window : stream_id %d length %d send_window %d",
// stream->id, length, stream->send_window);
return 1;
}
static int
incoming_stream(uint32_t stream_id)
{
if (local_go_away) {
struct bufferevent *bout = get_main_control()->connect_bev;
tcp_mux_send_win_update_rst(bout, stream_id);
return 0;
}
// TODO
// create new stream
return 1;
}
void
handle_tcp_mux_ping(struct tcp_mux_header *tmux_hdr)
{
uint16_t flags = ntohs(tmux_hdr->flags);
uint32_t ping_id = ntohl(tmux_hdr->length);
if ( (flags&SYN) == SYN) {
struct bufferevent *bout = get_main_control()->connect_bev;
tcp_mux_handle_ping(bout, ping_id);
}
}
void
handle_tcp_mux_go_away(struct tcp_mux_header *tmux_hdr)
{
uint32_t code = ntohl(tmux_hdr->length);
switch(code) {
case NORMAL:
remote_go_away = 1;
break;
case PROTO_ERR:
debug(LOG_ERR, "receive protocol error go away");
break;
case INTERNAL_ERR:
debug(LOG_ERR, "receive internal error go away");
break;
default:
debug(LOG_ERR, "receive unexpected go away");
}
}
uint32_t
tmux_stream_read(struct bufferevent *bev, struct tmux_stream *stream, uint32_t len)
{
assert(stream != NULL);
return ring_buffer_read(bev, &stream->rx_ring, len);
}
int
handle_tcp_mux_stream(struct tcp_mux_header *tmux_hdr, handle_data_fn_t fn)
{
uint32_t stream_id = ntohl(tmux_hdr->stream_id);
uint16_t flags = ntohs(tmux_hdr->flags);
//debug(LOG_DEBUG, "handle_tcp_mux_stream stream_id %d type %d flags %d", stream_id, tmux_hdr->type, flags);
if ( (flags&SYN) == SYN) {
debug(LOG_INFO, "!!!! as xfrpc, it should not be here %d", stream_id);
if (!incoming_stream(stream_id))
return 0;
}
struct tmux_stream *stream = get_stream_by_id(stream_id);
struct proxy_client *pc = get_proxy_client(stream_id);
assert(stream != NULL);
if (tmux_hdr->type == WINDOW_UPDATE) {
struct bufferevent *bev = pc?pc->local_proxy_bev: get_main_control()->connect_bev;
if (!incr_send_window(bev, tmux_hdr, flags, stream)) {
struct bufferevent *bout = get_main_control()->connect_bev;
tcp_mux_send_go_away(bout, PROTO_ERR);
}
return 0;
}
int32_t length = ntohl(tmux_hdr->length);
if (!process_data(stream, length, flags, fn, (void *)pc)) {
struct bufferevent *bout = get_main_control()->connect_bev;
tcp_mux_send_go_away(bout, PROTO_ERR);
return 0;
}
return length;
}
static int
ring_buffer_append(struct ring_buffer *ring, uint8_t *data, uint32_t len)
{
uint32_t left = RBUF_SIZE - ring->sz;
assert(left >= len);
int i = 0;
for (; i < len; i++) {
ring->data[ring->end++] = data[i];
if (ring->end == RBUF_SIZE) ring->end = 0;
ring->sz++;
if (ring->cur == ring->end) {
break;
}
}
return i;
}
static uint32_t
ring_buffer_read(struct bufferevent *bev, struct ring_buffer *ring, uint32_t len)
{
if (ring->sz == RBUF_SIZE) {
debug(LOG_ERR, "ring buffer is full");
return 0;
}
uint32_t cap = RBUF_SIZE - ring->sz;
if (len > cap) {
debug(LOG_INFO, "prepare read data [%d] out size ring capacity [%d]", len, cap);
len = cap;
}
for (int i = 0; i < len; i++) {
bufferevent_read(bev, &ring->data[ring->end++], 1);
if (ring->end == RBUF_SIZE) ring->end = 0;
ring->sz++;
if (ring->cur == ring->end) {
break;
}
}
return len;
}
static uint32_t
ring_buffer_write(struct bufferevent *bev, struct ring_buffer *ring, uint32_t len)
{
if (ring->sz == 0) {
debug(LOG_ERR, "ring buffer is empty");
return 0;
}
if (len > ring->sz) {
debug(LOG_INFO, "prepare write data [%d] out size ring data [%d]", len, ring->sz);
len = ring->sz;
}
while(len > 0) {
bufferevent_write(bev, &ring->data[ring->cur++], 1);
len--;
ring->sz--;;
if (ring->cur == RBUF_SIZE) ring->cur = 0;
if (ring->cur == ring->end) {
assert(ring->sz == 0);
break;
}
}
return len;
}
uint32_t
tmux_stream_write(struct bufferevent *bev, uint8_t *data, uint32_t length, struct tmux_stream *stream)
{
switch(stream->state) {
case LOCAL_CLOSE:
case CLOSED:
case RESET:
debug(LOG_INFO, "stream %d state is closed", stream->id);
return 0;
default:
break;
}
struct ring_buffer *tx_ring = &stream->tx_ring;
uint32_t left = RBUF_SIZE - tx_ring->sz;
if (stream->send_window == 0) {
debug(LOG_INFO, "stream %d send_window is zero, length %d left %d", stream->id, length, left);
ring_buffer_append(tx_ring, data, length);
return 0;
}
uint16_t flags = get_send_flags(stream);
uint32_t max = length;
struct bufferevent *bout = get_main_control()->connect_bev;
//debug(LOG_DEBUG, "tmux_stream_write stream id %u: send_window %u tx_ring sz %u length %u",
// stream->id, stream->send_window, tx_ring->sz, length);
if (stream->send_window < tx_ring->sz) {
debug(LOG_INFO, " send_window %u less than tx_ring size %u", stream->send_window, tx_ring->sz);
max = stream->send_window;
tcp_mux_send_data(bout, flags, stream->id, max);
ring_buffer_write(bev, tx_ring, max);
ring_buffer_append(tx_ring, data, length);
} else if (stream->send_window < tx_ring->sz + length) {
debug(LOG_INFO, " send_window %u less than %u", stream->send_window, tx_ring->sz+length);
max = stream->send_window;
tcp_mux_send_data(bout, flags, stream->id, max);
if (tx_ring->sz > 0)
ring_buffer_write(bev, tx_ring, tx_ring->sz);
bufferevent_write(bev, data, max - tx_ring->sz);
ring_buffer_append(tx_ring, data + max - tx_ring->sz, length + tx_ring->sz - max);
} else {
max = tx_ring->sz + length;
tcp_mux_send_data(bout, flags, stream->id, max);
if (tx_ring->sz > 0)
ring_buffer_write(bev, tx_ring, tx_ring->sz);
bufferevent_write(bev, data, length);
}
stream->send_window -= max;
return max;
}
void
tmux_stream_close(struct bufferevent *bout, struct tmux_stream *stream)
{
int closed = 0;
switch(stream->state) {
case SYN_SEND:
case SYN_RECEIVED:
case ESTABLISHED:
stream->state = LOCAL_CLOSE;
break;
case LOCAL_CLOSE:
case REMOTE_CLOSE:
stream->state = CLOSED;
closed = 1;
case CLOSED:
case RESET:
default:
return;
}
uint16_t flags = get_send_flags(stream);
flags |= FIN;
tcp_mux_send_win_update(bout, flags, stream->id, 0);
if (closed) {
debug(LOG_DEBUG, "del proxy client %d", stream->id);
del_proxy_client_by_stream_id(stream->id);
}
}

View File

@@ -29,6 +29,22 @@
#include "uthash.h"
#define MAX_STREAM_WINDOW_SIZE 256*1024
#define RBUF_SIZE 32*1024
struct ring_buffer {
uint32_t cur;
uint32_t end;
uint32_t sz;
uint8_t data[RBUF_SIZE];
};
enum go_away_type {
NORMAL,
PROTO_ERR,
INTERNAL_ERR,
};
enum tcp_mux_type {
DATA,
WINDOW_UPDATE,
@@ -73,20 +89,65 @@ enum tcp_mux_state {
RESET
};
struct tmux_stream {
uint32_t id;
uint32_t recv_window;
uint32_t send_window;
enum tcp_mux_state state;
struct ring_buffer tx_ring;
struct ring_buffer rx_ring;
// private arguments
UT_hash_handle hh;
};
typedef void (*handle_data_fn_t)(uint8_t *, int, void *);
void init_tmux_stream(struct tmux_stream *stream, uint32_t id, enum tcp_mux_state state);
int validate_tcp_mux_protocol(struct tcp_mux_header *tmux_hdr);
void send_window_update(struct bufferevent *bout, struct tmux_stream *stream, uint32_t length);
void tcp_mux_send_win_update_syn(struct bufferevent *bout, uint32_t stream_id);
void tcp_mux_send_win_update_ack(struct bufferevent *bout, uint32_t stream_id, uint32_t delta);
void tcp_mux_send_win_update_fin(struct bufferevent *bout, uint32_t stream_id);
void tcp_mux_send_data(struct bufferevent *bout, uint32_t stream_id, uint32_t length);
void tcp_mux_send_win_update_rst(struct bufferevent *bout, uint32_t stream_id);
void tcp_mux_send_data(struct bufferevent *bout, uint16_t flags, uint32_t stream_id, uint32_t length);
void tcp_mux_send_ping(struct bufferevent *bout, uint32_t ping_id);
uint32_t get_next_session_id();
void tcp_mux_encode(enum tcp_mux_type type, enum tcp_mux_flag flags, uint32_t stream_id, uint32_t length, struct tcp_mux_header *tmux_hdr);
void tcp_mux_encode(enum tcp_mux_type type, enum tcp_mux_flag flags,
uint32_t stream_id, uint32_t length, struct tcp_mux_header *tmux_hdr);
void handle_tcp_mux_frps_msg(uint8_t *data, int len, void (*fn)(uint8_t *, int, void *));
int handle_tcp_mux_stream(struct tcp_mux_header *tmux_hdr, handle_data_fn_t fn);
void handle_tcp_mux_ping(struct tcp_mux_header *tmux_hdr);
void handle_tcp_mux_go_away(struct tcp_mux_header *tmux_hdr);
uint32_t tmux_stream_write(struct bufferevent *bev, uint8_t *data, uint32_t length, struct tmux_stream *stream);
uint32_t tmux_stream_read(struct bufferevent *bev, struct tmux_stream *stream, uint32_t len);
void reset_session_id();
struct tmux_stream *get_cur_stream();
void set_cur_stream(struct tmux_stream *stream);
void add_stream(struct tmux_stream *stream);
void del_stream(uint32_t stream_id);
struct tmux_stream* get_stream_by_id(uint32_t id);
void tmux_stream_close(struct bufferevent *bout, struct tmux_stream *stream);
#endif

View File

@@ -1,7 +1,7 @@
#ifndef _VERSION_H_
#define _VERSION_H_
#define VERSION "1.05.579"
#define VERSION "1.11.587"
#define PROTOCOL_VERESION "0.43.0"
#define CLIENT_V 1