diff --git a/Makefile.am b/Makefile.am
index 9f5b7bed..6310a45e 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -65,6 +65,7 @@ src_libbitcoin_server_la_SOURCES = \
src/protocols/native/protocol_native.cpp \
src/protocols/native/protocol_native_address.cpp \
src/protocols/native/protocol_native_block.cpp \
+ src/protocols/native/protocol_native_configuration.cpp \
src/protocols/native/protocol_native_input.cpp \
src/protocols/native/protocol_native_output.cpp \
src/protocols/native/protocol_native_tx.cpp \
diff --git a/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj b/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj
index feb21832..1cb33e91 100644
--- a/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj
+++ b/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj
@@ -147,6 +147,7 @@
+
diff --git a/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj.filters b/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj.filters
index e1ca819f..61c5e7cb 100644
--- a/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj.filters
+++ b/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj.filters
@@ -141,6 +141,9 @@
src\protocols\native
+
+ src\protocols\native
+
src\protocols\native
diff --git a/builds/msvc/vs2026/libbitcoin-server/libbitcoin-server.vcxproj b/builds/msvc/vs2026/libbitcoin-server/libbitcoin-server.vcxproj
index bd7a8a8c..2146c850 100644
--- a/builds/msvc/vs2026/libbitcoin-server/libbitcoin-server.vcxproj
+++ b/builds/msvc/vs2026/libbitcoin-server/libbitcoin-server.vcxproj
@@ -147,6 +147,7 @@
+
diff --git a/builds/msvc/vs2026/libbitcoin-server/libbitcoin-server.vcxproj.filters b/builds/msvc/vs2026/libbitcoin-server/libbitcoin-server.vcxproj.filters
index e1ca819f..61c5e7cb 100644
--- a/builds/msvc/vs2026/libbitcoin-server/libbitcoin-server.vcxproj.filters
+++ b/builds/msvc/vs2026/libbitcoin-server/libbitcoin-server.vcxproj.filters
@@ -141,6 +141,9 @@
src\protocols\native
+
+ src\protocols\native
+
src\protocols\native
diff --git a/include/bitcoin/server/protocols/protocol_electrum.hpp b/include/bitcoin/server/protocols/protocol_electrum.hpp
index 05236111..622e63a3 100644
--- a/include/bitcoin/server/protocols/protocol_electrum.hpp
+++ b/include/bitcoin/server/protocols/protocol_electrum.hpp
@@ -314,6 +314,13 @@ class BCS_API protocol_electrum
}
private:
+ // Aliases.
+ using array_t = network::rpc::array_t;
+ using object_t = network::rpc::object_t;
+ using version_t = protocol_electrum_version;
+ static constexpr electrum::version minimum = version_t::minimum;
+ static constexpr electrum::version maximum = version_t::maximum;
+
// Post to notification strand.
template
inline auto notify(Method&& method, Args&&... args) NOEXCEPT
@@ -322,13 +329,6 @@ class BCS_API protocol_electrum
BIND_SAFE(BIND_SHARED(method, args)));
}
- // Aliases.
- using array_t = network::rpc::array_t;
- using object_t = network::rpc::object_t;
- using version_t = protocol_electrum_version;
- static constexpr electrum::version minimum = version_t::minimum;
- static constexpr electrum::version maximum = version_t::maximum;
-
// Transformations.
static array_t transform(const unspents& unspents) NOEXCEPT;
static array_t transform(const histories& histories) NOEXCEPT;
diff --git a/include/bitcoin/server/protocols/protocol_html.hpp b/include/bitcoin/server/protocols/protocol_html.hpp
index 3e530b4d..747d2691 100644
--- a/include/bitcoin/server/protocols/protocol_html.hpp
+++ b/include/bitcoin/server/protocols/protocol_html.hpp
@@ -75,6 +75,14 @@ class BCS_API protocol_html
network::http::media_type type,
const network::http::request& request={}) NOEXCEPT;
+ /// Notifiers (websocket).
+ virtual void notify_json(boost::json::value&& model, size_t size_hint,
+ const network::http::request& request = {}) NOEXCEPT;
+ virtual void notify_text(std::string&& hexidecimal,
+ const network::http::request& request = {}) NOEXCEPT;
+ virtual void notify_chunk(system::data_chunk&& bytes,
+ const network::http::request& request = {}) NOEXCEPT;
+
/// Utilities.
std::filesystem::path to_path(
const std::string& target = "/") const NOEXCEPT;
diff --git a/include/bitcoin/server/protocols/protocol_native.hpp b/include/bitcoin/server/protocols/protocol_native.hpp
index 3b913402..5620466a 100644
--- a/include/bitcoin/server/protocols/protocol_native.hpp
+++ b/include/bitcoin/server/protocols/protocol_native.hpp
@@ -43,6 +43,7 @@ class BCS_API protocol_native
const network::channel::ptr& channel,
const options_t& options) NOEXCEPT
: protocol_html(session, channel, options),
+ notification_strand_(channel->service().get_executor()),
network::tracker(session->log)
{
}
@@ -63,7 +64,14 @@ class BCS_API protocol_native
void dispatch_websocket(
const network::http::request& request) NOEXCEPT override;
- /// REST interface handlers.
+
+ /// Event handlers.
+ /// -----------------------------------------------------------------------
+
+ bool handle_event(const code&, node::chase event_,
+ node::event_value) NOEXCEPT;
+
+ /// Interface handlers.
/// -----------------------------------------------------------------------
bool handle_get_configuration(const code& ec, interface::configuration,
@@ -174,12 +182,27 @@ class BCS_API protocol_native
bool handle_get_event_subscribe(const code& ec, interface::event_subscribe,
uint8_t version, uint8_t media, bool stop) NOEXCEPT;
-private:
+protected:
using media_type = network::http::media_type;
+
+ /// Notification event handlers.
+ /// -----------------------------------------------------------------------
+ void do_top(node::header_t link, media_type media) NOEXCEPT;
+ void do_block(node::header_t link, media_type media) NOEXCEPT;
+ void do_transaction(node::transaction_t link, media_type media) NOEXCEPT;
+
+private:
static constexpr uint8_t text = to_value(media_type::text_plain);
static constexpr uint8_t json = to_value(media_type::application_json);
- static constexpr uint8_t data = to_value(
- media_type::application_octet_stream);
+ static constexpr uint8_t data = to_value(media_type::application_octet_stream);
+
+ ////// Post to notification strand (use POST_NOTIFY()).
+ ////template
+ ////inline auto notify(Method&& method, Args&&... args) NOEXCEPT
+ ////{
+ //// return boost::asio::post(notification_strand_,
+ //// BIND_SAFE(BIND_SHARED(method, args)));
+ ////}
// Serializers.
// ------------------------------------------------------------------------
@@ -230,13 +253,16 @@ class BCS_API protocol_native
database::header_link to_header(const std::optional& height,
const std::optional& hash) NOEXCEPT;
+ // This is thread safe, uses network threadpool.
+ network::asio::strand notification_strand_;
+
// These are thread safe.
std::atomic_bool stopping_{};
// Unconditional (all).
- std::atomic_bool top_subscribe_{};
- std::atomic_bool block_subscribe_{};
- std::atomic_bool tx_subscribe_{};
+ std::atomic top_subscribe_{ media_type::unknown };
+ std::atomic block_subscribe_{ media_type::unknown };
+ std::atomic tx_subscribe_{ media_type::unknown };
// TODO: map of outpoints (notify on spenders).
std::atomic_bool output_subscribe_{};
diff --git a/src/protocols/electrum/protocol_electrum.cpp b/src/protocols/electrum/protocol_electrum.cpp
index 1cbefb15..6de22b33 100644
--- a/src/protocols/electrum/protocol_electrum.cpp
+++ b/src/protocols/electrum/protocol_electrum.cpp
@@ -170,9 +170,9 @@ bool protocol_electrum::handle_event(const code&, node::chase event_,
}
case node::chase::reorganized:
{
- // value is regression branch_point.
- BC_ASSERT(std::holds_alternative(value));
- POST_NOTIFY(do_reorganized, std::get(value));
+ // Value is regression branch_point.
+ BC_ASSERT(std::holds_alternative(value));
+ POST_NOTIFY(do_reorganized, std::get(value));
break;
}
default:
@@ -195,7 +195,7 @@ void protocol_electrum::do_reorganized(node::header_t) NOEXCEPT
for (auto& [key, sub]: address_subscriptions_)
{
- // flush resets hash accumulator, sub.type remains unchanged.
+ // Flush resets hash accumulator, sub.type remains unchanged.
sub.accumulator.flush();
sub.status = {};
sub.cursor = {};
diff --git a/src/protocols/native/protocol_native.cpp b/src/protocols/native/protocol_native.cpp
index c82cc620..63f8d238 100644
--- a/src/protocols/native/protocol_native.cpp
+++ b/src/protocols/native/protocol_native.cpp
@@ -28,6 +28,7 @@ namespace server {
using namespace system;
using namespace network;
using namespace std::placeholders;
+constexpr auto relaxed = std::memory_order_relaxed;
#define CLASS protocol_native
#define SUBSCRIBE_NATIVE(method, ...) \
@@ -166,63 +167,140 @@ void protocol_native::dispatch_websocket(const http::request& request) NOEXCEPT
}
}
-// Handlers.
+// Event handlers.
// ----------------------------------------------------------------------------
-bool protocol_native::handle_get_configuration(const code& ec,
- interface::configuration, uint8_t, uint8_t media) NOEXCEPT
+// capture chaser events
+bool protocol_native::handle_event(const code&, node::chase event_,
+ node::event_value value) NOEXCEPT
{
- if (stopped(ec))
+ // Do not pass ec to stopped as it is not a call status.
+ if (stopped())
return false;
- if (media != json)
+ switch (event_)
{
- send_not_acceptable();
- return true;
+ case node::chase::organized:
+ {
+ auto media = top_subscribe_.load(relaxed);
+ if (media != media_type::unknown)
+ {
+ // Increments height above a fork point (start/reorg).
+ BC_ASSERT(std::holds_alternative(value));
+ POST(do_top, std::get(value), media);
+ }
+
+ media = block_subscribe_.load(relaxed);
+ if (media != media_type::unknown)
+ {
+ // No block emission for a fork point (start/reorg).
+ BC_ASSERT(std::holds_alternative(value));
+ POST(do_block, std::get(value), media);
+ }
+
+ break;
+ }
+ case node::chase::reorganized:
+ {
+ const auto media = top_subscribe_.load(relaxed);
+ if (media != media_type::unknown)
+ {
+ // Resets subscriber height to the fork point.
+ BC_ASSERT(std::holds_alternative(value));
+ POST(do_top, std::get(value), media);
+ }
+
+ break;
+ }
+ case node::chase::transaction:
+ {
+ const auto media = tx_subscribe_.load(relaxed);
+ if (media != media_type::unknown)
+ {
+ BC_ASSERT(std::holds_alternative(value));
+ POST(do_transaction, std::get(value), media);
+ }
+
+ break;
+ }
+ default:
+ {
+ break;
+ }
}
- BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT)
- boost::json::object object
- {
- { "address", archive().address_enabled() },
- { "filter", archive().filter_enabled() },
- { "turbo", database_settings().turbo },
- { "witness", network_settings().witness_node() },
- { "retarget", system_settings().forks.retarget },
- { "difficult", system_settings().forks.difficult },
- };
- BC_POP_WARNING()
-
- send_json(std::move(object), 64);
return true;
}
-// TODO: add log level(s) param.
-bool protocol_native::handle_get_log_subscribe(const code& ec,
- interface::log_subscribe, uint8_t , uint8_t ,
- bool stop) NOEXCEPT
+BC_PUSH_WARNING(NO_INCOMPLETE_SWITCH)
+
+void protocol_native::do_top(node::header_t link, media_type media) NOEXCEPT
{
- if (stopped(ec))
- return false;
+ BC_ASSERT(stranded());
- // TODO: return enumeration (on stop?).
- log_subscribe_.store(stop);
- return {};
+ // TODO: notification.
+ const auto height = archive().get_height(link).value;
+ switch (to_value(media))
+ {
+ case data:
+ notify_chunk(to_little_endian_size(height));
+ return;
+ case text:
+ notify_text(encode_base16(to_little_endian_size(height)));
+ return;
+ case json:
+ notify_json(height, two * sizeof(height));
+ return;
+ }
}
-// TODO: add event(s) param.
-bool protocol_native::handle_get_event_subscribe(const code& ec,
- interface::event_subscribe, uint8_t , uint8_t ,
- bool stop) NOEXCEPT
+void protocol_native::do_block(node::header_t link, media_type media) NOEXCEPT
{
- if (stopped(ec))
- return false;
+ BC_ASSERT(stranded());
- // TODO: return enumeration (on stop?).
- event_subscribe_.store(stop);
- return {};
+ // TODO: notification.
+ const auto hash = archive().get_header_key(link);
+ switch (to_value(media))
+ {
+ case data:
+ notify_chunk(to_chunk(hash));
+ return;
+ case text:
+ notify_text(encode_base16(hash));
+ return;
+ case json:
+ BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT)
+ notify_json(value_from(encode_base16(hash)), two * hash_size);
+ BC_POP_WARNING()
+ return;
+ }
+}
+
+void protocol_native::do_transaction(node::transaction_t link,
+ media_type media) NOEXCEPT
+{
+ BC_ASSERT(stranded());
+
+ // TODO: notification.
+ const auto hash = archive().get_tx_key(link);
+ switch (to_value(media))
+ {
+ case data:
+ notify_chunk(to_chunk(hash));
+ return;
+ case text:
+ notify_text(encode_base16(hash));
+ return;
+ case json:
+ BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT)
+ notify_json(value_from(encode_base16(hash)), two * hash_size);
+ BC_POP_WARNING()
+ return;
+ }
}
+BC_POP_WARNING()
+
// Utilities.
// ----------------------------------------------------------------------------
// private
diff --git a/src/protocols/native/protocol_native_block.cpp b/src/protocols/native/protocol_native_block.cpp
index bf9a5344..75251a9e 100644
--- a/src/protocols/native/protocol_native_block.cpp
+++ b/src/protocols/native/protocol_native_block.cpp
@@ -64,8 +64,8 @@ bool protocol_native::handle_get_top_subscribe(const code& ec,
if (stopped(ec))
return false;
- // TODO: return only bool (previous state) if stop.
- top_subscribe_.store(stop);
+ // TODO: return only bool (previous state) if stop?
+ top_subscribe_.store(stop ? media_type::unknown : (media_type)media);
return handle_get_top(ec, {}, version, media);
}
@@ -500,14 +500,13 @@ bool protocol_native::handle_get_block_tx(const code& ec, interface::block_tx,
}
bool protocol_native::handle_get_block_subscribe(const code& ec,
- interface::block_subscribe, uint8_t version, uint8_t media,
- bool stop) NOEXCEPT
+ interface::block_subscribe, uint8_t, uint8_t media, bool stop) NOEXCEPT
{
if (stopped(ec))
return false;
- // TODO: return only bool (previous state) if stop.
- block_subscribe_.store(stop);
+ // TODO: return only bool (previous state) if stop?
+ block_subscribe_.store(stop ? media_type::unknown : (media_type)media);
// Return top block hash upon block subscription.
const auto& query = archive();
diff --git a/src/protocols/native/protocol_native_configuration.cpp b/src/protocols/native/protocol_native_configuration.cpp
new file mode 100644
index 00000000..a1496279
--- /dev/null
+++ b/src/protocols/native/protocol_native_configuration.cpp
@@ -0,0 +1,82 @@
+/**
+ * Copyright (c) 2011-2026 libbitcoin developers (see AUTHORS)
+ *
+ * This file is part of libbitcoin.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+#include
+
+#include
+#include
+
+namespace libbitcoin {
+namespace server {
+
+bool protocol_native::handle_get_configuration(const code& ec,
+ interface::configuration, uint8_t, uint8_t media) NOEXCEPT
+{
+ if (stopped(ec))
+ return false;
+
+ if (media != json)
+ {
+ send_not_acceptable();
+ return true;
+ }
+
+ BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT)
+ boost::json::object object
+ {
+ { "address", archive().address_enabled() },
+ { "filter", archive().filter_enabled() },
+ { "turbo", database_settings().turbo },
+ { "witness", network_settings().witness_node() },
+ { "retarget", system_settings().forks.retarget },
+ { "difficult", system_settings().forks.difficult },
+ };
+ BC_POP_WARNING()
+
+ send_json(std::move(object), 64);
+ return true;
+}
+
+// TODO: add log level(s) param.
+bool protocol_native::handle_get_log_subscribe(const code& ec,
+ interface::log_subscribe, uint8_t , uint8_t ,
+ bool stop) NOEXCEPT
+{
+ if (stopped(ec))
+ return false;
+
+ // TODO: return enumeration (on stop?).
+ log_subscribe_.store(stop);
+ return {};
+}
+
+// TODO: add event(s) param.
+bool protocol_native::handle_get_event_subscribe(const code& ec,
+ interface::event_subscribe, uint8_t , uint8_t ,
+ bool stop) NOEXCEPT
+{
+ if (stopped(ec))
+ return false;
+
+ // TODO: return enumeration (on stop?).
+ event_subscribe_.store(stop);
+ return {};
+}
+
+} // namespace server
+} // namespace libbitcoin
diff --git a/src/protocols/native/protocol_native_tx.cpp b/src/protocols/native/protocol_native_tx.cpp
index 006d78fb..1eef593a 100644
--- a/src/protocols/native/protocol_native_tx.cpp
+++ b/src/protocols/native/protocol_native_tx.cpp
@@ -25,6 +25,9 @@ namespace libbitcoin {
namespace server {
using namespace system;
+using namespace std::placeholders;
+
+#define CLASS protocol_native
BC_PUSH_WARNING(NO_INCOMPLETE_SWITCH)
BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT)
@@ -171,16 +174,19 @@ bool protocol_native::handle_get_tx_details(const code& ec,
}
bool protocol_native::handle_get_tx_subscribe(const code& ec,
- interface::tx_subscribe, uint8_t , uint8_t ,
- bool stop) NOEXCEPT
+ interface::tx_subscribe, uint8_t, uint8_t media, bool stop) NOEXCEPT
{
if (stopped(ec))
return false;
- tx_subscribe_.store(stop);
+ // TODO: return bool (previous state) only?
+ tx_subscribe_.store(stop ? media_type::unknown : (media_type)media);
- // TODO: return bool (previous state) only.
- send_ok();
+ // TODO: move to send/notify_empty().
+ using namespace network::http;
+ response out{};
+ out.body() = empty_value{};
+ NOTIFY(std::move(out), handle_complete, _1, error::success);
return true;
}
diff --git a/src/protocols/protocol_html.cpp b/src/protocols/protocol_html.cpp
index 26e04e61..1dabca42 100644
--- a/src/protocols/protocol_html.cpp
+++ b/src/protocols/protocol_html.cpp
@@ -257,6 +257,40 @@ void protocol_html::send_buffer(buffer_body::value_type&& buffer,
SEND(std::move(response), handle_complete, _1, error::success);
}
+// Notifiers (websocket).
+// ----------------------------------------------------------------------------
+
+void protocol_html::notify_json(boost::json::value&& model, size_t size_hint,
+ const request& request) NOEXCEPT
+{
+ BC_ASSERT(stranded());
+ response response{ status::ok, request.version() };
+ response.body() = json_value
+ {
+ .model = std::move(model),
+ .size_hint = size_hint
+ };
+ NOTIFY(std::move(response), handle_complete, _1, error::success);
+}
+
+void protocol_html::notify_text(std::string&& hexidecimal,
+ const request& request) NOEXCEPT
+{
+ BC_ASSERT(stranded());
+ response response{ status::ok, request.version() };
+ response.body() = std::move(hexidecimal);
+ NOTIFY(std::move(response), handle_complete, _1, error::success);
+}
+
+void protocol_html::notify_chunk(system::data_chunk&& bytes,
+ const request& request) NOEXCEPT
+{
+ BC_ASSERT(stranded());
+ response response{ status::ok, request.version() };
+ response.body() = std::move(bytes);
+ NOTIFY(std::move(response), handle_complete, _1, error::success);
+}
+
// Utilities.
// ----------------------------------------------------------------------------
diff --git a/test/protocols/native/native_block.cpp b/test/protocols/native/native_block.cpp
index c62ffcc1..61ea6a4b 100644
--- a/test/protocols/native/native_block.cpp
+++ b/test/protocols/native/native_block.cpp
@@ -118,4 +118,13 @@ BOOST_AUTO_TEST_CASE(native__ws_top_subscribe__json__expected)
BOOST_REQUIRE_EQUAL(response.as_int64(), 9);
}
+BOOST_AUTO_TEST_CASE(native__ws_top_subscribe__stop__expected)
+{
+ BOOST_REQUIRE(!ws_upgrade());
+
+ const auto response = ws_get_json("/v1/top/subscribe?stop=true");
+ BOOST_REQUIRE(response.is_int64());
+ BOOST_REQUIRE_EQUAL(response.as_int64(), 9);
+}
+
BOOST_AUTO_TEST_SUITE_END()