mirror of
				https://github.com/msgpack/msgpack-c.git
				synced 2025-10-25 10:09:38 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			249 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			249 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| // MessagePack for C++ example
 | |
| //
 | |
| // Copyright (C) 2017 KONDO Takatoshi
 | |
| //
 | |
| //    Distributed under the Boost Software License, Version 1.0.
 | |
| //    (See accompanying file LICENSE_1_0.txt or copy at
 | |
| //    http://www.boost.org/LICENSE_1_0.txt)
 | |
| //
 | |
| 
 | |
| #include <iostream>
 | |
| #include <sstream>
 | |
| #include <cassert>
 | |
| #include <thread>
 | |
| 
 | |
| // MSGPACK_USE_X3_PARSE should be defined before including msgpack.hpp
 | |
| // It usually defined as a compiler option as -DMSGPACK_USE_X3_PARSE.
 | |
| 
 | |
| //#define MSGPACK_USE_X3_PARSE
 | |
| 
 | |
| #include <msgpack.hpp>
 | |
| 
 | |
| #include <boost/asio.hpp>
 | |
| #include <boost/coroutine2/all.hpp>
 | |
| 
 | |
| #if defined(__clang__)
 | |
| #pragma GCC diagnostic push
 | |
| #pragma GCC diagnostic ignored "-Wunused-parameter"
 | |
| #endif // defined(__clang__)
 | |
| 
 | |
| #include <boost/spirit/home/support/multi_pass.hpp>
 | |
| 
 | |
| #if defined(__clang__)
 | |
| #pragma GCC diagnostic pop
 | |
| #endif // defined(__clang__)
 | |
| 
 | |
| namespace as = boost::asio;
 | |
| namespace x3 = boost::spirit::x3;
 | |
| namespace coro2 = boost::coroutines2;
 | |
| 
 | |
| using pull_type = coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::pull_type;
 | |
| 
 | |
| // iterator fetching data from coroutine2.
 | |
