Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/rdma_performance/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class PerformanceTest {

int Init() {
brpc::ChannelOptions options;
options.use_rdma = FLAGS_use_rdma;
options.socket_mode = FLAGS_use_rdma? brpc::SOCKET_MODE_RDMA : brpc::SOCKET_MODE_TCP;
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_rpc_timeout_ms;
Expand Down
2 changes: 1 addition & 1 deletion example/rdma_performance/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ int main(int argc, char* argv[]) {
g_last_time.store(0, butil::memory_order_relaxed);

brpc::ServerOptions options;
options.use_rdma = FLAGS_use_rdma;
options.socket_mode = FLAGS_use_rdma? brpc::SOCKET_MODE_RDMA : brpc::SOCKET_MODE_TCP;
if (server.Start(FLAGS_port, &options) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
Expand Down
16 changes: 4 additions & 12 deletions src/brpc/acceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
#include "butil/fd_guard.h" // fd_guard
#include "butil/fd_utility.h" // make_close_on_exec
#include "butil/time.h" // gettimeofday_us
#include "brpc/rdma/rdma_endpoint.h"
#include "brpc/acceptor.h"
#include "brpc/transport_factory.h"


namespace brpc {
Expand All @@ -40,7 +40,7 @@ Acceptor::Acceptor(bthread_keytable_pool_t* pool)
, _empty_cond(&_map_mutex)
, _force_ssl(false)
, _ssl_ctx(NULL)
, _use_rdma(false)
, socket_mode(SOCKET_MODE_TCP)
, _bthread_tag(BTHREAD_TAG_DEFAULT) {
}

Expand Down Expand Up @@ -282,18 +282,10 @@ void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
options.fd = in_fd;
butil::sockaddr2endpoint(&in_addr, in_len, &options.remote_side);
options.user = acception->user();
options.need_on_edge_trigger = true;
options.force_ssl = am->_force_ssl;
options.initial_ssl_ctx = am->_ssl_ctx;
#if BRPC_WITH_RDMA
if (am->_use_rdma) {
options.on_edge_triggered_events = rdma::RdmaEndpoint::OnNewDataFromTcp;
} else {
#else
{
#endif
options.on_edge_triggered_events = InputMessenger::OnNewMessages;
}
options.use_rdma = am->_use_rdma;
options.socket_mode = am->socket_mode;
options.bthread_tag = am->_bthread_tag;
if (Socket::Create(options, &socket_id) != 0) {
LOG(ERROR) << "Fail to create Socket";
Expand Down
5 changes: 3 additions & 2 deletions src/brpc/acceptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "butil/synchronization/condition_variable.h"
#include "butil/containers/flat_map.h"
#include "brpc/input_messenger.h"
#include "brpc/socket_mode.h"


namespace brpc {
Expand Down Expand Up @@ -110,8 +111,8 @@ friend class Server;
bool _force_ssl;
std::shared_ptr<SocketSSLContext> _ssl_ctx;

// Whether to use rdma or not
bool _use_rdma;
// Choose to use a certain socket: 0 TCP, 1 RDMA
SocketMode socket_mode;

// Acceptor belongs to this tag
bthread_tag_t _bthread_tag;
Expand Down
38 changes: 8 additions & 30 deletions src/brpc/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "brpc/details/usercode_backup_pool.h" // TooManyUserCode
#include "brpc/rdma/rdma_helper.h"
#include "brpc/policy/esp_authenticator.h"
#include "brpc/transport_factory.h"

namespace brpc {

Expand All @@ -60,7 +61,7 @@ ChannelOptions::ChannelOptions()
, connection_type(CONNECTION_TYPE_UNKNOWN)
, succeed_without_server(true)
, log_succeed_without_server(true)
, use_rdma(false)
, socket_mode(SOCKET_MODE_TCP)
, auth(NULL)
, backup_request_policy(NULL)
, retry_policy(NULL)
Expand Down Expand Up @@ -120,7 +121,7 @@ static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) {
} else {
// All disabled ChannelSSLOptions are the same
}
if (opt.use_rdma) {
if (opt.socket_mode == SOCKET_MODE_RDMA) {
buf.append("|rdma");
}
butil::MurmurHash3_x64_128_Update(&mm_ctx, buf.data(), buf.size());
Expand Down Expand Up @@ -163,20 +164,6 @@ Channel::~Channel() {
}
}

#if BRPC_WITH_RDMA
static bool OptionsAvailableForRdma(const ChannelOptions* opt) {
if (opt->has_ssl_options()) {
LOG(WARNING) << "Cannot use SSL and RDMA at the same time";
return false;
}
if (!rdma::SupportedByRdma(opt->protocol.name())) {
LOG(WARNING) << "Cannot use " << opt->protocol.name()
<< " over RDMA";
return false;
}
return true;
}
#endif

int Channel::InitChannelOptions(const ChannelOptions* options) {
if (options) { // Override default options if user provided one.
Expand All @@ -191,19 +178,10 @@ int Channel::InitChannelOptions(const ChannelOptions* options) {
_options.hc_option.health_check_path = FLAGS_health_check_path;
_options.hc_option.health_check_timeout_ms = FLAGS_health_check_timeout_ms;
}
if (_options.use_rdma) {
#if BRPC_WITH_RDMA
if (!OptionsAvailableForRdma(&_options)) {
return -1;
}
rdma::GlobalRdmaInitializeOrDie();
if (!rdma::InitPollingModeWithTag(bthread_self_tag())) {
return -1;
}
#else
LOG(WARNING) << "Cannot use rdma since brpc does not compile with rdma";
auto ret = TransportFactory::ContextInitOrDie(_options.socket_mode, false, &_options);
if (ret != 0) {
LOG(ERROR) << "Fail to initialize transport context for channel, ret=" << ret;
return -1;
#endif
}

_serialize_request = protocol->serialize_request;
Expand Down Expand Up @@ -369,7 +347,7 @@ int Channel::InitSingle(const butil::EndPoint& server_addr_and_port,
return -1;
}
if (SocketMapInsert(SocketMapKey(server_addr_and_port, sig),
&_server_id, ssl_ctx, _options.use_rdma, _options.hc_option) != 0) {
&_server_id, ssl_ctx, _options.socket_mode, _options.hc_option) != 0) {
LOG(ERROR) << "Fail to insert into SocketMap";
return -1;
}
Expand Down Expand Up @@ -406,7 +384,7 @@ int Channel::Init(const char* ns_url,
GetNamingServiceThreadOptions ns_opt;
ns_opt.succeed_without_server = _options.succeed_without_server;
ns_opt.log_succeed_without_server = _options.log_succeed_without_server;
ns_opt.use_rdma = _options.use_rdma;
ns_opt.socket_mode = _options.socket_mode;
ns_opt.channel_signature = ComputeChannelSignature(_options);
ns_opt.hc_option = _options.hc_option;
if (CreateSocketSSLContext(_options, &ns_opt.ssl_ctx) != 0) {
Expand Down
7 changes: 4 additions & 3 deletions src/brpc/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "brpc/backup_request_policy.h"
#include "brpc/naming_service_filter.h"
#include "brpc/health_check_option.h"
#include "brpc/socket_mode.h"

namespace brpc {

Expand Down Expand Up @@ -105,9 +106,9 @@ struct ChannelOptions {
const ChannelSSLOptions& ssl_options() const { return *_ssl_options; }
ChannelSSLOptions* mutable_ssl_options();

// Let this channel use rdma rather than tcp.
// Default: false
bool use_rdma;
// Let this channel Choose to use a certain socket: 0 SOCKET_MODE_TCP, 1 SOCKET_MODE_RDMA.
// Default: SOCKET_MODE_TCP
SocketMode socket_mode;

// Turn on authentication for this channel if `auth' is not NULL.
// Note `auth' will not be deleted by channel and must remain valid when
Expand Down
2 changes: 1 addition & 1 deletion src/brpc/details/naming_service_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void NamingServiceThread::Actions::ResetServers(
// to pick those Sockets with the right settings during OnAddedServers
const SocketMapKey key(_added[i], _owner->_options.channel_signature);
CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id, _owner->_options.ssl_ctx,
_owner->_options.use_rdma, _owner->_options.hc_option));
_owner->_options.socket_mode, _owner->_options.hc_option));
_added_sockets.push_back(tagged_id);
}

Expand Down
7 changes: 4 additions & 3 deletions src/brpc/details/naming_service_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "brpc/naming_service.h" // NamingService
#include "brpc/naming_service_filter.h" // NamingServiceFilter
#include "brpc/socket_map.h"
#include "brpc/socket_mode.h"

namespace brpc {

Expand All @@ -45,11 +46,11 @@ struct GetNamingServiceThreadOptions {
GetNamingServiceThreadOptions()
: succeed_without_server(false)
, log_succeed_without_server(true)
, use_rdma(false) {}
, socket_mode(SOCKET_MODE_TCP) {}

bool succeed_without_server;
bool log_succeed_without_server;
bool use_rdma;
SocketMode socket_mode;
HealthCheckOption hc_option;
ChannelSignature channel_signature;
std::shared_ptr<SocketSSLContext> ssl_ctx;
Expand Down
1 change: 1 addition & 0 deletions src/brpc/input_message_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class InputMessageBase : public Destroyable {
friend class InputMessenger;
friend void* ProcessInputMessage(void*);
friend class Stream;
friend class Transport;
int64_t _received_us;
int64_t _base_real_us;
SocketUniquePtr _socket;
Expand Down
69 changes: 11 additions & 58 deletions src/brpc/input_messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "brpc/protocol.h" // ListProtocols
#include "brpc/rdma/rdma_endpoint.h"
#include "brpc/input_messenger.h"

#include "brpc/transport_factory.h"

namespace brpc {

Expand Down Expand Up @@ -112,8 +112,7 @@ ParseResult InputMessenger::CutInputMessage(
// The length of `data' must be PROTO_DUMMY_LEN + 1 to store extra ending char '\0'
char data[PROTO_DUMMY_LEN + 1];
m->_read_buf.copy_to_cstr(data, PROTO_DUMMY_LEN);
if (strncmp(data, "RDMA", PROTO_DUMMY_LEN) == 0 &&
m->_rdma_state == Socket::RDMA_OFF) {
if (strncmp(data, "RDMA", PROTO_DUMMY_LEN) == 0) {
// To avoid timeout when client uses RDMA but server uses TCP
return MakeParseError(PARSE_ERROR_TRY_OTHERS);
}
Expand Down Expand Up @@ -191,46 +190,13 @@ struct RunLastMessage {
}
};

static void QueueMessage(InputMessageBase* to_run_msg,
int* num_bthread_created,
bthread_keytable_pool_t* keytable_pool) {
if (!to_run_msg) {
return;
}

#if BRPC_WITH_RDMA
if (rdma::FLAGS_rdma_disable_bthread) {
ProcessInputMessage(to_run_msg);
return;
}
#endif
// Create bthread for last_msg. The bthread is not scheduled
// until bthread_flush() is called (in the worse case).

// TODO(gejun): Join threads.
bthread_t th;
bthread_attr_t tmp = (FLAGS_usercode_in_pthread ?
BTHREAD_ATTR_PTHREAD :
BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
tmp.keytable_pool = keytable_pool;
tmp.tag = bthread_self_tag();
bthread_attr_set_name(&tmp, "ProcessInputMessage");

if (!FLAGS_usercode_in_coroutine && bthread_start_background(
&th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
++*num_bthread_created;
} else {
ProcessInputMessage(to_run_msg);
}
}

InputMessenger::InputMessageClosure::~InputMessageClosure() noexcept(false) {
InputMessageClosure::~InputMessageClosure() noexcept(false) {
if (_msg) {
ProcessInputMessage(_msg);
}
}

void InputMessenger::InputMessageClosure::reset(InputMessageBase* m) {
void InputMessageClosure::reset(InputMessageBase* m) {
if (_msg) {
ProcessInputMessage(_msg);
}
Expand Down Expand Up @@ -303,7 +269,7 @@ int InputMessenger::ProcessNewMessage(
// This unique_ptr prevents msg to be lost before transfering
// ownership to last_msg
DestroyingPtr<InputMessageBase> msg(pr.message());
QueueMessage(last_msg.release(), &num_bthread_created, m->_keytable_pool);
m->_transport->QueueMessage(last_msg, &num_bthread_created, false);
if (_handlers[index].process == NULL) {
LOG(ERROR) << "process of index=" << index << " is NULL";
continue;
Expand Down Expand Up @@ -336,22 +302,19 @@ int InputMessenger::ProcessNewMessage(
// Transfer ownership to last_msg
last_msg.reset(msg.release());
} else {
QueueMessage(msg.release(), &num_bthread_created,
m->_keytable_pool);
last_msg.reset(msg.release());
m->_transport->QueueMessage(last_msg, &num_bthread_created, false);
bthread_flush();
num_bthread_created = 0;
}
}
#if BRPC_WITH_RDMA
// In RDMA polling mode, all messages must be executed in a new bthread and
// not in the bthread where the polling bthread is located, because the
// method for processing messages may call synchronization primitives,
// causing the polling bthread to be scheduled out.
if (rdma::FLAGS_rdma_use_polling) {
QueueMessage(last_msg.release(), &num_bthread_created,
m->_keytable_pool);
if (m->_socket_mode == SOCKET_MODE_RDMA) {
m->_transport->QueueMessage(last_msg, &num_bthread_created, true);
}
#endif
if (num_bthread_created) {
bthread_flush();
}
Expand Down Expand Up @@ -414,8 +377,7 @@ void InputMessenger::OnNewMessages(Socket* m) {
}
}

if (m->_rdma_state == Socket::RDMA_OFF && messenger->ProcessNewMessage(
m, nr, read_eof, received_us, base_realtime, last_msg) < 0) {
if (messenger->ProcessNewMessage(m, nr, read_eof, received_us, base_realtime, last_msg) < 0) {
return;
}
}
Expand Down Expand Up @@ -533,16 +495,7 @@ int InputMessenger::Create(const butil::EndPoint& remote_side,

int InputMessenger::Create(SocketOptions options, SocketId* id) {
options.user = this;
#if BRPC_WITH_RDMA
if (options.use_rdma) {
options.on_edge_triggered_events = rdma::RdmaEndpoint::OnNewDataFromTcp;
options.app_connect = std::make_shared<rdma::RdmaConnect>();
} else {
#else
{
#endif
options.on_edge_triggered_events = OnNewMessages;
}
options.need_on_edge_trigger = true;
// Enable keepalive by options or Gflag.
// Priority: options > Gflag.
if (options.keepalive_options || FLAGS_socket_keepalive) {
Expand Down
Loading