Kill ProgressiveFileCopier and seek to end for ongoing livetv

This commit is contained in:
cvium 2021-09-10 09:29:14 +02:00
parent b96dbbf553
commit 1603d1928e
15 changed files with 137 additions and 387 deletions

View File

@ -4,6 +4,7 @@
using System; using System;
using System.Globalization; using System.Globalization;
using System.IO;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MediaBrowser.Controller.Library; using MediaBrowser.Controller.Library;
@ -41,6 +42,11 @@ namespace Emby.Server.Implementations.Library
return _closeFn(); return _closeFn();
} }
public Stream GetStream(bool seekNearEnd)
{
throw new NotSupportedException();
}
public Task Open(CancellationToken openCancellationToken) public Task Open(CancellationToken openCancellationToken)
{ {
return Task.CompletedTask; return Task.CompletedTask;

View File

@ -602,7 +602,7 @@ namespace Emby.Server.Implementations.Library
public async Task<MediaSourceInfo> GetLiveStreamMediaInfo(string id, CancellationToken cancellationToken) public async Task<MediaSourceInfo> GetLiveStreamMediaInfo(string id, CancellationToken cancellationToken)
{ {
var liveStreamInfo = await GetLiveStreamInfo(id, cancellationToken).ConfigureAwait(false); var liveStreamInfo = GetLiveStreamInfo(id);
var mediaSource = liveStreamInfo.MediaSource; var mediaSource = liveStreamInfo.MediaSource;
@ -771,18 +771,18 @@ namespace Emby.Server.Implementations.Library
mediaSource.InferTotalBitrate(true); mediaSource.InferTotalBitrate(true);
} }
public async Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> GetLiveStreamWithDirectStreamProvider(string id, CancellationToken cancellationToken) public Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> GetLiveStreamWithDirectStreamProvider(string id, CancellationToken cancellationToken)
{ {
if (string.IsNullOrEmpty(id)) if (string.IsNullOrEmpty(id))
{ {
throw new ArgumentNullException(nameof(id)); throw new ArgumentNullException(nameof(id));
} }
var info = await GetLiveStreamInfo(id, cancellationToken).ConfigureAwait(false); var info = GetLiveStreamInfo(id);
return new Tuple<MediaSourceInfo, IDirectStreamProvider>(info.MediaSource, info as IDirectStreamProvider); return Task.FromResult(new Tuple<MediaSourceInfo, IDirectStreamProvider>(info.MediaSource, info as IDirectStreamProvider));
} }
private Task<ILiveStream> GetLiveStreamInfo(string id, CancellationToken cancellationToken) public ILiveStream GetLiveStreamInfo(string id)
{ {
if (string.IsNullOrEmpty(id)) if (string.IsNullOrEmpty(id))
{ {
@ -791,12 +791,10 @@ namespace Emby.Server.Implementations.Library
if (_openStreams.TryGetValue(id, out ILiveStream info)) if (_openStreams.TryGetValue(id, out ILiveStream info))
{ {
return Task.FromResult(info); return info;
}
else
{
return Task.FromException<ILiveStream>(new ResourceNotFoundException());
} }
throw new ResourceNotFoundException();
} }
public async Task<MediaSourceInfo> GetLiveStream(string id, CancellationToken cancellationToken) public async Task<MediaSourceInfo> GetLiveStream(string id, CancellationToken cancellationToken)

View File

@ -5,6 +5,7 @@ using System.IO;
using System.Net.Http; using System.Net.Http;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Jellyfin.Api.Helpers;
using MediaBrowser.Common.Net; using MediaBrowser.Common.Net;
using MediaBrowser.Controller.Library; using MediaBrowser.Controller.Library;
using MediaBrowser.Model.Dto; using MediaBrowser.Model.Dto;
@ -50,16 +51,23 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
{ {
onStarted(); onStarted();
_logger.LogInformation("Copying recording stream to file {0}", targetFile); _logger.LogInformation("Copying recording to file {FilePath}", targetFile);
// The media source is infinite so we need to handle stopping ourselves // The media source is infinite so we need to handle stopping ourselves
using var durationToken = new CancellationTokenSource(duration); using var durationToken = new CancellationTokenSource(duration);
using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, durationToken.Token); using var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, durationToken.Token);
var linkedCancellationToken = cancellationTokenSource.Token;
await directStreamProvider.CopyToAsync(output, cancellationTokenSource.Token).ConfigureAwait(false); await using var fileStream = new ProgressiveFileStream(directStreamProvider.GetStream(), null, null);
await _streamHelper.CopyToAsync(
fileStream,
output,
IODefaults.CopyToBufferSize,
1000,
linkedCancellationToken).ConfigureAwait(false);
} }
_logger.LogInformation("Recording completed to file {0}", targetFile); _logger.LogInformation("Recording completed: {FilePath}", targetFile);
} }
private async Task RecordFromMediaSource(MediaSourceInfo mediaSource, string targetFile, TimeSpan duration, Action onStarted, CancellationToken cancellationToken) private async Task RecordFromMediaSource(MediaSourceInfo mediaSource, string targetFile, TimeSpan duration, Action onStarted, CancellationToken cancellationToken)

