add code for windows platform

This commit is contained in:
wujiaxu
2020-08-01 14:23:17 +08:00
parent 52d2154d9f
commit 6f6dcd2f8d
48 changed files with 485 additions and 210 deletions

View File

@@ -67,7 +67,7 @@ message("CMAKE_CXX_FLAGS_MINSIZEREL is ${CMAKE_CXX_FLAGS_MINSIZEREL}")
if (WIN32)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /MP /wd4200")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /MP /wd4200 /std:c++14")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /MP /wd4200 /Zc:__cplusplus /std:c++14")
else ()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -fPIC -pipe -std=gnu90")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -fPIC -pipe -std=c++11 -fno-exceptions")

View File

@@ -18,7 +18,18 @@ set(COMMOM_KERNEL_HEADERS
if (WIN32)
set(INCLUDE_KERNEL_HEADERS
src/kernel_win/CommRequest.h
src/kernel_win/CommScheduler.h
src/kernel_win/Communicator.h
src/kernel_win/SleepRequest.h
src/kernel_win/ExecRequest.h
src/kernel_win/IORequest.h
src/kernel_win/Executor.h
src/kernel_win/list.h
src/kernel_win/rbtree.h
src/kernel_win/SubTask.h
src/kernel_win/thrdpool.h
src/kernel_win/WinPoller.h
)
elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux")
set(INCLUDE_KERNEL_HEADERS
@@ -35,6 +46,7 @@ else ()
endif ()
set(INCLUDE_HEADERS
src/PlatformSocket.h
src/algorithm/DNSRoutine.h
src/algorithm/MapReduce.h
src/algorithm/MapReduce.inl
@@ -70,6 +82,7 @@ set(INCLUDE_HEADERS
src/util/StringUtil.h
src/util/URIParser.h
src/util/MD5Util.h
src/util/RWLock.h
src/factory/WFConnection.h
src/factory/WFTask.h
src/factory/WFTask.inl

View File

@@ -38,6 +38,9 @@ add_library(
$<TARGET_OBJECTS:client>
)
#target_link_libraries(${LIB_SHARED} ws2_32 wsock32 OpenSSL::SSL OpenSSL::Crypto)
#if (NOT WIN32)
install(
TARGETS ${PROJECT_NAME}
ARCHIVE
@@ -47,7 +50,7 @@ install(
if (APPLE)
set(LIBSO ${LIB_DIR}/libworkflow.a)
else ()
elseif (NOT WIN32)
set(LIBSO ${LIB_DIR}/libworkflow.so)
add_custom_target(
SCRIPT_SHARED_LIB ALL

49
src/PlatformSocket.h Normal file
View File

@@ -0,0 +1,49 @@
/*
Copyright (c) 2019 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Authors: Wu Jiaxu (wujiaxu@sogou-inc.com)
*/
#ifndef _PLATFORMSOCKET_H_
#define _PLATFORMSOCKET_H_
#include <sys/types.h>
#ifdef _WIN32
# include <Ws2tcpip.h>
# include <Ws2def.h>
/*
typedef struct _WSABUF {
ULONG len;
CHAR *buf;
} WSABUF, *LPWSABUF;
*/
struct iovec
{
void *iov_base;
size_t iov_len;
};
#else
# include <arpa/inet.h>
# include <sys/socket.h>
# include <sys/un.h>
# include <sys/uio.h>
# include <netdb.h>
#endif
#endif

View File

@@ -17,7 +17,8 @@
*/
#include <stdio.h>
#include <sys/un.h>
#include <string.h>
#include "PlatformSocket.h"
#include "DNSRoutine.h"
#define PORT_STR_MAX 5
@@ -50,6 +51,12 @@ DNSOutput& DNSOutput::operator= (DNSOutput&& move)
void DNSRoutine::run(const DNSInput *in, DNSOutput *out)
{
struct addrinfo hints;
char port_str[PORT_STR_MAX + 1];
memset(&hints, 0, sizeof (hints));
#ifdef __linux__
if (!in->host_.empty() && in->host_[0] == '/')
{
out->error_ = 0;
@@ -69,15 +76,13 @@ void DNSRoutine::run(const DNSInput *in, DNSOutput *out)
strcpy(sun->sun_path, in->host_.c_str());
return;
}
struct addrinfo hints = {
#ifdef AI_ADDRCONFIG
.ai_flags = AI_ADDRCONFIG,
#endif
.ai_family = AF_UNSPEC,
.ai_socktype = SOCK_STREAM
};
char port_str[PORT_STR_MAX + 1];
#ifdef AI_ADDRCONFIG
hints.ai_flags = AI_ADDRCONFIG;
#endif
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
snprintf(port_str, PORT_STR_MAX + 1, "%u", in->port_);
out->error_ = getaddrinfo(in->host_.c_str(),

View File

@@ -18,10 +18,8 @@
#ifndef _DNSROUTINE_H_
#define _DNSROUTINE_H_
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <string>
#include "PlatformSocket.h"
class DNSInput
{

View File

@@ -6,4 +6,12 @@ set(SRC
)
add_library(${PROJECT_NAME} OBJECT ${SRC})
if (WIN32)
target_compile_definitions(
${PROJECT_NAME} PRIVATE
strdup=_strdup
strcasecmp=_stricmp
strncasecmp=_strnicmp
)
endif ()

View File

@@ -10,4 +10,12 @@ set(SRC
)
add_library(${PROJECT_NAME} OBJECT ${SRC})
if (WIN32)
target_compile_definitions(
${PROJECT_NAME} PRIVATE
strdup=_strdup
strcasecmp=_stricmp
strncasecmp=_strnicmp
)
endif ()

View File

@@ -18,11 +18,9 @@
*/
#include <ctype.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <string>
#include <mutex>
#include "PlatformSocket.h"
#include "list.h"
#include "rbtree.h"
#include "DNSRoutine.h"
@@ -546,7 +544,7 @@ void WFRouterTask::dispatch()
ret = inet_pton(AF_INET6, host_.c_str(), &addr);
else if (isdigit(back) && isdigit(front))
ret = inet_pton(AF_INET, host_.c_str(), &addr);
#ifdef AF_UNIX
#ifdef __linux__
else if (front == '/')
ret = 1;
#endif

View File

@@ -21,6 +21,7 @@
#define _WFTASKFACTORY_H_
#include <functional>
#include "PlatformSocket.h"
#include "URIParser.h"
#include "RedisMessage.h"
#include "HttpMessage.h"

View File

@@ -18,16 +18,15 @@
Li Yingxin (liyingxin@sogou-inc.com)
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include <time.h>
#include <netdb.h>
#include <stdio.h>
#include <time.h>
#include <errno.h>
#include <string>
#include <new>
#include <string>
#include <functional>
#include <utility>
#include "PlatformSocket.h"
#include "WFGlobal.h"
#include "Workflow.h"
#include "WFTask.h"
@@ -62,9 +61,10 @@ inline WFTimerTask *WFTaskFactory::create_timer_task(unsigned int microseconds,
timer_callback_t callback)
{
struct timespec value = {
.tv_sec = (time_t)microseconds / 1000000,
.tv_nsec = (long)microseconds % 1000000 * 1000
/* .tv_sec = */ (time_t)microseconds / 1000000,
/* .tv_nsec = */ (long)microseconds % 1000000 * 1000
};
return new __WFTimerTask(&value, WFGlobal::get_scheduler(),
std::move(callback));
}

View File

@@ -9,4 +9,12 @@ set(SRC
)
add_library(${PROJECT_NAME} OBJECT ${SRC})
if (WIN32)
target_compile_definitions(
${PROJECT_NAME} PRIVATE
strdup=_strdup
strcasecmp=_stricmp
strncasecmp=_strnicmp
)
endif ()

View File

@@ -19,13 +19,11 @@
#ifndef _DNSCACHE_H_
#define _DNSCACHE_H_
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <stdint.h>
#include <string>
#include <mutex>
#include <utility>
#include "PlatformSocket.h"
#include "LRUCache.h"
#define GET_TYPE_TTL 0

View File

@@ -45,10 +45,10 @@ struct EndpointParams
static constexpr struct EndpointParams ENDPOINT_PARAMS_DEFAULT =
{
.max_connections = 200,
.connect_timeout = 10 * 1000,
.response_timeout = 10 * 1000,
.ssl_connect_timeout = 10 * 1000,
/* .max_connections = */ 200,
/* .connect_timeout = */ 10 * 1000,
/* .response_timeout = */ 10 * 1000,
/* .ssl_connect_timeout = */ 10 * 1000
};
#endif

View File

@@ -17,9 +17,6 @@
*/
#include <openssl/ssl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <stdint.h>
#include <string.h>
#include <errno.h>
@@ -28,6 +25,7 @@
#include <vector>
#include <string>
#include <algorithm>
#include "PlatformSocket.h"
#include "list.h"
#include "rbtree.h"
#include "WFGlobal.h"
@@ -52,7 +50,7 @@ private:
socklen_t addrlen;
this->get_addr(&addr, &addrlen);
return socket(addr->sa_family, SOCK_DGRAM, 0);
return (int)socket(addr->sa_family, SOCK_DGRAM, 0);
}
};
@@ -65,7 +63,7 @@ private:
socklen_t addrlen;
this->get_addr(&addr, &addrlen);
return socket(addr->sa_family, SOCK_STREAM, IPPROTO_SCTP);
return (int)socket(addr->sa_family, SOCK_STREAM, IPPROTO_SCTP);
}
};
@@ -151,7 +149,7 @@ CommSchedTarget *Router::create_target(const struct RouterParams *params,
return NULL;
}
if (target->init(addr->ai_addr, addr->ai_addrlen, params->ssl_ctx,
if (target->init(addr->ai_addr, (socklen_t)addr->ai_addrlen, params->ssl_ctx,
params->connect_timeout, params->ssl_connect_timeout,
params->response_timeout, params->max_connections) < 0)
{
@@ -441,17 +439,17 @@ int RouteManager::get(TransportType type,
ssl_connect_timeout = endpoint_params->ssl_connect_timeout;
}
struct RouterParams params = {
.transport_type = type,
.addrinfo = addrinfo,
.md5_16 = md5_16,
.ssl_ctx = ssl_ctx,
.connect_timeout = endpoint_params->connect_timeout,
.ssl_connect_timeout = ssl_connect_timeout,
.response_timeout = endpoint_params->response_timeout,
.max_connections = endpoint_params->max_connections
struct RouterParams params =
{
/* .transport_type = */ type,
/* .addrinfo = */ addrinfo,
/* .md5_16 = */ md5_16,
/* .ssl_ctx = */ ssl_ctx,
/* .connect_timeout = */ endpoint_params->connect_timeout,
/* .ssl_connect_timeout = */ ssl_connect_timeout,
/* .response_timeout = */ endpoint_params->response_timeout,
/* .max_connections = */ endpoint_params->max_connections
};
if (StringUtil::start_with(other_info, "?maxconn="))
{
int maxconn = atoi(other_info.c_str() + 9);

View File

@@ -19,11 +19,9 @@
#ifndef _ROUTEMANAGER_H_
#define _ROUTEMANAGER_H_
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <string>
#include <mutex>
#include "PlatformSocket.h"
#include "rbtree.h"
#include "WFConnection.h"
#include "EndpointParams.h"

View File

@@ -16,7 +16,6 @@
Authors: Wu Jiaxu (wujiaxu@sogou-inc.com)
*/
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <vector>
@@ -31,6 +30,7 @@
#include "rbtree.h"
#include "URIParser.h"
#include "StringUtil.h"
#include "RWLock.h"
#include "EndpointParams.h"
#include "UpstreamManager.h"
@@ -38,34 +38,6 @@
#define MTTR_SECOND 30
#define VIRTUAL_GROUP_SIZE 16
namespace //anoymous namespace, for safe, avoid conflict
{
// RAII: YES
class ReadLock
{
public:
ReadLock(pthread_rwlock_t& rwlock): rwlock_(&rwlock) { pthread_rwlock_rdlock(rwlock_); }
ReadLock(pthread_rwlock_t *rwlock): rwlock_(rwlock) { pthread_rwlock_rdlock(rwlock_); }
~ReadLock() { pthread_rwlock_unlock(rwlock_); }
private:
pthread_rwlock_t *rwlock_;
};
// RAII: YES
class WriteLock
{
public:
WriteLock(pthread_rwlock_t& rwlock): rwlock_(&rwlock) { pthread_rwlock_wrlock(rwlock_); }
WriteLock(pthread_rwlock_t *rwlock): rwlock_(rwlock) { pthread_rwlock_wrlock(rwlock_); }
~WriteLock() { pthread_rwlock_unlock(rwlock_); }
private:
pthread_rwlock_t *rwlock_;
};
}
class UpstreamAddress;
class UpstreamGroup;
class Upstream;
@@ -140,7 +112,7 @@ public:
static void notify_available(UpstreamAddress *ua);
protected:
pthread_rwlock_t rwlock_;
RWLock rwlock_;
int total_weight_;
int available_weight_;
std::vector<UpstreamAddress *> masters_;
@@ -209,7 +181,7 @@ static unsigned int __default_consistent_hash(const char *path,
str += query;
str += fragment;
return std_hash(str);
return (unsigned int)std_hash(str);
}
UpstreamAddress::UpstreamAddress(const std::string& address,
@@ -223,7 +195,7 @@ UpstreamAddress::UpstreamAddress(const std::string& address,
this->params = *address_params;
this->address = address;
for (int i = 0; i < VIRTUAL_GROUP_SIZE; i++)
this->consistent_hash[i] = std_hash(address + "|v" + std::to_string(i));
this->consistent_hash[i] = (unsigned int)std_hash(address + "|v" + std::to_string(i));
if (this->params.weight == 0)
this->params.weight = 1;
@@ -252,7 +224,6 @@ UpstreamAddress::UpstreamAddress(const std::string& address,
}
Upstream::Upstream():
rwlock_(PTHREAD_RWLOCK_INITIALIZER),
total_weight_(0),
available_weight_(0),
select_callback_(nullptr),
@@ -1013,10 +984,6 @@ public:
}
private:
__UpstreamManager():
rwlock_(PTHREAD_RWLOCK_INITIALIZER)
{}
~__UpstreamManager()
{
for (auto *ua : addresses_)
@@ -1024,7 +991,7 @@ private:
}
private:
pthread_rwlock_t rwlock_;
RWLock rwlock_;
std::unordered_map<std::string, Upstream> upstream_map_;
std::vector<UpstreamAddress *> addresses_;
};

View File

@@ -70,13 +70,13 @@ struct AddressParams
*/
static constexpr struct AddressParams ADDRESS_PARAMS_DEFAULT =
{
.endpoint_params = ENDPOINT_PARAMS_DEFAULT,
.dns_ttl_default = 12 * 3600,
.dns_ttl_min = 180,
.max_fails = 200,
.weight = 1,
.server_type = SERVER_TYPE_MASTER,
.group_id = -1,
/* .endpoint_params = */ ENDPOINT_PARAMS_DEFAULT,
/* .dns_ttl_default = */ 12 * 3600,
/* .dns_ttl_min = */ 180,
/* .max_fails = */ 200,
/* .weight = */ 1,
/* .server_type = */ SERVER_TYPE_MASTER,
/* .group_id = */ -1
};
/**

View File

@@ -21,10 +21,8 @@
#include <openssl/engine.h>
#include <openssl/conf.h>
#include <openssl/crypto.h>
#include <assert.h>
#include <unistd.h>
#include <signal.h>
#include <pthread.h>
#include <thread>
#include <string>
#include <unordered_map>
#include <atomic>
@@ -36,6 +34,7 @@
#include "DNSCache.h"
#include "RouteManager.h"
#include "Executor.h"
#include "RWLock.h"
#include "WFTask.h"
#include "WFTaskError.h"
@@ -115,8 +114,27 @@ private:
static_scheme_port_["kafka"] = "9092";
sync_count_ = 0;
sync_max_ = 0;
#ifdef _WIN32
WSADATA wsaData;
WORD wVersionRequested = MAKEWORD(2, 2);
int err = WSAStartup(wVersionRequested, &wsaData);
if (err != 0)
abort();
if (LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2)
abort();
#endif
}
#ifdef _WIN32
~__WFGlobal()
{
WSACleanup();
}
#endif
private:
struct WFGlobalSettings settings_;
std::unordered_map<std::string, const char *> static_scheme_port_;
@@ -127,6 +145,8 @@ private:
int sync_max_;
};
static __WFGlobal *_g_global = __WFGlobal::get_instance();
#if OPENSSL_VERSION_NUMBER < 0x10100000L
static std::mutex *__ssl_mutex;
@@ -425,55 +445,52 @@ public:
ExecQueue *get_exec_queue(const std::string& queue_name)
{
ExecQueue *queue = NULL;
ExecQueue *queue;
ExecQueueMap::iterator iter;
pthread_rwlock_rdlock(&rwlock_);
const auto iter = queue_map_.find(queue_name);
if (iter != queue_map_.cend())
queue = iter->second;
pthread_rwlock_unlock(&rwlock_);
if (!queue)
{
queue = new ExecQueue();
if (queue->init() < 0)
{
delete queue;
queue = NULL;
}
else
{
pthread_rwlock_wrlock(&rwlock_);
const auto ret = queue_map_.emplace(queue_name, queue);
ReadLock lock(rwlock_);
if (!ret.second)
{
queue->deinit();
delete queue;
queue = ret.first->second;
}
pthread_rwlock_unlock(&rwlock_);
}
iter = queue_map_.find(queue_name);
if (iter != queue_map_.end())
return iter->second;
}
return queue;
queue = new ExecQueue();
if (queue->init() >= 0)
{
WriteLock lock(rwlock_);
auto ret = queue_map_.emplace(queue_name, queue);
if (!ret.second)
{
queue->deinit();
delete queue;
queue = ret.first->second;
}
return queue;
}
delete queue;
return NULL;
}
Executor *get_compute_executor() { return &compute_executor_; }
private:
__ExecManager():
rwlock_(PTHREAD_RWLOCK_INITIALIZER)
__ExecManager()
{
int compute_threads = __WFGlobal::get_instance()->
get_global_settings()->
compute_threads;
if (compute_threads <= 0)
compute_threads = sysconf(_SC_NPROCESSORS_ONLN);
{
compute_threads = std::thread::hardware_concurrency();
//if is 0, use something
//compute_threads = sysconf(_SC_NPROCESSORS_ONLN);
}
if (compute_executor_.init(compute_threads) < 0)
abort();
@@ -491,7 +508,7 @@ private:
}
private:
pthread_rwlock_t rwlock_;
RWLock rwlock_;
ExecQueueMap queue_map_;
Executor compute_executor_;
};

View File

@@ -58,13 +58,13 @@ struct WFGlobalSettings
*/
static constexpr struct WFGlobalSettings GLOBAL_SETTINGS_DEFAULT =
{
.endpoint_params = ENDPOINT_PARAMS_DEFAULT,
.dns_ttl_default = 12 * 3600,
.dns_ttl_min = 180,
.dns_threads = 8,
.poller_threads = 2,
.handler_threads = 20,
.compute_threads = -1,
/* .endpoint_params = */ ENDPOINT_PARAMS_DEFAULT,
/* .dns_ttl_default = */ 12 * 3600,
/* .dns_ttl_min = */ 180,
/* .dns_threads = */ 8,
/* .poller_threads = */ 2,
/* .handler_threads = */ 20,
/* .compute_threads = */ -1
};
/**

View File

@@ -15,4 +15,12 @@ set(SRC
)
add_library(${PROJECT_NAME} OBJECT ${SRC})
if (WIN32)
target_compile_definitions(
${PROJECT_NAME} PRIVATE
strdup=_strdup
strcasecmp=_stricmp
strncasecmp=_strnicmp
)
endif ()

View File

@@ -407,8 +407,8 @@ bool HttpHeaderCursor::next(std::string& name, std::string& value)
bool HttpHeaderCursor::find(const std::string& name, std::string& value)
{
struct HttpMessageHeader header = {
.name = name.c_str(),
.name_len = name.size()
/* .name = */ name.c_str(),
/* .name_len = */ name.size()
};
if (this->find(&header))

View File

@@ -16,12 +16,12 @@
Authors: Wu Jiaxu (wujiaxu@sogou-inc.com)
*/
#include <openssl/sha.h>
#include <stdint.h>
#include <string.h>
#include <errno.h>
#include <sys/uio.h>
#include <string>
#include <openssl/sha.h>
#include "PlatformSocket.h"
#include "MySQLMessage.h"
#include "mysql_types.h"

View File

@@ -7,4 +7,10 @@ set(SRC
)
add_library(${PROJECT_NAME} OBJECT ${SRC})
if (WIN32)
target_compile_definitions(
${PROJECT_NAME} PRIVATE
dup=_dup
)
endif ()

View File

@@ -30,12 +30,12 @@ using WFHttpServer = WFServer<protocol::HttpRequest,
static constexpr struct WFServerParams HTTP_SERVER_PARAMS_DEFAULT =
{
.max_connections = 2000,
.peer_response_timeout = 10 * 1000,
.receive_timeout = -1,
.keep_alive_timeout = 60 * 1000,
.request_size_limit = (size_t)-1,
.ssl_accept_timeout = 10 * 1000,
/* .max_connections = */ 2000,
/* .peer_response_timeout = */ 10 * 1000,
/* .receive_timeout = */ -1,
/* .keep_alive_timeout = */ 60 * 1000,
/* .request_size_limit = */ (size_t)-1,
/* .ssl_accept_timeout = */ 10 * 1000
};
template<>

View File

@@ -18,6 +18,10 @@
#include "WFMySQLServer.h"
#ifdef _WIN32
#include <io.h>
#endif
CommConnection *WFMySQLServer::new_connection(int accept_fd)
{
CommConnection *conn = this->WFServer::new_connection(accept_fd);
@@ -33,8 +37,15 @@ CommConnection *WFMySQLServer::new_connection(int accept_fd)
count = resp.encode(vec, 8);
if (count >= 0)
{
#ifdef _WIN32
for (int i = 0; i < count; i++)
_write(accept_fd, vec[i].iov_base, (unsigned int)vec[i].iov_len);
return conn;
#else
if (writev(accept_fd, vec, count) >= 0)
return conn;
#endif
}
delete conn;

View File

@@ -30,12 +30,12 @@ class MySQLServer;
static constexpr struct WFServerParams MYSQL_SERVER_PARAMS_DEFAULT =
{
.max_connections = 2000,
.peer_response_timeout = 10 * 1000,
.receive_timeout = -1,
.keep_alive_timeout = 28800 * 1000,
.request_size_limit = (size_t)-1,
.ssl_accept_timeout = 10 * 1000,
/* .max_connections = */ 2000,
/* .peer_response_timeout = */ 10 * 1000,
/* .receive_timeout = */ -1,
/* .keep_alive_timeout = */ 28800 * 1000,
/* .request_size_limit = */ (size_t)-1,
/* .ssl_accept_timeout = */ 10 * 1000
};
class WFMySQLServer : public WFServer<protocol::MySQLRequest,

View File

@@ -29,12 +29,12 @@ using WFRedisServer = WFServer<protocol::RedisRequest,
static constexpr struct WFServerParams REDIS_SERVER_PARAMS_DEFAULT =
{
.max_connections = 2000,
.peer_response_timeout = 10 * 1000,
.receive_timeout = -1,
.keep_alive_timeout = 300 * 1000,
.request_size_limit = (size_t)-1,
.ssl_accept_timeout = 5000,
/* .max_connections = */ 2000,
/* .peer_response_timeout = */ 10 * 1000,
/* .receive_timeout = */ -1,
/* .keep_alive_timeout = */ 300 * 1000,
/* .request_size_limit = */ (size_t)-1,
/* .ssl_accept_timeout = */ 10 * 1000
};
template<>

View File

@@ -17,14 +17,10 @@
Wu Jiaxu (wujiaxu@sogou-inc.com)
*/
#include <openssl/ssl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdio.h>
#include <openssl/ssl.h>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include "PlatformSocket.h"
#include "CommScheduler.h"
#include "WFConnection.h"
#include "WFGlobal.h"
@@ -93,11 +89,11 @@ int WFServerBase::create_listen_fd()
int reuse = 1;
this->get_addr(&bind_addr, &addrlen);
listen_fd = socket(bind_addr->sa_family, SOCK_STREAM, 0);
listen_fd = (int)socket(bind_addr->sa_family, SOCK_STREAM, 0);
if (listen_fd >= 0)
{
setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR,
&reuse, sizeof (int));
(const char *)(&reuse), sizeof (int));
}
}
@@ -111,7 +107,7 @@ CommConnection *WFServerBase::new_connection(int accept_fd)
{
int reuse = 1;
setsockopt(accept_fd, SOL_SOCKET, SO_REUSEADDR,
&reuse, sizeof (int));
(const char *)(&reuse), sizeof (int));
return new WFServerConnection(&this->conn_count);
}
@@ -145,15 +141,15 @@ int WFServerBase::start(const struct sockaddr *bind_addr, socklen_t addrlen,
int WFServerBase::start(int family, const char *host, unsigned short port,
const char *cert_file, const char *key_file)
{
struct addrinfo hints = {
.ai_flags = AI_PASSIVE,
.ai_family = family,
.ai_socktype = SOCK_STREAM,
};
struct addrinfo hints;
struct addrinfo *addrinfo;
char port_str[PORT_STR_MAX + 1];
int ret;
memset(&hints, 0, sizeof (hints));
hints.ai_flags = AI_PASSIVE;
hints.ai_family = family;
hints.ai_socktype = SOCK_STREAM;
snprintf(port_str, PORT_STR_MAX + 1, "%d", port);
ret = getaddrinfo(host, port_str, &hints, &addrinfo);
if (ret == 0)
@@ -164,8 +160,10 @@ int WFServerBase::start(int family, const char *host, unsigned short port,
}
else
{
#ifdef EAI_SYSTEM
if (ret != EAI_SYSTEM)
errno = EINVAL;
#endif
ret = -1;
}
@@ -198,6 +196,12 @@ static int __get_addr_bound(int sockfd, struct sockaddr *addr, socklen_t *len)
return 0;
}
#ifdef _WIN32
#include <io.h>
#else
#include <unistd.h>
#endif
int WFServerBase::serve(int listen_fd,
const char *cert_file, const char *key_file)
{

View File

@@ -20,12 +20,11 @@
#ifndef _WFSERVER_H_
#define _WFSERVER_H_
#include <sys/types.h>
#include <sys/socket.h>
#include <functional>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include "PlatformSocket.h"
#include "CommScheduler.h"
#include "WFTaskFactory.h"
@@ -41,12 +40,12 @@ struct WFServerParams
static constexpr struct WFServerParams SERVER_PARAMS_DEFAULT =
{
.max_connections = 2000,
.peer_response_timeout = 10 * 1000,
.receive_timeout = -1,
.keep_alive_timeout = 60 * 1000,
.request_size_limit = (size_t)-1,
.ssl_accept_timeout = 10 * 1000,
/* .max_connections = */ 2000,
/* .peer_response_timeout = */ 10 * 1000,
/* .receive_timeout = */ -1,
/* .keep_alive_timeout = */ 60 * 1000,
/* .request_size_limit = */ (size_t)-1,
/* .ssl_accept_timeout = */ 10 * 1000,
};
class WFServerBase : protected CommService

View File

@@ -19,11 +19,11 @@
#ifndef _ENCODESTREAM_H_
#define _ENCODESTREAM_H_
#include <sys/uio.h>
#include <stdint.h>
#include <string.h>
#include <utility>
#include <string>
#include "PlatformSocket.h"
#include "list.h"
/**

View File

@@ -19,6 +19,7 @@
#ifndef _LRUCACHE_H_
#define _LRUCACHE_H_
#include <assert.h>
#include <mutex>
#include <map>
#include <mutex>
//#include <unordered_map>

117
src/util/RWLock.h Normal file
View File

@@ -0,0 +1,117 @@
/*
Copyright (c) 2019 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Authors: Wu Jiaxu (wujiaxu@sogou-inc.com)
*/
#ifndef _RWLOCK_H_
#define _RWLOCK_H_
#include <mutex>
#include <condition_variable>
class RWLock
{
public:
RWLock() : status_(0), waiting_readers_(0), waiting_writers_(0) {}
RWLock(const RWLock&) = delete;
RWLock(RWLock&&) = delete;
RWLock& operator= (const RWLock&) = delete;
RWLock& operator= (RWLock&&) = delete;
void rlock()
{
std::unique_lock<std::mutex> lock(mutex_);
waiting_readers_++;
//while (status_ < 0)
while (status_ < 0 || waiting_writers_ > 0)
read_cond_.wait(lock);
waiting_readers_--;
status_++;
}
void wlock()
{
std::unique_lock<std::mutex> lock(mutex_);
waiting_writers_++;
while (status_ != 0)
write_cond_.wait(lock);
waiting_writers_--;
status_--;
}
void unlock()
{
std::lock_guard<std::mutex> lock(mutex_);
if (status_ < 0)// status must be -1
status_++;
else if (status_ > 0)
status_--;
if (waiting_writers_ > 0)
{
if (status_ == 0)
write_cond_.notify_one();
}
else if (waiting_readers_ > 0)
{
if (waiting_readers_ == 1)
read_cond_.notify_one();
else
read_cond_.notify_all();
}
}
private:
// -1 : one writer
// 0 : no reader and no writer
// n > 0 : n reader
int32_t status_;
int32_t waiting_readers_;
int32_t waiting_writers_;
std::mutex mutex_;
std::condition_variable read_cond_;
std::condition_variable write_cond_;
};
// RAII: YES
class ReadLock
{
public:
ReadLock(RWLock& rwlock): rwlock_(&rwlock) { rwlock_->rlock(); }
~ReadLock() { rwlock_->unlock(); }
private:
RWLock *rwlock_;
};
// RAII: YES
class WriteLock
{
public:
WriteLock(RWLock& rwlock): rwlock_(&rwlock) { rwlock_->wlock(); }
~WriteLock() { rwlock_->unlock(); }
private:
RWLock *rwlock_;
};
#endif

View File

@@ -18,11 +18,11 @@ set(memcheck_command ${CMAKE_MEMORYCHECK_COMMAND} ${CMAKE_MEMORYCHECK_COMMAND_OP
add_custom_target(check COMMAND ${CMAKE_CTEST_COMMAND})
enable_testing()
find_package(GTest REQUIRED)
find_package(GTest CONFIG REQUIRED)
if (WIN32)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /MP /wd4200")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /MP /wd4200 /std:c++14")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /MP /wd4200 /Zc:__cplusplus /std:c++14")
else ()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -fPIC -pipe -std=gnu90")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -fPIC -pipe -std=c++11 -fno-exceptions")
@@ -37,15 +37,28 @@ set(TEST_LIST
facilities_unittest
)
set(GTEST_LIB GTest::GTest GTest::Main)
if (APPLE)
set(WORKFLOW_LIB workflow pthread OpenSSL::SSL OpenSSL::Crypto)
elseif (WIN32)
set(WORKFLOW_LIB workflow ws2_32 wsock32 OpenSSL::SSL OpenSSL::Crypto)
set(GTEST_LIB GTest::gtest GTest::gtest_main)
else ()
set(WORKFLOW_LIB workflow ${LIBRT})
endif ()
foreach(src ${TEST_LIST})
add_executable(${src} EXCLUDE_FROM_ALL ${src}.cc)
target_link_libraries(${src} ${WORKFLOW_LIB} GTest::GTest GTest::Main)
target_link_libraries(${src} ${WORKFLOW_LIB} ${GTEST_LIB})
if (WIN32)
target_compile_definitions(
${src} PRIVATE
strdup=_strdup
strcasecmp=_stricmp
strncasecmp=_strnicmp
)
endif ()
add_test(${src} ${src})
add_dependencies(check ${src})
endforeach()

View File

@@ -20,7 +20,6 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <string>
#include <mutex>

View File

@@ -16,7 +16,7 @@ link_directories(${WORKFLOW_LIB_DIR})
if (WIN32)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} /MP /wd4200")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /MP /wd4200 /std:c++14")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /MP /wd4200 /Zc:__cplusplus /std:c++14")
else ()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall -fPIC -pipe -std=gnu90")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -fPIC -pipe -std=c++11 -fno-exceptions")
@@ -37,6 +37,8 @@ set(TUTORIAL_LIST
if (APPLE)
set(WORKFLOW_LIB workflow pthread OpenSSL::SSL OpenSSL::Crypto)
elseif (WIN32)
set(WORKFLOW_LIB workflow ws2_32 wsock32 OpenSSL::SSL OpenSSL::Crypto)
else ()
set(WORKFLOW_LIB workflow ${LIBRT})
endif ()
@@ -46,6 +48,14 @@ foreach(src ${TUTORIAL_LIST})
list(GET arr -1 bin_name)
add_executable(${bin_name} ${src}.cc)
target_link_libraries(${bin_name} ${WORKFLOW_LIB})
if (WIN32)
target_compile_definitions(
${bin_name} PRIVATE
strdup=_strdup
strcasecmp=_stricmp
strncasecmp=_strnicmp
)
endif ()
endforeach()
set(DIR10 tutorial-10-user_defined_protocol)
@@ -58,3 +68,4 @@ set_target_properties(server PROPERTIES
RUNTIME_OUTPUT_DIRECTORY ${PROJECT_SOURCE_DIR}/${DIR10})
set_target_properties(client PROPERTIES
RUNTIME_OUTPUT_DIRECTORY ${PROJECT_SOURCE_DIR}/${DIR10})

View File

@@ -16,8 +16,6 @@
Author: Xie Han (xiehan@sogou-inc.com;63350856@qq.com)
*/
#include <netdb.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
@@ -27,6 +25,10 @@
#include "workflow/HttpUtil.h"
#include "workflow/WFTaskFactory.h"
#ifndef _WIN32
#include <unistd.h>
#endif
#define REDIRECT_MAX 5
#define RETRY_MAX 2
@@ -125,7 +127,11 @@ int main(int argc, char *argv[])
req->add_header_pair("User-Agent", "Wget/1.14 (linux-gnu)");
req->add_header_pair("Connection", "close");
task->start();
#ifndef _WIN32
pause();
#else
getchar();
#endif
return 0;
}

View File

@@ -16,8 +16,6 @@
Author: Xie Han (xiehan@sogou-inc.com;63350856@qq.com)
*/
#include <netdb.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
@@ -26,6 +24,10 @@
#include "workflow/RedisMessage.h"
#include "workflow/WFTaskFactory.h"
#ifndef _WIN32
#include <unistd.h>
#endif
#define RETRY_MAX 2
struct tutorial_task_data
@@ -145,7 +147,11 @@ int main(int argc, char *argv[])
* Workflow::start_series_work(task, nullptr) or
* Workflow::create_series_work(task, nullptr)->start() */
task->start();
#ifndef _WIN32
pause();
#else
getchar();
#endif
return 0;
}

View File

@@ -17,8 +17,6 @@
*/
/* Tuturial-03. Store wget result in redis: key=URL, value=Http Body*/
#include <netdb.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
@@ -178,6 +176,7 @@ int main(int argc, char *argv[])
std::unique_lock<std::mutex> lock(mutex);
while (!finished)
cond.wait(lock);
lock.unlock();
return 0;
}

View File

@@ -16,12 +16,7 @@
Author: Xie Han (xiehan@sogou-inc.com;63350856@qq.com)
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
@@ -31,6 +26,10 @@
#include "workflow/WFServer.h"
#include "workflow/WFHttpServer.h"
#ifndef _WIN32
#include <unistd.h>
#endif
void process(WFHttpTask *server_task)
{
protocol::HttpRequest *req = server_task->get_req();
@@ -110,7 +109,11 @@ int main(int argc, char *argv[])
port = atoi(argv[1]);
if (server.start(port) == 0)
{
#ifndef _WIN32
pause();
#else
getchar();
#endif
server.stop();
}
else

View File

@@ -17,7 +17,6 @@
*/
#include <signal.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <utility>
@@ -26,6 +25,10 @@
#include "workflow/HttpUtil.h"
#include "workflow/WFHttpServer.h"
#ifndef _WIN32
#include <unistd.h>
#endif
struct tutorial_series_context
{
std::string url;
@@ -160,7 +163,11 @@ int main(int argc, char *argv[])
if (server.start(port) == 0)
{
#ifndef _WIN32
pause();
#else
getchar();
#endif
server.stop();
}
else

View File

@@ -122,6 +122,7 @@ int main(int argc, char *argv[])
std::unique_lock<std::mutex> lock(mutex);
while (!finished)
cond.wait(lock);
lock.unlock();
return 0;
}

View File

@@ -115,8 +115,8 @@ int main(int argc, char *argv[])
std::unique_lock<std::mutex> lock(mutex);
while (!finished)
cond.wait(lock);
lock.unlock();
lock.unlock();
free(array);
return 0;
}

View File

@@ -162,6 +162,7 @@ int main()
std::unique_lock<std::mutex> lock(mutex);
while (!finished)
cond.wait(lock);
lock.unlock();
return 0;
}

View File

@@ -18,7 +18,6 @@
#include <signal.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <utility>
@@ -28,6 +27,18 @@
#include "workflow/WFTaskFactory.h"
#include "workflow/Workflow.h"
#ifndef _WIN32
#include <unistd.h>
#endif
#ifdef _WIN32
#include <io.h>
# define open _open
# define O_RDONLY _O_RDONLY
# define close _close
# define lseek _lseek
#endif
using namespace protocol;
void pread_callback(WFFileIOTask *task)
@@ -113,7 +124,11 @@ int main(int argc, char *argv[])
if (ret == 0)
{
#ifndef _WIN32
pause();
#else
getchar();
#endif
server.stop();
}
else

View File

@@ -20,7 +20,7 @@
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <arpa/inet.h>
#include "workflow/PlatformSocket.h"
#include "message.h"
namespace protocol

View File

@@ -20,7 +20,6 @@
#include <stdio.h>
#include <ctype.h>
#include <signal.h>
#include <unistd.h>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFServer.h"
@@ -70,7 +69,7 @@ int main(int argc, char *argv[])
if (server.start(AF_INET6, port) == 0 ||
server.start(AF_INET, port) == 0)
{
pause();
getchar();
server.stop();
}
else

View File

@@ -20,12 +20,11 @@
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <signal.h>
#include <vector>
#include <map>
#include <mutex>
#include <condition_variable>
#include <signal.h>
#include "workflow/Workflow.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/MySQLResult.h"
@@ -259,8 +258,9 @@ int main(int argc, char *argv[])
series->start();
std::unique_lock<std::mutex> lock(mutex);
while (task_finished == false)
while (!task_finished)
cond.wait(lock);
lock.unlock();
return 0;
}