Skip to content

Commit 2c1adac

Browse files
authored
Merge pull request #195 from aboseley/async
Add support for extensible async_ overloads
2 parents 5b3db93 + 85df422 commit 2c1adac

7 files changed

Lines changed: 368 additions & 2 deletions

File tree

AUTHORS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ Individual Contributors
55
Thomas W Rodgers <rodgert@twrodgers.com>
66
Andrey Upadyshev <oliora@gmail.com>
77
Tim Blechmann <tim@klingt.org>
8+
Adam Boseley <adam.boseley@gmail.com>

azmq/socket.hpp

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
#include <boost/asio/buffer.hpp>
2323
#include <boost/system/error_code.hpp>
2424

25+
#if BOOST_VERSION >= 106600
26+
#include <boost/asio/bind_executor.hpp>
27+
#endif
28+
2529
#include <type_traits>
2630
#include <utility>
2731

@@ -685,6 +689,85 @@ class socket :
685689
return stm;
686690
}
687691
};
692+
693+
#if BOOST_VERSION >= 107000
694+
/**
695+
* \brief Support async methods using completion tokens
696+
*/
697+
698+
template<typename ConstBufferSequence>
699+
struct async_send_initiation {
700+
azmq::socket &socket;
701+
ConstBufferSequence const &buffers;
702+
703+
template<typename CompletionHandler>
704+
void operator()(CompletionHandler &&completion_handler) {
705+
auto executor = boost::asio::get_associated_executor(
706+
completion_handler, socket.get_executor());
707+
socket.async_send(buffers, boost::asio::bind_executor(executor,
708+
std::bind(std::forward<CompletionHandler>(completion_handler), std::placeholders::_1, std::placeholders::_2)));
709+
}
710+
};
711+
712+
template<typename MutableBufferSequence>
713+
struct async_receive_initiation {
714+
azmq::socket &socket;
715+
MutableBufferSequence const &buffers;
716+
717+
template<typename CompletionHandler>
718+
void operator()(CompletionHandler &&completion_handler) {
719+
auto executor = boost::asio::get_associated_executor(
720+
completion_handler, socket.get_executor());
721+
socket.async_receive(buffers, boost::asio::bind_executor(executor,
722+
std::bind(std::forward<CompletionHandler>(completion_handler), std::placeholders::_1, std::placeholders::_2)));
723+
}
724+
};
725+
726+
template<typename MutableBufferSequence>
727+
struct async_receive_more_initiation {
728+
azmq::socket &socket;
729+
MutableBufferSequence const &buffers;
730+
731+
template<typename CompletionHandler>
732+
void operator()(CompletionHandler &&completion_handler) {
733+
auto executor = boost::asio::get_associated_executor(
734+
completion_handler, socket.get_executor());
735+
socket.async_receive_more(buffers, boost::asio::bind_executor(executor,
736+
std::bind(std::forward<CompletionHandler>(completion_handler), std::placeholders::_1, std::placeholders::_2)));
737+
}
738+
};
739+
740+
template<class CompletionToken, class ConstBufferSequence>
741+
auto async_send(azmq::socket &socket, ConstBufferSequence const &buffers,
742+
CompletionToken &&token)
743+
-> BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken,
744+
void(boost::system::error_code, size_t)) {
745+
746+
return boost::asio::async_initiate<CompletionToken, void(boost::system::error_code, size_t)>(
747+
async_send_initiation<ConstBufferSequence>{socket, buffers}, token);
748+
}
749+
750+
template<class CompletionToken, class MutableBufferSequence>
751+
auto async_receive(azmq::socket &socket, MutableBufferSequence const &buffers,
752+
CompletionToken &&token)
753+
-> BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken,
754+
void(boost::system::error_code, size_t)) {
755+
756+
return boost::asio::async_initiate<CompletionToken, void(boost::system::error_code, size_t)>(
757+
async_receive_initiation<MutableBufferSequence>{socket, buffers}, token);
758+
}
759+
760+
template<class CompletionToken, class MutableBufferSequence>
761+
auto async_receive_more(azmq::socket &socket, MutableBufferSequence &buffers,
762+
CompletionToken &&token)
763+
-> BOOST_ASIO_INITFN_RESULT_TYPE(CompletionToken,
764+
void(boost::system::error_code, size_t)) {
765+
766+
return boost::asio::async_initiate<CompletionToken, void(boost::system::error_code, size_t)>(
767+
async_receive_more_initiation<MutableBufferSequence>{socket, buffers}, token);
768+
}
769+
#endif
770+
688771
AZMQ_V1_INLINE_NAMESPACE_END
689772

