From 394acbf707ab3ce59d0e0496526728a5baf5f6dd Mon Sep 17 00:00:00 2001 From: Kittisak Phormraksa Date: Fri, 30 Aug 2024 10:42:31 +0700 Subject: [PATCH] API remove stream consumers (#12) --- internal/streams/custom.go | 29 +++++++++++++++++++++++++++++ internal/streams/streams.go | 1 + 2 files changed, 30 insertions(+) diff --git a/internal/streams/custom.go b/internal/streams/custom.go index 8233875e2..05e6bb302 100644 --- a/internal/streams/custom.go +++ b/internal/streams/custom.go @@ -56,3 +56,32 @@ func apiStreamsSpeed(w http.ResponseWriter, r *http.Request) { http.Error(w, "", http.StatusNotFound) } + +func apiStreamsRemoveConsumers(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, "", http.StatusNotFound) + return + } + + query := r.URL.Query() + + for _, streamName := range query["name"] { + if streamName == "" { + continue + } + + streamsMu.RLock() + stream := Get(streamName) + streamsMu.RUnlock() + + if stream == nil { + continue + } + + for _, consumer := range stream.consumers { + stream.RemoveConsumer(consumer) + } + } + + http.Error(w, "", http.StatusOK) +} diff --git a/internal/streams/streams.go b/internal/streams/streams.go index a772a02dc..b0e44dcda 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -31,6 +31,7 @@ func Init() { // custom api.HandleFunc("api/custom/streams/speed", apiStreamsSpeed) + api.HandleFunc("api/custom/streams.removeConsumers", apiStreamsRemoveConsumers) if cfg.Publish == nil { return