Use BlockingCollection

This commit is contained in:
Bond_009 2023-04-14 21:38:12 +02:00
parent 858dadcdd1
commit 33f97045f9
2 changed files with 33 additions and 48 deletions

View File

@ -122,17 +122,16 @@ namespace Emby.Server.Implementations.Data
{ {
WriteConnections = new ConnectionPool(WriteConnectionsCount, CreateWriteConnection); WriteConnections = new ConnectionPool(WriteConnectionsCount, CreateWriteConnection);
ReadConnections = new ConnectionPool(ReadConnectionsCount, CreateReadConnection); ReadConnections = new ConnectionPool(ReadConnectionsCount, CreateReadConnection);
// Configuration and pragmas can affect VACUUM so it needs to be last.
using (var connection = GetConnection(true))
{
connection.Execute("VACUUM");
}
} }
protected ManagedConnection GetConnection(bool readOnly = false) protected ManagedConnection GetConnection(bool readOnly = false)
{ => readOnly ? ReadConnections.GetConnection() : WriteConnections.GetConnection();
if (readOnly)
{
return ReadConnections.GetConnection();
}
return WriteConnections.GetConnection();
}
protected SQLiteDatabaseConnection CreateWriteConnection() protected SQLiteDatabaseConnection CreateWriteConnection()
{ {
@ -173,52 +172,44 @@ namespace Emby.Server.Implementations.Data
writeConnection.Execute("PRAGMA temp_store=" + (int)TempStore); writeConnection.Execute("PRAGMA temp_store=" + (int)TempStore);
// Configuration and pragmas can affect VACUUM so it needs to be last.
writeConnection.Execute("VACUUM");
return writeConnection; return writeConnection;
} }
protected SQLiteDatabaseConnection CreateReadConnection() protected SQLiteDatabaseConnection CreateReadConnection()
{ {
var writeConnection = SQLite3.Open( var connection = SQLite3.Open(
DbFilePath, DbFilePath,
DefaultConnectionFlags | ConnectionFlags.ReadOnly, DefaultConnectionFlags | ConnectionFlags.ReadOnly,
null); null);
if (CacheSize.HasValue) if (CacheSize.HasValue)
{ {
writeConnection.Execute("PRAGMA cache_size=" + CacheSize.Value); connection.Execute("PRAGMA cache_size=" + CacheSize.Value);
} }
if (!string.IsNullOrWhiteSpace(LockingMode)) if (!string.IsNullOrWhiteSpace(LockingMode))
{ {
writeConnection.Execute("PRAGMA locking_mode=" + LockingMode); connection.Execute("PRAGMA locking_mode=" + LockingMode);
} }
if (!string.IsNullOrWhiteSpace(JournalMode)) if (!string.IsNullOrWhiteSpace(JournalMode))
{ {
writeConnection.Execute("PRAGMA journal_mode=" + JournalMode); connection.Execute("PRAGMA journal_mode=" + JournalMode);
} }
if (JournalSizeLimit.HasValue) if (JournalSizeLimit.HasValue)
{ {
writeConnection.Execute("PRAGMA journal_size_limit=" + JournalSizeLimit.Value); connection.Execute("PRAGMA journal_size_limit=" + JournalSizeLimit.Value);
} }
if (Synchronous.HasValue) if (Synchronous.HasValue)
{ {
writeConnection.Execute("PRAGMA synchronous=" + (int)Synchronous.Value); connection.Execute("PRAGMA synchronous=" + (int)Synchronous.Value);
} }
if (PageSize.HasValue) connection.Execute("PRAGMA temp_store=" + (int)TempStore);
{
writeConnection.Execute("PRAGMA page_size=" + PageSize.Value);
}
writeConnection.Execute("PRAGMA temp_store=" + (int)TempStore); return connection;
return writeConnection;
} }
public IStatement PrepareStatement(ManagedConnection connection, string sql) public IStatement PrepareStatement(ManagedConnection connection, string sql)

View File

@ -2,44 +2,47 @@
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Threading;
using SQLitePCL.pretty; using SQLitePCL.pretty;
namespace Emby.Server.Implementations.Data; namespace Emby.Server.Implementations.Data;
public sealed class ConnectionPool : IDisposable public sealed class ConnectionPool : IDisposable
{ {
private readonly int _count; private readonly BlockingCollection<SQLiteDatabaseConnection> _connections = new();
private readonly SemaphoreSlim _lock;
private readonly ConcurrentQueue<SQLiteDatabaseConnection> _connections = new ConcurrentQueue<SQLiteDatabaseConnection>();
private bool _disposed; private bool _disposed;
public ConnectionPool(int count, Func<SQLiteDatabaseConnection> factory) public ConnectionPool(int count, Func<SQLiteDatabaseConnection> factory)
{ {
_count = count;
_lock = new SemaphoreSlim(count, count);
for (int i = 0; i < count; i++) for (int i = 0; i < count; i++)
{ {
_connections.Enqueue(factory.Invoke()); _connections.Add(factory.Invoke());
} }
} }
public ManagedConnection GetConnection() public ManagedConnection GetConnection()
{ {
_lock.Wait(); if (_disposed)
if (!_connections.TryDequeue(out var connection))
{ {
_lock.Release(); ThrowObjectDisposedException();
throw new InvalidOperationException();
} }
return new ManagedConnection(connection, this); return new ManagedConnection(_connections.Take(), this);
void ThrowObjectDisposedException()
{
throw new ObjectDisposedException(GetType().Name);
}
} }
public void Return(SQLiteDatabaseConnection connection) public void Return(SQLiteDatabaseConnection connection)
{ {
_connections.Enqueue(connection); if (_disposed)
_lock.Release(); {
connection.Dispose();
return;
}
_connections.Add(connection);
} }
public void Dispose() public void Dispose()
@ -49,20 +52,11 @@ public sealed class ConnectionPool : IDisposable
return; return;
} }
for (int i = 0; i < _count; i++) foreach (var connection in _connections)
{ {
_lock.Wait();
if (!_connections.TryDequeue(out var connection))
{
_lock.Release();
throw new InvalidOperationException();
}
connection.Dispose(); connection.Dispose();
} }
_lock.Dispose();
_disposed = true; _disposed = true;
} }
} }