Skip to content
34 changes: 34 additions & 0 deletions example/http_c++/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/channel.h>
#include "bthread/countdown_event.h"

DEFINE_string(d, "", "POST this data to the http server");
DEFINE_bool(progressive, false, "whether or not progressive read data from server");
DEFINE_int32(progressive_read_timeout_ms, 5000, "progressive read data idle timeout in milliseconds");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 2000, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
Expand All @@ -36,6 +39,25 @@ namespace brpc {
DECLARE_bool(http_verbose);
}

class PartDataReader: public brpc::ProgressiveReader {
public:
explicit PartDataReader(bthread::CountdownEvent* done): _done(done){}

butil::Status OnReadOnePart(const void* data, size_t length) {
memcpy(_buffer, data, length);
LOG(INFO) << "data : " << _buffer << " size : " << length;
return butil::Status::OK();
}

void OnEndOfMessage(const butil::Status& status) {
_done->signal();
LOG(INFO) << "progressive read data final status : " << status;
}
private:
char _buffer[1024];
bthread::CountdownEvent* _done;
};

int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
Expand Down Expand Up @@ -71,13 +93,25 @@ int main(int argc, char* argv[]) {
cntl.request_attachment().append(FLAGS_d);
}

if (FLAGS_progressive) {
cntl.set_progressive_read_timeout_ms(FLAGS_progressive_read_timeout_ms);
cntl.response_will_be_read_progressively();
}

// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
if (cntl.Failed()) {
std::cerr << cntl.ErrorText() << std::endl;
return -1;
}

if (FLAGS_progressive) {
bthread::CountdownEvent done(1);
cntl.ReadProgressiveAttachmentBy(new PartDataReader(&done));
done.wait();
LOG(INFO) << "wait client progressive read done safely";
}
// If -http_verbose is on, brpc already prints the response to stderr.
if (!brpc::FLAGS_http_verbose) {
std::cout << cntl.response_attachment() << std::endl;
Expand Down
7 changes: 7 additions & 0 deletions example/http_c++/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
DEFINE_string(certificate, "cert.pem", "Certificate file path to enable SSL");
DEFINE_string(private_key, "key.pem", "Private key file path to enable SSL");
DEFINE_string(ciphers, "", "Cipher suite used for SSL connections");
DEFINE_bool(enable_progressive_timeout, false, "whether or not trigger progressive write attachement data timeout");

namespace example {

Expand Down Expand Up @@ -104,6 +105,9 @@ class FileServiceImpl : public FileService {

// sleep a while to send another part.
bthread_usleep(10000);
if (FLAGS_enable_progressive_timeout && i > 50) {
bthread_usleep(100000000UL);
}
}
return NULL;
}
Expand Down Expand Up @@ -194,6 +198,9 @@ class HttpSSEServiceImpl : public HttpSSEService {

// sleep a while to send another part.
bthread_usleep(10000 * 10);
if (FLAGS_enable_progressive_timeout && i > 50) {
bthread_usleep(100000000UL);
}
}
return NULL;
}
Expand Down
54 changes: 53 additions & 1 deletion src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ DEFINE_bool(graceful_quit_on_sigterm, false,
"Register SIGTERM handle func to quit graceful");
DEFINE_bool(graceful_quit_on_sighup, false,
"Register SIGHUP handle func to quit graceful");

DEFINE_bool(log_idle_progressive_read_close, false,
"Print log when an idle progressive read is closed");
const IdlNames idl_single_req_single_res = { "req", "res" };
const IdlNames idl_single_req_multi_res = { "req", "" };
const IdlNames idl_multi_req_single_res = { "", "res" };
Expand Down Expand Up @@ -331,6 +332,15 @@ void Controller::Call::Reset() {
stream_user_data = NULL;
}

void Controller::set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms){
if(progressive_read_timeout_ms <= 0x7fffffff){
_progressive_read_timeout_ms = progressive_read_timeout_ms;
} else {
_progressive_read_timeout_ms = 0x7fffffff;
LOG(WARNING) << "progressive_read_timeout_seconds is limited to 0x7fffffff";
}
}

