diff --git a/Emby.Server.Implementations/EntryPoints/UdpServerEntryPoint.cs b/Emby.Server.Implementations/EntryPoints/UdpServerEntryPoint.cs index 9ee219854d..a83817cb96 100644 --- a/Emby.Server.Implementations/EntryPoints/UdpServerEntryPoint.cs +++ b/Emby.Server.Implementations/EntryPoints/UdpServerEntryPoint.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using Emby.Server.Implementations.Udp; using MediaBrowser.Controller; @@ -12,7 +13,7 @@ namespace Emby.Server.Implementations.EntryPoints /// /// Class UdpServerEntryPoint. /// - public class UdpServerEntryPoint : IServerEntryPoint + public sealed class UdpServerEntryPoint : IServerEntryPoint { /// /// The port of the UDP server. @@ -31,61 +32,44 @@ namespace Emby.Server.Implementations.EntryPoints /// The UDP server. /// private UdpServer _udpServer; + private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); + private bool _disposed = false; /// /// Initializes a new instance of the class. /// public UdpServerEntryPoint( - ILogger logger, - IServerApplicationHost appHost, - IJsonSerializer json, - ISocketFactory socketFactory) + ILogger logger, + IServerApplicationHost appHost) { _logger = logger; _appHost = appHost; - _json = json; - _socketFactory = socketFactory; + + } /// - public Task RunAsync() + public async Task RunAsync() { - var udpServer = new UdpServer(_logger, _appHost, _json, _socketFactory); - - try - { - udpServer.Start(PortNumber); - - _udpServer = udpServer; - } - catch (Exception ex) - { - _logger.LogError(ex, "Failed to start UDP Server"); - } - - return Task.CompletedTask; + _udpServer = new UdpServer(_logger, _appHost); + _udpServer.Start(PortNumber, _cancellationTokenSource.Token); } /// public void Dispose() { - Dispose(true); - GC.SuppressFinalize(this); - } - - /// - /// Releases unmanaged and - optionally - managed resources. - /// - /// true to release both managed and unmanaged resources; false to release only unmanaged resources. - protected virtual void Dispose(bool dispose) - { - if (dispose) + if (_disposed) { - if (_udpServer != null) - { - _udpServer.Dispose(); - } + return; } + + _cancellationTokenSource.Cancel(); + _udpServer.Dispose(); + + _cancellationTokenSource = null; + _udpServer = null; + + _disposed = true; } } } diff --git a/Emby.Server.Implementations/Net/SocketFactory.cs b/Emby.Server.Implementations/Net/SocketFactory.cs index 0870db003f..4e04cde78c 100644 --- a/Emby.Server.Implementations/Net/SocketFactory.cs +++ b/Emby.Server.Implementations/Net/SocketFactory.cs @@ -8,32 +8,6 @@ namespace Emby.Server.Implementations.Net { public class SocketFactory : ISocketFactory { - /// - /// Creates a new UDP acceptSocket and binds it to the specified local port. - /// - /// An integer specifying the local port to bind the acceptSocket to. - public ISocket CreateUdpSocket(int localPort) - { - if (localPort < 0) - { - throw new ArgumentException("localPort cannot be less than zero.", nameof(localPort)); - } - - var retVal = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); - - try - { - retVal.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); - return new UdpSocket(retVal, localPort, IPAddress.Any); - } - catch - { - retVal?.Dispose(); - - throw; - } - } - public ISocket CreateUdpBroadcastSocket(int localPort) { if (localPort < 0) @@ -156,8 +130,5 @@ namespace Emby.Server.Implementations.Net throw; } } - - public Stream CreateNetworkStream(ISocket socket, bool ownsSocket) - => new NetworkStream(((UdpSocket)socket).Socket, ownsSocket); } } diff --git a/Emby.Server.Implementations/Net/UdpSocket.cs b/Emby.Server.Implementations/Net/UdpSocket.cs index dde4a2a34c..211ca67841 100644 --- a/Emby.Server.Implementations/Net/UdpSocket.cs +++ b/Emby.Server.Implementations/Net/UdpSocket.cs @@ -181,15 +181,6 @@ namespace Emby.Server.Implementations.Net return taskCompletion.Task; } - public Task ReceiveAsync(CancellationToken cancellationToken) - { - ThrowIfDisposed(); - - var buffer = new byte[8192]; - - return ReceiveAsync(buffer, 0, buffer.Length, cancellationToken); - } - public Task SendToAsync(byte[] buffer, int offset, int size, IPEndPoint endPoint, CancellationToken cancellationToken) { ThrowIfDisposed(); diff --git a/Emby.Server.Implementations/Udp/UdpServer.cs b/Emby.Server.Implementations/Udp/UdpServer.cs index 185a282ac4..19fc1e316d 100644 --- a/Emby.Server.Implementations/Udp/UdpServer.cs +++ b/Emby.Server.Implementations/Udp/UdpServer.cs @@ -1,112 +1,47 @@ using System; -using System.Collections.Generic; -using System.Linq; using System.Net; +using System.Net.Sockets; using System.Text; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; using MediaBrowser.Controller; using MediaBrowser.Model.ApiClient; -using MediaBrowser.Model.Events; -using MediaBrowser.Model.Net; -using MediaBrowser.Model.Serialization; using Microsoft.Extensions.Logging; namespace Emby.Server.Implementations.Udp { /// - /// Provides a Udp Server + /// Provides a Udp Server. /// - public class UdpServer : IDisposable + public sealed class UdpServer : IDisposable { /// /// The _logger /// private readonly ILogger _logger; - - private bool _isDisposed; - - private readonly List>> _responders = new List>>(); - private readonly IServerApplicationHost _appHost; - private readonly IJsonSerializer _json; + + /// + /// The _udp client. + /// + private Socket _udpSocket; + private IPEndPoint _endpoint; + private readonly byte[] _receiveBuffer = new byte[8192]; + + private bool _disposed = false; /// /// Initializes a new instance of the class. /// - public UdpServer(ILogger logger, IServerApplicationHost appHost, IJsonSerializer json, ISocketFactory socketFactory) + public UdpServer(ILogger logger, IServerApplicationHost appHost) { _logger = logger; _appHost = appHost; - _json = json; - _socketFactory = socketFactory; - - AddMessageResponder("who is JellyfinServer?", true, RespondToV2Message); } - private void AddMessageResponder(string message, bool isSubstring, Func responder) + private async Task RespondToV2Message(string messageText, EndPoint endpoint, CancellationToken cancellationToken) { - _responders.Add(new Tuple>(message, isSubstring, responder)); - } - - /// - /// Raises the event. - /// - private async void OnMessageReceived(GenericEventArgs e) - { - var message = e.Argument; - - var encoding = Encoding.UTF8; - var responder = GetResponder(message.Buffer, message.ReceivedBytes, encoding); - - if (responder == null) - { - encoding = Encoding.Unicode; - responder = GetResponder(message.Buffer, message.ReceivedBytes, encoding); - } - - if (responder != null) - { - var cancellationToken = CancellationToken.None; - - try - { - await responder.Item2.Item3(responder.Item1, message.RemoteEndPoint, encoding, cancellationToken).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - - } - catch (Exception ex) - { - _logger.LogError(ex, "Error in OnMessageReceived"); - } - } - } - - private Tuple>> GetResponder(byte[] buffer, int bytesReceived, Encoding encoding) - { - var text = encoding.GetString(buffer, 0, bytesReceived); - var responder = _responders.FirstOrDefault(i => - { - if (i.Item2) - { - return text.IndexOf(i.Item1, StringComparison.OrdinalIgnoreCase) != -1; - } - return string.Equals(i.Item1, text, StringComparison.OrdinalIgnoreCase); - }); - - if (responder == null) - { - return null; - } - return new Tuple>>(text, responder); - } - - private async Task RespondToV2Message(string messageText, IPEndPoint endpoint, Encoding encoding, CancellationToken cancellationToken) - { - var parts = messageText.Split('|'); - var localUrl = await _appHost.GetLocalApiUrl(cancellationToken).ConfigureAwait(false); if (!string.IsNullOrEmpty(localUrl)) @@ -118,8 +53,16 @@ namespace Emby.Server.Implementations.Udp Name = _appHost.FriendlyName }; - await SendAsync(encoding.GetBytes(_json.SerializeToString(response)), endpoint, cancellationToken).ConfigureAwait(false); + try + { + await _udpSocket.SendToAsync(JsonSerializer.SerializeToUtf8Bytes(response), SocketFlags.None, endpoint).ConfigureAwait(false); + } + catch (SocketException ex) + { + _logger.LogError(ex, "Error sending response message"); + } + var parts = messageText.Split('|'); if (parts.Length > 1) { _appHost.EnableLoopback(parts[1]); @@ -131,162 +74,60 @@ namespace Emby.Server.Implementations.Udp } } - /// - /// The _udp client - /// - private ISocket _udpClient; - private readonly ISocketFactory _socketFactory; - /// /// Starts the specified port. /// /// The port. - public void Start(int port) + /// + public void Start(int port, CancellationToken cancellationToken) { - _udpClient = _socketFactory.CreateUdpSocket(port); + _endpoint = new IPEndPoint(IPAddress.Any, port); - Task.Run(() => BeginReceive()); + _udpSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); + _udpSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); + _udpSocket.Bind(_endpoint); + + _ = Task.Run(async () => await BeginReceiveAsync(cancellationToken).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); } - private readonly byte[] _receiveBuffer = new byte[8192]; - - private void BeginReceive() + private async Task BeginReceiveAsync(CancellationToken cancellationToken) { - if (_isDisposed) + while (!cancellationToken.IsCancellationRequested) { - return; - } - - try - { - var result = _udpClient.BeginReceive(_receiveBuffer, 0, _receiveBuffer.Length, OnReceiveResult); - - if (result.CompletedSynchronously) + try { - OnReceiveResult(result); + var result = await _udpSocket.ReceiveFromAsync(_receiveBuffer, SocketFlags.None, _endpoint).ConfigureAwait(false); + + cancellationToken.ThrowIfCancellationRequested(); + + var text = Encoding.UTF8.GetString(_receiveBuffer, 0, result.ReceivedBytes); + if (text.Contains("who is JellyfinServer?", StringComparison.OrdinalIgnoreCase)) + { + await RespondToV2Message(text, result.RemoteEndPoint, cancellationToken).ConfigureAwait(false); + } + } + catch (SocketException ex) + { + _logger.LogError(ex, "Failed to receive data drom socket"); + } + catch (OperationCanceledException) + { + // Don't throw } } - catch (ObjectDisposedException) - { - //TODO Investigate and properly fix. - } - catch (Exception ex) - { - _logger.LogError(ex, "Error receiving udp message"); - } } - private void OnReceiveResult(IAsyncResult result) - { - if (_isDisposed) - { - return; - } - - try - { - var socketResult = _udpClient.EndReceive(result); - - OnMessageReceived(socketResult); - } - catch (ObjectDisposedException) - { - //TODO Investigate and properly fix. - } - catch (Exception ex) - { - _logger.LogError(ex, "Error receiving udp message"); - } - - BeginReceive(); - } - - /// - /// Called when [message received]. - /// - /// The message. - private void OnMessageReceived(SocketReceiveResult message) - { - if (_isDisposed) - { - return; - } - - if (message.RemoteEndPoint.Port == 0) - { - return; - } - - try - { - OnMessageReceived(new GenericEventArgs - { - Argument = message - }); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error handling UDP message"); - } - } - - /// - /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. - /// + /// public void Dispose() { - Dispose(true); - } - - /// - /// Releases unmanaged and - optionally - managed resources. - /// - /// true to release both managed and unmanaged resources; false to release only unmanaged resources. - protected virtual void Dispose(bool dispose) - { - if (dispose) + if (_disposed) { - _isDisposed = true; - - if (_udpClient != null) - { - _udpClient.Dispose(); - } - } - } - - public async Task SendAsync(byte[] bytes, IPEndPoint remoteEndPoint, CancellationToken cancellationToken) - { - if (_isDisposed) - { - throw new ObjectDisposedException(GetType().Name); + return; } - if (bytes == null) - { - throw new ArgumentNullException(nameof(bytes)); - } + _udpSocket?.Dispose(); - if (remoteEndPoint == null) - { - throw new ArgumentNullException(nameof(remoteEndPoint)); - } - - try - { - await _udpClient.SendToAsync(bytes, 0, bytes.Length, remoteEndPoint, cancellationToken).ConfigureAwait(false); - - _logger.LogInformation("Udp message sent to {remoteEndPoint}", remoteEndPoint); - } - catch (OperationCanceledException) - { - - } - catch (Exception ex) - { - _logger.LogError(ex, "Error sending message to {remoteEndPoint}", remoteEndPoint); - } + GC.SuppressFinalize(this); } } - } diff --git a/MediaBrowser.Model/Net/ISocket.cs b/MediaBrowser.Model/Net/ISocket.cs index f80de5524c..3fdc40bbe2 100644 --- a/MediaBrowser.Model/Net/ISocket.cs +++ b/MediaBrowser.Model/Net/ISocket.cs @@ -14,8 +14,6 @@ namespace MediaBrowser.Model.Net Task ReceiveAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken); - int Receive(byte[] buffer, int offset, int count); - IAsyncResult BeginReceive(byte[] buffer, int offset, int count, AsyncCallback callback); SocketReceiveResult EndReceive(IAsyncResult result); diff --git a/MediaBrowser.Model/Net/ISocketFactory.cs b/MediaBrowser.Model/Net/ISocketFactory.cs index 2f857f1af0..dc69b1fb25 100644 --- a/MediaBrowser.Model/Net/ISocketFactory.cs +++ b/MediaBrowser.Model/Net/ISocketFactory.cs @@ -8,13 +8,6 @@ namespace MediaBrowser.Model.Net /// public interface ISocketFactory { - /// - /// Creates a new unicast socket using the specified local port number. - /// - /// The local port to bind to. - /// A implementation. - ISocket CreateUdpSocket(int localPort); - ISocket CreateUdpBroadcastSocket(int localPort); /// @@ -30,7 +23,5 @@ namespace MediaBrowser.Model.Net /// The local port to bind to. /// A implementation. ISocket CreateUdpMulticastSocket(string ipAddress, int multicastTimeToLive, int localPort); - - Stream CreateNetworkStream(ISocket socket, bool ownsSocket); } }