refactor: replace mutex with semaphore

This commit is contained in:
Anna 2022-07-18 17:13:14 -04:00
parent 3d1a09937a
commit d7dfa0d495
1 changed files with 19 additions and 11 deletions

View File

@ -42,7 +42,7 @@ internal class Client : IDisposable {
private KeyPair KeyPair { get; } private KeyPair KeyPair { get; }
private readonly Mutex _waitersMutex = new(); private readonly SemaphoreSlim _waitersSemaphore = new(1, 1);
private Dictionary<uint, ChannelWriter<ResponseKind>> Waiters { get; set; } = new(); private Dictionary<uint, ChannelWriter<ResponseKind>> Waiters { get; set; } = new();
private Channel<(RequestContainer, ChannelWriter<ChannelReader<ResponseKind>>?)> ToSend { get; set; } = System.Threading.Channels.Channel.CreateUnbounded<(RequestContainer, ChannelWriter<ChannelReader<ResponseKind>>?)>(); private Channel<(RequestContainer, ChannelWriter<ChannelReader<ResponseKind>>?)> ToSend { get; set; } = System.Threading.Channels.Channel.CreateUnbounded<(RequestContainer, ChannelWriter<ChannelReader<ResponseKind>>?)>();
@ -69,6 +69,7 @@ internal class Client : IDisposable {
this._active = false; this._active = false;
this.WebSocket.Dispose(); this.WebSocket.Dispose();
this._waitersSemaphore.Dispose();
} }
private void Login(object? sender, EventArgs e) { private void Login(object? sender, EventArgs e) {
@ -112,11 +113,15 @@ internal class Client : IDisposable {
}); });
} }
private ChannelReader<ResponseKind> RegisterWaiter(uint number) { private async Task<ChannelReader<ResponseKind>> RegisterWaiter(uint number) {
var channel = System.Threading.Channels.Channel.CreateBounded<ResponseKind>(1); var channel = System.Threading.Channels.Channel.CreateBounded<ResponseKind>(1);
this._waitersMutex.WaitOne(); await this._waitersSemaphore.WaitAsync();
this.Waiters[number] = channel.Writer; try {
this._waitersMutex.ReleaseMutex(); this.Waiters[number] = channel.Writer;
} finally {
this._waitersSemaphore.Release();
}
return channel.Reader; return channel.Reader;
} }
@ -501,9 +506,12 @@ internal class Client : IDisposable {
} }
this.ToSend = System.Threading.Channels.Channel.CreateUnbounded<(RequestContainer, ChannelWriter<ChannelReader<ResponseKind>>?)>(); this.ToSend = System.Threading.Channels.Channel.CreateUnbounded<(RequestContainer, ChannelWriter<ChannelReader<ResponseKind>>?)>();
this._waitersMutex.WaitOne(); await this._waitersSemaphore.WaitAsync();
this.Waiters = new Dictionary<uint, ChannelWriter<ResponseKind>>(); try {
this._waitersMutex.ReleaseMutex(); this.Waiters = new Dictionary<uint, ChannelWriter<ResponseKind>>();
} finally {
this._waitersSemaphore.Release();
}
// If the websocket is closed, we need to reconnect // If the websocket is closed, we need to reconnect
this.WebSocket.Dispose(); this.WebSocket.Dispose();
@ -585,13 +593,13 @@ internal class Client : IDisposable {
break; break;
} }
default: { default: {
this._waitersMutex.WaitOne(); await this._waitersSemaphore.WaitAsync();
try { try {
if (this.Waiters.Remove(response.Number, out var waiter)) { if (this.Waiters.Remove(response.Number, out var waiter)) {
await waiter.WriteAsync(response.Kind); await waiter.WriteAsync(response.Kind);
} }
} finally { } finally {
this._waitersMutex.ReleaseMutex(); this._waitersSemaphore.Release();
} }
break; break;
@ -603,7 +611,7 @@ internal class Client : IDisposable {
await this.WebSocket.SendMessage(req); await this.WebSocket.SendMessage(req);
if (update != null) { if (update != null) {
await update.WriteAsync(this.RegisterWaiter(req.Number)); await update.WriteAsync(await this.RegisterWaiter(req.Number));
} }
} }
} }