690773
namespace detail {
@@ -833,4 +916,3 @@ void attach(socket & s, Range r, bool serverish = true) {
833916
AZMQ_V1_INLINE_NAMESPACE_END
834917
} // namespace azmq
835918
#endif // AZMQ_SOCKET_HPP_
836-

test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,4 @@ add_subdirectory(socket_ops)
2828
add_subdirectory(socket)
2929
add_subdirectory(signal)
3030
add_subdirectory(actor)
31+
add_subdirectory(cpp20/socket)

test/cpp20/socket/CMakeLists.txt

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
project(test_socket_cpp20)
2+
cmake_minimum_required(VERSION 3.16)
3+
4+
include(CheckIncludeFileCXX)
5+
check_include_file_cxx(coroutine FOUND_COROUTINE_HEADER)
6+
set(REQUIRES_MSG "coroutine support")
7+
if (FOUND_COROUTINE_HEADER)
8+
message(STATUS "${PROJECT_NAME} found ${REQUIRES_MSG}")
9+
else()
10+
message(WARNING "${PROJECT_NAME} requires ${REQUIRES_MSG}")
11+
return()
12+
endif ()
13+
14+
set(REQUIRES_MSG "a c++20 compiler")
15+
if (cxx_std_20 IN_LIST CMAKE_CXX_COMPILE_FEATURES)
16+
message(STATUS "${PROJECT_NAME} found ${REQUIRES_MSG}")
17+
else()
18+
message(WARNING "${PROJECT_NAME} requires ${REQUIRES_MSG}")
19+
return()
20+
endif ()
21+
22+
set(REQUIRES_MSG "boost version >= 107000")
23+
if (Boost_VERSION_MACRO GREATER_EQUAL 107000)
24+
message(STATUS "${PROJECT_NAME} found ${REQUIRES_MSG}")
25+
else()
26+
message(WARNING "${PROJECT_NAME} requires ${REQUIRES_MSG}, but has version : ${Boost_VERSION_MACRO}")
27+
return()
28+
endif ()
29+
30+
add_executable(${PROJECT_NAME} main.cpp)
31+
target_link_libraries(${PROJECT_NAME}
32+
Azmq::azmq
33+
Boost::boost
34+
Boost::thread
35+
Boost::system
36+
${ZeroMQ_LIBRARIES}
37+
${CMAKE_THREAD_LIBS_INIT})
38+
39+
target_compile_features(${PROJECT_NAME} PUBLIC cxx_std_20)
40+
41+
add_catch_test(${PROJECT_NAME})

test/cpp20/socket/main.cpp

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
Copyright (c) 2013-2014 Contributors as noted in the AUTHORS file
3+
4+
This file is part of azmq
5+
6+
Distributed under the Boost Software License, Version 1.0. (See accompanying
7+
file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
8+
*/
9+
#include <azmq/socket.hpp>
10+
11+
#include <boost/asio/io_context.hpp>
12+
#include <boost/asio/co_spawn.hpp>
13+
#include <boost/asio/detached.hpp>
14+
#include <boost/asio/awaitable.hpp>
15+
#include <boost/asio/use_awaitable.hpp>
16+
17+
#include <coroutine>
18+
#include <array>
19+
#include <string>
20+
21+
#define CATCH_CONFIG_MAIN
22+
#include <catch2/catch.hpp>
23+
24+
std::string subj(const char *name) {
25+
return std::string("inproc://") + name;
26+
}
27+
28+
TEST_CASE("coroutine send/receive message", "[socket_cpp20]") {
29+
boost::asio::io_context ioc;
30+
31+
azmq::socket sb(ioc, ZMQ_ROUTER);
32+
sb.bind(subj(BOOST_CURRENT_FUNCTION));
33+
34+
azmq::socket sc(ioc, ZMQ_DEALER);
35+
sc.connect(subj(BOOST_CURRENT_FUNCTION));
36+
37+
boost::optional<size_t> btc{};
38+
boost::optional<size_t> btb{};
39+
40+
//sending coroutine
41+
co_spawn(ioc, [&]() -> boost::asio::awaitable<void> {
42+
std::array<boost::asio::const_buffer, 2> snd_bufs = {{
43+
boost::asio::buffer("A"),
44+
boost::asio::buffer("B")
45+
}};
46+
47+
btc = co_await azmq::async_send(sc, snd_bufs, boost::asio::use_awaitable);
48+
co_return;
49+
}, boost::asio::detached);
50+
51+
//receiving coroutine
52+
co_spawn(ioc, [&]() -> boost::asio::awaitable<void> {
53+
std::array<char, 5> ident;
54+
std::array<char, 2> a;
55+
std::array<char, 2> b;
56+
57+
std::array<boost::asio::mutable_buffer, 3> rcv_bufs = {{
58+
boost::asio::buffer(ident),
59+
boost::asio::buffer(a),
60+
boost::asio::buffer(b)
61+
}};
62+
63+
btb = co_await azmq::async_receive(sb, rcv_bufs, boost::asio::use_awaitable);
64+
co_return;
65+
}, boost::asio::detached);
66+
67+
ioc.run();
68+
69+
REQUIRE(btb.has_value());
70+
REQUIRE(btb.value() == 9);
71+
72+
REQUIRE(btc.has_value());
73+
REQUIRE(btc.value() == 4);
74+
}
75+

test/socket/CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ project(test_socket)
22

33
add_executable(${PROJECT_NAME} main.cpp)
44

5-
target_link_libraries(${PROJECT_NAME} Azmq::azmq)
5+
find_package(Boost COMPONENTS coroutine)
6+
7+
target_link_libraries(${PROJECT_NAME} Azmq::azmq Boost::coroutine)
68

79
add_catch_test(${PROJECT_NAME})

0 commit comments

Comments
 (0)