diff --git a/chunk_header.go b/chunk_header.go index cd2ba50..acf48ce 100644 --- a/chunk_header.go +++ b/chunk_header.go @@ -79,14 +79,15 @@ func encodeChunkBasicHeader(w io.Writer, mh *chunkBasicHeader) error { } type chunkMessageHeader struct { - timestamp uint32 // fmt = 0 - timestampDelta uint32 // fmt = 1 | 2 - messageLength uint32 // fmt = 0 | 1 - messageTypeID byte // fmt = 0 | 1 - messageStreamID uint32 // fmt = 0 + timestamp uint32 // fmt = 0 + timestampDelta uint32 // fmt = 1 | 2 + messageLength uint32 // fmt = 0 | 1 + messageTypeID byte // fmt = 0 | 1 + messageStreamID uint32 // fmt = 0 + extendedTimestampMode ExtendedTimestampMode // fmt = 0 | 1 | 2 } -func decodeChunkMessageHeader(r io.Reader, fmt byte, buf []byte, mh *chunkMessageHeader) error { +func decodeChunkMessageHeader(r io.Reader, fmt byte, buf []byte, mh *chunkMessageHeader, extendedTimestampMode ExtendedTimestampMode) error { if buf == nil || len(buf) < 11 { buf = make([]byte, 11) } @@ -104,8 +105,10 @@ func decodeChunkMessageHeader(r io.Reader, fmt byte, buf []byte, mh *chunkMessag mh.messageLength = binary.BigEndian.Uint32(cache32bits) mh.messageTypeID = buf[6] // 8bits mh.messageStreamID = binary.LittleEndian.Uint32(buf[7:11]) // 32bits + mh.extendedTimestampMode = ExtendedTimestampUnused if mh.timestamp == 0xffffff { + mh.extendedTimestampMode = ExtendedTimestampUsed _, err := io.ReadAtLeast(r, cache32bits, 4) if err != nil { return err @@ -123,8 +126,10 @@ func decodeChunkMessageHeader(r io.Reader, fmt byte, buf []byte, mh *chunkMessag copy(cache32bits[1:], buf[3:6]) // 24bits BE mh.messageLength = binary.BigEndian.Uint32(cache32bits) mh.messageTypeID = buf[6] // 8bits + mh.extendedTimestampMode = ExtendedTimestampUnused if mh.timestampDelta == 0xffffff { + mh.extendedTimestampMode = ExtendedTimestampDeltaUsed _, err := io.ReadAtLeast(r, cache32bits, 4) if err != nil { return err @@ -139,8 +144,10 @@ func decodeChunkMessageHeader(r io.Reader, fmt byte, buf []byte, mh *chunkMessag copy(cache32bits[1:], buf[0:3]) // 24bits BE mh.timestampDelta = binary.BigEndian.Uint32(cache32bits) + mh.extendedTimestampMode = ExtendedTimestampUnused if mh.timestampDelta == 0xffffff { + mh.extendedTimestampMode = ExtendedTimestampDeltaUsed _, err := io.ReadAtLeast(r, cache32bits, 4) if err != nil { return err @@ -149,7 +156,22 @@ func decodeChunkMessageHeader(r io.Reader, fmt byte, buf []byte, mh *chunkMessag } case 3: - // DO NOTHING + // DO NOTHING unless an extended timestamp was used in preceding messages + switch extendedTimestampMode { + case ExtendedTimestampUsed: + _, err := io.ReadAtLeast(r, cache32bits, 4) + if err != nil { + return err + } + mh.timestamp = binary.BigEndian.Uint32(cache32bits) + + case ExtendedTimestampDeltaUsed: + _, err := io.ReadAtLeast(r, cache32bits, 4) + if err != nil { + return err + } + mh.timestampDelta = binary.BigEndian.Uint32(cache32bits) + } default: panic("Unexpected fmt") diff --git a/chunk_stream_reader.go b/chunk_stream_reader.go index 9760298..11c60ce 100644 --- a/chunk_stream_reader.go +++ b/chunk_stream_reader.go @@ -11,6 +11,14 @@ import ( "bytes" ) +type ExtendedTimestampMode byte + +const ( + ExtendedTimestampUnused ExtendedTimestampMode = 0 + ExtendedTimestampUsed ExtendedTimestampMode = 1 + ExtendedTimestampDeltaUsed ExtendedTimestampMode = 2 +) + type ChunkStreamReader struct { basicHeader chunkBasicHeader messageHeader chunkMessageHeader @@ -21,6 +29,8 @@ type ChunkStreamReader struct { messageTypeID byte messageStreamID uint32 + extendedTimestampMode ExtendedTimestampMode + buf bytes.Buffer completed bool } diff --git a/chunk_streamer.go b/chunk_streamer.go index 786e293..d314610 100644 --- a/chunk_streamer.go +++ b/chunk_streamer.go @@ -16,7 +16,6 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" - "github.com/yutopp/go-rtmp/message" ) @@ -200,16 +199,17 @@ func (cs *ChunkStreamer) readChunk() (*ChunkStreamReader, error) { } //cs.logger.Debugf("(READ) BasicHeader = %+v", bh) + reader, err := cs.prepareChunkReader(bh.chunkStreamID) + if err != nil { + return nil, errors.Wrapf(err, "Failed to prepare chunk reader") + } + var mh chunkMessageHeader - if err := decodeChunkMessageHeader(cs.r, bh.fmt, cs.cacheBuffer, &mh); err != nil { + if err := decodeChunkMessageHeader(cs.r, bh.fmt, cs.cacheBuffer, &mh, reader.extendedTimestampMode); err != nil { return nil, err } //cs.logger.Debugf("(READ) MessageHeader = %+v", mh) - reader, err := cs.prepareChunkReader(bh.chunkStreamID) - if err != nil { - return nil, errors.Wrapf(err, "Failed to prepare chunk reader") - } if reader.completed { reader.buf.Reset() reader.completed = false @@ -225,17 +225,27 @@ func (cs *ChunkStreamer) readChunk() (*ChunkStreamReader, error) { reader.messageLength = mh.messageLength reader.messageTypeID = mh.messageTypeID reader.messageStreamID = mh.messageStreamID + reader.extendedTimestampMode = mh.extendedTimestampMode case 1: reader.timestampDelta = mh.timestampDelta reader.messageLength = mh.messageLength reader.messageTypeID = mh.messageTypeID + reader.extendedTimestampMode = mh.extendedTimestampMode case 2: reader.timestampDelta = mh.timestampDelta + reader.extendedTimestampMode = mh.extendedTimestampMode case 3: - // DO NOTHING + // DO NOTHING unless an extended timestamp was used in preceding messages + switch reader.extendedTimestampMode { + case ExtendedTimestampUsed: + reader.timestamp = mh.timestamp + + case ExtendedTimestampDeltaUsed: + reader.timestampDelta = mh.timestampDelta + } default: panic("unsupported chunk") // TODO: fix @@ -454,23 +464,23 @@ func (sched *chunkStreamerWriterSched) Run() (err error) { } }() +writerLoop: for { select { case writer := <-sched.writers: - isCompleted, err := sched.streamer.writeChunk(writer) - if err != nil { - writer.lastErr = err - close(writer.doneCh) - return err + isCompleted := false + for !isCompleted { + isCompleted, err = sched.streamer.writeChunk(writer) + if err != nil { + writer.lastErr = err + close(writer.doneCh) + return err + } + if isCompleted { + close(writer.doneCh) + continue writerLoop + } } - if isCompleted { - close(writer.doneCh) - continue - } - - // Enqueue writer - sched.writers <- writer - case <-sched.stopCh: return nil } diff --git a/server.go b/server.go index 224b39f..31e48c7 100644 --- a/server.go +++ b/server.go @@ -10,6 +10,7 @@ package rtmp import ( "io" "net" + "strings" "sync" "github.com/pkg/errors" @@ -117,6 +118,9 @@ func (srv *Server) handleConn(conn net.Conn) { if err == io.EOF { c.logger.Infof("Server closed") return + } else if strings.HasPrefix(err.Error(), "Failed to handshake") { + c.logger.Debugf("Server handshake failed") + return } c.logger.Errorf("Server closed by error: Err = %+v", err) }