Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ src_libbitcoin_server_la_SOURCES = \
src/protocols/native/protocol_native.cpp \
src/protocols/native/protocol_native_address.cpp \
src/protocols/native/protocol_native_block.cpp \
src/protocols/native/protocol_native_configuration.cpp \
src/protocols/native/protocol_native_input.cpp \
src/protocols/native/protocol_native_output.cpp \
src/protocols/native/protocol_native_tx.cpp \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native.cpp" />
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_address.cpp" />
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_block.cpp" />
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_configuration.cpp" />
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_input.cpp" />
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_output.cpp" />
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_tx.cpp" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_block.cpp">
<Filter>src\protocols\native</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_configuration.cpp">
<Filter>src\protocols\native</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_input.cpp">
<Filter>src\protocols\native</Filter>
</ClCompile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native.cpp" />
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_address.cpp" />
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_block.cpp" />
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_configuration.cpp" />
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_input.cpp" />
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_output.cpp" />
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_tx.cpp" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_block.cpp">
<Filter>src\protocols\native</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_configuration.cpp">
<Filter>src\protocols\native</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\protocols\native\protocol_native_input.cpp">
<Filter>src\protocols\native</Filter>
</ClCompile>
Expand Down
14 changes: 7 additions & 7 deletions include/bitcoin/server/protocols/protocol_electrum.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,13 @@ class BCS_API protocol_electrum
}

private:
// Aliases.
using array_t = network::rpc::array_t;
using object_t = network::rpc::object_t;
using version_t = protocol_electrum_version;
static constexpr electrum::version minimum = version_t::minimum;
static constexpr electrum::version maximum = version_t::maximum;

// Post to notification strand.
template <class Derived, typename Method, typename... Args>
inline auto notify(Method&& method, Args&&... args) NOEXCEPT
Expand All @@ -322,13 +329,6 @@ class BCS_API protocol_electrum
BIND_SAFE(BIND_SHARED(method, args)));
}

// Aliases.
using array_t = network::rpc::array_t;
using object_t = network::rpc::object_t;
using version_t = protocol_electrum_version;
static constexpr electrum::version minimum = version_t::minimum;
static constexpr electrum::version maximum = version_t::maximum;

// Transformations.
static array_t transform(const unspents& unspents) NOEXCEPT;
static array_t transform(const histories& histories) NOEXCEPT;
Expand Down
8 changes: 8 additions & 0 deletions include/bitcoin/server/protocols/protocol_html.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ class BCS_API protocol_html
network::http::media_type type,
const network::http::request& request={}) NOEXCEPT;

/// Notifiers (websocket).
virtual void notify_json(boost::json::value&& model, size_t size_hint,
const network::http::request& request = {}) NOEXCEPT;
virtual void notify_text(std::string&& hexidecimal,
const network::http::request& request = {}) NOEXCEPT;
virtual void notify_chunk(system::data_chunk&& bytes,
const network::http::request& request = {}) NOEXCEPT;

/// Utilities.
std::filesystem::path to_path(
const std::string& target = "/") const NOEXCEPT;
Expand Down
40 changes: 33 additions & 7 deletions include/bitcoin/server/protocols/protocol_native.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class BCS_API protocol_native
const network::channel::ptr& channel,
const options_t& options) NOEXCEPT
: protocol_html(session, channel, options),
notification_strand_(channel->service().get_executor()),
network::tracker<protocol_native>(session->log)
{
}
Expand All @@ -63,7 +64,14 @@ class BCS_API protocol_native
void dispatch_websocket(
const network::http::request& request) NOEXCEPT override;

/// REST interface handlers.

/// Event handlers.
/// -----------------------------------------------------------------------

bool handle_event(const code&, node::chase event_,
node::event_value) NOEXCEPT;

/// Interface handlers.
/// -----------------------------------------------------------------------

