From 8d2932cd46f241bdca20be6b8e302ad881fe8b78 Mon Sep 17 00:00:00 2001 From: XieHan Date: Sun, 27 Nov 2022 17:54:49 +0800 Subject: [PATCH] Add back consul tutorial --- BUILD | 6 + tutorial/CMakeLists.txt | 12 ++ tutorial/GNUmakefile | 2 +- tutorial/tutorial-14-consul_cli.cc | 243 +++++++++++++++++++++++++++++ 4 files changed, 262 insertions(+), 1 deletion(-) create mode 100644 tutorial/tutorial-14-consul_cli.cc diff --git a/BUILD b/BUILD index f416de19..df40f4fd 100644 --- a/BUILD +++ b/BUILD @@ -366,3 +366,9 @@ cc_binary( srcs = ['tutorial/tutorial-13-kafka_cli.cc'], deps = [':kafka', ':workflow_hdrs'], ) + +cc_binary( + name = 'consul_cli', + srcs = ['tutorial/tutorial-14-consul_cli.cc'], + deps = [':consul'], +) diff --git a/tutorial/CMakeLists.txt b/tutorial/CMakeLists.txt index c9db7511..3a17c521 100644 --- a/tutorial/CMakeLists.txt +++ b/tutorial/CMakeLists.txt @@ -86,6 +86,18 @@ foreach(src ${TUTORIAL_LIST}) endforeach() endif() +if (NOT CONSUL STREQUAL "n") +set(TUTORIAL_LIST + tutorial-14-consul_cli +) +foreach(src ${TUTORIAL_LIST}) + string(REPLACE "-" ";" arr ${src}) + list(GET arr -1 bin_name) + add_executable(${bin_name} ${src}.cc) + target_link_libraries(${bin_name} ${WORKFLOW_LIB}) +endforeach() +endif() + if (KAFKA STREQUAL "y") add_executable("kafka_cli" "tutorial-13-kafka_cli.cc") target_link_libraries("kafka_cli" wfkafka ${WORKFLOW_LIB} z snappy lz4 zstd) diff --git a/tutorial/GNUmakefile b/tutorial/GNUmakefile index cb0ee1f2..3e711b30 100644 --- a/tutorial/GNUmakefile +++ b/tutorial/GNUmakefile @@ -13,7 +13,7 @@ all: rm -rf $(DEFAULT_BUILD_DIR)/CMakeCache.txt ifeq ($(DEBUG),y) - cd $(BUILD_DIR) && $(CMAKE3) -D CMAKE_BUILD_TYPE=Debug -D KAFKA=$(KAFKA) -D MYSQL=$(MYSQL) -D REDIS=$(REDIS) $(ROOT_DIR) + cd $(BUILD_DIR) && $(CMAKE3) -D CMAKE_BUILD_TYPE=Debug -D KAFKA=$(KAFKA) -D CONSUL=$(CONSUL) -D MYSQL=$(MYSQL) -D REDIS=$(REDIS) $(ROOT_DIR) else cd $(BUILD_DIR) && $(CMAKE3) -D KAFKA=$(KAFKA) -D MYSQL=$(MYSQL) -D REDIS=$(REDIS) $(ROOT_DIR) endif diff --git a/tutorial/tutorial-14-consul_cli.cc b/tutorial/tutorial-14-consul_cli.cc new file mode 100644 index 00000000..b681f0cb --- /dev/null +++ b/tutorial/tutorial-14-consul_cli.cc @@ -0,0 +1,243 @@ +/* + Copyright (c) 2020 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Author: Wang Zhenpeng (wangzhenpeng@sogou-inc.com) +*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "workflow/WFConsulClient.h" +#include "workflow/ConsulDataTypes.h" +#include "workflow/WFTaskFactory.h" +#include "workflow/WFFacilities.h" +#include "workflow/HttpMessage.h" +#include "workflow/WFGlobal.h" + +using namespace protocol; + +static WFFacilities::WaitGroup wait_group(1); + +std::string url; +WFConsulClient client; + +void print_discover_result(std::vector& discover_result) +{ + for (const auto& instance : discover_result) + { + fprintf(stderr, "%s", "discover_instance\n"); + + fprintf(stderr, "node_id:%s\n", instance.node_id.c_str()); + fprintf(stderr, "node_name:%s\n", instance.node_name.c_str()); + fprintf(stderr, "node_address:%s\n", instance.node_address.c_str()); + fprintf(stderr, "dc:%s\n", instance.dc.c_str()); + const std::map& node_meta = instance.node_meta; + for (const auto& meta_kv : node_meta) + { + fprintf(stderr, "node_meta:%s = %s\n", meta_kv.first.c_str(), + meta_kv.second.c_str()); + } + fprintf(stderr, "create_index:%lld\n", instance.create_index); + fprintf(stderr, "modify_index:%lld\n", instance.modify_index); + + fprintf(stderr, "service_id:%s\n", instance.service.service_id.c_str()); + fprintf(stderr, "service_name:%s\n", instance.service.service_name.c_str()); + fprintf(stderr, "service_namespace:%s\n", instance.service.service_namespace.c_str()); + fprintf(stderr, "service_address:%s\n", instance.service.service_address.first.c_str()); + fprintf(stderr, "service_port:%d\n", instance.service.service_address.second); + fprintf(stderr, "service_tag_override:%d\n", instance.service.tag_override); + fprintf(stderr, "%s", "service_tags:"); + const std::vector& tags = instance.service.tags; + for (const auto& tag : tags) + { + fprintf(stderr, "%s,", tag.c_str()); + } + fprintf(stderr, "\n"); + const std::map& service_meta = instance.service.meta; + for (const auto& meta_kv : service_meta) + { + fprintf(stderr, "service_meta:%s = %s\n", meta_kv.first.c_str(), + meta_kv.second.c_str()); + } + fprintf(stderr, "lan:%s:%d\n", instance.service.lan.first.c_str(), + instance.service.lan.second); + fprintf(stderr, "lan_ipv4:%s:%d\n", + instance.service.lan_ipv4.first.c_str(), + instance.service.lan_ipv4.second); + fprintf(stderr, "lan_ipv6:%s:%d\n", + instance.service.lan_ipv6.first.c_str(), + instance.service.lan_ipv6.second); + fprintf(stderr, "wan:%s:%d\n", instance.service.wan.first.c_str(), + instance.service.wan.second); + fprintf(stderr, "wan_ipv4:%s:%d\n", + instance.service.wan_ipv4.first.c_str(), + instance.service.wan_ipv4.second); + fprintf(stderr, "wan_ipv6:%s:%d\n", + instance.service.wan_ipv6.first.c_str(), + instance.service.wan_ipv6.second); + + fprintf(stderr, "check_id:%s\n", instance.check_id.c_str()); + fprintf(stderr, "check_name:%s\n", instance.check_name.c_str()); + fprintf(stderr, "check_notes:%s\n", instance.check_notes.c_str()); + fprintf(stderr, "check_output:%s\n", instance.check_output.c_str()); + fprintf(stderr, "check_status:%s\n", instance.check_status.c_str()); + fprintf(stderr, "check_type:%s\n", instance.check_type.c_str()); + } +} + +void print_list_service_result( + std::vector& list_service_result) +{ + for (const auto& instance : list_service_result) + { + fprintf(stderr, "service name:%s tags:", instance.service_name.c_str()); + std::string tags_log; + for (const auto& tag : instance.tags) + { + tags_log += tag; + tags_log += ","; + } + if (tags_log.size() > 0) + tags_log.pop_back(); + + fprintf(stderr, "%s\n", tags_log.c_str()); + } +} + +void consul_callback(WFConsulTask *task) +{ + int state = task->get_state(); + int error = task->get_error(); + + if (state != WFT_STATE_SUCCESS) + { + fprintf(stderr, "error:%d, error msg:%s\n", + error, WFGlobal::get_error_string(state, error)); + fprintf(stderr, "Failed. Press Ctrl-C to exit.\n"); + wait_group.done(); + return; + } + + int api_type = task->get_api_type(); + std::vector dis_result; + std::vector list_service_result; + + switch (api_type) + { + case CONSUL_API_TYPE_DISCOVER: + fprintf(stderr, "discover ok\n"); + fprintf(stderr, "consul-index:%lld\n", task->get_consul_index()); + if (task->get_discover_result(dis_result)) + print_discover_result(dis_result); + else + fprintf(stderr, "error:%d\n", task->get_error()); + break; + + case CONSUL_API_TYPE_LIST_SERVICE: + fprintf(stderr, "list service ok\n"); + if (task->get_list_service_result(list_service_result)) + print_list_service_result(list_service_result); + else + fprintf(stderr, "error:%d\n", task->get_error()); + break; + + case CONSUL_API_TYPE_REGISTER: + fprintf(stderr, "register ok\n"); + break; + + case CONSUL_API_TYPE_DEREGISTER: + fprintf(stderr, "deregister ok\n"); + break; + + default: + break; + } + wait_group.done(); +} + +void sig_handler(int signo) { } + +int main(int argc, char *argv[]) +{ + if (argc < 3) + { + fprintf(stderr, "USAGE: %s url type(discover/register/deregister)

\n", argv[0]); + exit(1); + } + + signal(SIGINT, sig_handler); + + url = argv[1]; + if (strncmp(argv[1], "http://", 7) != 0) + url = "http://" + url; + + ConsulConfig config; + config.set_token("cd125427-3fd1-f326-bf46-fbce06cc9003"); + config.set_health_check(true); + + // http health check + config.set_check_http_url("http://127.0.0.1:8000/health_check/sd"); + // config.add_http_header("Accept", {"text/html", "application/xml"}); + + // tcp health check + //config.set_check_tcp("127.0.0.1:80"); + + client.init(url, config); + + WFConsulTask *task; + + if (0 == strcmp(argv[2], "discover")) + { + task = client.create_discover_task("", "dev-wf_test_service_1", 3, consul_callback); + config.set_blocking_query(true); + } + else if (0 == strcmp(argv[2], "list_service")) + { + task = client.create_list_service_task("", 3, consul_callback); + } + else if (0 == strcmp(argv[2], "register")) + { + task = client.create_register_task("", "dev-wf_test_service_1", "wf_test_service_id_2", 3, consul_callback); + protocol::ConsulService service; + service.tags.emplace_back("tag1"); + service.tags.emplace_back("tag2"); + service.service_address.first = "127.0.0.1"; + service.service_address.second = 8000; + service.meta["mk1"] = "mv1"; + service.meta["mk2"] = "mv2"; + service.tag_override = true; + task->set_service(&service); + } + else if (0 == strcmp(argv[2], "deregister")) + { + task = client.create_deregister_task("", "wf_test_service_id_2", 3, consul_callback); + } + else + { + fprintf(stderr, "USAGE: %s url

[compress_type/d]\n", argv[0]); + exit(1); + } + + task->start(); + + wait_group.wait(); + + return 0; +}