Callback timeouts.

feature-npv2
Icedream 2014-05-08 14:54:30 +02:00
parent be3cfd1f2b
commit 483842b7d9
2 changed files with 21 additions and 11 deletions

View File

@ -117,7 +117,7 @@ namespace NPSharp
LoginId = result.NPID; LoginId = result.NPID;
SessionToken = result.SessionToken; SessionToken = result.SessionToken;
tcs.SetResult(true); tcs.SetResult(true);
}); }, 10);
_rpc.Send(new AuthenticateWithTokenMessage {Token = token}); _rpc.Send(new AuthenticateWithTokenMessage {Token = token});
return await tcs.Task; return await tcs.Task;
@ -140,7 +140,7 @@ namespace NPSharp
if (result.Result != 0) if (result.Result != 0)
tcs.SetResult(false); tcs.SetResult(false);
tcs.SetResult(true); tcs.SetResult(true);
}); }, 10);
_rpc.Send(new StorageWriteUserFileMessage {FileData = contents, FileName = filename, NPID = LoginId}); _rpc.Send(new StorageWriteUserFileMessage {FileData = contents, FileName = filename, NPID = LoginId});
return await tcs.Task; return await tcs.Task;
@ -164,7 +164,7 @@ namespace NPSharp
return; return;
} }
tcs.SetResult(result.FileData); tcs.SetResult(result.FileData);
}); }, 10);
_rpc.Send(new StorageGetUserFileMessage {FileName = filename, NPID = LoginId}); _rpc.Send(new StorageGetUserFileMessage {FileName = filename, NPID = LoginId});
return await tcs.Task; return await tcs.Task;
@ -202,7 +202,7 @@ namespace NPSharp
return; return;
} }
tcs.SetResult(result.FileData); tcs.SetResult(result.FileData);
}); }, 10);
_rpc.Send(new StorageGetPublisherFileMessage {FileName = filename}); _rpc.Send(new StorageGetPublisherFileMessage {FileName = filename});
return await tcs.Task; return await tcs.Task;

View File

@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets; using System.Net.Sockets;
using log4net; using log4net;
using NPSharp.RPC.Messages; using NPSharp.RPC.Messages;
@ -18,7 +19,7 @@ namespace NPSharp.RPC
private readonly string _host; private readonly string _host;
private readonly ushort _port; private readonly ushort _port;
private readonly Dictionary<uint, Action<RPCServerMessage>> _callbacks = new Dictionary<uint, Action<RPCServerMessage>>(); private readonly Dictionary<uint, Tuple<DateTime, Action<RPCServerMessage>>> _callbacks = new Dictionary<uint, Tuple<DateTime, Action<RPCServerMessage>>>();
/// <summary> /// <summary>
/// Initializes an RPC connection stream with a specified host and port. /// Initializes an RPC connection stream with a specified host and port.
@ -71,6 +72,7 @@ namespace NPSharp.RPC
try try
{ {
_callbacks.Clear();
_ns.Close(timeout); _ns.Close(timeout);
_ns.Dispose(); _ns.Dispose();
} }
@ -84,12 +86,14 @@ namespace NPSharp.RPC
/// Attaches a callback to the next message being sent out. This allows handling response packets. /// Attaches a callback to the next message being sent out. This allows handling response packets.
/// </summary> /// </summary>
/// <param name="callback">The method to call when we receive a response to the next message</param> /// <param name="callback">The method to call when we receive a response to the next message</param>
public void AttachCallback(Action<RPCServerMessage> callback) /// <param name="timeout">Time in seconds from now in which this callback will expire for the next packet</param>
public void AttachCallback(Action<RPCServerMessage> callback, double timeout)
{ {
_cleanupCallbacks();
_log.DebugFormat("AttachCallback for packet id {0}", _id); _log.DebugFormat("AttachCallback for packet id {0}", _id);
if (_callbacks.ContainsKey(_id)) if (_callbacks.ContainsKey(_id))
throw new Exception("There is already a callback for the current message. You can only add max. one callback."); throw new Exception("There is already a callback for the current message. You can only add max. one callback.");
_callbacks.Add(_id, callback); _callbacks.Add(_id, new Tuple<DateTime, Action<RPCServerMessage>>(DateTime.Now + TimeSpan.FromSeconds(timeout), callback));
} }
// TODO: Exposure of message ID needed or no? // TODO: Exposure of message ID needed or no?
@ -130,15 +134,21 @@ namespace NPSharp.RPC
return null; return null;
} }
_log.DebugFormat("Received packet ID {1} (type {0})", message.GetType().Name, message.MessageId);
if (!_callbacks.ContainsKey(message.MessageId)) if (!_callbacks.ContainsKey(message.MessageId))
return message; return message;
_callbacks[message.MessageId].Invoke(message); _cleanupCallbacks();
// TODO: Callback cleanup if (_callbacks.ContainsKey(message.MessageId))
_callbacks[message.MessageId].Item2.Invoke(message);
return message; return message;
} }
private void _cleanupCallbacks()
{
var cbr = (from item in _callbacks where item.Value.Item1 < DateTime.Now select item.Key).ToArray();
foreach (var cb in cbr)
_callbacks.Remove(cb);
}
} }
} }