void Controller::set_timeout_ms(int64_t timeout_ms) {
if (timeout_ms <= 0x7fffffff) {
_timeout_ms = timeout_ms;
Expand Down Expand Up @@ -1027,6 +1037,43 @@ void Controller::SubmitSpan() {
_span = NULL;
}

void* Controller::HandleIdleProgressiveReader(void* arg) {
auto* cntl = static_cast<Controller*>(arg);
const uint64_t CHECK_INTERVAL_US = 1000000UL;
Comment thread
zchuango marked this conversation as resolved.
Outdated
auto log_idle = FLAGS_log_idle_progressive_read_close;
std::vector<SocketId> remove_socket_ids;
while (bthread_usleep(CHECK_INTERVAL_US) == 0) {
// TODO: this is not efficient for a lot of connections(>100K)
Comment thread
zchuango marked this conversation as resolved.
Outdated
auto socketIds = cntl->_checking_progressive_read_fds;
Comment thread
zchuango marked this conversation as resolved.
Outdated
int64_t progressive_read_timeout_us = cntl->_progressive_read_timeout_ms * 1000;
for (auto socket_id : socketIds){
SocketUniquePtr s;
if (Socket::Address(socket_id, &s) == 0) {
auto cpuwide_time_us = butil::cpuwide_time_us();
const int64_t last_active_us = s->last_active_time_us();
if (cpuwide_time_us - last_active_us <= progressive_read_timeout_us) {
continue;
}
LOG_IF(INFO, log_idle) << "progressive read timeout socket id : " << socket_id
<< " progressive read timeout us : " << progressive_read_timeout_us
<< " progressive read idle duration : " << cpuwide_time_us - last_active_us;
if (s->parsing_context() != NULL) {
s->parsing_context()->Destroy();
}
s->ReleaseReferenceIfIdle(0xffffffff);
remove_socket_ids.push_back(socket_id);
} else {
LOG(ERROR) << "not found the socket id : " << socket_id;
remove_socket_ids.push_back(socket_id);
}
}
for (auto remove_socket_id : remove_socket_ids) {
socketIds.erase(remove_socket_id);
}
}
return NULL;
}

void Controller::HandleSendFailed() {
if (!FailedInline()) {
SetFailed("Must be SetFailed() before calling HandleSendFailed()");
Expand Down Expand Up @@ -1179,6 +1226,11 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
// Tag the socket so that when the response comes back, the parser will
// stop before reading all body.
_current_call.sending_sock->read_will_be_progressive(_connection_type);
auto socket_id = _current_call.sending_sock->id();
if (_progressive_read_timeout_ms > 0 && _checking_progressive_read_fds.seek(socket_id) == NULL) {
_checking_progressive_read_fds.insert(socket_id);
LOG(INFO) << "insert the progressive read fd : " << socket_id << " socket fds size : " << _checking_progressive_read_fds.size();
Comment thread
zchuango marked this conversation as resolved.
Outdated
}
}

// Handle authentication
Expand Down
25 changes: 22 additions & 3 deletions src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
#include "brpc/grpc.h"
#include "brpc/kvmap.h"
#include "brpc/rpc_dump.h"

// EAUTH is defined in MAC
#ifndef EAUTH
#define EAUTH ERPCAUTH
Expand Down Expand Up @@ -163,7 +162,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
uint64_t log_id;
std::string request_id;
};

static void* HandleIdleProgressiveReader(void* arg);
public:
Controller();
Controller(const Inheritable& parent_ctx);
Expand All @@ -177,6 +176,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);

// Set/get timeout in milliseconds for the RPC call. Use
// ChannelOptions.timeout_ms on unset.
void set_progressive_read_timeout_ms(int32_t progressive_read_timeout_ms);
int32_t progressive_read_timeout_ms() const { return _progressive_read_timeout_ms; }

void set_timeout_ms(int64_t timeout_ms);
int64_t timeout_ms() const { return _timeout_ms; }

Expand Down Expand Up @@ -323,7 +325,19 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);

// Make the RPC end when the HTTP response has complete headers and let
// user read the remaining body by using ReadProgressiveAttachmentBy().
void response_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); }
void response_will_be_read_progressively() {
if(has_flag(FLAGS_READ_PROGRESSIVELY) && _progressive_read_idle_tid > 0) {
return;
}
bthread_attr_t tmp = BTHREAD_ATTR_NORMAL;
tmp.tag = _bthread_tag;
if(bthread_start_background(&_progressive_read_idle_tid, &tmp, HandleIdleProgressiveReader, this) != 0){
Comment thread
zchuango marked this conversation as resolved.
Outdated
LOG(FATAL) << "Failed to start controller bthread id : " << _progressive_read_idle_tid;
}
LOG(INFO) << "Start Response progressive reader idle checker close idle_tid : " << _progressive_read_idle_tid
Comment thread
zchuango marked this conversation as resolved.
Outdated
<< " _bthread_tag : " << _bthread_tag;
add_flag(FLAGS_READ_PROGRESSIVELY);
}
// Make the RPC end when the HTTP request has complete headers and let
// user read the remaining body by using ReadProgressiveAttachmentBy().
void request_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); }
Expand Down Expand Up @@ -837,6 +851,11 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
int32_t _timeout_ms;
int32_t _connect_timeout_ms;
int32_t _backup_request_ms;
int32_t _progressive_read_timeout_ms;
butil::FlatSet<SocketId> _checking_progressive_read_fds;
Comment thread
zchuango marked this conversation as resolved.
Outdated
bthread_t _progressive_read_idle_tid;
Comment thread
zchuango marked this conversation as resolved.
Outdated
// Controller belongs to this tag
bthread_tag_t _bthread_tag = bthread_self_tag();
Comment thread
zchuango marked this conversation as resolved.
Outdated
// Priority: `_backup_request_policy' > `_backup_request_ms'.
BackupRequestPolicy* _backup_request_policy;
// If this rpc call has retry/backup request,this var save the real timeout for current call
Expand Down
Loading