Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 129 additions & 97 deletions src/Client/Connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,24 @@ struct ConnectionImpl
void ref();
void unref();

typename NetProvider::Stream_t &get_strm() { return strm; }
const typename NetProvider::Stream_t &get_strm() const { return strm; }

void setError(const std::string &msg, int errno_ = 0);
bool hasError() const;

size_t getFutureCount() const;

BUFFER &getInBuf();
BUFFER &getOutBuf();

void prepare_auth(std::string_view user, std::string_view passwd);
void commit_auth(std::string_view user, std::string_view passwd);

Connector<BUFFER, NetProvider> &connector;
BUFFER inBuf;
static constexpr size_t GC_STEP_CNT = 100;
size_t gc_step = 0;
BUFFER outBuf;
RequestEncoder<BUFFER> enc;
ResponseDecoder<BUFFER> dec;
Expand Down Expand Up @@ -111,8 +127,8 @@ template<class BUFFER, class NetProvider>
ConnectionImpl<BUFFER, NetProvider>::~ConnectionImpl()
{
assert(refs == 0);
if (!strm.has_status(SS_DEAD)) {
connector.close(*this);
if (strm.is_open()) {
connector.close(this);
}
}

Expand All @@ -133,6 +149,56 @@ ConnectionImpl<BUFFER, NetProvider>::unref()
delete this;
}

template <class BUFFER, class NetProvider>
void
ConnectionImpl<BUFFER, NetProvider>::setError(const std::string &msg, int errno_)
{
error.emplace(msg, errno_);
}

template <class BUFFER, class NetProvider>
bool
ConnectionImpl<BUFFER, NetProvider>::hasError() const
{
return error.has_value();
}

template <class BUFFER, class NetProvider>
size_t
ConnectionImpl<BUFFER, NetProvider>::getFutureCount() const
{
return futures.size();
}

template <class BUFFER, class NetProvider>
BUFFER &
ConnectionImpl<BUFFER, NetProvider>::getInBuf()
{
return inBuf;
}

template <class BUFFER, class NetProvider>
BUFFER &
ConnectionImpl<BUFFER, NetProvider>::getOutBuf()
{
return outBuf;
}

template <class BUFFER, class NetProvider>
void
ConnectionImpl<BUFFER, NetProvider>::prepare_auth(std::string_view user, std::string_view passwd)
{
enc.encodeAuth(user, passwd, greeting);
}

template <class BUFFER, class NetProvider>
void
ConnectionImpl<BUFFER, NetProvider>::commit_auth(std::string_view user, std::string_view passwd)
{
enc.reencodeAuth(user, passwd, greeting);
connector.readyToSend(this);
}

/** Each connection is supposed to be bound to a single socket. */
template<class BUFFER, class NetProvider>
class Connection
Expand Down Expand Up @@ -212,44 +278,11 @@ class Connection
BUFFER& getInBuf();
BUFFER& getOutBuf();

template<class B, class N>
friend
void hasSentBytes(Connection<B, N> &conn, size_t bytes);

template<class B, class N>
friend
void hasNotRecvBytes(Connection<B, N> &conn, size_t bytes);

template<class B, class N>
friend
bool hasDataToSend(Connection<B, N> &conn);

template<class B, class N>
friend
bool hasDataToDecode(Connection<B, N> &conn);

template<class B, class N>
friend
enum DecodeStatus processResponse(Connection<B, N> &conn, int req_sync, Response<B> *result);

template<class B, class N>
friend
void inputBufGC(Connection<B, N> &conn);

template<class B, class N>
friend
int decodeGreeting(Connection<B, N> &conn);

rid_t prepare_auth(std::string_view user,
std::string_view passwd);

rid_t commit_auth(std::string_view user,
std::string_view passwd);
void prepare_auth(std::string_view user, std::string_view passwd);
void commit_auth(std::string_view user, std::string_view passwd);

private:
ConnectionImpl<BUFFER, NetProvider> *impl;
static constexpr size_t GC_STEP_CNT = 100;
size_t gc_step = 0;

template <class T>
rid_t insert(const T &tuple, uint32_t space_id);
Expand Down Expand Up @@ -435,21 +468,21 @@ template<class BUFFER, class NetProvider>
size_t
Connection<BUFFER, NetProvider>::getFutureCount() const
{
return impl->futures.size();
return impl->getFutureCount();
}

template<class BUFFER, class NetProvider>
void
Connection<BUFFER, NetProvider>::setError(const std::string &msg, int errno_)
{
impl->error.emplace(msg, errno_);
impl->setError(msg, errno_);
}

template<class BUFFER, class NetProvider>
bool
Connection<BUFFER, NetProvider>::hasError() const
{
return impl->error.has_value();
return impl->hasError();
}

template<class BUFFER, class NetProvider>
Expand All @@ -471,73 +504,79 @@ template<class BUFFER, class NetProvider>
BUFFER&
Connection<BUFFER, NetProvider>::getInBuf()
{
return impl->inBuf;
return impl->getInBuf();
}

template<class BUFFER, class NetProvider>
BUFFER&
Connection<BUFFER, NetProvider>::getOutBuf()
{
return impl->outBuf;
return impl->getOutBuf();
}

template<class BUFFER, class NetProvider>
template <class BUFFER, class NetProvider>
void
hasSentBytes(Connection<BUFFER, NetProvider> &conn, size_t bytes)
hasSentBytes(ConnectionImpl<BUFFER, NetProvider> *conn, size_t bytes)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about moving all these functions to ConnectionImpl?

  1. It's weird to see non-method functions here at all.
  2. These utility functions will be hidden in ConnectionImpl so that only advanced user will see them (only implementor of its own NetProvider will work with ConnectionImpl directly.

{
//dropBack()/dropFront() interfaces require number of bytes be greater
//than zero so let's check it first.
if (bytes > 0)
conn.impl->outBuf.dropFront(bytes);
conn->getOutBuf().dropFront(bytes);
}

template<class BUFFER, class NetProvider>
template <class BUFFER, class NetProvider>
void
hasNotRecvBytes(Connection<BUFFER, NetProvider> &conn, size_t bytes)
hasNotRecvBytes(ConnectionImpl<BUFFER, NetProvider> *conn, size_t bytes)
{
if (bytes > 0)
conn.impl->inBuf.dropBack(bytes);
conn->getInBuf().dropBack(bytes);
}

template<class BUFFER, class NetProvider>
template <class BUFFER, class NetProvider>
bool
hasDataToSend(Connection<BUFFER, NetProvider> &conn)
hasDataToSend(ConnectionImpl<BUFFER, NetProvider> *conn)
{
//We drop content of input buffer once it has been sent. So to detect
//if there's any data to send it's enough to check buffer's emptiness.
return !conn.impl->outBuf.empty();
return !conn->getOutBuf().empty();
}

template<class BUFFER, class NetProvider>
template <class BUFFER, class NetProvider>
bool
hasDataToDecode(Connection<BUFFER, NetProvider> &conn)
{
assert(conn.impl->endDecoded < conn.impl->inBuf.end() ||
conn.impl->endDecoded == conn.impl->inBuf.end());
return conn.impl->endDecoded != conn.impl->inBuf.end();
return hasDataToDecode(conn.getImpl());
}

template<class BUFFER, class NetProvider>
template <class BUFFER, class NetProvider>
bool
hasDataToDecode(ConnectionImpl<BUFFER, NetProvider> *conn)
{
assert(conn->endDecoded < conn->getInBuf().end() || conn->endDecoded == conn->getInBuf().end());
return conn->endDecoded != conn->getInBuf().end();
}

template <class BUFFER, class NetProvider>
static void
inputBufGC(Connection<BUFFER, NetProvider> &conn)
inputBufGC(ConnectionImpl<BUFFER, NetProvider> *conn)
{
if ((conn.gc_step++ % Connection<BUFFER, NetProvider>::GC_STEP_CNT) == 0) {
LOG_DEBUG("Flushed input buffer of the connection %p", &conn);
conn.impl->inBuf.flush();
if (conn->gc_step++ % ConnectionImpl<BUFFER, NetProvider>::GC_STEP_CNT == 0) {
LOG_DEBUG("Flushed input buffer of the connection %p", conn);
conn->getInBuf().flush();
}
}

template<class BUFFER, class NetProvider>
template <class BUFFER, class NetProvider>
DecodeStatus
processResponse(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BUFFER> *result)
processResponse(ConnectionImpl<BUFFER, NetProvider> *conn, int req_sync, Response<BUFFER> *result)
{
//Decode response. In case of success - fill in feature map
//and adjust end-of-decoded data pointer. Call GC if needed.
if (! conn.impl->inBuf.has(conn.impl->endDecoded, MP_RESPONSE_SIZE))
if (!conn->getInBuf().has(conn->endDecoded, MP_RESPONSE_SIZE))
return DECODE_NEEDMORE;

Response<BUFFER> response;
response.size = conn.impl->dec.decodeResponseSize();
response.size = conn->dec.decodeResponseSize();
if (response.size < 0) {
LOG_ERROR("Failed to decode response size");
//In case of corrupted response size all other data in the buffer
Expand All @@ -548,55 +587,53 @@ processResponse(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BU

}
response.size += MP_RESPONSE_SIZE;
if (! conn.impl->inBuf.has(conn.impl->endDecoded, response.size)) {
if (!conn->getInBuf().has(conn->endDecoded, response.size)) {
//Response was received only partially. Reset decoder position
//to the start of response to make this function re-entered.
conn.impl->dec.reset(conn.impl->endDecoded);
conn->dec.reset(conn->endDecoded);
return DECODE_NEEDMORE;
}
if (conn.impl->dec.decodeResponse(response) != 0) {
conn.setError("Failed to decode response, skipping bytes..");
conn.impl->endDecoded += response.size;
if (conn->dec.decodeResponse(response) != 0) {
conn->setError("Failed to decode response, skipping bytes..");
conn->endDecoded += response.size;
return DECODE_ERR;
}
LOG_DEBUG("Header: sync=", response.header.sync, ", code=",
response.header.code, ", schema=", response.header.schema_id);
if (result != nullptr && response.header.sync == req_sync) {
*result = std::move(response);
} else {
conn.impl->futures.insert({response.header.sync,
std::move(response)});
conn->futures.insert({response.header.sync, std::move(response)});
}
conn.impl->endDecoded += response.size;
conn->endDecoded += response.size;
inputBufGC(conn);
return DECODE_SUCC;
}

template<class BUFFER, class NetProvider>
template <class BUFFER, class NetProvider>
int
decodeGreeting(Connection<BUFFER, NetProvider> &conn)
decodeGreeting(ConnectionImpl<BUFFER, NetProvider> *conn)
{
//TODO: that's not zero-copy, should be rewritten in that pattern.
assert(conn.getInBuf().has(conn.impl->endDecoded, Iproto::GREETING_SIZE));
assert(conn->getInBuf().has(conn->endDecoded, Iproto::GREETING_SIZE));
char greeting_buf[Iproto::GREETING_SIZE];
conn.impl->endDecoded.read({greeting_buf, sizeof(greeting_buf)});
conn.impl->dec.reset(conn.impl->endDecoded);
if (parseGreeting(std::string_view{greeting_buf, Iproto::GREETING_SIZE},
conn.impl->greeting) != 0)
conn->endDecoded.read({greeting_buf, sizeof(greeting_buf)});
conn->dec.reset(conn->endDecoded);
if (parseGreeting(std::string_view {greeting_buf, Iproto::GREETING_SIZE}, conn->greeting) != 0)
return -1;
conn.impl->is_greeting_received = true;
LOG_DEBUG("Version: ", conn.impl->greeting.version_id);
conn->is_greeting_received = true;
LOG_DEBUG("Version: ", conn->greeting.version_id);

#ifndef NDEBUG
//print salt in hex format.
char hex_salt[Iproto::MAX_SALT_SIZE * 2 + 1];
const char *hex = "0123456789abcdef";
for (size_t i = 0; i < conn.impl->greeting.salt_size; i++) {
uint8_t u = conn.impl->greeting.salt[i];
for (size_t i = 0; i < conn->greeting.salt_size; i++) {
uint8_t u = conn->greeting.salt[i];
hex_salt[i * 2] = hex[u / 16];
hex_salt[i * 2 + 1] = hex[u % 16];
}
hex_salt[conn.impl->greeting.salt_size * 2] = 0;
hex_salt[conn->greeting.salt_size * 2] = 0;
LOG_DEBUG("Salt: ", hex_salt);
#endif
return 0;
Expand Down Expand Up @@ -717,21 +754,16 @@ Connection<BUFFER, NetProvider>::select(const T &key, uint32_t space_id,
return impl->enc.getSync();
}

template<class BUFFER, class NetProvider>
rid_t
Connection<BUFFER, NetProvider>::prepare_auth(std::string_view user,
std::string_view passwd)
template <class BUFFER, class NetProvider>
void
Connection<BUFFER, NetProvider>::prepare_auth(std::string_view user, std::string_view passwd)
{
impl->enc.encodeAuth(user, passwd, impl->greeting);
return 0;
impl->prepare_auth(user, passwd);
}

template<class BUFFER, class NetProvider>
rid_t
Connection<BUFFER, NetProvider>::commit_auth(std::string_view user,
std::string_view passwd)
template <class BUFFER, class NetProvider>
void
Connection<BUFFER, NetProvider>::commit_auth(std::string_view user, std::string_view passwd)
{
impl->enc.reencodeAuth(user, passwd, impl->greeting);;
impl->connector.readyToSend(*this);
return 0;
impl->commit_auth(user, passwd);
}
Loading
Loading