Skip to content

Commit 0b7fad9

Browse files
committed
Plausible improvements
1 parent 4f1df95 commit 0b7fad9

4 files changed

Lines changed: 35 additions & 15 deletions

File tree

include/ccf/endpoint.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ namespace ccf::endpoints
264264
// committed (ie - assigned a transaction ID)
265265
LocallyCommittedEndpointFunction locally_committed_func;
266266

267-
// Functor which is invoked to modify the response after it is reaches a
267+
// Functor which is invoked to modify the response after it reaches a
268268
// terminal consensus state (ie - it is either globally committed, or
269269
// invalidated)
270270
ConsensusCommittedEndpointFunction consensus_committed_func;

src/consensus/aft/raft.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2607,7 +2607,7 @@ namespace aft
26072607
if (commit_callbacks != nullptr)
26082608
{
26092609
const auto term = get_term_internal(idx);
2610-
commit_callbacks->execute_callbacks({term, idx});
2610+
commit_callbacks->trigger_callbacks({term, idx});
26112611
}
26122612

26132613
RAFT_DEBUG_FMT("Commit on {}: {}", state->node_id, idx);

src/http/http_session.h

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,12 @@ namespace http
3838
std::unique_ptr<ccf::tls::Context> ctx,
3939
const ccf::http::ParserConfiguration& configuration,
4040
const std::shared_ptr<ErrorReporter>& error_reporter_,
41-
const std::shared_ptr<ccf::CommitCallbackSubsystem>& commit_callbacks =
42-
nullptr) :
41+
const std::shared_ptr<ccf::CommitCallbackSubsystem>& commit_callbacks_) :
4342
HTTPSession(session_id_, writer_factory, std::move(ctx)),
4443
request_parser(*this, configuration),
4544
rpc_map(std::move(rpc_map_)),
4645
error_reporter(error_reporter_),
47-
commit_callbacks(commit_callbacks),
46+
commit_callbacks(commit_callbacks_),
4847
interface_id(std::move(interface_id_))
4948
{}
5049

@@ -186,25 +185,29 @@ namespace http
186185
// maintain session consistency
187186
ccf::tasks::Resumable paused_task = ccf::tasks::pause_current_task();
188187

188+
// shared_from_this returns a base session type
189+
std::shared_ptr<ccf::ThreadedSession> self = shared_from_this();
190+
189191
// Register for a callback when this TxID is committed (or
190192
// invalidated)
191193
commit_callbacks->add_callback(
192194
tx_id,
193-
[this, rpc_ctx, paused_task, committed_func](
195+
[self, rpc_ctx, paused_task, committed_func](
194196
ccf::TxID transaction_id, ccf::FinalTxStatus status) {
195197
// Let the handler modify the response
196198
committed_func(rpc_ctx, transaction_id, status);
197199

198200
// Write the response
199-
this->send_response(
201+
send_response_impl(
202+
*self,
200203
rpc_ctx->get_response_http_status(),
201204
rpc_ctx->get_response_headers(),
202205
rpc_ctx->get_response_trailers(),
203206
std::move(rpc_ctx->take_response_body()));
204207

205208
if (rpc_ctx->terminate_session)
206209
{
207-
close_session();
210+
self->close_session();
208211
}
209212

210213
// Resume processing work for this session
@@ -240,11 +243,12 @@ namespace http
240243
}
241244
}
242245

243-
bool send_response(
246+
static bool send_response_impl(
247+
ccf::ThreadedSession& session,
244248
ccf::http_status status_code,
245249
ccf::http::HeaderMap&& headers,
246250
ccf::http::HeaderMap&& trailers,
247-
std::vector<uint8_t>&& body) override
251+
std::vector<uint8_t>&& body)
248252
{
249253
if (!trailers.empty())
250254
{
@@ -263,9 +267,23 @@ namespace http
263267
false /* Don't overwrite any existing content-length header */
264268
);
265269

266-
send_data(response.build_response());
270+
session.send_data(response.build_response());
267271
return true;
268272
}
273+
274+
bool send_response(
275+
ccf::http_status status_code,
276+
ccf::http::HeaderMap&& headers,
277+
ccf::http::HeaderMap&& trailers,
278+
std::vector<uint8_t>&& body) override
279+
{
280+
return send_response_impl(
281+
*this,
282+
status_code,
283+
std::move(headers),
284+
std::move(trailers),
285+
std::move(body));
286+
}
269287
};
270288

271289
class HTTPClientSession : public HTTPSession,

src/node/commit_callback_subsystem.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@ namespace ccf
1717

1818
std::optional<ccf::TxID> known_commit = std::nullopt;
1919

20-
std::mutex callbacks_mutex;
20+
// Use a recursive mutex so that `add_callback` may safely be called while a
21+
// callback is executing (and the mutex is locked)
22+
std::recursive_mutex callbacks_mutex;
2123

2224
public:
2325
CommitCallbackSubsystem() = default;
2426

2527
void add_callback(ccf::TxID tx_id, CommitCallback&& callback) override
2628
{
27-
std::lock_guard<std::mutex> guard(callbacks_mutex);
29+
std::lock_guard<std::recursive_mutex> guard(callbacks_mutex);
2830

2931
if (known_commit.has_value())
3032
{
@@ -49,9 +51,9 @@ namespace ccf
4951
std::make_pair(tx_id, std::move(callback)));
5052
}
5153

52-
void execute_callbacks(ccf::TxID committed)
54+
void trigger_callbacks(ccf::TxID committed)
5355
{
54-
std::lock_guard<std::mutex> guard(callbacks_mutex);
56+
std::lock_guard<std::recursive_mutex> guard(callbacks_mutex);
5557

5658
known_commit = committed;
5759

0 commit comments

Comments
 (0)