| class buffered_iterator : public std::iterator<std::input_iterator_tag, char> {
 | |
| public:
 | |
|     using pointer_t = typename iterator::pointer;
 | |
|     using reference_t = typename iterator::reference;
 | |
| 
 | |
|     explicit buffered_iterator(pull_type& source) noexcept
 | |
|         : source_{ &source } {
 | |
|         fetch_();
 | |
|     }
 | |
|     buffered_iterator() = default;
 | |
| 
 | |
|     bool operator==(buffered_iterator const& other) const noexcept {
 | |
|         if (!other.source_ && !source_ && !other.buf_ && !buf_) return true;
 | |
|         return other.it_ == it_;
 | |
|     }
 | |
| 
 | |
|     bool operator!=(buffered_iterator const& other) const noexcept {
 | |
|         return !(other == *this);
 | |
|     }
 | |
| 
 | |
|     buffered_iterator & operator++() {
 | |
|         increment_();
 | |
|         return * this;
 | |
|     }
 | |
| 
 | |
|     buffered_iterator operator++(int) = delete;
 | |
| 
 | |
|     reference_t operator*() noexcept {
 | |
|         return *it_;
 | |
|     }
 | |
| 
 | |
|     pointer_t operator->() noexcept {
 | |
|         return std::addressof(*it_);
 | |
|     }
 | |
| 
 | |
| private:
 | |
|     void fetch_() noexcept {
 | |
|         BOOST_ASSERT( nullptr != source_);
 | |
|         if (*source_) {
 | |
|             buf_ = source_->get();
 | |
|             it_ = buf_->begin();
 | |
|         }
 | |
|         else {
 | |
|             source_ = nullptr;
 | |
|             buf_.reset();
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     void increment_() {
 | |
|         BOOST_ASSERT( nullptr != source_);
 | |
|         BOOST_ASSERT(*source_);
 | |
|         if (++it_ == buf_->end()) {
 | |
|             (*source_)();
 | |
|             fetch_();
 | |
|         }
 | |
|     }
 | |
| 
 | |
| private:
 | |
|     pull_type* source_{ nullptr };
 | |
|     std::shared_ptr<std::vector<char>> buf_;
 | |
|     std::vector<char>::iterator it_;
 | |
| };
 | |
| 
 | |
| // session class that corresponding to each client
 | |
| class session : public std::enable_shared_from_this<session> {
 | |
| public:
 | |
|     session(as::ip::tcp::socket socket)
 | |
|         : socket_(std::move(socket)) {
 | |
|     }
 | |
| 
 | |
|     void start() {
 | |
|         sink_ = std::make_shared<coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::push_type>(
 | |
|             [&, this](pull_type& source) {
 | |
|                 // *1 is started when the first sink is called.
 | |
| 
 | |
|                 std::cout << "session started" << std::endl;
 | |
|                 do_read();
 | |
|                 source();
 | |
| 
 | |
|                 // use buffered_iterator here
 | |
|                 // b is incremented in msgpack::unpack() and fetch data from sink
 | |
|                 // via coroutine2 mechanism
 | |
|                 auto b = boost::spirit::make_default_multi_pass(buffered_iterator(source));
 | |
|                 auto e = boost::spirit::make_default_multi_pass(buffered_iterator());
 | |
| 
 | |
|                 // This is usually an infinity look, but for test, loop is finished when
 | |
|                 // two message pack data is processed.
 | |
|                 for (int i = 0; i != 2; ++i) {
 | |
|                     auto oh = msgpack::unpack(b, e);
 | |
|                     std::cout << oh.get() << std::endl;
 | |
|                 }
 | |
|             }
 | |
|         );
 | |
|         // send dummy data to start *1
 | |
|         (*sink_)({});
 | |
|     }
 | |
| 
 | |
| private:
 | |
|     void do_read() {
 | |
|         std::cout << "session do_read() is called" << std::endl;
 | |
|         auto self(shared_from_this());
 | |
|         auto data = std::make_shared<std::vector<char>>(static_cast<std::size_t>(max_length));
 | |
|         socket_.async_read_some(
 | |
|             boost::asio::buffer(*data),
 | |
|             [this, self, data]
 | |
|             (boost::system::error_code ec, std::size_t length) {
 | |
|                 if (!ec) {
 | |
|                     data->resize(length);
 | |
|                     (*sink_)(data);
 | |
|                     do_read();
 | |
|                 }
 | |
|             }
 | |
|         );
 | |
|     }
 | |
| 
 | |
|     as::ip::tcp::socket socket_;
 | |
|     static constexpr std::size_t const max_length = 1024;
 | |
|     std::shared_ptr<coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::push_type> sink_;
 | |
| };
 | |
| 
 | |
| class server {
 | |
| public:
 | |
|     server(
 | |
|         as::io_service& ios,
 | |
|         std::uint16_t port)
 | |
|         : acceptor_(ios, as::ip::tcp::endpoint(as::ip::tcp::v4(), port)),
 | |
|           socket_(ios) {
 | |
|         do_accept();
 | |
|         std::cout << "server start accept" << std::endl;
 | |
|         ios.run();
 | |
|     }
 | |
| 
 | |
| private:
 | |
|     void do_accept() {
 | |
|         acceptor_.async_accept(
 | |
|             socket_,
 | |
|             [this](boost::system::error_code ec) {
 | |
|                 if (!ec) {
 | |
|                     std::make_shared<session>(std::move(socket_))->start();
 | |
|                 }
 | |
|                 // for test, only one session is accepted.
 | |
|                 // do_accept();
 | |
|             }
 | |
|         );
 | |
|     }
 | |
| 
 | |
|     as::ip::tcp::acceptor acceptor_;
 | |
|     as::ip::tcp::socket socket_;
 | |
| };
 | |
| 
 | |
| int main() {
 | |
|     std::thread srv(
 | |
|         []{
 | |
|             boost::asio::io_service ios;
 | |
|             server s(ios, 12345);
 | |
|         }
 | |
|     );
 | |
| 
 | |
|     std::thread cli(
 | |
|         []{
 | |
|             std::this_thread::sleep_for(std::chrono::seconds(1));
 | |
|             std::cout << "client start" << std::endl;
 | |
| 
 | |
|             std::stringstream ss;
 | |
|             std::map<std::string, std::vector<int>> v1 {
 | |
|                 { "ABC", { 1, 2, 3 } },
 | |
|                 { "DEFG", { 4, 5 } }
 | |
|             };
 | |
|             std::vector<std::string> v2 {
 | |
|                 "HIJ", "KLM", "NOP"
 | |
|                     };
 | |
|             msgpack::pack(ss, v1);
 | |
|             msgpack::pack(ss, v2);
 | |
| 
 | |
|             auto send_data = ss.str();
 | |
| 
 | |
|             boost::asio::io_service ios;
 | |
|             as::ip::tcp::resolver::query q("127.0.0.1", "12345");
 | |
|             as::ip::tcp::resolver r(ios);
 | |
|             auto it = r.resolve(q);
 | |
| 
 | |
|             std::cout << "client connect" << std::endl;
 | |
|             as::ip::tcp::socket s(ios);
 | |
|             as::connect(s, it);
 | |
| 
 | |
| 
 | |
|             std::size_t const size = 5;
 | |
|             std::size_t rest = send_data.size();
 | |
|             std::size_t index = 0;
 | |
|             while (rest != 0) {
 | |
|                 std::cout << "client send data" << std::endl;
 | |
|                 auto send_size = size < rest ? size : rest;
 | |
|                 as::write(s, as::buffer(&send_data[index], send_size));
 | |
|                 rest -= send_size;
 | |
|                 index += send_size;
 | |
|                 std::cout << "client wait" << std::endl;
 | |
|                 std::this_thread::sleep_for(std::chrono::seconds(1));
 | |
|             }
 | |
|         }
 | |
|     );
 | |
| 
 | |
|     cli.join();
 | |
|     std::cout << "client joinded" << std::endl;
 | |
|     srv.join();
 | |
|     std::cout << "server joinded" << std::endl;
 | |
| }
 | 