View File

@ -156,11 +156,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
await taskCompletionSource.Task.ConfigureAwait(false); await taskCompletionSource.Task.ConfigureAwait(false);
} }
public string GetFilePath()
{
return TempFilePath;
}
private async Task StartStreaming(UdpClient udpClient, HdHomerunManager hdHomerunManager, IPAddress remoteAddress, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken) private async Task StartStreaming(UdpClient udpClient, HdHomerunManager hdHomerunManager, IPAddress remoteAddress, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
{ {
using (udpClient) using (udpClient)
@ -184,7 +179,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
EnableStreamSharing = false; EnableStreamSharing = false;
} }
await DeleteTempFiles(new List<string> { TempFilePath }).ConfigureAwait(false); await DeleteTempFiles(TempFilePath).ConfigureAwait(false);
} }
private async Task CopyTo(UdpClient udpClient, string file, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken) private async Task CopyTo(UdpClient udpClient, string file, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)

View File

@ -3,10 +3,8 @@
#pragma warning disable CS1591 #pragma warning disable CS1591
using System; using System;
using System.Collections.Generic;
using System.Globalization; using System.Globalization;
using System.IO; using System.IO;
using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MediaBrowser.Common.Configuration; using MediaBrowser.Common.Configuration;
@ -97,6 +95,18 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
return Task.CompletedTask; return Task.CompletedTask;
} }
public Stream GetStream(bool seekNearEnd = true)
{
var stream = GetInputStream(TempFilePath, AsyncFile.UseAsyncIO);
bool seekFile = (DateTime.UtcNow - DateOpened).TotalSeconds > 10;
if (seekFile)
{
TrySeek(stream, -20000);
}
return stream;
}
protected FileStream GetInputStream(string path, bool allowAsyncFileRead) protected FileStream GetInputStream(string path, bool allowAsyncFileRead)
=> new FileStream( => new FileStream(
path, path,
@ -106,25 +116,11 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
IODefaults.FileStreamBufferSize, IODefaults.FileStreamBufferSize,
allowAsyncFileRead ? FileOptions.SequentialScan | FileOptions.Asynchronous : FileOptions.SequentialScan); allowAsyncFileRead ? FileOptions.SequentialScan | FileOptions.Asynchronous : FileOptions.SequentialScan);
public Task DeleteTempFiles() protected async Task DeleteTempFiles(string path, int retryCount = 0)
{
return DeleteTempFiles(GetStreamFilePaths());
}
protected async Task DeleteTempFiles(IEnumerable<string> paths, int retryCount = 0)
{ {
if (retryCount == 0) if (retryCount == 0)
{ {
Logger.LogInformation("Deleting temp files {0}", paths); Logger.LogInformation("Deleting temp file {FilePath}", path);
}
var failedFiles = new List<string>();
foreach (var path in paths)
{
if (!File.Exists(path))
{
continue;
} }
try try
@ -133,85 +129,16 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
} }
catch (Exception ex) catch (Exception ex)
{ {
Logger.LogError(ex, "Error deleting file {path}", path); Logger.LogError(ex, "Error deleting file {FilePath}", path);
failedFiles.Add(path); if (retryCount <= 40)
}
}
if (failedFiles.Count > 0 && retryCount <= 40)
{ {
await Task.Delay(500).ConfigureAwait(false); await Task.Delay(500).ConfigureAwait(false);
await DeleteTempFiles(failedFiles, retryCount + 1).ConfigureAwait(false); await DeleteTempFiles(path, retryCount + 1).ConfigureAwait(false);
}
} }
} }
protected virtual List<string> GetStreamFilePaths() private void TrySeek(Stream stream, long offset)
{
return new List<string> { TempFilePath };
}
public async Task CopyToAsync(Stream stream, CancellationToken cancellationToken)
{
using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, LiveStreamCancellationTokenSource.Token);
cancellationToken = linkedCancellationTokenSource.Token;
bool seekFile = (DateTime.UtcNow - DateOpened).TotalSeconds > 10;
var nextFileInfo = GetNextFile(null);
var nextFile = nextFileInfo.file;
var isLastFile = nextFileInfo.isLastFile;
var allowAsync = AsyncFile.UseAsyncIO;
while (!string.IsNullOrEmpty(nextFile))
{
var emptyReadLimit = isLastFile ? EmptyReadLimit : 1;
await CopyFile(nextFile, seekFile, emptyReadLimit, allowAsync, stream, cancellationToken).ConfigureAwait(false);
seekFile = false;
nextFileInfo = GetNextFile(nextFile);
nextFile = nextFileInfo.file;
isLastFile = nextFileInfo.isLastFile;
}
Logger.LogInformation("Live Stream ended.");
}
private (string file, bool isLastFile) GetNextFile(string currentFile)
{
var files = GetStreamFilePaths();
if (string.IsNullOrEmpty(currentFile))
{
return (files[^1], true);
}
var nextIndex = files.FindIndex(i => string.Equals(i, currentFile, StringComparison.OrdinalIgnoreCase)) + 1;
var isLastFile = nextIndex == files.Count - 1;
return (files.ElementAtOrDefault(nextIndex), isLastFile);
}
private async Task CopyFile(string path, bool seekFile, int emptyReadLimit, bool allowAsync, Stream stream, CancellationToken cancellationToken)
{
using (var inputStream = GetInputStream(path, allowAsync))
{
if (seekFile)
{
TrySeek(inputStream, -20000);
}
await StreamHelper.CopyToAsync(
inputStream,
stream,
IODefaults.CopyToBufferSize,
emptyReadLimit,
cancellationToken).ConfigureAwait(false);
}
}
private void TrySeek(FileStream stream, long offset)
{ {
if (!stream.CanSeek) if (!stream.CanSeek)
{ {

View File

@ -3,7 +3,6 @@
#pragma warning disable CS1591 #pragma warning disable CS1591
using System; using System;
using System.Collections.Generic;
using System.Globalization; using System.Globalization;
using System.IO; using System.IO;
using System.Net.Http; using System.Net.Http;
@ -55,39 +54,26 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
Directory.CreateDirectory(Path.GetDirectoryName(TempFilePath)); Directory.CreateDirectory(Path.GetDirectoryName(TempFilePath));
var typeName = GetType().Name; var typeName = GetType().Name;
Logger.LogInformation("Opening " + typeName + " Live stream from {0}", url); Logger.LogInformation("Opening {StreamType} Live stream from {Url}", typeName, url);
// Response stream is disposed manually. // Response stream is disposed manually.
var response = await _httpClientFactory.CreateClient(NamedClient.Default) var response = await _httpClientFactory.CreateClient(NamedClient.Default)
.GetAsync(url, HttpCompletionOption.ResponseHeadersRead, CancellationToken.None) .GetAsync(url, HttpCompletionOption.ResponseHeadersRead, CancellationToken.None)
.ConfigureAwait(false); .ConfigureAwait(false);
var extension = "ts";
var requiresRemux = false;
var contentType = response.Content.Headers.ContentType?.ToString() ?? string.Empty; var contentType = response.Content.Headers.ContentType?.ToString() ?? string.Empty;
if (contentType.IndexOf("matroska", StringComparison.OrdinalIgnoreCase) != -1) if (contentType.Contains("matroska", StringComparison.OrdinalIgnoreCase)
|| contentType.Contains("mp4", StringComparison.OrdinalIgnoreCase)
|| contentType.Contains("dash", StringComparison.OrdinalIgnoreCase)
|| contentType.Contains("mpegURL", StringComparison.OrdinalIgnoreCase)
|| contentType.Contains("text/", StringComparison.OrdinalIgnoreCase))
{ {
requiresRemux = true;
}
else if (contentType.IndexOf("mp4", StringComparison.OrdinalIgnoreCase) != -1 ||
contentType.IndexOf("dash", StringComparison.OrdinalIgnoreCase) != -1 ||
contentType.IndexOf("mpegURL", StringComparison.OrdinalIgnoreCase) != -1 ||
contentType.IndexOf("text/", StringComparison.OrdinalIgnoreCase) != -1)
{
requiresRemux = true;
}
// Close the stream without any sharing features // Close the stream without any sharing features
if (requiresRemux) response.Dispose();
{
using (response)
{
return; return;
} }
}
SetTempFilePath(extension); SetTempFilePath("ts");
var taskCompletionSource = new TaskCompletionSource<bool>(); var taskCompletionSource = new TaskCompletionSource<bool>();
@ -117,16 +103,11 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
if (!taskCompletionSource.Task.Result) if (!taskCompletionSource.Task.Result)
{ {
Logger.LogWarning("Zero bytes copied from stream {0} to {1} but no exception raised", GetType().Name, TempFilePath); Logger.LogWarning("Zero bytes copied from stream {StreamType} to {FilePath} but no exception raised", GetType().Name, TempFilePath);
throw new EndOfStreamException(string.Format(CultureInfo.InvariantCulture, "Zero bytes copied from stream {0}", GetType().Name)); throw new EndOfStreamException(string.Format(CultureInfo.InvariantCulture, "Zero bytes copied from stream {0}", GetType().Name));
} }
} }
public string GetFilePath()
{
return TempFilePath;
}
private Task StartStreaming(HttpResponseMessage response, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken) private Task StartStreaming(HttpResponseMessage response, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
{ {
return Task.Run( return Task.Run(
@ -134,7 +115,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
{ {
try try
{ {
Logger.LogInformation("Beginning {0} stream to {1}", GetType().Name, TempFilePath); Logger.LogInformation("Beginning {StreamType} stream to {FilePath}", GetType().Name, TempFilePath);
using var message = response; using var message = response;
await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
await using var fileStream = new FileStream(TempFilePath, FileMode.Create, FileAccess.Write, FileShare.Read, IODefaults.FileStreamBufferSize, AsyncFile.UseAsyncIO); await using var fileStream = new FileStream(TempFilePath, FileMode.Create, FileAccess.Write, FileShare.Read, IODefaults.FileStreamBufferSize, AsyncFile.UseAsyncIO);
@ -147,19 +128,19 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
} }
catch (OperationCanceledException ex) catch (OperationCanceledException ex)
{ {
Logger.LogInformation("Copying of {0} to {1} was canceled", GetType().Name, TempFilePath); Logger.LogInformation("Copying of {StreamType} to {FilePath} was canceled", GetType().Name, TempFilePath);
openTaskCompletionSource.TrySetException(ex); openTaskCompletionSource.TrySetException(ex);
} }
catch (Exception ex) catch (Exception ex)
{ {
Logger.LogError(ex, "Error copying live stream {0} to {1}.", GetType().Name, TempFilePath); Logger.LogError(ex, "Error copying live stream {StreamType} to {FilePath}", GetType().Name, TempFilePath);
openTaskCompletionSource.TrySetException(ex); openTaskCompletionSource.TrySetException(ex);
} }
openTaskCompletionSource.TrySetResult(false); openTaskCompletionSource.TrySetResult(false);
EnableStreamSharing = false; EnableStreamSharing = false;
await DeleteTempFiles(new List<string> { TempFilePath }).ConfigureAwait(false); await DeleteTempFiles(TempFilePath).ConfigureAwait(false);
}, },
CancellationToken.None); CancellationToken.None);
} }

View File

@ -1199,15 +1199,15 @@ namespace Jellyfin.Api.Controllers
[ProducesResponseType(StatusCodes.Status200OK)] [ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(StatusCodes.Status404NotFound)] [ProducesResponseType(StatusCodes.Status404NotFound)]
[ProducesVideoFile] [ProducesVideoFile]
public async Task<ActionResult> GetLiveStreamFile([FromRoute, Required] string streamId, [FromRoute, Required] string container) public ActionResult GetLiveStreamFile([FromRoute, Required] string streamId, [FromRoute, Required] string container)
{ {
var liveStreamInfo = await _mediaSourceManager.GetDirectStreamProviderByUniqueId(streamId, CancellationToken.None).ConfigureAwait(false); var liveStreamInfo = _mediaSourceManager.GetLiveStreamInfo(streamId);
if (liveStreamInfo == null) if (liveStreamInfo == null)
{ {
return NotFound(); return NotFound();
} }
var liveStream = new ProgressiveFileStream(liveStreamInfo.GetFilePath(), null, _transcodingJobHelper); var liveStream = new ProgressiveFileStream(liveStreamInfo.GetStream(), null, _transcodingJobHelper);
return new FileStreamResult(liveStream, MimeTypes.GetMimeType("file." + container)); return new FileStreamResult(liveStream, MimeTypes.GetMimeType("file." + container));
} }

View File

@ -453,14 +453,10 @@ namespace Jellyfin.Api.Controllers
{ {
StreamingHelpers.AddDlnaHeaders(state, Response.Headers, true, startTimeTicks, Request, _dlnaManager); StreamingHelpers.AddDlnaHeaders(state, Response.Headers, true, startTimeTicks, Request, _dlnaManager);
await new ProgressiveFileCopier(state.DirectStreamProvider, null, _transcodingJobHelper, CancellationToken.None) var liveStreamInfo = _mediaSourceManager.GetLiveStreamInfo(streamingRequest.LiveStreamId);
{ var liveStream = new ProgressiveFileStream(liveStreamInfo.GetStream(), null, _transcodingJobHelper);
AllowEndOfFile = false
}.WriteToAsync(Response.Body, CancellationToken.None)
.ConfigureAwait(false);
// TODO (moved from MediaBrowser.Api): Don't hardcode contentType // TODO (moved from MediaBrowser.Api): Don't hardcode contentType
return File(Response.Body, MimeTypes.GetMimeType("file.ts")!); return File(liveStream, MimeTypes.GetMimeType("file.ts")!);
} }
// Static remote stream // Static remote stream
@ -492,13 +488,8 @@ namespace Jellyfin.Api.Controllers
if (state.MediaSource.IsInfiniteStream) if (state.MediaSource.IsInfiniteStream)
{ {
await new ProgressiveFileCopier(state.MediaPath, null, _transcodingJobHelper, CancellationToken.None) var liveStream = new ProgressiveFileStream(state.MediaPath, null, _transcodingJobHelper);
{ return File(liveStream, contentType);
AllowEndOfFile = false
}.WriteToAsync(Response.Body, CancellationToken.None)
.ConfigureAwait(false);
return File(Response.Body, contentType);
} }
return FileStreamResponseHelpers.GetStaticFileResult( return FileStreamResponseHelpers.GetStaticFileResult(

View File

@ -120,14 +120,10 @@ namespace Jellyfin.Api.Helpers
{ {
StreamingHelpers.AddDlnaHeaders(state, _httpContextAccessor.HttpContext.Response.Headers, true, streamingRequest.StartTimeTicks, _httpContextAccessor.HttpContext.Request, _dlnaManager); StreamingHelpers.AddDlnaHeaders(state, _httpContextAccessor.HttpContext.Response.Headers, true, streamingRequest.StartTimeTicks, _httpContextAccessor.HttpContext.Request, _dlnaManager);
await new ProgressiveFileCopier(state.DirectStreamProvider, null, _transcodingJobHelper, CancellationToken.None) var liveStreamInfo = _mediaSourceManager.GetLiveStreamInfo(streamingRequest.LiveStreamId);
{ var liveStream = new ProgressiveFileStream(liveStreamInfo.GetStream(), null, _transcodingJobHelper);
AllowEndOfFile = false
}.WriteToAsync(_httpContextAccessor.HttpContext.Response.Body, CancellationToken.None)
.ConfigureAwait(false);
// TODO (moved from MediaBrowser.Api): Don't hardcode contentType // TODO (moved from MediaBrowser.Api): Don't hardcode contentType
return new FileStreamResult(_httpContextAccessor.HttpContext.Response.Body, MimeTypes.GetMimeType("file.ts")!); return new FileStreamResult(liveStream, MimeTypes.GetMimeType("file.ts"));
} }
// Static remote stream // Static remote stream
@ -159,13 +155,8 @@ namespace Jellyfin.Api.Helpers
if (state.MediaSource.IsInfiniteStream) if (state.MediaSource.IsInfiniteStream)
{ {
await new ProgressiveFileCopier(state.MediaPath, null, _transcodingJobHelper, CancellationToken.None) var stream = new ProgressiveFileStream(state.MediaPath, null, _transcodingJobHelper);
{ return new FileStreamResult(stream, contentType);
AllowEndOfFile = false
}.WriteToAsync(_httpContextAccessor.HttpContext.Response.Body, CancellationToken.None)
.ConfigureAwait(false);
return new FileStreamResult(_httpContextAccessor.HttpContext.Response.Body, contentType);
} }
return FileStreamResponseHelpers.GetStaticFileResult( return FileStreamResponseHelpers.GetStaticFileResult(

View File

@ -1,187 +0,0 @@
using System;
using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Jellyfin.Api.Models.PlaybackDtos;
using MediaBrowser.Common.Extensions;
using MediaBrowser.Controller.Library;
using MediaBrowser.Model.IO;
namespace Jellyfin.Api.Helpers
{
/// <summary>
/// Progressive file copier.
/// </summary>
public class ProgressiveFileCopier
{
private readonly TranscodingJobDto? _job;
private readonly string? _path;
private readonly CancellationToken _cancellationToken;
private readonly IDirectStreamProvider? _directStreamProvider;
private readonly TranscodingJobHelper _transcodingJobHelper;
private long _bytesWritten;
/// <summary>
/// Initializes a new instance of the <see cref="ProgressiveFileCopier"/> class.
/// </summary>
/// <param name="path">The path to copy from.</param>
/// <param name="job">The transcoding job.</param>
/// <param name="transcodingJobHelper">Instance of the <see cref="TranscodingJobHelper"/>.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public ProgressiveFileCopier(string path, TranscodingJobDto? job, TranscodingJobHelper transcodingJobHelper, CancellationToken cancellationToken)
{
_path = path;
_job = job;
_cancellationToken = cancellationToken;
_transcodingJobHelper = transcodingJobHelper;
}
/// <summary>
/// Initializes a new instance of the <see cref="ProgressiveFileCopier"/> class.
/// </summary>
/// <param name="directStreamProvider">Instance of the <see cref="IDirectStreamProvider"/> interface.</param>
/// <param name="job">The transcoding job.</param>
/// <param name="transcodingJobHelper">Instance of the <see cref="TranscodingJobHelper"/>.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public ProgressiveFileCopier(IDirectStreamProvider directStreamProvider, TranscodingJobDto? job, TranscodingJobHelper transcodingJobHelper, CancellationToken cancellationToken)
{
_directStreamProvider = directStreamProvider;
_job = job;
_cancellationToken = cancellationToken;
_transcodingJobHelper = transcodingJobHelper;
}
/// <summary>
/// Gets or sets a value indicating whether allow read end of file.
/// </summary>
public bool AllowEndOfFile { get; set; } = true;
/// <summary>
/// Gets or sets copy start position.
/// </summary>
public long StartPosition { get; set; }
/// <summary>
/// Write source stream to output.
/// </summary>
/// <param name="outputStream">Output stream.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A <see cref="Task"/>.</returns>
public async Task WriteToAsync(Stream outputStream, CancellationToken cancellationToken)
{
using var linkedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationToken);
cancellationToken = linkedCancellationTokenSource.Token;
try
{
if (_directStreamProvider != null)
{
await _directStreamProvider.CopyToAsync(outputStream, cancellationToken).ConfigureAwait(false);
return;
}
var fileOptions = FileOptions.SequentialScan;
var allowAsyncFileRead = false;
if (AsyncFile.UseAsyncIO)
{
fileOptions |= FileOptions.Asynchronous;
allowAsyncFileRead = true;
}
if (_path == null)
{
throw new ResourceNotFoundException(nameof(_path));
}
await using var inputStream = new FileStream(_path, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, IODefaults.FileStreamBufferSize, fileOptions);
var eofCount = 0;
const int EmptyReadLimit = 20;
if (StartPosition > 0)
{
inputStream.Position = StartPosition;
}
while (eofCount < EmptyReadLimit || !AllowEndOfFile)
{
var bytesRead = await CopyToInternalAsync(inputStream, outputStream, allowAsyncFileRead, cancellationToken).ConfigureAwait(false);
if (bytesRead == 0)
{
if (_job == null || _job.HasExited)
{
eofCount++;
}
await Task.Delay(100, cancellationToken).ConfigureAwait(false);
}
else
{
eofCount = 0;
}
}
}
finally
{
if (_job != null)
{
_transcodingJobHelper.OnTranscodeEndRequest(_job);
}
}
}
private async Task<int> CopyToInternalAsync(Stream source, Stream destination, bool readAsync, CancellationToken cancellationToken)
{
var array = ArrayPool<byte>.Shared.Rent(IODefaults.CopyToBufferSize);
try
{
int bytesRead;
int totalBytesRead = 0;
if (readAsync)
{
bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false);
}
else
{
bytesRead = source.Read(array, 0, array.Length);
}
while (bytesRead != 0)
{
var bytesToWrite = bytesRead;
if (bytesToWrite > 0)
{
await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToWrite), cancellationToken).ConfigureAwait(false);
_bytesWritten += bytesRead;
totalBytesRead += bytesRead;
if (_job != null)
{
_job.BytesDownloaded = Math.Max(_job.BytesDownloaded ?? _bytesWritten, _bytesWritten);
}
}
if (readAsync)
{
bytesRead = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false);
}
else
{
bytesRead = source.Read(array, 0, array.Length);
}
}
return totalBytesRead;
}
finally
{
ArrayPool<byte>.Shared.Return(array);
}
}
}
}

View File

@ -13,9 +13,9 @@ namespace Jellyfin.Api.Helpers
/// </summary> /// </summary>
public class ProgressiveFileStream : Stream public class ProgressiveFileStream : Stream
{ {
private readonly FileStream _fileStream; private readonly Stream _stream;
private readonly TranscodingJobDto? _job; private readonly TranscodingJobDto? _job;
private readonly TranscodingJobHelper _transcodingJobHelper; private readonly TranscodingJobHelper? _transcodingJobHelper;
private readonly int _timeoutMs; private readonly int _timeoutMs;
private readonly bool _allowAsyncFileRead; private readonly bool _allowAsyncFileRead;
private int _bytesWritten; private int _bytesWritten;
@ -33,7 +33,6 @@ namespace Jellyfin.Api.Helpers
_job = job; _job = job;
_transcodingJobHelper = transcodingJobHelper; _transcodingJobHelper = transcodingJobHelper;
_timeoutMs = timeoutMs; _timeoutMs = timeoutMs;
_bytesWritten = 0;
var fileOptions = FileOptions.SequentialScan; var fileOptions = FileOptions.SequentialScan;
_allowAsyncFileRead = false; _allowAsyncFileRead = false;
@ -45,11 +44,27 @@ namespace Jellyfin.Api.Helpers
_allowAsyncFileRead = true; _allowAsyncFileRead = true;
} }
_fileStream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, IODefaults.FileStreamBufferSize, fileOptions); _stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, IODefaults.FileStreamBufferSize, fileOptions);
}
/// <summary>
/// Initializes a new instance of the <see cref="ProgressiveFileStream"/> class.
/// </summary>
/// <param name="stream">The stream to progressively copy.</param>
/// <param name="job">The transcoding job information.</param>
/// <param name="transcodingJobHelper">The transcoding job helper.</param>
/// <param name="timeoutMs">The timeout duration in milliseconds.</param>
public ProgressiveFileStream(Stream stream, TranscodingJobDto? job, TranscodingJobHelper? transcodingJobHelper, int timeoutMs = 30000)
{
_job = job;
_transcodingJobHelper = transcodingJobHelper;
_timeoutMs = timeoutMs;
_allowAsyncFileRead = AsyncFile.UseAsyncIO;
_stream = stream;
} }
/// <inheritdoc /> /// <inheritdoc />
public override bool CanRead => _fileStream.CanRead; public override bool CanRead => _stream.CanRead;
/// <inheritdoc /> /// <inheritdoc />
public override bool CanSeek => false; public override bool CanSeek => false;
@ -70,13 +85,13 @@ namespace Jellyfin.Api.Helpers
/// <inheritdoc /> /// <inheritdoc />
public override void Flush() public override void Flush()
{ {
_fileStream.Flush(); _stream.Flush();
} }
/// <inheritdoc /> /// <inheritdoc />
public override int Read(byte[] buffer, int offset, int count) public override int Read(byte[] buffer, int offset, int count)
{ {
return _fileStream.Read(buffer, offset, count); return _stream.Read(buffer, offset, count);
} }
/// <inheritdoc /> /// <inheritdoc />
@ -93,11 +108,11 @@ namespace Jellyfin.Api.Helpers
int bytesRead; int bytesRead;
if (_allowAsyncFileRead) if (_allowAsyncFileRead)
{ {
bytesRead = await _fileStream.ReadAsync(buffer, newOffset, remainingBytesToRead, cancellationToken).ConfigureAwait(false); bytesRead = await _stream.ReadAsync(buffer, newOffset, remainingBytesToRead, cancellationToken).ConfigureAwait(false);
} }
else else
{ {
bytesRead = _fileStream.Read(buffer, newOffset, remainingBytesToRead); bytesRead = _stream.Read(buffer, newOffset, remainingBytesToRead);
} }
remainingBytesToRead -= bytesRead; remainingBytesToRead -= bytesRead;
@ -152,11 +167,11 @@ namespace Jellyfin.Api.Helpers
{ {
if (disposing) if (disposing)
{ {
_fileStream.Dispose(); _stream.Dispose();
if (_job != null) if (_job != null)
{ {
_transcodingJobHelper.OnTranscodeEndRequest(_job); _transcodingJobHelper?.OnTranscodeEndRequest(_job);
} }
} }
} }

