Skip to content

How to correctly pause and resume message decoding/encoding when writing custom filters? #119

Open
@yizhengx

Description

@yizhengx

I'm following the local rate limit codes to write my own custom filter.

The logic is following: when the service is being rate limited to x requests/sec and the actual input rate is larger than x, instead of sending response 'request{} is being rate limited", this filter will buffer all incoming requests and keep sending requests to the service with rate x.

Implementations: The code skeleton is similar to the local rate limit one. There is a queue maintained in the LocalRateLimiterImpl obj.

class FilterConfig : public Logger::Loggable<Logger::Id::filter> {
public:
  FilterConfig(const LocalRateLimitConfig& cfg, Stats::Scope& scope, Event::Dispatcher& dispatcher);
  ~FilterConfig() = default;

  LocalRateLimitStats& stats() { return stats_; }
  LocalRateLimiterImpl& rateLimiter() { return rate_limiter_; }

private:
  LocalRateLimitStats stats_;
  LocalRateLimiterImpl rate_limiter_;
};

class LocalRateLimit : public CodecFilter, Logger::Loggable<Logger::Id::filter> {
public:
  LocalRateLimit(std::shared_ptr<FilterConfig> filter_config) : filter_config_(filter_config){
    ENVOY_LOG(warn, "LocalRateLimit Constructor");
    has_buffered = false;
  };
  ~LocalRateLimit() override = default;

  void onDestroy() override;

  // DecoderFilter
  void setDecoderFilterCallbacks(DecoderFilterCallbacks& callbacks) override;
  FilterStatus onMessageDecoded(MetadataSharedPtr metadata, MutationSharedPtr mutation) override;

  void setEncoderFilterCallbacks(EncoderFilterCallbacks& callbacks) override;
  FilterStatus onMessageEncoded(MetadataSharedPtr, MutationSharedPtr) override;

private:
  void cleanup();

  DecoderFilterCallbacks* callbacks_{};
  EncoderFilterCallbacks* encoder_callbacks_{};

  std::shared_ptr<FilterConfig> filter_config_;
  bool has_buffered;
};

Every time a request comes in, it will be paused in OnMessageDecode() function and the callbacks will be pushed into the queue.

FilterStatus LocalRateLimit::onMessageDecoded(MetadataSharedPtr metadata, MutationSharedPtr) {
  if (has_buffered) {
    return FilterStatus::ContinueIteration;
  }
  has_buffered = true;
  filter_config_->rateLimiter().bufferRequest(callbacks_);
  return FilterStatus::PauseIteration;
}

The LocalRateLimiterImpl has a periodic timer to call TSQueue.pop() which will check some condition to resume the requests using callbacks->continueDecoding(). The queue implementation is as following:

struct QueueElement{
  DecoderFilterCallbacks* callbacks;
  std::chrono::time_point<std::chrono::system_clock> timeout;
};

class TSQueue: public Logger::Loggable<Logger::Id::filter>{
private:
    // Underlying queue
    std::queue<QueueElement> queue_;
    // mutex for thread synchronization
    mutable std::mutex mutex_;
    std::chrono::time_point<std::chrono::system_clock> last_timeout;
    std::chrono::microseconds delay;
  
public:
    TSQueue(std::chrono::microseconds delay_){
      delay = delay_;
    }

    // Pushes an element to the queue
    void push(DecoderFilterCallbacks* callbacks)
    {
      std::lock_guard<std::mutex> lock(mutex_);
      if (queue_.empty()){
        last_timeout = std::chrono::system_clock::now();
        QueueElement ele = {callbacks, std::chrono::time_point(last_timeout)};
        queue_.push(ele);
      }else{
        last_timeout += delay;
        QueueElement ele = {callbacks, std::chrono::time_point(last_timeout)};
        queue_.push(ele);
      }
    }
  
    // Pops an element off the queue
    void pop()
    {
      std::lock_guard<std::mutex> lock(mutex_);
      while (!queue_.empty())
      {
        QueueElement ele = queue_.front();
        std::chrono::time_point<std::chrono::system_clock> t = std::chrono::system_clock::now();
        if (ele.timeout < t){
          ele.callbacks->continueDecoding();
          queue_.pop();
        }else{
          break;
        }
      }
    }
};

However, when I complied it and tried to run it, it can successfully pause the first request. After it tried to resume the request, there is a thread safety error after the onMessageEncoding is called. It might be buggy to just call callbacks.continueDecoding() from a share FilterConfig.

thread-safety-bug

Any suggestions to fix this? Thanks!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions