Skip to content

Dropped first frame on failed send #659

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
diehard2 opened this issue May 10, 2025 · 3 comments
Open

Dropped first frame on failed send #659

diehard2 opened this issue May 10, 2025 · 3 comments

Comments

@diehard2
Copy link

diehard2 commented May 10, 2025

I'm experiencing an unusual issue. Under high load, I'm seeing the first frame of a message that isn't the routing id sometimes vanish. I'm doing a blocking synchronous client using zmq_immediate in conjunction with zmq_sndtimeo. So, if the message can't be sent it returns false after a few tries. Something like this

bool cppzmq_client(zmq::context_t* context, bool dowhile)
{

  const int recvtimeout = 4000;
  static std::atomic<int> id = 0;

  zmq::socket_t m_socket = {*context, ZMQ_DEALER};
  m_socket.set(zmq::sockopt::sndhwm, 0);
  m_socket.set(zmq::sockopt::routing_id, std::to_string(id++));
  m_socket.set(zmq::sockopt::immediate, 1);
  m_socket.set(zmq::sockopt::rcvtimeo, recvtimeout);

  zmq::multipart_t message;
  message.pushstr({});
  message.push_back(zmq::message_t{std::string{"stuff"}});
  message.push_back(zmq::message_t{std::string{"more stuff"}});
  message.push_back(zmq::message_t{std::string{"even more stuff"}});

  bool succeeded = false;
  const int sendtimeout = 1000;
  m_socket.set(zmq::sockopt::sndtimeo, sendtimeout);
  m_socket.connect("ipc://@/zmq-client");
  int retry_count = 0;
  do {
    try {
       succeeded = message.send(m_socket);
      } catch (const std::exception& e) {
        if (zmq_errno() != EINTR && zmq_errno() != EAGAIN) {
          throw;
        }
      }
      retry_count++;
      std::cout << "Retry count: " << retry_count << std::endl;
    } while (!succeeded && retry_count < 20);
    
  if(!succeeded) {
    std::cerr << "Failed to send message to broker: " << zmq_strerror(zmq_errno()) << std::endl;
    return false;
  }

  zmq::multipart_t result;
  bool read_succeeded = result.recv(m_socket);
  return read_succeeded && result.peekstr(0) != "MISSING_FRAME";
}

In the send() function, its popping off the first part of the message, but it never restores the first message to the mulitpart_t if the read fails, which leads to retries failing

 bool send(socket_ref socket, int flags = 0)
    {
        flags &= ~(ZMQ_SNDMORE);
        bool more = size() > 0;
        while (more) {
            message_t message = pop();
            more = size() > 0;
#ifdef ZMQ_CPP11
            if (!socket.send(message, static_cast<send_flags>(
                                        (more ? ZMQ_SNDMORE : 0) | flags)))
                return false;
#else
            if (!socket.send(message, (more ? ZMQ_SNDMORE : 0) | flags))
                return false;
#endif
        }
        clear();
        return true;
    }

Would it be possible to restore the popped message so a retry can occur?

@diehard2 diehard2 changed the title Sporadic dropped first frame from client Dropped first frame on failed send May 11, 2025
@gummif
Copy link
Member

gummif commented May 12, 2025

I would suggest using zmq::recv_multipart and zmq::send_multipart. See if you problems go away. But you are right, that is clearly a bug.

@diehard2
Copy link
Author

Thanks @gummif. I think there's some subtle issues with zmq::send_multipart also. The rvalue reference strongly suggests I should use it as

zmq::send_multipart(m_socket, std::move(message));

in which case I can't retry since I've lost the message. However, despite the rvalue signature, I could do

zmq::send_multipart(m_socket, message);

and if I receive an unset optional response I would be able to safely retry. However, its possible to get an unset optional response in the following scenario if one isn't in debug mode

  1. first message sends
  2. second send fails

I think the following would allow for the ability to gracefully resend.

send_result_t
send_multipart(socket_ref s, Range &msgs, send_flags flags = send_flags::none)
{
    using std::begin;
    using std::end;
    auto it = begin(msgs);
    const auto end_it = end(msgs);
    size_t msg_count = 0;
    while (it != end_it) {
        const auto next = std::next(it);
        const auto msg_flags =
          flags | (next == end_it ? send_flags::none : send_flags::sndmore);
        if (!s.send(*it, msg_flags)) {
            // zmq ensures atomic delivery of messages
            if(it == begin(msgs)) {
             return {};
            }
            throw some_zmq_error_code;
        }
        ++msg_count;
        it = next;
    }
    return msg_count;
}


@gummif
Copy link
Member

gummif commented May 12, 2025

The Range&& is a universal reference, so any const or ref qualifiers are valid.

The case you mention is guaranteed by libzmq, so it would be a bug in the API if that would happen. But might be pretty cheap to check as a sanity check.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants