TsUdpSource.cs 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Diagnostics;
  5. using System.Linq;
  6. using System.Net;
  7. using System.Net.Sockets;
  8. using System.Text;
  9. using System.Threading;
  10. using MatrixIO.IO;
  11. namespace TsViewer
  12. {
  13. public class TsUdpSource : TsSource
  14. {
  15. private readonly object _syncObject = new Object();
  16. private Socket _socket;
  17. private volatile bool _running;
  18. protected IPAddress _localAddress = IPAddress.Any;
  19. protected IPAddress _sourceAddress;
  20. protected IPAddress _groupAddress;
  21. protected bool _multicast = true;
  22. protected int _receiveBufferSize = 128 * 1024;
  23. private readonly TimeAverage _bitrate = new TimeAverage();
  24. private readonly TimeAverage _packetrate = new TimeAverage();
  25. private readonly Stopwatch _highResTimer = new Stopwatch();
  26. private readonly BlockingCollection<UdpPacket> _packetQueue = new BlockingCollection<UdpPacket>(new ConcurrentQueue<UdpPacket>());
  27. private Thread _processingThread;
  28. private readonly ConcurrentQueue<byte[]> _bufferPool = new ConcurrentQueue<byte[]>();
  29. private readonly ConcurrentQueue<UdpPacket> _packetPool = new ConcurrentQueue<UdpPacket>();
  30. private readonly Stopwatch _stopwatch = new Stopwatch();
  31. private long _total;
  32. private int _count;
  33. private TimeSpan _average;
  34. ~TsUdpSource()
  35. {
  36. Stop();
  37. }
  38. public double Bitrate
  39. {
  40. get { return _bitrate.GetAverage(10); }
  41. }
  42. public double Packetrate
  43. {
  44. get { return _packetrate.GetAverage(10); }
  45. }
  46. public TimeSpan ProcessingTime
  47. {
  48. get { lock (_stopwatch) return _average; }
  49. }
  50. public override void Start(Uri uri)
  51. {
  52. lock (_syncObject)
  53. {
  54. if (_running) return;
  55. _running = true;
  56. Uri = uri;
  57. switch (uri.HostNameType)
  58. {
  59. case UriHostNameType.IPv4:
  60. case UriHostNameType.IPv6:
  61. _sourceAddress = IPAddress.Parse(uri.Host);
  62. break;
  63. default:
  64. var addresses = Dns.GetHostAddresses(uri.DnsSafeHost);
  65. if (addresses.Length < 1) throw new ArgumentException("Host not found.");
  66. _sourceAddress = addresses[0];
  67. break;
  68. }
  69. if ((_sourceAddress.AddressFamily == AddressFamily.InterNetwork &&
  70. _sourceAddress.GetAddressBytes()[0] >= 224 && _sourceAddress.GetAddressBytes()[0] <= 239) ||
  71. (_sourceAddress.AddressFamily == AddressFamily.InterNetworkV6 && _sourceAddress.IsIPv6Multicast))
  72. {
  73. _multicast = true;
  74. }
  75. if (_sourceAddress.AddressFamily == AddressFamily.InterNetworkV6)
  76. _localAddress = IPAddress.IPv6Any;
  77. _socket = new Socket(_sourceAddress.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
  78. _socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ExclusiveAddressUse, false);
  79. _socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, _receiveBufferSize);
  80. _socket.Bind(new IPEndPoint(_localAddress, uri.Port));
  81. switch (_sourceAddress.AddressFamily)
  82. {
  83. case AddressFamily.InterNetwork:
  84. if (_multicast)
  85. _socket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership,
  86. new MulticastOption(_sourceAddress));
  87. break;
  88. case AddressFamily.InterNetworkV6:
  89. if (_multicast)
  90. _socket.SetSocketOption(SocketOptionLevel.IPv6, SocketOptionName.AddMembership,
  91. new IPv6MulticastOption(_sourceAddress));
  92. break;
  93. }
  94. var socketArgs = new SocketAsyncEventArgs();
  95. byte[] buffer;
  96. if (!_bufferPool.TryDequeue(out buffer)) buffer = new byte[9000];
  97. socketArgs.SetBuffer(buffer, 0, buffer.Length);
  98. socketArgs.Completed += ReceiveCompleted;
  99. _bitrate.Start();
  100. _packetrate.Start();
  101. _highResTimer.Start();
  102. _processingThread = new Thread(ProcessPackets) {IsBackground = true};
  103. _processingThread.Start();
  104. _socket.ReceiveAsync(socketArgs);
  105. }
  106. }
  107. private void ReceiveCompleted(object sender, SocketAsyncEventArgs e)
  108. {
  109. UdpPacket packet;
  110. if (!_packetPool.TryDequeue(out packet)) packet = new UdpPacket();
  111. packet.Buffer = e.Buffer;
  112. packet.Length = e.BytesTransferred;
  113. packet.ReceivedFrom = (IPEndPoint) e.RemoteEndPoint;
  114. packet.Tick = _highResTimer.ElapsedTicks;
  115. _packetQueue.Add(packet);
  116. // Start receiving next packet as quickly as possible.
  117. byte[] newBuffer;
  118. if (!_bufferPool.TryDequeue(out newBuffer)) newBuffer = new byte[65507];
  119. e.SetBuffer(newBuffer, 0, newBuffer.Length);
  120. if(_running) _socket.ReceiveAsync(e);
  121. }
  122. public void ProcessPackets()
  123. {
  124. foreach (var packet in _packetQueue.GetConsumingEnumerable())
  125. {
  126. Debug.WriteLine("Received " + packet.Length + " bytes at " + packet.Tick + " from " +
  127. packet.ReceivedFrom +
  128. (packet.Length > 0 && packet.Buffer != null
  129. ? " with a first byte of 0x" + packet.Buffer[0].ToString("X2")
  130. : ""));
  131. _stopwatch.Reset();
  132. _stopwatch.Start();
  133. _bitrate.Add(packet.Length*8);
  134. _packetrate.Add(1);
  135. ProcessPacket(packet);
  136. _bufferPool.Enqueue(packet.Buffer);
  137. packet.Buffer = null;
  138. _packetPool.Enqueue(packet);
  139. _stopwatch.Stop();
  140. _total += _stopwatch.ElapsedTicks;
  141. if (++_count >= 100)
  142. {
  143. _average = new TimeSpan(_total/_count);
  144. _total = _count = 0;
  145. }
  146. Debug.WriteLine("Processed packet in " + _stopwatch.Elapsed);
  147. }
  148. Debug.WriteLine("Exiting packet processing thread.");
  149. }
  150. internal virtual void ProcessPacket(UdpPacket packet)
  151. {
  152. Demuxer.ProcessInput(packet.Buffer, 0, packet.Length);
  153. }
  154. public override void Stop()
  155. {
  156. lock (_syncObject)
  157. {
  158. if (!_running) return;
  159. Trace.WriteLine("Stopping.");
  160. _packetQueue.CompleteAdding();
  161. _running = false;
  162. }
  163. }
  164. internal class UdpPacket
  165. {
  166. public byte[] Buffer { get; set; }
  167. public int Length { get; set; }
  168. public IPEndPoint ReceivedFrom { get; set; }
  169. public long Tick { get; set; }
  170. }
  171. }
  172. }