View File

@ -60,6 +60,9 @@ namespace Jellyfin.Api.Models.StreamingDtos
/// <summary> /// <summary>
/// Gets or sets the direct stream provicer. /// Gets or sets the direct stream provicer.
/// </summary> /// </summary>
/// <remarks>
/// Deprecated.
/// </remarks>
public IDirectStreamProvider? DirectStreamProvider { get; set; } public IDirectStreamProvider? DirectStreamProvider { get; set; }
/// <summary> /// <summary>

View File

@ -0,0 +1,20 @@
using System.IO;
namespace MediaBrowser.Controller.Library
{
/// <summary>
/// The direct live TV stream provider.
/// </summary>
/// <remarks>
/// Deprecated.
/// </remarks>
public interface IDirectStreamProvider
{
/// <summary>
/// Gets the live stream, optionally seeks to the end of the file first.
/// </summary>
/// <param name="seekNearEnd">A value indicating whether to seek to the end of the file.</param>
/// <returns>The stream.</returns>
Stream GetStream(bool seekNearEnd = true);
}
}

View File

@ -2,6 +2,7 @@
#pragma warning disable CA1711, CS1591 #pragma warning disable CA1711, CS1591
using System.IO;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MediaBrowser.Model.Dto; using MediaBrowser.Model.Dto;
@ -25,5 +26,7 @@ namespace MediaBrowser.Controller.Library
Task Open(CancellationToken openCancellationToken); Task Open(CancellationToken openCancellationToken);
Task Close(); Task Close();
Stream GetStream(bool seekNearEnd = true);
} }
} }

