[Tip,C#] 간단히 만들어보는 메시지 처리 구조 2
관련링크
본문
이전에 만들었던 코드는 Manager 클래스에서 메시지 큐처리를 맡고 단일 스레드에서 처리하기 때문에 어느 한 객체에서 메시지 처리에 지연이 발생하면 모두 영향을 받게 됩니다.
메시지 전송에는 동시성이 확보되지만 해당 메시지를 처리하는 부분에서는 동시성이 확보되질 못 합니다.
따라서 메시지 큐처리를 개별 구독 객체 단위로 처리하도록 바꿔 줍니다.
이렇게 하면 객체 단위로 지연 확산이 격리가 되고 동시성을 확보할 수 있습니다.
abstract class SubscribeBase : IDisposable
{
bool disposed = false;
readonly ConcurrentQueue<IMessage> messageQ = new ConcurrentQueue<IMessage>();
readonly EventWaitHandle msgEvent = new EventWaitHandle(false, EventResetMode.AutoReset);
CancellationTokenSource cts;
Task msgProc;
protected SubscribeBase()
{
cts = new CancellationTokenSource();
msgProc = Task.Factory.StartNew(MsgProc);
}
~SubscribeBase()
{
Dispose(false);
}
........
void MsgProc()
{
while (cts.IsCancellationRequested == false)
{
msgEvent.WaitOne();
if (cts.IsCancellationRequested)
{
return;
}
while (messageQ.TryDequeue(out var m))
{
......
}
}
}
.........
}
Manager 클래스는 Send 호출시 바로 Map 을 뒤져서 해당 구독객체의 메시지 큐에 메시지를 밀어넣습니다.
public void Send(IMessage message)
{
if (MsgSubscriptionMap.ContainsKey(message.ID))
{
var subIds = MsgSubscriptionMap[message.ID];
foreach (var id in subIds)
{
if (Subscribers.ContainsKey(id))
{
Subscribers[id].Receive(message);
}
}
}
}