diff --git a/example/protocol.cc b/example/protocol.cc new file mode 100644 index 00000000..e7f2a387 --- /dev/null +++ b/example/protocol.cc @@ -0,0 +1,86 @@ +#include +#include +#include +#include + +namespace myprotocol { + using namespace msgpack::type; + using msgpack::define; + + struct Get : define< tuple > { + Get() { } + Get(uint32_t f, const std::string& k) : + define_type(msgpack_type(f, k)) { } + uint32_t& flags() { return get<0>(); } + std::string& key() { return get<1>(); } + }; + + struct Put : define< tuple > { + Put() { } + Put(uint32_t f, const std::string& k, const char* valref, size_t vallen) : + define_type(msgpack_type( f, k, raw_ref(valref,vallen) )) { } + uint32_t& flags() { return get<0>(); } + std::string& key() { return get<1>(); } + raw_ref& value() { return get<2>(); } + }; + + struct MultiGet : define< std::vector > { + }; +} + + +int main(void) +{ + // send Get request + std::stringstream stream; + { + myprotocol::Get req; + req.flags() = 0; + req.key() = "key0"; + msgpack::pack(stream, req); + } + + stream.seekg(0); + + // receive Get request + { + std::string buffer(stream.str()); + + msgpack::zone mempool; + msgpack::object o = + msgpack::unpack(buffer.data(), buffer.size(), mempool); + + myprotocol::Get req; + msgpack::convert(req, o); + std::cout << "received: " << o << std::endl; + } + + + stream.str(""); + + + // send MultiGet request + { + myprotocol::MultiGet req; + req.push_back( myprotocol::Get(1, "key1") ); + req.push_back( myprotocol::Get(2, "key2") ); + req.push_back( myprotocol::Get(3, "key3") ); + msgpack::pack(stream, req); + } + + stream.seekg(0); + + // receive MultiGet request + { + std::string buffer(stream.str()); + + msgpack::zone mempool; + msgpack::object o = + msgpack::unpack(buffer.data(), buffer.size(), mempool); + + myprotocol::MultiGet req; + msgpack::convert(req, o); + std::cout << "received: " << o << std::endl; + } +} + diff --git a/example/simple.cc b/example/simple.cc new file mode 100644 index 00000000..74beeaef --- /dev/null +++ b/example/simple.cc @@ -0,0 +1,31 @@ +#include +#include +#include +#include + +int main(void) +{ + // this is target object + msgpack::type::tuple src(1, true, "example"); + + // any classes that implements write(const char*,size_t) can be a buffer + std::stringstream buffer; + msgpack::pack(buffer, src); + + // send the buffer ... + buffer.seekg(0); + + // deserialize the buffer into msgpack::object type + msgpack::zone mempool; + std::string str(buffer.str()); + msgpack::object deserialized = + msgpack::unpack(str.data(), str.size(), mempool); + + // msgpack::object supports ostream + std::cout << deserialized << std::endl; + + // convert msgpack::object type into the original type + msgpack::type::tuple dst; + msgpack::convert(dst, deserialized); +} + diff --git a/example/simple.rb b/example/simple.rb new file mode 100644 index 00000000..90b46968 --- /dev/null +++ b/example/simple.rb @@ -0,0 +1,5 @@ +require 'msgpack' + +serialized = [1, -1, true, false, nil, {"key" => "value"}].to_msgpack +p MessagePack.unpack(serialized) + diff --git a/example/stream.cc b/example/stream.cc new file mode 100644 index 00000000..49927de2 --- /dev/null +++ b/example/stream.cc @@ -0,0 +1,131 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +class Server { +public: + Server(int sock) : m_sock(sock) { } + ~Server() { } + + typedef std::auto_ptr auto_zone; + + void socket_readable() + { + m_pac.reserve_buffer(1024); + + ssize_t count = + read(m_sock, m_pac.buffer(), m_pac.buffer_capacity()); + + if(count < 0) { + if(errno == EAGAIN || errno == EINTR) { + return; + } else { + throw std::runtime_error(strerror(errno)); + } + } else if(count == 0) { + throw std::runtime_error("connection closed"); + } + + m_pac.buffer_consumed(count); + + while(m_pac.execute()) { + msgpack::object msg = m_pac.data(); + + auto_zone life( m_pac.release_zone() ); + + m_pac.reset(); + + process_message(msg, life); + } + } + +private: + void process_message(msgpack::object msg, auto_zone& life) + { + std::cout << "message reached: " << msg << std::endl; + } + +private: + int m_sock; + msgpack::unpacker m_pac; +}; + + +static void* run_server(void* arg) +try { + Server* srv = reinterpret_cast(arg); + + while(true) { + srv->socket_readable(); + } + return NULL; + +} catch (std::exception& e) { + std::cerr << "error while processing client packet: " + << e.what() << std::endl; + return NULL; + +} catch (...) { + std::cerr << "error while processing client packet: " + << "unknown error" << std::endl; + return NULL; +} + + +struct fwriter { + fwriter(int fd) : m_fp( fdopen(fd, "w") ) { } + + void write(const char* buf, size_t buflen) + { + size_t count = fwrite(buf, buflen, 1, m_fp); + //if(fwrite(buf, buflen, 1, m_fp) < 1) { + if(count < 1) { + std::cout << buflen << std::endl; + std::cout << count << std::endl; + throw std::runtime_error(strerror(errno)); + } + } + + void flush() { fflush(m_fp); } + + void close() { fclose(m_fp); } + +private: + FILE* m_fp; +}; + + +int main(void) +{ + int pair[2]; + pipe(pair); + + // run server thread + Server srv(pair[0]); + pthread_t thread; + pthread_create(&thread, NULL, + run_server, reinterpret_cast(&srv)); + + // client thread: + fwriter writer(pair[1]); + + typedef msgpack::type::tuple put_t; + typedef msgpack::type::tuple get_t; + + put_t req1("put", "apple", "red"); + put_t req2("put", "lemon", "yellow"); + get_t req3("get", "apple"); + msgpack::pack(writer, req1); + msgpack::pack(writer, req2); + msgpack::pack(writer, req3); + writer.flush(); + writer.close(); + + pthread_join(thread, NULL); +} + diff --git a/example/stream.rb b/example/stream.rb new file mode 100644 index 00000000..e53ce82d --- /dev/null +++ b/example/stream.rb @@ -0,0 +1,67 @@ +require 'msgpack' + +class Server + def initialize(sock) + @sock = sock + @pk = MessagePack::Unpacker.new + @buffer = '' + @nread = 0 + end + + def run + while true + begin + data = @sock.sysread(1024) + rescue + puts "connection closed (#{$!})" + return + end + receive_data(data) + end + end + + private + def receive_data(data) + @buffer << data + + while true + @nread = @pk.execute(@buffer, @nread) + + if @pk.finished? + msg = @pk.data + process_message(msg) + + @pk.reset + @buffer.slice!(0, @nread) + @nread = 0 + next unless @buffer.empty? + end + + break + end + + rescue + puts "error while processing client packet: #{$!}" + end + + def process_message(msg) + puts "message reached: #{msg.inspect}" + end +end + + +rpipe, wpipe = IO.pipe + +# run server thread +thread = Thread.new(Server.new(rpipe)) {|srv| + srv.run +} + +# client thread: +wpipe.write ["put", "apple", "red"].to_msgpack +wpipe.write ["put", "lemon", "yellow"].to_msgpack +wpipe.write ["get", "apple"].to_msgpack +wpipe.close + +thread.join +