Golang 實現 RTP

語言: CN / TW / HK

在 Coding 之前我們先來簡單介紹一下 RTP(Real-time Transport Protocol), 正如它的名字所説,用於互聯網的實時傳輸協議,通過 IP 網絡傳輸音頻和視頻的網絡協議。

由音視頻傳輸工作小組開發,1996 年首次發佈,並提出了以下使用設想。

  1. 簡單的多播音頻會議

使用 IP 的多播服務進行語音通信。通過某種分配機制,獲取多播組地址和端口對。一個端口用於音頻數據的,另一個用於控制(RTCP)包,地址和端口信息被分發給預期的參與者。如果需要加密,可通過特定格式進行加密。

  1. 音視頻會議

如果在會議中同時使用音視頻媒體,那麼它們是作為單獨的 RTP 會話傳輸。音頻,視頻兩個媒體分別使用不同的 UDP 端口對傳輸單獨的 RTP 和 RTCP 數組包,多播地址可能相同,可能不同。進行這種分離的動機是如果參與者只想接受一種媒體,可以進行選擇。

  1. Mixer 和 Translator

我們需要考慮這樣一種情況,在某個會議中,大多數人處於高速網絡鏈路中,而某個地方的一小部分人只能低速率連接。為了防止所有人使用低帶寬,可以在低帶寬區域放置一個 RTP 級的中繼器 Mixer。Mixer 將接收的音頻報文重新同步為發送方 20 ms 恆定間隔,重建音頻為單一流,音頻編碼為低速帶寬的音頻,然後轉發給低速鏈路上的帶寬數據包流。

  1. 分層編碼

多媒體應用程序應該能調節傳輸速率以匹配接收者容量或適應網絡擁塞。可以將調節速率的任務通過將分層編碼和分層傳輸系統相結合來實現接收器。在基於 IP 多播的 RTP 上下文中,每個 RTP 會話均承載在自己的多播組上。然後,接收者可以只通過加入組播組合適的子集來調整接收帶寬。

RTP 數據包頭部字段

只有當 Mixer 存在時,才會存在 CSRC 標識符列表。這些字段具有以下含義。前 12 個 8 位組在每一個包中都有。

  • version (V): 2 bits

RTP 版本。

  • 填充 §: 1 bit

如果設置了填充位,包中包含至少一個填充 8 位組,其他填充位不屬於 Payload。

  • 擴展 (X): 1 bit

如果設置了擴展位則存在。

  • CSRC 數量(CC): 4 bits

CSRC 數量包含在固定頭中,CSRC 標識符數量。

  • 標記 (M): 1 bit

標記由配置文件定義。用於標記數據包流中例如幀邊界之類的重要事件。

  • payload 類型(PT): 7 bits

該字段指出 RTP 有效載荷格式,由應用程序進行解釋。接收者必須忽略無法理解的有效載荷類型的數據包。

  • 序列號: 16 bits

每次 RTP 數據包發送時增加,可能用於接收者檢測包丟失並且恢復包序列。

  • 時間戳: 32 bits

該字段反映了 RTP 數據包中第一個 8 位組的採樣時刻。

  • SSRC: 32 bits

標識同步源,這個標識符應該選擇隨機,在同一個 RTP 對話的兩個同步源應該有不同的同步標識。

  • CSRC 列表:0 到 15 項, 其中每項 32 bits

該字段表示對該 payload 數據做出貢獻所有 SSRC。

Golang 的相關實現

RTP 的實現有一些,不過通過 Go 實現有一些好處。

  • 易於測試

這裏的易於測試不僅僅是體現在容易書寫,能夠快速通過源碼,函數直接生成相應測試函數。而且更重要的是能夠提供相應的基準測試,提供計時,並行執行,內存統計等參數供開發者進行相應調整。

  • 語言層面強大的 Web 開發能力

能夠基於語言層面快速的對例 JSON 解析,字段封裝 。無需引入三方庫。

  • 性能較為優異

相比於 Python,Ruby 等解釋型語言快,比 node, erlang 等語言更易書寫。如果服務中需要用併發,內置關鍵字 go 就可以快速起多個 goroutine。

Go 社區的 RTP 有 RTP 相關實現,對應的測試也比較全面,簡單介紹一下。

package_test.go (基礎測試)