View File

@ -4,7 +4,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Jellyfin.Data.Entities; using Jellyfin.Data.Entities;
@ -110,6 +109,13 @@ namespace MediaBrowser.Controller.Library
Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> GetLiveStreamWithDirectStreamProvider(string id, CancellationToken cancellationToken); Task<Tuple<MediaSourceInfo, IDirectStreamProvider>> GetLiveStreamWithDirectStreamProvider(string id, CancellationToken cancellationToken);
/// <summary>
/// Gets the live stream info.
/// </summary>
/// <param name="id">The identifier.</param>
/// <returns>An instance of <see cref="ILiveStream"/>.</returns>
public ILiveStream GetLiveStreamInfo(string id);
/// <summary> /// <summary>
/// Closes the media source. /// Closes the media source.
/// </summary> /// </summary>
@ -129,11 +135,4 @@ namespace MediaBrowser.Controller.Library
Task<IDirectStreamProvider> GetDirectStreamProviderByUniqueId(string uniqueId, CancellationToken cancellationToken); Task<IDirectStreamProvider> GetDirectStreamProviderByUniqueId(string uniqueId, CancellationToken cancellationToken);
} }
public interface IDirectStreamProvider
{
Task CopyToAsync(Stream stream, CancellationToken cancellationToken);
string GetFilePath();
}
} }