mirror of
https://github.com/sogou/workflow.git
synced 2026-02-08 01:33:17 +08:00
initialization
This commit is contained in:
123
tutorial/tutorial-10-user_defined_protocol/client.cc
Normal file
123
tutorial/tutorial-10-user_defined_protocol/client.cc
Normal file
@@ -0,0 +1,123 @@
|
||||
/*
|
||||
Copyright (c) 2020 Sogou, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
Author: Xie Han (xiehan@sogou-inc.com;63350856@qq.com)
|
||||
*/
|
||||
|
||||
#include <string.h>
|
||||
#include <stdio.h>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include "workflow/Workflow.h"
|
||||
#include "workflow/WFTaskFactory.h"
|
||||
#include "message.h"
|
||||
|
||||
using WFTutorialTask = WFNetworkTask<protocol::TutorialRequest,
|
||||
protocol::TutorialResponse>;
|
||||
using tutorial_callback_t = std::function<void (WFTutorialTask *)>;
|
||||
|
||||
using namespace protocol;
|
||||
|
||||
class MyFactory : public WFTaskFactory
|
||||
{
|
||||
public:
|
||||
static WFTutorialTask *create_tutorial_task(const std::string& host,
|
||||
unsigned short port,
|
||||
int retry_max,
|
||||
tutorial_callback_t callback)
|
||||
{
|
||||
using NTF = WFNetworkTaskFactory<TutorialRequest, TutorialResponse>;
|
||||
WFTutorialTask *task = NTF::create_client_task(TT_TCP, host, port,
|
||||
retry_max,
|
||||
std::move(callback));
|
||||
task->set_keep_alive(30 * 1000);
|
||||
return task;
|
||||
}
|
||||
};
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
std::mutex mutex;
|
||||
std::condition_variable cond;
|
||||
bool finished = false;
|
||||
unsigned short port;
|
||||
std::string host;
|
||||
|
||||
if (argc != 3)
|
||||
{
|
||||
fprintf(stderr, "USAGE: %s <host> <port>\n", argv[0]);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
host = argv[1];
|
||||
port = atoi(argv[2]);
|
||||
std::function<void (WFTutorialTask *task)> callback =
|
||||
[&host, port, &callback](WFTutorialTask *task) {
|
||||
int state = task->get_state();
|
||||
int error = task->get_error();
|
||||
TutorialResponse *resp = task->get_resp();
|
||||
char buf[1024];
|
||||
void *body;
|
||||
size_t body_size;
|
||||
|
||||
if (state != WFT_STATE_SUCCESS)
|
||||
{
|
||||
if (state == WFT_STATE_SYS_ERROR)
|
||||
fprintf(stderr, "SYS error: %s\n", strerror(error));
|
||||
else if (state == WFT_STATE_DNS_ERROR)
|
||||
fprintf(stderr, "DNS error: %s\n", gai_strerror(error));
|
||||
else
|
||||
fprintf(stderr, "other error.\n");
|
||||
return;
|
||||
}
|
||||
|
||||
resp->get_message_body_nocopy(&body, &body_size);
|
||||
if (body_size != 0)
|
||||
printf("Server Response: %.*s\n", (int)body_size, (char *)body);
|
||||
|
||||
printf("Input next request string (Ctrl-D to exit): ");
|
||||
*buf = '\0';
|
||||
scanf("%1024s", buf);
|
||||
body_size = strlen(buf);
|
||||
if (body_size > 0)
|
||||
{
|
||||
WFTutorialTask *next;
|
||||
next = MyFactory::create_tutorial_task(host, port, 0, callback);
|
||||
next->get_req()->set_message_body(buf, body_size);
|
||||
next->get_resp()->set_size_limit(4 * 1024);
|
||||
**task << next; /* equal to: series_of(task)->push_back(next) */
|
||||
}
|
||||
else
|
||||
printf("\n");
|
||||
};
|
||||
|
||||
/* First request is emtpy. We will ignore the server response. */
|
||||
WFTutorialTask *task = MyFactory::create_tutorial_task(host, port, 0, callback);
|
||||
task->get_resp()->set_size_limit(4 * 1024);
|
||||
Workflow::start_series_work(task, [&mutex, &cond, &finished](const SeriesWork *)
|
||||
{
|
||||
mutex.lock();
|
||||
finished = true;
|
||||
cond.notify_one();
|
||||
mutex.unlock();
|
||||
});
|
||||
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
while (!finished)
|
||||
cond.wait(lock);
|
||||
lock.unlock();
|
||||
return 0;
|
||||
}
|
||||
|
||||
147
tutorial/tutorial-10-user_defined_protocol/message.cc
Normal file
147
tutorial/tutorial-10-user_defined_protocol/message.cc
Normal file
@@ -0,0 +1,147 @@
|
||||
/*
|
||||
Copyright (c) 2020 Sogou, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
Author: Xie Han (xiehan@sogou-inc.com;63350856@qq.com)
|
||||
*/
|
||||
|
||||
#include <errno.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <stdint.h>
|
||||
#include <arpa/inet.h>
|
||||
#include "message.h"
|
||||
|
||||
namespace protocol
|
||||
{
|
||||
|
||||
int TutorialMessage::encode(struct iovec vectors[], int max/*max==8192*/)
|
||||
{
|
||||
uint32_t n = htonl(this->body_size);
|
||||
|
||||
memcpy(this->head, &n, 4);
|
||||
vectors[0].iov_base = this->head;
|
||||
vectors[0].iov_len = 4;
|
||||
vectors[1].iov_base = this->body;
|
||||
vectors[1].iov_len = this->body_size;
|
||||
|
||||
return 2; /* return the number of vectors used, no more then max. */
|
||||
}
|
||||
|
||||
int TutorialMessage::append(const void *buf, size_t size)
|
||||
{
|
||||
if (this->head_received < 4)
|
||||
{
|
||||
size_t head_left;
|
||||
void *p;
|
||||
|
||||
p = &this->head[head_received];
|
||||
head_left = 4 - this->head_received;
|
||||
if (size < 4 - this->head_received)
|
||||
{
|
||||
memcpy(p, buf, size);
|
||||
this->head_received += size;
|
||||
return 0;
|
||||
}
|
||||
|
||||
memcpy(p, buf, head_left);
|
||||
size -= head_left;
|
||||
buf = (const char *)buf + head_left;
|
||||
|
||||
p = this->head;
|
||||
this->body_size = ntohl(*(uint32_t *)p);
|
||||
if (this->body_size > this->size_limit)
|
||||
{
|
||||
errno = EMSGSIZE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
this->body = (char *)malloc(this->body_size);
|
||||
if (!this->body)
|
||||
return -1;
|
||||
|
||||
this->body_received = 0;
|
||||
}
|
||||
|
||||
size_t body_left = this->body_size - this->body_received;
|
||||
|
||||
if (size > body_left)
|
||||
{
|
||||
errno = EBADMSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(this->body, buf, body_left);
|
||||
if (size < body_left)
|
||||
return 0;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int TutorialMessage::set_message_body(const void *body, size_t size)
|
||||
{
|
||||
void *p = malloc(size);
|
||||
|
||||
if (!p)
|
||||
return -1;
|
||||
|
||||
memcpy(p, body, size);
|
||||
free(this->body);
|
||||
this->body = (char *)p;
|
||||
this->body_size = size;
|
||||
|
||||
this->head_received = 4;
|
||||
this->body_received = size;
|
||||
return 0;
|
||||
}
|
||||
|
||||
TutorialMessage::TutorialMessage(TutorialMessage&& msg)
|
||||
{
|
||||
this->size_limit = msg.size_limit;
|
||||
msg.size_limit = (size_t)-1;
|
||||
|
||||
memcpy(this->head, msg.head, 4);
|
||||
this->head_received = msg.head_received;
|
||||
this->body = msg.body;
|
||||
this->body_received = msg.body_received;
|
||||
this->body_size = msg.body_size;
|
||||
|
||||
msg.head_received = 0;
|
||||
msg.body = NULL;
|
||||
msg.body_size = 0;
|
||||
}
|
||||
|
||||
TutorialMessage& TutorialMessage::operator = (TutorialMessage&& msg)
|
||||
{
|
||||
if (&msg != this)
|
||||
{
|
||||
this->size_limit = msg.size_limit;
|
||||
msg.size_limit = (size_t)-1;
|
||||
|
||||
memcpy(this->head, msg.head, 4);
|
||||
this->head_received = msg.head_received;
|
||||
this->body = msg.body;
|
||||
this->body_received = msg.body_received;
|
||||
this->body_size = msg.body_size;
|
||||
|
||||
msg.head_received = 0;
|
||||
msg.body = NULL;
|
||||
msg.body_size = 0;
|
||||
}
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
73
tutorial/tutorial-10-user_defined_protocol/message.h
Normal file
73
tutorial/tutorial-10-user_defined_protocol/message.h
Normal file
@@ -0,0 +1,73 @@
|
||||
/*
|
||||
Copyright (c) 2020 Sogou, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
Author: Xie Han (xiehan@sogou-inc.com;63350856@qq.com)
|
||||
*/
|
||||
|
||||
#ifndef _TUTORIALMESSAGE_H_
|
||||
#define _TUTORIALMESSAGE_H_
|
||||
|
||||
#include <stdlib.h>
|
||||
#include "workflow/ProtocolMessage.h"
|
||||
|
||||
namespace protocol
|
||||
{
|
||||
|
||||
class TutorialMessage : public ProtocolMessage
|
||||
{
|
||||
private:
|
||||
virtual int encode(struct iovec vectors[], int max);
|
||||
virtual int append(const void *buf, size_t size);
|
||||
|
||||
public:
|
||||
int set_message_body(const void *body, size_t size);
|
||||
|
||||
void get_message_body_nocopy(void **body, size_t *size)
|
||||
{
|
||||
*body = this->body;
|
||||
*size = this->body_size;
|
||||
}
|
||||
|
||||
protected:
|
||||
char head[4];
|
||||
size_t head_received;
|
||||
char *body;
|
||||
size_t body_received;
|
||||
size_t body_size;
|
||||
|
||||
public:
|
||||
TutorialMessage()
|
||||
{
|
||||
this->head_received = 0;
|
||||
this->body = NULL;
|
||||
this->body_size = 0;
|
||||
}
|
||||
|
||||
TutorialMessage(TutorialMessage&& msg);
|
||||
TutorialMessage& operator = (TutorialMessage&& msg);
|
||||
|
||||
virtual ~TutorialMessage()
|
||||
{
|
||||
free(this->body);
|
||||
}
|
||||
};
|
||||
|
||||
using TutorialRequest = TutorialMessage;
|
||||
using TutorialResponse = TutorialMessage;
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
84
tutorial/tutorial-10-user_defined_protocol/server.cc
Normal file
84
tutorial/tutorial-10-user_defined_protocol/server.cc
Normal file
@@ -0,0 +1,84 @@
|
||||
/*
|
||||
Copyright (c) 2020 Sogou, Inc.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
Author: Xie Han (xiehan@sogou-inc.com;63350856@qq.com)
|
||||
*/
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <ctype.h>
|
||||
#include <signal.h>
|
||||
#include <unistd.h>
|
||||
#include "workflow/Workflow.h"
|
||||
#include "workflow/WFTaskFactory.h"
|
||||
#include "workflow/WFServer.h"
|
||||
#include "message.h"
|
||||
|
||||
using WFTutorialTask = WFNetworkTask<protocol::TutorialRequest,
|
||||
protocol::TutorialResponse>;
|
||||
using WFTutorialServer = WFServer<protocol::TutorialRequest,
|
||||
protocol::TutorialResponse>;
|
||||
|
||||
using namespace protocol;
|
||||
|
||||
void process(WFTutorialTask *task)
|
||||
{
|
||||
TutorialRequest *req = task->get_req();
|
||||
TutorialResponse *resp = task->get_resp();
|
||||
void *body;
|
||||
size_t size;
|
||||
size_t i;
|
||||
|
||||
req->get_message_body_nocopy(&body, &size);
|
||||
for (i = 0; i < size; i++)
|
||||
((char *)body)[i] = toupper(((char *)body)[i]);
|
||||
|
||||
resp->set_message_body(body, size);
|
||||
}
|
||||
|
||||
void sig_handler(int signo) { }
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
unsigned short port;
|
||||
|
||||
if (argc != 2)
|
||||
{
|
||||
fprintf(stderr, "USAGE %s <port>\n", argv[0]);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
port = atoi(argv[1]);
|
||||
signal(SIGINT, sig_handler);
|
||||
|
||||
struct WFServerParams params = SERVER_PARAMS_DEFAULT;
|
||||
params.request_size_limit = 4 * 1024;
|
||||
|
||||
WFTutorialServer server(¶ms, process);
|
||||
if (server.start(AF_INET6, port) == 0 ||
|
||||
server.start(AF_INET, port) == 0)
|
||||
{
|
||||
pause();
|
||||
server.stop();
|
||||
}
|
||||
else
|
||||
{
|
||||
perror("server.start");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user