func TestBasic(t *testing.T) {
  p := &Packet{}

  if err := p.Unmarshal([]byte{}); err == nil {
    t.Fatal("Unmarshal did not error on zero length packet")
  }

  rawPkt := []byte{
    0x90, 0xe0, 0x69, 0x8f, 0xd9, 0xc2, 0x93, 0xda, 0x1c, 0x64,
    0x27, 0x82, 0x00, 0x01, 0x00, 0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x98, 0x36, 0xbe, 0x88, 0x9e,
  }
  parsedPacket := &Packet{
        // 固定頭部
    Header: Header{
      Marker:           true,
      Extension:        true,
      ExtensionProfile: 1,
      Extensions: []Extension{
        {0, []byte{
          0xFF, 0xFF, 0xFF, 0xFF,
        }},
      },
      Version:        2,
      PayloadOffset:  20,
      PayloadType:    96,
      SequenceNumber: 27023,
      Timestamp:      3653407706,
      SSRC:           476325762,
      CSRC:           []uint32{},
    },
        // 有效負載
    Payload: rawPkt[20:],
    Raw:     rawPkt,
  }

  // Unmarshal to the used Packet should work as well.
  for i := 0; i < 2; i++ {
    t.Run(fmt.Sprintf("Run%d", i+1), func(t *testing.T) {
      if err := p.Unmarshal(rawPkt); err != nil {
        t.Error(err)
      } else if !reflect.DeepEqual(p, parsedPacket) {
        t.Errorf("TestBasic unmarshal: got %#v, want %#v", p, parsedPacket)
      }

      if parsedPacket.Header.MarshalSize() != 20 {
        t.Errorf("wrong computed header marshal size")
      } else if parsedPacket.MarshalSize() != len(rawPkt) {
        t.Errorf("wrong computed marshal size")
      }

      if p.PayloadOffset != 20 {
        t.Errorf("wrong payload offset: %d != %d", p.PayloadOffset, 20)
      }

      raw, err := p.Marshal()
      if err != nil {
        t.Error(err)
      } else if !reflect.DeepEqual(raw, rawPkt) {
        t.Errorf("TestBasic marshal: got %#v, want %#v", raw, rawPkt)
      }

      if p.PayloadOffset != 20 {
        t.Errorf("wrong payload offset: %d != %d", p.PayloadOffset, 20)
      }
    })
  }
}

基本測試中,利用 Golang 自帶的 Unmarshal 快速將 byte 切片轉換為相應結構體。減少了相關封包,解包等代碼的工作量。在網絡傳輸中,也能夠在語言層面直接完成大端,小端編碼的轉換,減少編碼的煩惱。

h.SequenceNumber = binary.BigEndian.Uint16(rawPacket[seqNumOffset : seqNumOffset+seqNumLength])
h.Timestamp = binary.BigEndian.Uint32(rawPacket[timestampOffset : timestampOffset+timestampLength])
h.SSRC = binary.BigEndian.Uint32(rawPacket[ssrcOffset : ssrcOffset+ssrcLength])

其中關於切片的相關操作十分便捷,可以獲取數組中的某一段數據,操作比較靈活,在協議數據的傳輸過程中,通過切片,獲取某段數據進行相應處理。

m := copy(buf[n:], p.Payload)
p.Raw = buf[:n+m]

在實現完成後,Golang 的子測試能夠進行嵌套測試。對執行特定測試用例特別有用,只有子測試完成後,父測試才會返回。

func TestVP8PartitionHeadChecker_IsPartitionHead(t *testing.T) {
	checker := &VP8PartitionHeadChecker{}
	t.Run("SmallPacket", func(t *testing.T) {
		if checker.IsPartitionHead([]byte{0x00}) {
			t.Fatal("Small packet should not be the head of a new partition")
		}
	})
	t.Run("SFlagON", func(t *testing.T) {
		if !checker.IsPartitionHead([]byte{0x10, 0x00, 0x00, 0x00}) {
			t.Fatal("Packet with S flag should be the head of a new partition")
		}
	})
	t.Run("SFlagOFF", func(t *testing.T) {
		if checker.IsPartitionHead([]byte{0x00, 0x00, 0x00, 0x00}) {
			t.Fatal("Packet without S flag should not be the head of a new partition")
		}
	})
}

更多的相關實現可以去 GitHub( http://github.com/pion/rtp ) 上看一下實現源碼。

如果人為去關注相關的傳輸細節,可能在底層耗費大量時間,目前市面上有很多相關的實現方案,有開源的,和一些公司提供的一些方案。目前經過業界實踐,陌陌,小米眾多公司都採用了聲網的 SDK 去進行相關的業務時間,部分公司甚至已經將核心業務交由處理,可見其穩定性。個人去測試了一下他們的雲課堂相關服務,回放,在線演示等功能十分便捷,可以節約大量開發時間。