mirror of
https://github.com/msgpack/msgpack-c.git
synced 2025-04-21 16:28:19 +02:00
249 lines
7.0 KiB
C++
249 lines
7.0 KiB
C++
// MessagePack for C++ example
|
|
//
|
|
// Copyright (C) 2017 KONDO Takatoshi
|
|
//
|
|
// Distributed under the Boost Software License, Version 1.0.
|
|
// (See accompanying file LICENSE_1_0.txt or copy at
|
|
// http://www.boost.org/LICENSE_1_0.txt)
|
|
//
|
|
|
|
#include <iostream>
|
|
#include <sstream>
|
|
#include <cassert>
|
|
#include <thread>
|
|
|
|
// MSGPACK_USE_X3_PARSE should be defined before including msgpack.hpp
|
|
// It usually defined as a compiler option as -DMSGPACK_USE_X3_PARSE.
|
|
|
|
//#define MSGPACK_USE_X3_PARSE
|
|
|
|
#include <msgpack.hpp>
|
|
|
|
#include <boost/asio.hpp>
|
|
#include <boost/coroutine2/all.hpp>
|
|
|
|
#if defined(__clang__)
|
|
#pragma GCC diagnostic push
|
|
#pragma GCC diagnostic ignored "-Wunused-parameter"
|
|
#endif // defined(__clang__)
|
|
|
|
#include <boost/spirit/home/support/multi_pass.hpp>
|
|
|
|
#if defined(__clang__)
|
|
#pragma GCC diagnostic pop
|
|
#endif // defined(__clang__)
|
|
|
|
namespace as = boost::asio;
|
|
namespace x3 = boost::spirit::x3;
|
|
namespace coro2 = boost::coroutines2;
|
|
|
|
using pull_type = coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::pull_type;
|
|
|
|
// iterator fetching data from coroutine2.
|
|
class buffered_iterator : public std::iterator<std::input_iterator_tag, char> {
|
|
public:
|
|
using pointer_t = typename iterator::pointer;
|
|
using reference_t = typename iterator::reference;
|
|
|
|
explicit buffered_iterator(pull_type& source) noexcept
|
|
: source_{ &source } {
|
|
fetch_();
|
|
}
|
|
buffered_iterator() = default;
|
|
|
|
bool operator==(buffered_iterator const& other) const noexcept {
|
|
if (!other.source_ && !source_ && !other.buf_ && !buf_) return true;
|
|
return other.it_ == it_;
|
|
}
|
|
|
|
bool operator!=(buffered_iterator const& other) const noexcept {
|
|
return !(other == *this);
|
|
}
|
|
|
|
buffered_iterator & operator++() {
|
|
increment_();
|
|
return * this;
|
|
}
|
|
|
|
buffered_iterator operator++(int) = delete;
|
|
|
|
reference_t operator*() noexcept {
|
|
return *it_;
|
|
}
|
|
|
|
pointer_t operator->() noexcept {
|
|
return std::addressof(*it_);
|
|
}
|
|
|
|
private:
|
|
void fetch_() noexcept {
|
|
BOOST_ASSERT( nullptr != source_);
|
|
if (*source_) {
|
|
buf_ = source_->get();
|
|
it_ = buf_->begin();
|
|
}
|
|
else {
|
|
source_ = nullptr;
|
|
buf_.reset();
|
|
}
|
|
}
|
|
|
|
void increment_() {
|
|
BOOST_ASSERT( nullptr != source_);
|
|
BOOST_ASSERT(*source_);
|
|
if (++it_ == buf_->end()) {
|
|
(*source_)();
|
|
fetch_();
|
|
}
|
|
}
|
|
|
|
private:
|
|
pull_type* source_{ nullptr };
|
|
std::shared_ptr<std::vector<char>> buf_;
|
|
std::vector<char>::iterator it_;
|
|
};
|
|
|
|
// session class that corresponding to each client
|
|
class session : public std::enable_shared_from_this<session> {
|
|
public:
|
|
session(as::ip::tcp::socket socket)
|
|
: socket_(std::move(socket)) {
|
|
}
|
|
|
|
void start() {
|
|
sink_ = std::make_shared<coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::push_type>(
|
|
[&, this](pull_type& source) {
|
|
// *1 is started when the first sink is called.
|
|
|
|
std::cout << "session started" << std::endl;
|
|
do_read();
|
|
source();
|
|
|
|
// use buffered_iterator here
|
|
// b is incremented in msgpack::unpack() and fetch data from sink
|
|
// via coroutine2 mechanism
|
|
auto b = boost::spirit::make_default_multi_pass(buffered_iterator(source));
|
|
auto e = boost::spirit::make_default_multi_pass(buffered_iterator());
|
|
|
|
// This is usually an infinity look, but for test, loop is finished when
|
|
// two message pack data is processed.
|
|
for (int i = 0; i != 2; ++i) {
|
|
auto oh = msgpack::unpack(b, e);
|
|
std::cout << oh.get() << std::endl;
|
|
}
|
|
}
|
|
);
|
|
// send dummy data to start *1
|
|
(*sink_)({});
|
|
}
|
|
|
|
private:
|
|
void do_read() {
|
|
std::cout << "session do_read() is called" << std::endl;
|
|
auto self(shared_from_this());
|
|
auto data = std::make_shared<std::vector<char>>(static_cast<std::size_t>(max_length));
|
|
socket_.async_read_some(
|
|
boost::asio::buffer(*data),
|
|
[this, self, data]
|
|
(boost::system::error_code ec, std::size_t length) {
|
|
if (!ec) {
|
|
data->resize(length);
|
|
(*sink_)(data);
|
|
do_read();
|
|
}
|
|
}
|
|
);
|
|
}
|
|
|
|
as::ip::tcp::socket socket_;
|
|
static constexpr std::size_t const max_length = 1024;
|
|
std::shared_ptr<coro2::asymmetric_coroutine<std::shared_ptr<std::vector<char>>>::push_type> sink_;
|
|
};
|
|
|
|
class server {
|
|
public:
|
|
server(
|
|
as::io_service& ios,
|
|
std::uint16_t port)
|
|
: acceptor_(ios, as::ip::tcp::endpoint(as::ip::tcp::v4(), port)),
|
|
socket_(ios) {
|
|
do_accept();
|
|
std::cout << "server start accept" << std::endl;
|
|
ios.run();
|
|
}
|
|
|
|
private:
|
|
void do_accept() {
|
|
acceptor_.async_accept(
|
|
socket_,
|
|
[this](boost::system::error_code ec) {
|
|
if (!ec) {
|
|
std::make_shared<session>(std::move(socket_))->start();
|
|
}
|
|
// for test, only one session is accepted.
|
|
// do_accept();
|
|
}
|
|
);
|
|
}
|
|
|
|
as::ip::tcp::acceptor acceptor_;
|
|
as::ip::tcp::socket socket_;
|
|
};
|
|
|
|
int main() {
|
|
std::thread srv(
|
|
[]{
|
|
boost::asio::io_service ios;
|
|
server s(ios, 12345);
|
|
}
|
|
);
|
|
|
|
std::thread cli(
|
|
[]{
|
|
std::this_thread::sleep_for(std::chrono::seconds(1));
|
|
std::cout << "client start" << std::endl;
|
|
|
|
std::stringstream ss;
|
|
std::map<std::string, std::vector<int>> v1 {
|
|
{ "ABC", { 1, 2, 3 } },
|
|
{ "DEFG", { 4, 5 } }
|
|
};
|
|
std::vector<std::string> v2 {
|
|
"HIJ", "KLM", "NOP"
|
|
};
|
|
msgpack::pack(ss, v1);
|
|
msgpack::pack(ss, v2);
|
|
|
|
auto send_data = ss.str();
|
|
|
|
boost::asio::io_service ios;
|
|
as::ip::tcp::resolver::query q("127.0.0.1", "12345");
|
|
as::ip::tcp::resolver r(ios);
|
|
auto it = r.resolve(q);
|
|
|
|
std::cout << "client connect" << std::endl;
|
|
as::ip::tcp::socket s(ios);
|
|
as::connect(s, it);
|
|
|
|
|
|
std::size_t const size = 5;
|
|
std::size_t rest = send_data.size();
|
|
std::size_t index = 0;
|
|
while (rest != 0) {
|
|
std::cout << "client send data" << std::endl;
|
|
auto send_size = size < rest ? size : rest;
|
|
as::write(s, as::buffer(&send_data[index], send_size));
|
|
rest -= send_size;
|
|
index += send_size;
|
|
std::cout << "client wait" << std::endl;
|
|
std::this_thread::sleep_for(std::chrono::seconds(1));
|
|
}
|
|
}
|
|
);
|
|
|
|
cli.join();
|
|
std::cout << "client joinded" << std::endl;
|
|
srv.join();
|
|
std::cout << "server joinded" << std::endl;
|
|
}
|