bool handle_get_configuration(const code& ec, interface::configuration,
Expand Down Expand Up @@ -174,12 +182,27 @@ class BCS_API protocol_native
bool handle_get_event_subscribe(const code& ec, interface::event_subscribe,
uint8_t version, uint8_t media, bool stop) NOEXCEPT;

private:
protected:
using media_type = network::http::media_type;

/// Notification event handlers.
/// -----------------------------------------------------------------------
void do_top(node::header_t link, media_type media) NOEXCEPT;
void do_block(node::header_t link, media_type media) NOEXCEPT;
void do_transaction(node::transaction_t link, media_type media) NOEXCEPT;

private:
static constexpr uint8_t text = to_value(media_type::text_plain);
static constexpr uint8_t json = to_value(media_type::application_json);
static constexpr uint8_t data = to_value(
media_type::application_octet_stream);
static constexpr uint8_t data = to_value(media_type::application_octet_stream);

////// Post to notification strand (use POST_NOTIFY()).
////template <class Derived, typename Method, typename... Args>
////inline auto notify(Method&& method, Args&&... args) NOEXCEPT
////{
//// return boost::asio::post(notification_strand_,
//// BIND_SAFE(BIND_SHARED(method, args)));
////}

// Serializers.
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -230,13 +253,16 @@ class BCS_API protocol_native
database::header_link to_header(const std::optional<uint32_t>& height,
const std::optional<system::hash_cptr>& hash) NOEXCEPT;

// This is thread safe, uses network threadpool.
network::asio::strand notification_strand_;

// These are thread safe.
std::atomic_bool stopping_{};

// Unconditional (all).
std::atomic_bool top_subscribe_{};
std::atomic_bool block_subscribe_{};
std::atomic_bool tx_subscribe_{};
std::atomic<media_type> top_subscribe_{ media_type::unknown };
std::atomic<media_type> block_subscribe_{ media_type::unknown };
std::atomic<media_type> tx_subscribe_{ media_type::unknown };

// TODO: map of outpoints (notify on spenders).
std::atomic_bool output_subscribe_{};
Expand Down
8 changes: 4 additions & 4 deletions src/protocols/electrum/protocol_electrum.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ bool protocol_electrum::handle_event(const code&, node::chase event_,
}
case node::chase::reorganized:
{
// value is regression branch_point.
BC_ASSERT(std::holds_alternative<node::height_t>(value));
POST_NOTIFY(do_reorganized, std::get<node::height_t>(value));
// Value is regression branch_point.
BC_ASSERT(std::holds_alternative<node::header_t>(value));
POST_NOTIFY(do_reorganized, std::get<node::header_t>(value));
break;
}
default:
Expand All @@ -195,7 +195,7 @@ void protocol_electrum::do_reorganized(node::header_t) NOEXCEPT

for (auto& [key, sub]: address_subscriptions_)
{
// flush resets hash accumulator, sub.type remains unchanged.
// Flush resets hash accumulator, sub.type remains unchanged.
sub.accumulator.flush();
sub.status = {};
sub.cursor = {};
Expand Down
154 changes: 116 additions & 38 deletions src/protocols/native/protocol_native.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace server {
using namespace system;
using namespace network;
using namespace std::placeholders;
constexpr auto relaxed = std::memory_order_relaxed;

#define CLASS protocol_native
#define SUBSCRIBE_NATIVE(method, ...) \
Expand Down Expand Up @@ -166,63 +167,140 @@ void protocol_native::dispatch_websocket(const http::request& request) NOEXCEPT
}
}

// Handlers.
// Event handlers.
// ----------------------------------------------------------------------------

bool protocol_native::handle_get_configuration(const code& ec,
interface::configuration, uint8_t, uint8_t media) NOEXCEPT
// capture chaser events
bool protocol_native::handle_event(const code&, node::chase event_,
node::event_value value) NOEXCEPT
{
if (stopped(ec))
// Do not pass ec to stopped as it is not a call status.
if (stopped())
return false;

if (media != json)
switch (event_)
{
send_not_acceptable();
return true;
case node::chase::organized:
{
auto media = top_subscribe_.load(relaxed);
if (media != media_type::unknown)
{
// Increments height above a fork point (start/reorg).
BC_ASSERT(std::holds_alternative<node::header_t>(value));
POST(do_top, std::get<node::header_t>(value), media);
}

media = block_subscribe_.load(relaxed);
if (media != media_type::unknown)
{
// No block emission for a fork point (start/reorg).
BC_ASSERT(std::holds_alternative<node::header_t>(value));
POST(do_block, std::get<node::header_t>(value), media);
}

break;
}
case node::chase::reorganized:
{
const auto media = top_subscribe_.load(relaxed);
if (media != media_type::unknown)
{
// Resets subscriber height to the fork point.
BC_ASSERT(std::holds_alternative<node::height_t>(value));
POST(do_top, std::get<node::height_t>(value), media);
}

break;
}
case node::chase::transaction:
{
const auto media = tx_subscribe_.load(relaxed);
if (media != media_type::unknown)
{
BC_ASSERT(std::holds_alternative<node::transaction_t>(value));
POST(do_transaction, std::get<node::transaction_t>(value), media);
}

break;
}
default:
{
break;
}
}

BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT)
boost::json::object object
{
{ "address", archive().address_enabled() },
{ "filter", archive().filter_enabled() },
{ "turbo", database_settings().turbo },
{ "witness", network_settings().witness_node() },
{ "retarget", system_settings().forks.retarget },
{ "difficult", system_settings().forks.difficult },
};
BC_POP_WARNING()

send_json(std::move(object), 64);
return true;
}

