Skip to content

Commit e10e86f

Browse files
authored
Merge pull request #1219 from cleaton/connection-pool-feature
Add connection pool option (V3)
2 parents 83ba905 + d10cbfe commit e10e86f

11 files changed

Lines changed: 266 additions & 4 deletions

cpr/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ add_library(cpr
66
auth.cpp
77
callback.cpp
88
cert_info.cpp
9+
connection_pool.cpp
910
cookies.cpp
1011
cprtypes.cpp
1112
curl_container.cpp

cpr/connection_pool.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#include "cpr/connection_pool.h"
2+
#include <curl/curl.h>
3+
#include <memory>
4+
#include <mutex>
5+
6+
namespace cpr {
7+
ConnectionPool::ConnectionPool() {
8+
CURLSH* curl_share = curl_share_init();
9+
this->connection_mutex_ = std::make_shared<std::mutex>();
10+
11+
auto lock_f = +[](CURL* /*handle*/, curl_lock_data /*data*/, curl_lock_access /*access*/, void* userptr) {
12+
std::mutex* lock = static_cast<std::mutex*>(userptr);
13+
lock->lock(); // cppcheck-suppress localMutex // False positive: mutex is used as callback for libcurl, not local scope
14+
};
15+
16+
auto unlock_f = +[](CURL* /*handle*/, curl_lock_data /*data*/, void* userptr) {
17+
std::mutex* lock = static_cast<std::mutex*>(userptr);
18+
lock->unlock();
19+
};
20+
21+
curl_share_setopt(curl_share, CURLSHOPT_SHARE, CURL_LOCK_DATA_CONNECT);
22+
curl_share_setopt(curl_share, CURLSHOPT_USERDATA, this->connection_mutex_.get());
23+
curl_share_setopt(curl_share, CURLSHOPT_LOCKFUNC, lock_f);
24+
curl_share_setopt(curl_share, CURLSHOPT_UNLOCKFUNC, unlock_f);
25+
26+
this->curl_sh_ = std::shared_ptr<CURLSH>(curl_share,
27+
[](CURLSH* ptr) {
28+
// Make sure to reset callbacks before cleanup to avoid deadlocks
29+
curl_share_setopt(ptr, CURLSHOPT_LOCKFUNC, nullptr);
30+
curl_share_setopt(ptr, CURLSHOPT_UNLOCKFUNC, nullptr);
31+
curl_share_cleanup(ptr);
32+
});
33+
}
34+
35+
void ConnectionPool::SetupHandler(CURL* easy_handler) const {
36+
curl_easy_setopt(easy_handler, CURLOPT_SHARE, this->curl_sh_.get());
37+
}
38+
39+
} // namespace cpr

cpr/session.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "cpr/body_view.h"
3232
#include "cpr/callback.h"
3333
#include "cpr/connect_timeout.h"
34+
#include "cpr/connection_pool.h"
3435
#include "cpr/cookies.h"
3536
#include "cpr/cprtypes.h"
3637
#include "cpr/curlholder.h"
@@ -397,6 +398,11 @@ void Session::SetConnectTimeout(const ConnectTimeout& timeout) {
397398
curl_easy_setopt(curl_->handle, CURLOPT_CONNECTTIMEOUT_MS, timeout.Milliseconds());
398399
}
399400

401+
void Session::SetConnectionPool(const ConnectionPool& pool) {
402+
CURL* curl = curl_->handle;
403+
pool.SetupHandler(curl);
404+
}
405+
400406
void Session::SetAuth(const Authentication& auth) {
401407
// Ignore here since this has been defined by libcurl.
402408
switch (auth.GetAuthMode()) {
@@ -1091,6 +1097,7 @@ void Session::SetOption(const MultiRange& multi_range) { SetMultiRange(multi_ran
10911097
void Session::SetOption(const ReserveSize& reserve_size) { SetReserveSize(reserve_size.size); }
10921098
void Session::SetOption(const AcceptEncoding& accept_encoding) { SetAcceptEncoding(accept_encoding); }
10931099
void Session::SetOption(AcceptEncoding&& accept_encoding) { SetAcceptEncoding(std::move(accept_encoding)); }
1100+
void Session::SetOption(const ConnectionPool& pool) { SetConnectionPool(pool); }
10941101
// clang-format on
10951102

10961103
void Session::SetCancellationParam(std::shared_ptr<std::atomic_bool> param) {

include/cpr/connection_pool.h

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
#ifndef CPR_CONNECTION_POOL_H
2+
#define CPR_CONNECTION_POOL_H
3+
4+
#include <curl/curl.h>
5+
#include <memory>
6+
#include <mutex>
7+
8+
namespace cpr {
9+
/**
10+
* cpr connection pool implementation for sharing connections between HTTP requests.
11+
*
12+
* The ConnectionPool enables connection reuse across multiple HTTP requests to the same host,
13+
* which can significantly improve performance by avoiding the overhead of establishing new
14+
* connections for each request. It uses libcurl's CURLSH (share) interface to manage
15+
* connection sharing in a thread-safe manner.
16+
*
17+
* Example:
18+
* ```cpp
19+
* // Create a connection pool
20+
* cpr::ConnectionPool pool;
21+
*
22+
* // Use the pool with requests to reuse connections
23+
* cpr::Response r1 = cpr::Get(cpr::Url{"http://example.com/api/data"}, pool);
24+
* cpr::Response r2 = cpr::Get(cpr::Url{"http://example.com/api/more"}, pool);
25+
*
26+
* // Or with async requests
27+
* auto future1 = cpr::GetAsync(cpr::Url{"http://example.com/api/data"}, pool);
28+
* auto future2 = cpr::GetAsync(cpr::Url{"http://example.com/api/more"}, pool);
29+
* ```
30+
**/
31+
class ConnectionPool {
32+
public:
33+
/**
34+
* Creates a new connection pool with shared connection state.
35+
* Initializes the underlying CURLSH handle and sets up thread-safe locking mechanisms.
36+
**/
37+
ConnectionPool();
38+
39+
/**
40+
* Copy constructor - creates a new connection pool sharing the same connection state.
41+
* Multiple ConnectionPool instances can share the same underlying connection pool.
42+
**/
43+
ConnectionPool(const ConnectionPool&) = default;
44+
45+
/**
46+
* Copy assignment operator is deleted to prevent accidental copying.
47+
* Use the copy constructor if you need to share the connection pool.
48+
**/
49+
ConnectionPool& operator=(const ConnectionPool&) = delete;
50+
51+
/**
52+
* Configures a CURL easy handle to use this connection pool.
53+
* This method sets up the easy handle to participate in connection sharing
54+
* managed by this pool.
55+
*
56+
* @param easy_handler The CURL easy handle to configure for connection sharing.
57+
**/
58+
void SetupHandler(CURL* easy_handler) const;
59+
60+
private:
61+
/**
62+
* Thread-safe mutex used for synchronizing access to shared connections.
63+
* This mutex is passed to libcurl's locking callbacks to ensure thread safety
64+
* when multiple threads access the same connection pool. It's declared first
65+
* to ensure it's destroyed last, after the CURLSH handle that references it.
66+
**/
67+
std::shared_ptr<std::mutex> connection_mutex_;
68+
69+
/**
70+
* Shared CURL handle (CURLSH) that manages the actual connection sharing.
71+
* This handle maintains the pool of reusable connections and is configured
72+
* with appropriate locking callbacks for thread safety. The shared_ptr uses
73+
* a custom deleter that safely resets the lock/unlock callbacks before
74+
* calling curl_share_cleanup() to prevent use-after-free issues during destruction.
75+
* Declared last to ensure it's destroyed first, before the mutex it references.
76+
**/
77+
std::shared_ptr<CURLSH> curl_sh_;
78+
};
79+
} // namespace cpr
80+
#endif

include/cpr/cpr.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "cpr/callback.h"
88
#include "cpr/cert_info.h"
99
#include "cpr/connect_timeout.h"
10+
#include "cpr/connection_pool.h"
1011
#include "cpr/cookies.h"
1112
#include "cpr/cprtypes.h"
1213
#include "cpr/cprver.h"

include/cpr/curlholder.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
#include <array>
55
#include <curl/curl.h>
66
#include <mutex>
7-
#include <string>
87

98
#include "cpr/secure_string.h"
109

include/cpr/session.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "cpr/body_view.h"
1919
#include "cpr/callback.h"
2020
#include "cpr/connect_timeout.h"
21+
#include "cpr/connection_pool.h"
2122
#include "cpr/cookies.h"
2223
#include "cpr/cprtypes.h"
2324
#include "cpr/curlholder.h"
@@ -72,6 +73,7 @@ class Session : public std::enable_shared_from_this<Session> {
7273
[[nodiscard]] const Header& GetHeader() const;
7374
void SetTimeout(const Timeout& timeout);
7475
void SetConnectTimeout(const ConnectTimeout& timeout);
76+
void SetConnectionPool(const ConnectionPool& pool);
7577
void SetAuth(const Authentication& auth);
7678
// Only supported with libcurl >= 7.61.0.
7779
// As an alternative use SetHeader and add the token manually.
@@ -137,6 +139,7 @@ class Session : public std::enable_shared_from_this<Session> {
137139
void SetOption(const Timeout& timeout);
138140
void SetOption(const ConnectTimeout& timeout);
139141
void SetOption(const Authentication& auth);
142+
void SetOption(const ConnectionPool& pool);
140143
// Only supported with libcurl >= 7.61.0.
141144
// As an alternative use SetHeader and add the token manually.
142145
#if LIBCURL_VERSION_NUM >= 0x073D00

test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ add_cpr_test(file_upload)
7070
add_cpr_test(singleton)
7171
add_cpr_test(threadpool)
7272
add_cpr_test(testUtils)
73+
add_cpr_test(connection_pool)
7374

7475
if (ENABLE_SSL_TESTS)
7576
add_cpr_test(ssl)

test/abstractServer.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ static void EventHandler(mg_connection* conn, int event, void* event_data, void*
4949

5050
case MG_EV_HTTP_MSG: {
5151
AbstractServer* server = static_cast<AbstractServer*>(context);
52+
// Use the connection address as unique identifier instead
53+
int port = AbstractServer::GetRemotePort(conn);
54+
server->AddConnection(port);
5255
server->OnRequest(conn, static_cast<mg_http_message*>(event_data));
5356
} break;
5457

@@ -79,6 +82,18 @@ void AbstractServer::Run() {
7982
server_stop_cv.notify_all();
8083
}
8184

85+
void AbstractServer::AddConnection(int remote_port) {
86+
unique_connections.insert(remote_port);
87+
}
88+
89+
size_t AbstractServer::GetConnectionCount() {
90+
return unique_connections.size();
91+
}
92+
93+
void AbstractServer::ResetConnectionCount() {
94+
unique_connections.clear();
95+
}
96+
8297
static const std::string base64_chars =
8398
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
8499
"abcdefghijklmnopqrstuvwxyz"

test/abstractServer.hpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <memory>
88
#include <mutex>
99
#include <string>
10+
#include <set>
1011

1112
#include "cpr/cpr.h"
1213
#include "mongoose.h"
@@ -38,18 +39,26 @@ class AbstractServer : public testing::Environment {
3839
void Start();
3940
void Stop();
4041

42+
size_t GetConnectionCount();
43+
void ResetConnectionCount();
44+
void AddConnection(int remote_port);
45+
4146
virtual std::string GetBaseUrl() = 0;
4247
virtual uint16_t GetPort() = 0;
4348

4449
virtual void acceptConnection(mg_connection* conn) = 0;
4550
virtual void OnRequest(mg_connection* conn, mg_http_message* msg) = 0;
4651

52+
static uint16_t GetRemotePort(const mg_connection* conn);
53+
static uint16_t GetLocalPort(const mg_connection* conn);
54+
4755
private:
4856
std::shared_ptr<std::thread> serverThread{nullptr};
4957
std::mutex server_mutex;
5058
std::condition_variable server_start_cv;
5159
std::condition_variable server_stop_cv;
5260
std::atomic<bool> should_run{false};
61+
std::set<int> unique_connections;
5362

5463
void Run();
5564

@@ -61,9 +70,6 @@ class AbstractServer : public testing::Environment {
6170
static std::string Base64Decode(const std::string& in);
6271
static void SendError(mg_connection* conn, int code, std::string& reason);
6372
static bool IsConnectionActive(mg_mgr* mgr, mg_connection* conn);
64-
65-
static uint16_t GetRemotePort(const mg_connection* conn);
66-
static uint16_t GetLocalPort(const mg_connection* conn);
6773
};
6874
} // namespace cpr
6975

0 commit comments

Comments
 (0)