-
Notifications
You must be signed in to change notification settings - Fork 53
Enh/add reactions #1565
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Enh/add reactions #1565
Changes from all commits
6d4277b
f16b1b2
e10d5b9
bcafd91
bfefdc3
8e57dee
fff7704
94d51c9
d9a9261
3eed1a9
c06c2f8
ec6e2a3
e0d0722
878fec4
1597514
e45b5c4
c92b347
e69c80e
07da05b
d2e10c9
7408bbd
dabe712
d803fb2
bec3075
7dcd7de
03faf0c
e602b32
5a25136
1d39bf0
3f52e95
4c02557
b905cdb
b3b6957
f84d02a
297bd09
09733ab
ddaab5b
377084f
a6c16d7
c4d58b1
9d4356f
8fbd965
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,356 @@ | ||
| package api | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "errors" | ||
| "net/http" | ||
| "slices" | ||
| "strconv" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/TUM-Dev/gocast/dao" | ||
| "github.com/TUM-Dev/gocast/model" | ||
| "github.com/TUM-Dev/gocast/tools" | ||
| "github.com/TUM-Dev/gocast/tools/realtime" | ||
| "github.com/getsentry/sentry-go" | ||
| "github.com/gin-gonic/gin" | ||
| ) | ||
|
|
||
| type StreamReactionRoutes struct { | ||
| dao.DaoWrapper | ||
| } | ||
|
|
||
| // TODO: This can be modified to allow different reactions for different streams | ||
| func (r StreamReactionRoutes) allowedReactions(c *gin.Context) { | ||
| c.JSON(http.StatusOK, tools.Cfg.AllowedReactions) | ||
| } | ||
|
|
||
| func (r StreamReactionRoutes) addReaction(c *gin.Context) { | ||
| tumLiveContext := c.MustGet("TUMLiveContext").(tools.TUMLiveContext) | ||
| user := tumLiveContext.User | ||
| stream := tumLiveContext.Stream | ||
|
|
||
| if stream == nil { | ||
| _ = c.Error(tools.RequestError{ | ||
| Status: http.StatusNotFound, | ||
| CustomMessage: "stream not found", | ||
| }) | ||
| return | ||
| } | ||
|
|
||
| course, err := r.DaoWrapper.CoursesDao.GetCourseById(c, stream.CourseID) | ||
|
|
||
| if user == nil || err != nil { | ||
| _ = c.Error(tools.RequestError{ | ||
| Status: http.StatusInternalServerError, | ||
| CustomMessage: "user or course not found", | ||
| }) | ||
| return | ||
| } | ||
|
|
||
| if !user.IsEligibleToWatchCourse(course) { | ||
| _ = c.Error(tools.RequestError{ | ||
| Status: http.StatusForbidden, | ||
| CustomMessage: "user not eligible to watch course", | ||
| }) | ||
| return | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. optional: these three conditionals seem like they could be a separate function that could be reused?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will leave them there for now and they can be extracted if we need them somewhere else |
||
|
|
||
| type reactionRequest struct { | ||
| Reaction string `json:"reaction"` | ||
| } | ||
|
|
||
| var reaction reactionRequest | ||
| if err := c.ShouldBindJSON(&reaction); err != nil { | ||
| _ = c.Error(tools.RequestError{ | ||
| Status: http.StatusBadRequest, | ||
| CustomMessage: "can not bind body", | ||
| Err: err, | ||
| }) | ||
| return | ||
| } | ||
|
|
||
| // This can be modified to allow different reactions for different streams | ||
| if !slices.Contains(tools.Cfg.AllowedReactions, reaction.Reaction) { | ||
| _ = c.Error(tools.RequestError{ | ||
| Status: http.StatusBadRequest, | ||
| CustomMessage: "reaction not allowed", | ||
| }) | ||
| return | ||
| } | ||
|
|
||
| lastReaction, _ := r.DaoWrapper.StreamReactionDao.GetLastReactionOfUser(c, user.ID) | ||
| // This contains the cooldown logic, to change this value change the time.Duration(10) to the desired cooldown time | ||
| if lastReaction.Reaction != "" && lastReaction.CreatedAt.Add(time.Duration(10)*time.Second).After(time.Now()) { | ||
| _ = c.Error(tools.RequestError{ | ||
| Status: http.StatusTooManyRequests, | ||
| CustomMessage: "cooldown not over", | ||
| }) | ||
| return | ||
| } | ||
|
|
||
| reactionObj := model.StreamReaction{ | ||
| Reaction: reaction.Reaction, | ||
| StreamID: stream.ID, | ||
| UserID: user.ID, | ||
| } | ||
|
|
||
| err = r.DaoWrapper.StreamReactionDao.Create(c, &reactionObj) | ||
| if err != nil { | ||
| _ = c.Error(tools.RequestError{ | ||
| Status: http.StatusInternalServerError, | ||
| CustomMessage: "can not create reaction", | ||
| Err: err, | ||
| }) | ||
| return | ||
| } | ||
| NotifyAdminsOnReaction(stream.ID, reaction.Reaction) | ||
| c.JSON(http.StatusOK, "") | ||
| } | ||
|
|
||
| // The part below is used for Realtime Connection to the client | ||
|
|
||
| const ( | ||
| ReactionUpdateRoomName = "reaction-update" | ||
| ) | ||
|
|
||
| var ( | ||
| liveReactionListenerMutex sync.RWMutex | ||
| liveReactionListener = map[uint]*liveReactionAdminSessionsWrapper{} | ||
| ) | ||
|
|
||
| type liveReactionAdminSessionsWrapper struct { | ||
| sessions []*realtime.Context | ||
| stream uint | ||
| } | ||
|
|
||
| func RegisterReactionUpdateRealtimeChannel() { | ||
| RealtimeInstance.RegisterChannel(ReactionUpdateRoomName, realtime.ChannelHandlers{ | ||
| OnSubscribe: reactionUpdateOnSubscribe, | ||
| OnUnsubscribe: reactionUpdateOnUnsubscribe, | ||
| OnMessage: reactionUpdateSetStream, | ||
| }) | ||
|
|
||
| go func() { | ||
| // Notify admins every 5 seconds | ||
| logger.Info("Starting periodic notification of reaction percentages") | ||
| for { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this a leaking goroutine? You never end it or cancel it via a context?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be ok as this is only started once at the initialize and it will be stopped when the program stops. |
||
| time.Sleep(5 * time.Second) | ||
| NotifyAdminsOnReactionPercentages(context.Background()) | ||
| } | ||
| }() | ||
| } | ||
|
|
||
| func reactionUpdateOnUnsubscribe(psc *realtime.Context) { | ||
| logger.Debug("Unsubscribing from reaction Update") | ||
| ctx, _ := psc.Client.Get("ctx") // get gin context | ||
| foundContext, exists := ctx.(*gin.Context).Get("TUMLiveContext") | ||
| if !exists { | ||
| sentry.CaptureException(errors.New("context should exist but doesn't")) | ||
| return | ||
| } | ||
|
|
||
| tumLiveContext := foundContext.(tools.TUMLiveContext) | ||
|
|
||
| var userId uint | ||
| if tumLiveContext.User != nil { | ||
| userId = tumLiveContext.User.ID | ||
| } | ||
|
|
||
| liveReactionListenerMutex.Lock() | ||
| defer liveReactionListenerMutex.Unlock() | ||
| var newSessions []*realtime.Context | ||
| for _, session := range liveReactionListener[userId].sessions { | ||
| if session != psc { | ||
| newSessions = append(newSessions, session) | ||
| } | ||
| } | ||
| if len(newSessions) == 0 { | ||
| delete(liveReactionListener, userId) | ||
| } else { | ||
| liveReactionListener[userId].sessions = newSessions | ||
| } | ||
| logger.Debug("Successfully unsubscribed from reaction Update") | ||
| } | ||
|
|
||
| func reactionUpdateOnSubscribe(psc *realtime.Context) { | ||
| ctx, _ := psc.Client.Get("ctx") // get gin context | ||
|
|
||
| foundContext, exists := ctx.(*gin.Context).Get("TUMLiveContext") | ||
| if !exists { | ||
| sentry.CaptureException(errors.New("context should exist but doesn't")) | ||
| return | ||
| } | ||
|
|
||
| tumLiveContext := foundContext.(tools.TUMLiveContext) | ||
|
|
||
| var userId uint | ||
| var err error | ||
|
|
||
| if tumLiveContext.User != nil { | ||
| userId = tumLiveContext.User.ID | ||
| } else { | ||
| logger.Error("could not fetch public courses", "err", err) | ||
| return | ||
|
|
||
| } | ||
|
|
||
| liveReactionListenerMutex.Lock() | ||
| defer liveReactionListenerMutex.Unlock() | ||
| existing := liveReactionListener[userId] | ||
| if existing != nil { | ||
| liveReactionListener[userId] = &liveReactionAdminSessionsWrapper{append(existing.sessions, psc), liveReactionListener[userId].stream} | ||
| } else { | ||
| liveReactionListener[userId] = &liveReactionAdminSessionsWrapper{[]*realtime.Context{psc}, 0} | ||
| } | ||
| } | ||
|
|
||
| func reactionUpdateSetStream(psc *realtime.Context, message *realtime.Message) { | ||
| logger.Info("reactionUpdateSetStream", "message", string(message.Payload)) | ||
| ctx, _ := psc.Client.Get("ctx") // get gin context | ||
|
|
||
| foundContext, exists := ctx.(*gin.Context).Get("TUMLiveContext") | ||
| if !exists { | ||
| sentry.CaptureException(errors.New("context should exist but doesn't")) | ||
| return | ||
| } | ||
|
|
||
| tumLiveContext := foundContext.(tools.TUMLiveContext) | ||
|
|
||
| var userId uint | ||
| var err error | ||
|
|
||
| if tumLiveContext.User != nil { | ||
| userId = tumLiveContext.User.ID | ||
| } else { | ||
| logger.Error("could not get user from request", "err", err) | ||
| return | ||
| } | ||
|
|
||
| type Message struct { | ||
| StreamID string `json:"streamId"` | ||
| } | ||
|
|
||
| var messageObj Message | ||
| err = json.Unmarshal(message.Payload, &messageObj) | ||
| if err != nil { | ||
| logger.Error("could not unmarshal message", "err", err) | ||
| return | ||
| } | ||
|
|
||
| stream, err := daoWrapper.StreamsDao.GetStreamByID(context.TODO(), messageObj.StreamID) | ||
| if err != nil { | ||
| logger.Error("Cant get stream by id", "err", err) | ||
| return | ||
| } | ||
| course, err := daoWrapper.CoursesDao.GetCourseById(context.TODO(), stream.CourseID) | ||
| if err != nil { | ||
| logger.Error("Cant get course by id", "err", err) | ||
| return | ||
| } | ||
| if !tumLiveContext.User.IsAdminOfCourse(course) { | ||
| logger.Error("User is not admin of course") | ||
| reactionUpdateOnUnsubscribe(psc) | ||
| return | ||
| } | ||
|
|
||
| liveReactionListenerMutex.Lock() | ||
| defer liveReactionListenerMutex.Unlock() | ||
| if liveReactionListener[userId] != nil { | ||
| uId, err := strconv.Atoi(messageObj.StreamID) | ||
| if err != nil { | ||
| logger.Error("could not convert streamID to int", "err", err) | ||
| return | ||
| } | ||
| liveReactionListener[userId].stream = uint(uId) | ||
| } else { | ||
| logger.Error("User has no live reaction listener") | ||
| } | ||
| } | ||
|
|
||
| func NotifyAdminsOnReaction(streamID uint, reaction string) { | ||
| liveReactionListenerMutex.Lock() | ||
| defer liveReactionListenerMutex.Unlock() | ||
| reactionStruct := struct { | ||
| Reaction string `json:"reaction"` | ||
| }{ | ||
| Reaction: reaction, | ||
| } | ||
| reactionMarshaled, err := json.Marshal(reactionStruct) | ||
| if err != nil { | ||
| logger.Error("could not marshal reaction", "err", err) | ||
| return | ||
| } | ||
| for _, session := range liveReactionListener { | ||
| if session.stream == streamID { | ||
| for _, s := range session.sessions { | ||
| err := s.Send(reactionMarshaled) | ||
| if err != nil { | ||
| logger.Error("can't write reaction to session", "err", err) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func NotifyAdminsOnReactionPercentages(context context.Context) { | ||
| liveReactionListenerMutex.Lock() | ||
| defer liveReactionListenerMutex.Unlock() | ||
| streams := make([]uint, 0) | ||
| for _, session := range liveReactionListener { | ||
| streams = append(streams, session.stream) | ||
| } | ||
| liveReactionListenerMutex.Unlock() | ||
|
|
||
| streamReactionPercentages := map[uint]map[string]float64{} | ||
|
|
||
| for _, stream := range streams { | ||
| reactionsRaw, err := daoWrapper.StreamReactionDao.GetByStreamWithinMinutes(context, stream, 2) // TODO: Make this variable for the lecturer | ||
| if err != nil { | ||
| logger.Error("could not get reactions for stream", "stream", stream, "err", err) | ||
| return | ||
| } | ||
|
|
||
| reactions := make(map[string]int) | ||
| for _, reaction := range reactionsRaw { | ||
| reactions[reaction.Reaction]++ | ||
| } | ||
|
|
||
| totalReactions := 0 | ||
| for _, count := range reactions { | ||
| totalReactions += count | ||
| } | ||
| if totalReactions == 0 { | ||
| // logger.Debug("no reactions for stream", "stream", stream) | ||
| continue | ||
| } | ||
|
|
||
| streamReactionPercentages[stream] = make(map[string]float64) | ||
| for reaction, count := range reactions { | ||
| streamReactionPercentages[stream][reaction] = float64(count) / float64(totalReactions) | ||
| } | ||
| } | ||
|
|
||
| // Send the percentages to the admin sessions | ||
| liveReactionListenerMutex.Lock() | ||
|
|
||
| for _, session := range liveReactionListener { | ||
| if session.stream == 0 { | ||
| continue | ||
| } | ||
| reactionPercentages := streamReactionPercentages[session.stream] | ||
| reactionPercentagesMarshaled, err := json.Marshal(reactionPercentages) | ||
| if err != nil { | ||
| logger.Error("could not marshal reaction percentages", "err", err) | ||
| return | ||
| } | ||
| for _, s := range session.sessions { | ||
| err := s.Send([]byte("{\"percentages\": " + string(reactionPercentagesMarshaled) + "}")) | ||
| if err != nil { | ||
| logger.Error("can't write reaction percentages to session", "err", err) | ||
| } | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.