// TODO: add log level(s) param.
bool protocol_native::handle_get_log_subscribe(const code& ec,
interface::log_subscribe, uint8_t , uint8_t ,
bool stop) NOEXCEPT
BC_PUSH_WARNING(NO_INCOMPLETE_SWITCH)

void protocol_native::do_top(node::header_t link, media_type media) NOEXCEPT
{
if (stopped(ec))
return false;
BC_ASSERT(stranded());

// TODO: return enumeration (on stop?).
log_subscribe_.store(stop);
return {};
// TODO: notification.
const auto height = archive().get_height(link).value;
switch (to_value(media))
{
case data:
notify_chunk(to_little_endian_size(height));
return;
case text:
notify_text(encode_base16(to_little_endian_size(height)));
return;
case json:
notify_json(height, two * sizeof(height));
return;
}
}

// TODO: add event(s) param.
bool protocol_native::handle_get_event_subscribe(const code& ec,
interface::event_subscribe, uint8_t , uint8_t ,
bool stop) NOEXCEPT
void protocol_native::do_block(node::header_t link, media_type media) NOEXCEPT
{
if (stopped(ec))
return false;
BC_ASSERT(stranded());

// TODO: return enumeration (on stop?).
event_subscribe_.store(stop);
return {};
// TODO: notification.
const auto hash = archive().get_header_key(link);
switch (to_value(media))
{
case data:
notify_chunk(to_chunk(hash));
return;
case text:
notify_text(encode_base16(hash));
return;
case json:
BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT)
notify_json(value_from(encode_base16(hash)), two * hash_size);
BC_POP_WARNING()
return;
}
}

void protocol_native::do_transaction(node::transaction_t link,
media_type media) NOEXCEPT
{
BC_ASSERT(stranded());

// TODO: notification.
const auto hash = archive().get_tx_key(link);
switch (to_value(media))
{
case data:
notify_chunk(to_chunk(hash));
return;
case text:
notify_text(encode_base16(hash));
return;
case json:
BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT)
notify_json(value_from(encode_base16(hash)), two * hash_size);
BC_POP_WARNING()
return;
}
}

BC_POP_WARNING()

// Utilities.
// ----------------------------------------------------------------------------
// private
Expand Down
11 changes: 5 additions & 6 deletions src/protocols/native/protocol_native_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ bool protocol_native::handle_get_top_subscribe(const code& ec,
if (stopped(ec))
return false;

// TODO: return only bool (previous state) if stop.
top_subscribe_.store(stop);
// TODO: return only bool (previous state) if stop?
top_subscribe_.store(stop ? media_type::unknown : (media_type)media);
return handle_get_top(ec, {}, version, media);
}

Expand Down Expand Up @@ -500,14 +500,13 @@ bool protocol_native::handle_get_block_tx(const code& ec, interface::block_tx,
}

bool protocol_native::handle_get_block_subscribe(const code& ec,
interface::block_subscribe, uint8_t version, uint8_t media,
bool stop) NOEXCEPT
interface::block_subscribe, uint8_t, uint8_t media, bool stop) NOEXCEPT
{
if (stopped(ec))
return false;

// TODO: return only bool (previous state) if stop.
block_subscribe_.store(stop);
// TODO: return only bool (previous state) if stop?
block_subscribe_.store(stop ? media_type::unknown : (media_type)media);

// Return top block hash upon block subscription.
const auto& query = archive();
Expand Down
Loading
Loading