diff --git a/Makefile.am b/Makefile.am index 9f5b7bed..6310a45e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -65,6 +65,7 @@ src_libbitcoin_server_la_SOURCES = \ src/protocols/native/protocol_native.cpp \ src/protocols/native/protocol_native_address.cpp \ src/protocols/native/protocol_native_block.cpp \ + src/protocols/native/protocol_native_configuration.cpp \ src/protocols/native/protocol_native_input.cpp \ src/protocols/native/protocol_native_output.cpp \ src/protocols/native/protocol_native_tx.cpp \ diff --git a/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj b/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj index feb21832..1cb33e91 100644 --- a/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj +++ b/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj @@ -147,6 +147,7 @@ + diff --git a/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj.filters b/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj.filters index e1ca819f..61c5e7cb 100644 --- a/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj.filters +++ b/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj.filters @@ -141,6 +141,9 @@ src\protocols\native + + src\protocols\native + src\protocols\native diff --git a/builds/msvc/vs2026/libbitcoin-server/libbitcoin-server.vcxproj b/builds/msvc/vs2026/libbitcoin-server/libbitcoin-server.vcxproj index bd7a8a8c..2146c850 100644 --- a/builds/msvc/vs2026/libbitcoin-server/libbitcoin-server.vcxproj +++ b/builds/msvc/vs2026/libbitcoin-server/libbitcoin-server.vcxproj @@ -147,6 +147,7 @@ + diff --git a/builds/msvc/vs2026/libbitcoin-server/libbitcoin-server.vcxproj.filters b/builds/msvc/vs2026/libbitcoin-server/libbitcoin-server.vcxproj.filters index e1ca819f..61c5e7cb 100644 --- a/builds/msvc/vs2026/libbitcoin-server/libbitcoin-server.vcxproj.filters +++ b/builds/msvc/vs2026/libbitcoin-server/libbitcoin-server.vcxproj.filters @@ -141,6 +141,9 @@ src\protocols\native + + src\protocols\native + src\protocols\native diff --git a/include/bitcoin/server/protocols/protocol_electrum.hpp b/include/bitcoin/server/protocols/protocol_electrum.hpp index 05236111..622e63a3 100644 --- a/include/bitcoin/server/protocols/protocol_electrum.hpp +++ b/include/bitcoin/server/protocols/protocol_electrum.hpp @@ -314,6 +314,13 @@ class BCS_API protocol_electrum } private: + // Aliases. + using array_t = network::rpc::array_t; + using object_t = network::rpc::object_t; + using version_t = protocol_electrum_version; + static constexpr electrum::version minimum = version_t::minimum; + static constexpr electrum::version maximum = version_t::maximum; + // Post to notification strand. template inline auto notify(Method&& method, Args&&... args) NOEXCEPT @@ -322,13 +329,6 @@ class BCS_API protocol_electrum BIND_SAFE(BIND_SHARED(method, args))); } - // Aliases. - using array_t = network::rpc::array_t; - using object_t = network::rpc::object_t; - using version_t = protocol_electrum_version; - static constexpr electrum::version minimum = version_t::minimum; - static constexpr electrum::version maximum = version_t::maximum; - // Transformations. static array_t transform(const unspents& unspents) NOEXCEPT; static array_t transform(const histories& histories) NOEXCEPT; diff --git a/include/bitcoin/server/protocols/protocol_html.hpp b/include/bitcoin/server/protocols/protocol_html.hpp index 3e530b4d..747d2691 100644 --- a/include/bitcoin/server/protocols/protocol_html.hpp +++ b/include/bitcoin/server/protocols/protocol_html.hpp @@ -75,6 +75,14 @@ class BCS_API protocol_html network::http::media_type type, const network::http::request& request={}) NOEXCEPT; + /// Notifiers (websocket). + virtual void notify_json(boost::json::value&& model, size_t size_hint, + const network::http::request& request = {}) NOEXCEPT; + virtual void notify_text(std::string&& hexidecimal, + const network::http::request& request = {}) NOEXCEPT; + virtual void notify_chunk(system::data_chunk&& bytes, + const network::http::request& request = {}) NOEXCEPT; + /// Utilities. std::filesystem::path to_path( const std::string& target = "/") const NOEXCEPT; diff --git a/include/bitcoin/server/protocols/protocol_native.hpp b/include/bitcoin/server/protocols/protocol_native.hpp index 3b913402..5620466a 100644 --- a/include/bitcoin/server/protocols/protocol_native.hpp +++ b/include/bitcoin/server/protocols/protocol_native.hpp @@ -43,6 +43,7 @@ class BCS_API protocol_native const network::channel::ptr& channel, const options_t& options) NOEXCEPT : protocol_html(session, channel, options), + notification_strand_(channel->service().get_executor()), network::tracker(session->log) { } @@ -63,7 +64,14 @@ class BCS_API protocol_native void dispatch_websocket( const network::http::request& request) NOEXCEPT override; - /// REST interface handlers. + + /// Event handlers. + /// ----------------------------------------------------------------------- + + bool handle_event(const code&, node::chase event_, + node::event_value) NOEXCEPT; + + /// Interface handlers. /// ----------------------------------------------------------------------- bool handle_get_configuration(const code& ec, interface::configuration, @@ -174,12 +182,27 @@ class BCS_API protocol_native bool handle_get_event_subscribe(const code& ec, interface::event_subscribe, uint8_t version, uint8_t media, bool stop) NOEXCEPT; -private: +protected: using media_type = network::http::media_type; + + /// Notification event handlers. + /// ----------------------------------------------------------------------- + void do_top(node::header_t link, media_type media) NOEXCEPT; + void do_block(node::header_t link, media_type media) NOEXCEPT; + void do_transaction(node::transaction_t link, media_type media) NOEXCEPT; + +private: static constexpr uint8_t text = to_value(media_type::text_plain); static constexpr uint8_t json = to_value(media_type::application_json); - static constexpr uint8_t data = to_value( - media_type::application_octet_stream); + static constexpr uint8_t data = to_value(media_type::application_octet_stream); + + ////// Post to notification strand (use POST_NOTIFY()). + ////template + ////inline auto notify(Method&& method, Args&&... args) NOEXCEPT + ////{ + //// return boost::asio::post(notification_strand_, + //// BIND_SAFE(BIND_SHARED(method, args))); + ////} // Serializers. // ------------------------------------------------------------------------ @@ -230,13 +253,16 @@ class BCS_API protocol_native database::header_link to_header(const std::optional& height, const std::optional& hash) NOEXCEPT; + // This is thread safe, uses network threadpool. + network::asio::strand notification_strand_; + // These are thread safe. std::atomic_bool stopping_{}; // Unconditional (all). - std::atomic_bool top_subscribe_{}; - std::atomic_bool block_subscribe_{}; - std::atomic_bool tx_subscribe_{}; + std::atomic top_subscribe_{ media_type::unknown }; + std::atomic block_subscribe_{ media_type::unknown }; + std::atomic tx_subscribe_{ media_type::unknown }; // TODO: map of outpoints (notify on spenders). std::atomic_bool output_subscribe_{}; diff --git a/src/protocols/electrum/protocol_electrum.cpp b/src/protocols/electrum/protocol_electrum.cpp index 1cbefb15..6de22b33 100644 --- a/src/protocols/electrum/protocol_electrum.cpp +++ b/src/protocols/electrum/protocol_electrum.cpp @@ -170,9 +170,9 @@ bool protocol_electrum::handle_event(const code&, node::chase event_, } case node::chase::reorganized: { - // value is regression branch_point. - BC_ASSERT(std::holds_alternative(value)); - POST_NOTIFY(do_reorganized, std::get(value)); + // Value is regression branch_point. + BC_ASSERT(std::holds_alternative(value)); + POST_NOTIFY(do_reorganized, std::get(value)); break; } default: @@ -195,7 +195,7 @@ void protocol_electrum::do_reorganized(node::header_t) NOEXCEPT for (auto& [key, sub]: address_subscriptions_) { - // flush resets hash accumulator, sub.type remains unchanged. + // Flush resets hash accumulator, sub.type remains unchanged. sub.accumulator.flush(); sub.status = {}; sub.cursor = {}; diff --git a/src/protocols/native/protocol_native.cpp b/src/protocols/native/protocol_native.cpp index c82cc620..63f8d238 100644 --- a/src/protocols/native/protocol_native.cpp +++ b/src/protocols/native/protocol_native.cpp @@ -28,6 +28,7 @@ namespace server { using namespace system; using namespace network; using namespace std::placeholders; +constexpr auto relaxed = std::memory_order_relaxed; #define CLASS protocol_native #define SUBSCRIBE_NATIVE(method, ...) \ @@ -166,63 +167,140 @@ void protocol_native::dispatch_websocket(const http::request& request) NOEXCEPT } } -// Handlers. +// Event handlers. // ---------------------------------------------------------------------------- -bool protocol_native::handle_get_configuration(const code& ec, - interface::configuration, uint8_t, uint8_t media) NOEXCEPT +// capture chaser events +bool protocol_native::handle_event(const code&, node::chase event_, + node::event_value value) NOEXCEPT { - if (stopped(ec)) + // Do not pass ec to stopped as it is not a call status. + if (stopped()) return false; - if (media != json) + switch (event_) { - send_not_acceptable(); - return true; + case node::chase::organized: + { + auto media = top_subscribe_.load(relaxed); + if (media != media_type::unknown) + { + // Increments height above a fork point (start/reorg). + BC_ASSERT(std::holds_alternative(value)); + POST(do_top, std::get(value), media); + } + + media = block_subscribe_.load(relaxed); + if (media != media_type::unknown) + { + // No block emission for a fork point (start/reorg). + BC_ASSERT(std::holds_alternative(value)); + POST(do_block, std::get(value), media); + } + + break; + } + case node::chase::reorganized: + { + const auto media = top_subscribe_.load(relaxed); + if (media != media_type::unknown) + { + // Resets subscriber height to the fork point. + BC_ASSERT(std::holds_alternative(value)); + POST(do_top, std::get(value), media); + } + + break; + } + case node::chase::transaction: + { + const auto media = tx_subscribe_.load(relaxed); + if (media != media_type::unknown) + { + BC_ASSERT(std::holds_alternative(value)); + POST(do_transaction, std::get(value), media); + } + + break; + } + default: + { + break; + } } - BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) - boost::json::object object - { - { "address", archive().address_enabled() }, - { "filter", archive().filter_enabled() }, - { "turbo", database_settings().turbo }, - { "witness", network_settings().witness_node() }, - { "retarget", system_settings().forks.retarget }, - { "difficult", system_settings().forks.difficult }, - }; - BC_POP_WARNING() - - send_json(std::move(object), 64); return true; } -// TODO: add log level(s) param. -bool protocol_native::handle_get_log_subscribe(const code& ec, - interface::log_subscribe, uint8_t , uint8_t , - bool stop) NOEXCEPT +BC_PUSH_WARNING(NO_INCOMPLETE_SWITCH) + +void protocol_native::do_top(node::header_t link, media_type media) NOEXCEPT { - if (stopped(ec)) - return false; + BC_ASSERT(stranded()); - // TODO: return enumeration (on stop?). - log_subscribe_.store(stop); - return {}; + // TODO: notification. + const auto height = archive().get_height(link).value; + switch (to_value(media)) + { + case data: + notify_chunk(to_little_endian_size(height)); + return; + case text: + notify_text(encode_base16(to_little_endian_size(height))); + return; + case json: + notify_json(height, two * sizeof(height)); + return; + } } -// TODO: add event(s) param. -bool protocol_native::handle_get_event_subscribe(const code& ec, - interface::event_subscribe, uint8_t , uint8_t , - bool stop) NOEXCEPT +void protocol_native::do_block(node::header_t link, media_type media) NOEXCEPT { - if (stopped(ec)) - return false; + BC_ASSERT(stranded()); - // TODO: return enumeration (on stop?). - event_subscribe_.store(stop); - return {}; + // TODO: notification. + const auto hash = archive().get_header_key(link); + switch (to_value(media)) + { + case data: + notify_chunk(to_chunk(hash)); + return; + case text: + notify_text(encode_base16(hash)); + return; + case json: + BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) + notify_json(value_from(encode_base16(hash)), two * hash_size); + BC_POP_WARNING() + return; + } +} + +void protocol_native::do_transaction(node::transaction_t link, + media_type media) NOEXCEPT +{ + BC_ASSERT(stranded()); + + // TODO: notification. + const auto hash = archive().get_tx_key(link); + switch (to_value(media)) + { + case data: + notify_chunk(to_chunk(hash)); + return; + case text: + notify_text(encode_base16(hash)); + return; + case json: + BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) + notify_json(value_from(encode_base16(hash)), two * hash_size); + BC_POP_WARNING() + return; + } } +BC_POP_WARNING() + // Utilities. // ---------------------------------------------------------------------------- // private diff --git a/src/protocols/native/protocol_native_block.cpp b/src/protocols/native/protocol_native_block.cpp index bf9a5344..75251a9e 100644 --- a/src/protocols/native/protocol_native_block.cpp +++ b/src/protocols/native/protocol_native_block.cpp @@ -64,8 +64,8 @@ bool protocol_native::handle_get_top_subscribe(const code& ec, if (stopped(ec)) return false; - // TODO: return only bool (previous state) if stop. - top_subscribe_.store(stop); + // TODO: return only bool (previous state) if stop? + top_subscribe_.store(stop ? media_type::unknown : (media_type)media); return handle_get_top(ec, {}, version, media); } @@ -500,14 +500,13 @@ bool protocol_native::handle_get_block_tx(const code& ec, interface::block_tx, } bool protocol_native::handle_get_block_subscribe(const code& ec, - interface::block_subscribe, uint8_t version, uint8_t media, - bool stop) NOEXCEPT + interface::block_subscribe, uint8_t, uint8_t media, bool stop) NOEXCEPT { if (stopped(ec)) return false; - // TODO: return only bool (previous state) if stop. - block_subscribe_.store(stop); + // TODO: return only bool (previous state) if stop? + block_subscribe_.store(stop ? media_type::unknown : (media_type)media); // Return top block hash upon block subscription. const auto& query = archive(); diff --git a/src/protocols/native/protocol_native_configuration.cpp b/src/protocols/native/protocol_native_configuration.cpp new file mode 100644 index 00000000..a1496279 --- /dev/null +++ b/src/protocols/native/protocol_native_configuration.cpp @@ -0,0 +1,82 @@ +/** + * Copyright (c) 2011-2026 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include + +#include +#include + +namespace libbitcoin { +namespace server { + +bool protocol_native::handle_get_configuration(const code& ec, + interface::configuration, uint8_t, uint8_t media) NOEXCEPT +{ + if (stopped(ec)) + return false; + + if (media != json) + { + send_not_acceptable(); + return true; + } + + BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) + boost::json::object object + { + { "address", archive().address_enabled() }, + { "filter", archive().filter_enabled() }, + { "turbo", database_settings().turbo }, + { "witness", network_settings().witness_node() }, + { "retarget", system_settings().forks.retarget }, + { "difficult", system_settings().forks.difficult }, + }; + BC_POP_WARNING() + + send_json(std::move(object), 64); + return true; +} + +// TODO: add log level(s) param. +bool protocol_native::handle_get_log_subscribe(const code& ec, + interface::log_subscribe, uint8_t , uint8_t , + bool stop) NOEXCEPT +{ + if (stopped(ec)) + return false; + + // TODO: return enumeration (on stop?). + log_subscribe_.store(stop); + return {}; +} + +// TODO: add event(s) param. +bool protocol_native::handle_get_event_subscribe(const code& ec, + interface::event_subscribe, uint8_t , uint8_t , + bool stop) NOEXCEPT +{ + if (stopped(ec)) + return false; + + // TODO: return enumeration (on stop?). + event_subscribe_.store(stop); + return {}; +} + +} // namespace server +} // namespace libbitcoin diff --git a/src/protocols/native/protocol_native_tx.cpp b/src/protocols/native/protocol_native_tx.cpp index 006d78fb..1eef593a 100644 --- a/src/protocols/native/protocol_native_tx.cpp +++ b/src/protocols/native/protocol_native_tx.cpp @@ -25,6 +25,9 @@ namespace libbitcoin { namespace server { using namespace system; +using namespace std::placeholders; + +#define CLASS protocol_native BC_PUSH_WARNING(NO_INCOMPLETE_SWITCH) BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) @@ -171,16 +174,19 @@ bool protocol_native::handle_get_tx_details(const code& ec, } bool protocol_native::handle_get_tx_subscribe(const code& ec, - interface::tx_subscribe, uint8_t , uint8_t , - bool stop) NOEXCEPT + interface::tx_subscribe, uint8_t, uint8_t media, bool stop) NOEXCEPT { if (stopped(ec)) return false; - tx_subscribe_.store(stop); + // TODO: return bool (previous state) only? + tx_subscribe_.store(stop ? media_type::unknown : (media_type)media); - // TODO: return bool (previous state) only. - send_ok(); + // TODO: move to send/notify_empty(). + using namespace network::http; + response out{}; + out.body() = empty_value{}; + NOTIFY(std::move(out), handle_complete, _1, error::success); return true; } diff --git a/src/protocols/protocol_html.cpp b/src/protocols/protocol_html.cpp index 26e04e61..1dabca42 100644 --- a/src/protocols/protocol_html.cpp +++ b/src/protocols/protocol_html.cpp @@ -257,6 +257,40 @@ void protocol_html::send_buffer(buffer_body::value_type&& buffer, SEND(std::move(response), handle_complete, _1, error::success); } +// Notifiers (websocket). +// ---------------------------------------------------------------------------- + +void protocol_html::notify_json(boost::json::value&& model, size_t size_hint, + const request& request) NOEXCEPT +{ + BC_ASSERT(stranded()); + response response{ status::ok, request.version() }; + response.body() = json_value + { + .model = std::move(model), + .size_hint = size_hint + }; + NOTIFY(std::move(response), handle_complete, _1, error::success); +} + +void protocol_html::notify_text(std::string&& hexidecimal, + const request& request) NOEXCEPT +{ + BC_ASSERT(stranded()); + response response{ status::ok, request.version() }; + response.body() = std::move(hexidecimal); + NOTIFY(std::move(response), handle_complete, _1, error::success); +} + +void protocol_html::notify_chunk(system::data_chunk&& bytes, + const request& request) NOEXCEPT +{ + BC_ASSERT(stranded()); + response response{ status::ok, request.version() }; + response.body() = std::move(bytes); + NOTIFY(std::move(response), handle_complete, _1, error::success); +} + // Utilities. // ---------------------------------------------------------------------------- diff --git a/test/protocols/native/native_block.cpp b/test/protocols/native/native_block.cpp index c62ffcc1..61ea6a4b 100644 --- a/test/protocols/native/native_block.cpp +++ b/test/protocols/native/native_block.cpp @@ -118,4 +118,13 @@ BOOST_AUTO_TEST_CASE(native__ws_top_subscribe__json__expected) BOOST_REQUIRE_EQUAL(response.as_int64(), 9); } +BOOST_AUTO_TEST_CASE(native__ws_top_subscribe__stop__expected) +{ + BOOST_REQUIRE(!ws_upgrade()); + + const auto response = ws_get_json("/v1/top/subscribe?stop=true"); + BOOST_REQUIRE(response.is_int64()); + BOOST_REQUIRE_EQUAL(response.as_int64(), 9); +} + BOOST_AUTO_TEST_SUITE_END()