Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 68 additions & 28 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

msdk "github.com/livekit/media-sdk"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"

"github.com/frostbyte73/core"
"github.com/icholy/digest"
Expand Down Expand Up @@ -64,6 +65,8 @@ const (
inviteOKRetryAttempts = 5
inviteOKRetryAttemptsNoACK = 2
inviteOkAckLateTimeout = inviteOkRetryIntervalMax

inviteCredentialValidity = 60 * time.Minute // Allow reuse of credentials for 1h
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this is smart, might want to extend this to max_call_duration or something.

Also, keep in mind that this is per Call-ID for now, so new calls would still need re-auth.

)

var errNoACK = errors.New("no ACK received for 200 OK")
Expand Down Expand Up @@ -134,23 +137,44 @@ func (s *Server) getCallInfo(id string) *inboundCallInfo {
return c
}

func (s *Server) getInvite(sipCallID string) *inProgressInvite {
s.imu.Lock()
defer s.imu.Unlock()
for i := range s.inProgressInvites {
if s.inProgressInvites[i].sipCallID == sipCallID {
return s.inProgressInvites[i]
func (s *Server) cleanupInvites() {
ticker := time.NewTicker(5 * time.Minute) // Periodic cleanup every 5 minutes
defer ticker.Stop()
for {
select {
case <-s.closing.Watch():
return
case <-ticker.C:
s.imu.Lock()
for it := s.inviteTimeoutQueue.IterateRemoveAfter(inviteCredentialValidity); it.Next(); {
key := it.Item().Value
delete(s.inProgressInvites, *key)
}
s.imu.Unlock()
}
}
if len(s.inProgressInvites) >= digestLimit {
s.inProgressInvites = s.inProgressInvites[1:]
}

func (s *Server) getInvite(sipCallID, toTag, fromTag string) *inProgressInvite {
key := dialogKey{
sipCallID: sipCallID,
toTag: toTag,
fromTag: fromTag,
}
s.imu.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A common practice to avoid lock contention is to use RWMutex and doing a two stage lock:

s.imu.RLock()
is, ok := s.inProgressInvites[key]
s.imu.RUnlock()
if ok {
   return is
}
s.imu.Lock()
defer s.imu.Unlock()
is, ok := s.inProgressInvites[key]
if ok {
   return is
}
// ... the rest ...

This allows multiple readers to get the invite state without blocking each other. Also notice that we redo the check after getting a write lock - other routine might create the state earlier.

defer s.imu.Unlock()
is, ok := s.inProgressInvites[key]
if ok {
return is
}
is := &inProgressInvite{sipCallID: sipCallID}
s.inProgressInvites = append(s.inProgressInvites, is)
is = &inProgressInvite{sipCallID: sipCallID}
s.inProgressInvites[key] = is
s.inviteTimeoutQueue.Reset(&utils.TimeoutQueueItem[*dialogKey]{Value: &key})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we do not reset the cache expiry time if we have a cache hit. Is it intentional?


return is
}

func (s *Server) handleInviteAuth(log logger.Logger, req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) {
func (s *Server) handleInviteAuth(log logger.Logger, req *sip.Request, tx sip.ServerTransaction, from, username, password string, inviteState *inProgressInvite) (ok bool) {
log = log.WithValues(
"username", username,
"passwordHash", hashPassword(password),
Expand Down Expand Up @@ -178,8 +202,6 @@ func (s *Server) handleInviteAuth(log logger.Logger, req *sip.Request, tx sip.Se
}
ci := s.getCallInfo(sipCallID)
ci.countInvite(log, req)
inviteState := s.getInvite(sipCallID)
log = log.WithValues("inviteStateSipCallID", sipCallID)

h := req.GetHeader("Proxy-Authorization")
if h == nil {
Expand Down Expand Up @@ -220,7 +242,6 @@ func (s *Server) handleInviteAuth(log logger.Logger, req *sip.Request, tx sip.Se
// Check if we have a valid challenge state
if inviteState.challenge.Realm == "" {
log.Warnw("No challenge state found for authentication attempt", errors.New("missing challenge state"),
"sipCallID", sipCallID,
"expectedRealm", UserAgent,
)
_ = tx.Respond(sip.NewResponseFromRequest(req, 401, "Bad credentials", nil))
Expand Down Expand Up @@ -295,18 +316,18 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
s.log.Errorw("cannot parse source IP", err, "fromIP", src)
return psrpc.NewError(psrpc.MalformedRequest, errors.Wrap(err, "cannot parse source IP"))
}
callID := lksip.NewCallID()
sipCallID := legCallIDFromReq(req)
tr := callTransportFromReq(req)
legTr := legTransportFromReq(req)
log := s.log.WithValues(
"callID", callID,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The one log line that will not have callID now is "Bad request", when validation fails.

"sipCallID", sipCallID,
"fromIP", src.Addr(),
"toIP", req.Destination(),
"transport", tr,
)

var call *inboundCall
cc := s.newInbound(log, LocalTag(callID), s.ContactURI(legTr), req, tx, func(headers map[string]string) map[string]string {
cc := s.newInbound(log, s.ContactURI(legTr), req, tx, func(headers map[string]string) map[string]string {
c := call
if c == nil || len(c.attrsToHdr) == 0 {
return headers
Expand All @@ -319,17 +340,43 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
})
log = LoggerWithParams(log, cc)
log = LoggerWithHeaders(log, cc)
cc.log = log
log.Infow("processing invite")

if err := cc.ValidateInvite(); err != nil {
log.Errorw("invalid invite", err)
if s.conf.HideInboundPort {
cc.Drop()
} else {
cc.RespondAndDrop(sip.StatusBadRequest, "Bad request")
}
return psrpc.NewError(psrpc.InvalidArgument, errors.Wrap(err, "invite validation failed"))
}

// Establish ID
fromTag, _ := req.From().Params.Get("tag") // always exists, via ValidateInvite() check
toParams := req.To().Params // To() always exists, via ValidateInvite() check
if toParams == nil {
toParams = sip.NewParams()
req.To().Params = toParams
}
toTag, ok := toParams.Get("tag")
if !ok {
// No to-tag on the invite means we need to generate one per RFC 3261 section 12.
// Generate a new to-tag early, to make sure both INVITES have the same ID.
toTag = utils.NewGuid("")
toParams.Add("tag", toTag)
}
inviteProgress := s.getInvite(sipCallID, toTag, fromTag)
callID := inviteProgress.lkCallID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No locking here. I assume our re-invite handling will catch a duplicate invite if it arrives at the same time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right - locking is done in getInvite(), but we're using inviteProgress.lkCallID outside of it.
If two separate INVITEs come along, and they're retransmissions (same via branch), sipgo swallows it. If they're not retransmissions, these are separate invites and should be processed independently, which due to the early to-tag generation here they would (different toTag = different map key)

if callID == "" {
callID = lksip.NewCallID()
inviteProgress.lkCallID = callID
}
cc.id = LocalTag(callID)

log = log.WithValues("callID", callID)
cc.log = log
log.Infow("processing invite")

ctx, span := tracer.Start(ctx, "Server.onInvite")
defer span.End()

Expand All @@ -352,12 +399,6 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
cc.Processing()
}

// Extract SIP Call ID directly from the request
sipCallID := ""
if h := req.CallID(); h != nil {
sipCallID = h.Value()
}

callInfo := &rpc.SIPCall{
LkCallId: callID,
SipCallId: sipCallID,
Expand Down Expand Up @@ -421,7 +462,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
// We will send password request anyway, so might as well signal that the progress is made.
cc.Processing()
}
if !s.handleInviteAuth(log, req, tx, from.User, r.Username, r.Password) {
if !s.handleInviteAuth(log, req, tx, from.User, r.Username, r.Password, inviteProgress) {
cmon.InviteErrorShort("unauthorized")
// handleInviteAuth will generate the SIP Response as needed
return psrpc.NewErrorf(psrpc.PermissionDenied, "invalid credentials were provided")
Expand Down Expand Up @@ -1293,11 +1334,10 @@ func (c *inboundCall) transferCall(ctx context.Context, transferTo string, heade

}

func (s *Server) newInbound(log logger.Logger, id LocalTag, contact URI, invite *sip.Request, inviteTx sip.ServerTransaction, getHeaders setHeadersFunc) *sipInbound {
func (s *Server) newInbound(log logger.Logger, contact URI, invite *sip.Request, inviteTx sip.ServerTransaction, getHeaders setHeadersFunc) *sipInbound {
c := &sipInbound{
log: log,
s: s,
id: id,
invite: invite,
inviteTx: inviteTx,
legTr: legTransportFromReq(invite),
Expand Down
2 changes: 1 addition & 1 deletion pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ authLoop:
if err != nil {
return nil, fmt.Errorf("invalid challenge %q: %w", challengeStr, err)
}
toHeader := resp.To()
Copy link
Contributor Author

@alexlivekit alexlivekit Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we're doing the right validation on the inbound side, the outbound side E2E test caught this error!
But this also means out clients might run into the same issue, in case some of them are not spec-compliant.

toHeader = resp.To()
if toHeader == nil {
return nil, errors.New("no 'To' header on Response")
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sip/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@ func legTransportFromReq(req *sip.Request) Transport {
return ""
}

func legCallIDFromReq(req *sip.Request) string {
if callID := req.CallID(); callID != nil {
return callID.Value()
}
return ""
}

func transportPort(c *config.Config, t Transport) int {
if t == TransportTLS {
if tc := c.TLS; tc != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sip/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestParseReason(t *testing.T) {
Cause: 200,
Text: "Call completed elsewhere",
},
Normal: false,
Normal: true,
},
{
Name: "Q.850",
Expand Down
36 changes: 25 additions & 11 deletions pkg/sip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net"
"net/netip"
"sync"
"sync/atomic"
"time"

"github.com/frostbyte73/core"
Expand All @@ -35,6 +36,7 @@ import (
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/utils"
"github.com/livekit/sipgo"
"github.com/livekit/sipgo/sip"

Expand All @@ -43,8 +45,7 @@ import (
)

const (
UserAgent = "LiveKit"
digestLimit = 500
UserAgent = "LiveKit"
)

const (
Expand Down Expand Up @@ -123,6 +124,12 @@ type Handler interface {
OnSessionEnd(ctx context.Context, callIdentifier *CallIdentifier, callInfo *livekit.SIPCallInfo, reason string)
}

type dialogKey struct {
sipCallID string
toTag string
fromTag string
}

type Server struct {
log logger.Logger
mon *stats.Monitor
Expand All @@ -132,8 +139,10 @@ type Server struct {
sipListeners []io.Closer
sipUnhandled RequestHandler

imu sync.Mutex
inProgressInvites []*inProgressInvite
imu sync.Mutex
inProgressInvites map[dialogKey]*inProgressInvite
inviteTimeoutQueue utils.TimeoutQueue[*dialogKey]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
inviteTimeoutQueue utils.TimeoutQueue[*dialogKey]
inviteTimeoutQueue utils.TimeoutQueue[dialogKey]

We probably don't need a pointer here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a convention in Go called a Mutex hat, which implies inviteTimeoutQueue is protected by imu.

But, the queue implementation already has a mutex internally. So might be worth moving it out of the imu group.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is neat, thank you for poinitng it out!

isCleanupTaskRunning atomic.Bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seem unused


closing core.Fuse
cmu sync.RWMutex
Expand All @@ -155,20 +164,22 @@ type Server struct {
type inProgressInvite struct {
sipCallID string
challenge digest.Challenge
lkCallID string // SCL_* LiveKit call ID assigned to this dialog
}

func NewServer(region string, conf *config.Config, log logger.Logger, mon *stats.Monitor, getIOClient GetIOInfoClient) *Server {
if log == nil {
log = logger.GetLogger()
}
s := &Server{
log: log,
conf: conf,
region: region,
mon: mon,
getIOClient: getIOClient,
activeCalls: make(map[RemoteTag]*inboundCall),
byLocal: make(map[LocalTag]*inboundCall),
log: log,
conf: conf,
region: region,
mon: mon,
getIOClient: getIOClient,
inProgressInvites: make(map[dialogKey]*inProgressInvite),
activeCalls: make(map[RemoteTag]*inboundCall),
byLocal: make(map[LocalTag]*inboundCall),
}
s.infos.byCallID = expirable.NewLRU[string, *inboundCallInfo](maxCallCache, nil, callCacheTTL)
s.initMediaRes()
Expand Down Expand Up @@ -309,6 +320,9 @@ func (s *Server) Start(agent *sipgo.UserAgent, sc *ServiceConfig, tlsConf *tls.C
}
}

// Start the cleanup task
go s.cleanupInvites()

return nil
}

Expand Down
Loading
Loading