如何构建应用程序引擎以及使用 Windows Azure Storage 实现异步消息传送
扫描二维码
随时随地手机看文章
心存怀疑的开发人员提出的最大疑问之一是他们如何在云中继续运行后台进程,即他们的引擎如何继续运行。本文旨在通过向您演示如何构建应用程序引擎以及使用 Windows Azure Storage 实现异步消息传送和处理来为您揭开云中缺乏后台处理的神秘面纱。 为了证明开发人员可以抛开其有形的基础结构这条“安全毛毯”并将其应用程序引擎置于云中,我们将介绍如何实现电子商务应用程序的一个小型子集 Hollywood Hackers,您可以从中购买到 Hollywood 用于完全忽略物理法则和过时的常识的所有神奇技术。 我们将介绍的两个主要方案如下: 将异步文本消息 (“toasts”) 发送给使用该应用程序的用户,以通知他们发生的重要事件(如已提交他们的购物车)或在员工之间发送消息。此方案使用 Windows Azure Queue、Windows Azure Table 和 Windows Azure 工作者角色。 此方案使用 Windows Azure Queue 和 Windows Azure 工作者角色将购物车提交给执行引擎。 使用队列存储进行内部应用程序消息传送 在介绍具体的方案之前,我们需要先介绍一些有关 Windows Azure Queue 的基础知识。云中的队列与传统的 .NET 应用程序中的队列的运行方式不太一样。在处理 AppDomain 中的数据时,您知道该数据只有一份,它完整地位于单一托管进程中。 而在云中,您的一部分数据可能在加利福尼亚,而另一部分可能在纽约,并且您可能会安排一个工作者角色在德克萨斯州对该数据进行处理,而另一工作者角色在北达科他州进行数据处理。 很多开发人员在适应这种分布式计算和分布式数据时面临着一些不熟悉的问题,例如对可能出现的故障进行编码、针对数据提交形成多次重试的概念甚至幂等性的理念。 只要您不将 Windows Azure Queue 视为进程内的常规 CLR 队列,其工作方式其实非常简单。首先,应用程序将向队列获取一些数量的消息(需要记住,一次不会超过 20 条)并提供一个超时。此超时控制对其他队列处理客户端隐藏这些消息的时间。当应用程序成功完成需要对队列消息进行的所有处理后,将删除该消息。 如果应用程序引发异常或处理队列消息失败,则在超时期限过后,其他客户端可以再次看到该消息。因此,当一个工作者角色处理失败后,其他工作者角色可以继续进行处理。向队列提交消息非常简单:应用程序直接或借助客户端库形成适当的 HTTP POST 消息,然后提交字符串或字节数组。队列专门用于进行内部应用程序消息传送和非永久存储,因此这些消息占用的空间需要相当小。 如上所述,您可能安排多个工作者角色都尝试处理同一消息。虽然隐藏当前正在处理的消息的不可见超时很有帮助,但不能确保万无一失。要完全避免冲突,您应该将您的引擎处理设计为幂等。换句话说,同一队列消息应该可以由一个或多个工作者角色处理多次,而不会使应用程序处于不一致的状态。 理想情况下,您希望工作者角色可以检测出是否已完成对给定消息的处理。在编写工作者角色来处理队列消息时,请牢记您的代码可能会尝试处理已处理过的消息,尽管这个可能性微乎其微。 图 1 中的代码段显示了如何使用随 Windows Azure SDK 一起提供的 StorageClient 程序集创建消息并将其提交给 Windows Azure Queue。StorageClient 库实际上只是 Windows Azure Storage HTTP 接口周围的包装。 图 1 创建消息并将其提交给 Windows Azure Queue 对于本文中的其他示例,我们使用了可以简化此过程的一些包装类(位于 Hollywood Hackers 的 CodePlex 站点中,网址为:hollywoodhackers.codeplex.com/SourceControl/ListDownloadableCommits.aspx)。 异步消息传送 (Toast) 当今,交互式网站不仅是一种时尚,更是一种需求。用户已经习惯了完全交互式网站,以致于当他们遇到一个静态的非交互式页面时会认为什么地方出问题了。考虑到这一点,我们希望可以在我们的用户使用这样的站点时向他们发送通知。 为此,我们将利用 Windows Azure Queue 和 Windows Azure Table 存储机制构建一个消息传递框架。客户端将使用与 jQuery Gritter 插件结合的 jQuery 在用户的浏览器中将通知显示为一个 toast,类似于当您收到新的 Outlook 电子邮件、即时消息或警报声时在 Windows 系统托盘上方淡出的消息。 当需要向某个用户发送通知时,该用户将被插入到队列中。因为工作者角色负责处理队列中的每个项目,所以该角色将动态确定如何处理每个项目。在本例中,引擎只需要执行一个操作,但在复杂的 CRM 网站或支持站点中,要执行的操作可能不计其数。 工作者角色在队列中遇到用户通知时,会将该通知存储在表存储中并将其从队列中删除。这样,消息可以保留很长时间并等待用户登录进行处理。队列存储中的消息的最大保存期限比较短,不会超过几天。当用户访问网站时,我们的 jQuery 脚本将异步获取表中的所有消息,并通过在控制器上调用可返回 JavaScript Object Notation (JSON) 的方法在浏览器中以常见的形式显示这些消息。 尽管队列只处理字符串或字节数组,但我们可以通过将任何类型的结构化数据序列化为二进制文件来将其存储在队列中,然后在我们需要使用时再将其转换回来。这成为将强类型化的对象传递到队列中的出色技术。我们会将此技术构建到我们的队列消息的基类中。然后,我们的系统消息类可以包含我们的数据,而且可以将整个对象提交到队列中并根据需要进行利用(请参见图 2)。 图 2 在队列中存储结构化数据 请记住,要使用 BinaryFormatter 类,需要以完全信任模式(可以通过服务配置文件启用此模式)运行 Windows Azure 工作者角色。 [!--empirenews.page--] 图 3 用于与队列交互的包装 我们还需要为表存储设置一个包装,以便在用户登录到站点之前可以存储用户通知。可以使用 PartitionKey(行集合的标识符)和 RowKey(可唯一标识特定分区中的每个单独行)组织表数据。选择 PartitionKey 和 RowKey 使用的数据是在使用表存储时所做的最重要的设计决策之一。 这些特点允许跨存储节点进行负载平衡,并在应用程序中提供内置的可伸缩性选项。不考虑数据的数据中心关联性,使用同一分区键的表存储中的行将保留在相同的物理数据存储中。因为针对每个用户存储对应的消息,所以分区键将是 UserName,而 RowKey 则成为标识每行的 GUID(请参见图 4)。 图 4 表存储的包装 因为我们的存储机制已经确定,所以我们需要一个工作者角色作为引擎;以便在我们的电子商务站点的后台处理消息。为此,我们定义了一个从 Microsoft.ServiceHosting.ServiceRuntime.RoleEntryPoint 类继承的类,并将其与云服务项目中的工作者角色关联(请参见图 5)。 图 5 作为引擎的工作者角色 让我们看一下工作者角色代码。在初始化和设置所需的队列和表存储之后,此代码将进入一个循环。每 10 秒钟,它就会处理一次队列中的消息。每次我们通过处理循环时都将获取队列中的消息,直到最终返回 null,这表示队列为空。 您从队列中看到的消息永远不会超过 20 个,如果不信,您可以反复尝试来验证一下。对队列进行的任何处理都有时间限制,必须在该时间范围内对每个队列消息执行有意义的操作,否则队列消息将被视为超时,并在队列中显示备份,以便可以由其他工作者来处理此消息。每个消息都会作为用户通知添加到表存储中。关于工作者角色需要记住的重要一点是:一旦入口点方法完成,工作者角色也就结束了。这就是您需要在一个循环中保持逻辑运行的原因。 从客户端的角度来说,我们需要能够以 JSON 形式返回消息,以便 jQuery 可以异步轮询并显示新的用户通知。为此,我们会将一些代码添加到消息控制器中,以便可以访问这些通知(请参见图 6)。 图 6 以 JSON 形式返回消息 在 Visual Studio 2010 beta 2 下的 ASP.NET MVC 2(我们用于撰写本文的环境)中,如果没有 JsonRequestBehavior.AllowGet 选项,您无法将 JSON 数据返回到 jQuery 或其他客户端。在 ASP.NET MVC 1 中,不需要此选项。现在,我们可以编写 JavaScript,它每 15 秒将调用一次 GetMessages 方法并将以 toast 形式消息显示通知(请参见图 7)。 [!--empirenews.page--] 提交和处理购物车 在我们的示例应用程序中,我们希望使用队列存储执行的另一个关键方案是提交购物车。Hollywood Hackers 有一个第三方履行系统(Hollywood Hackers 无法在其空间有限的仓库中保留所有小工具),所以引擎需要对购物车进行一些处理。一旦引擎完成其处理,它会向用户通知队列提交一个消息,告知用户已经对购物车进行了处理(或者出现了问题)。如果处理购物车时用户处于在线状态,该用户将收到系统弹出的一个 toast 消息。如果用户不在线,则会在其下次登录到该站点时收到该弹出消息,如图 8 所示。 图 8 示例用户通知 查看原图(大图) 我们首先需要的是一些包装类,使我们可以与购物车队列交互。这些包装非常简单,如果要查看它们的源代码,可以在 CodePlex 站点上查看。 与标准 CRUD(创建、读取、更新、删除)存储库不同的是,队列中的读取操作不是单纯的读取操作。请记住,只要获取队列中的消息,必须在有限的时间内处理该消息,操作失败或删除消息都会显示处理完成。这种模式不能顺利地转换到存储库模式,所以我们已经不再借助包装类执行此操作。 现在,我们已经拥有了要与购物车队列交互的代码,我们可以将一些代码放在购物车控制器中,以便将购物车内容提交到队列中(请参见图 9)。 图 9 向队列提交购物车 在实际情况下,您可能会从进程外状态(例如会话存储、缓存或窗体发布)获得购物车。为了简化本文代码,我们仅仅构建了购物车的内容。 最后,购物车内容已处于队列中,我们可以修改工作者角色,以便它可以定期检查队列中挂起的购物车。它每次会从队列中选择一个购物车,用整整一分钟对该购物车进行处理,然后向用户通知队列提交一个消息,告知用户已经对该购物车进行了处理(请参见图 10)。 图 10 检查队列中挂起的购物车 经过对用户通知表中的队列消息的存取操作,位于主页面中的 jQuery Gritter 代码然后会在下一个 15 秒的轮询周期中检测是否存在新消息,然后向用户显示购物车 toast 通知。 总结和后续操作 本文的目的是使开发人员可以抛开其有形的数据中心这条“安全毛毯”,认识到他们可以使用 Windows Azure 执行很多操作,而不仅仅是创建简单的“Hello World”网站。借助 Windows Azure Queues 和 Windows Azure 表存储的强大功能,以及利用这些强大功能在应用程序和其工作者角色之间进行异步消息传送,您可以真正使用 Windows Azure 增强应用程序的引擎了。 为使文章简明易懂,我们将很多代码都保留为原样,没有进行重构。作为熟悉新 Windows Azure Muscle 的练习,请尝试重构本文中的一些代码,以加深对队列的熟练使用,甚至创建一个独立的程序集,其中包含为任何 ASP.NET MVC 网站进行异步消息传送和通知所需的所有代码。 主要是亲自动手实践,创建一些站点并看看您都可以执行哪些操作。本文中的代码位于 Hollywood Hackers 的 CodePlex 站点中。
string accountName;
string accountSharedKey;
string queueBaseUri;
string StorageCredentialsAccountAndKey credentials;
if (RoleEnvironment.IsAvailable)
{
// We are running in a cloud - INCLUDING LOCAL!
accountName =
RoleEnvironment.GetConfigurationSettingValue("AccountName");
accountSharedKey =
RoleEnvironment.GetConfigurationSettingValue("AccountSharedKey");
queueBaseUri = RoleEnvironment.GetConfigurationSettingValue
("QueueStorageEndpoint");
}
else
{
accountName = ConfigurationManager.AppSettings["AccountName"];
accountSharedKey =
ConfigurationManager.AppSettings["AccountSharedKey"];
queueBaseUri =
ConfigurationManager.AppSettings["QueueStorageEndpoint"];
}
credentials =
new StorageCredentialsAccountAndKey(accountName, accountSharedKey);
CloudQueueClient client =
new CloudQueueClient(queueBaseUri, credentials);
CloudQueue queue = client.GetQueueReference(queueName);
CloudQueueMessage m = new CloudQueueMessage(
/* string or byte[] representing message to enqueue */);
Queue.AddMessage(m);namespace HollywoodHackers.Storage.Queue
{
[Serializable]
public class QueueMessageBase
{
public byte[] ToBinary()
{
BinaryFormatter bf = new BinaryFormatter();
MemoryStream ms = new MemoryStream();
ms.Position = 0;
bf.Serialize(ms, this);
byte[] output = ms.GetBuffer();
ms.Close();
return output;
}
public static T FromMessage<T>(CloudQueueMessage m)
{
byte[] buffer = m.AsBytes();
MemoryStream ms = new MemoryStream(buffer);
ms.Position = 0;
BinaryFormatter bf = new BinaryFormatter();
return (T)bf.Deserialize(ms);
}
}
[Serializable]
public class ToastQueueMessage : QueueMessageBase
{
public ToastQueueMessage()
: base()
{
}
public string TargetUserName { get; set; }
public string MessageText { get; set; }
public string Title { get; set; }
public DateTime CreatedOn { get; set; }
}
现在,我们需要一个简单的包装来与我们的队列交互。从本质上说,我们需要能够将消息插入队列,获取任何挂起的消息并清除该队列(请参见图 3)。namespace HollywoodHackers.Storage.Queue
{
public class StdQueue<T> :
StorageBase where T : QueueMessageBase, new()
{
protected CloudQueue queue;
protected CloudQueueClient client;
public StdQueue(string queueName)
{
client = new CloudQueueClient
(StorageBase.QueueBaseUri, StorageBase.Credentials);
queue = client.GetQueueReference(queueName);
queue.CreateIfNotExist();
}
public void AddMessage(T message)
{
CloudQueueMessage msg =
new CloudQueueMessage(message.ToBinary());
queue.AddMessage(msg);
}
public void DeleteMessage(CloudQueueMessage msg)
{
queue.DeleteMessage(msg);
}
public CloudQueueMessage GetMessage()
{
return queue.GetMessage(TimeSpan.FromSeconds(60));
}
}
public class ToastQueue : StdQueue<ToastQueueMessage>
{
public ToastQueue()
: base("toasts")
{
}
}
}namespace HollywoodHackers.Storage.Repositories
{
public class UserTextNotificationRepository : StorageBase
{
public const string EntitySetName =
"UserTextNotifications";
CloudTableClient tableClient;
UserTextNotificationContext notificationContext;
public UserTextNotificationRepository()
: base()
{
tableClient = new CloudTableClient
(StorageBase.TableBaseUri, StorageBase.Credentials);
notificationContext = new UserTextNotificationContext
(StorageBase.TableBaseUri,StorageBase.Credentials);
tableClient.CreateTableIfNotExist(EntitySetName);
}
public UserTextNotification[]
GetNotificationsForUser(string userName)
{
var q = from notification in
notificationContext.UserNotifications
where notification.TargetUserName ==
userName select notification;
return q.ToArray();
}
public void AddNotification
(UserTextNotification notification)
{
notification.RowKey = Guid.NewGuid().ToString();
notificationContext.AddObject
(EntitySetName, notification);
notificationContext.SaveChanges();
}
}
}
public class WorkerRole : RoleEntryPoint
{
ShoppingCartQueue cartQueue;
ToastQueue toastQueue;
UserTextNotificationRepository toastRepository;
public override void Run()
{
// This is a sample worker implementation.
//Replace with your logic.
Trace.WriteLine("WorkerRole1 entry point called",
"Information");
toastRepository = new UserTextNotificationRepository();
InitQueue();
while (true)
{
Thread.Sleep(10000);
Trace.WriteLine("Working", "Information");
ProcessNewTextNotifications();
ProcessShoppingCarts();
}
}
private void InitQueue()
{
cartQueue = new ShoppingCartQueue();
toastQueue = new ToastQueue();
}
private void ProcessNewTextNotifications()
{
CloudQueueMessage cqm = toastQueue.GetMessage();
while (cqm != null)
{
ToastQueueMessage message =
QueueMessageBase.FromMessage<ToastQueueMessage>(cqm);
toastRepository.AddNotification(new
UserTextNotification()
{
MessageText = message.MessageText,
MessageDate = DateTime.Now,
TargetUserName = message.TargetUserName,
Title = message.Title
});
toastQueue.DeleteMessage(cqm);
cqm = toastQueue.GetMessage();
}
}
private void ProcessShoppingCarts()
{
// We will add this later in the article!
}
public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
DiagnosticMonitor.Start("DiagnosticsConnectionString");
// For information on handling configuration changes
// see the MSDN topic at
//http://go.microsoft.com/fwlink/?LinkId=166357.
RoleEnvironment.Changing += RoleEnvironmentChanging;
return base.OnStart();
}
private void RoleEnvironmentChanging(object sender, RoleEnvironmentChangingEventArgs e)
{
// If a configuration setting is changing
if (e.Changes.Any(change => change is RoleEnvironmentConfigurationSettingChange))
{
// Set e.Cancel to true to restart this role instance
e.Cancel = true;
}
}
}public JsonResult GetMessages()
{
if (User.Identity.IsAuthenticated)
{
UserTextNotification[] userToasts =
toastRepository.GetNotifications(User.Identity.Name);
object[] data =
(from UserTextNotification toast in userToasts
select new { title = toast.Title ?? "Notification",
text = toast.MessageText }).ToArray();
return Json(data, JsonRequestBehavior.AllowGet);
}
else
return Json(null);
}
图 7 以 toast 形式消息显示的通知$(document).ready(function() {
setInterval(function() {
$.ajax({
contentType: "application/json; charset=utf-8",
dataType: "json",
url: "/SystemMessage/GetMessages",
success: function(data) {
for (msg in data) {
$.gritter.add({
title: data[msg].title,
text: data[msg].text,
sticky: false
});
}
}
})
}, 15000)
});public ActionResult Submit()
{
ShoppingCartMessage cart = new ShoppingCartMessage();
cart.UserName = User.Identity.Name;
cart.Discounts = 12.50f;
cart.CartID = Guid.NewGuid().ToString();
List<ShoppingCartItem> items = new List<ShoppingCartItem>();
items.Add(new ShoppingCartItem()
{ Quantity = 12, SKU = "10000101010",
UnitPrice = 15.75f });
items.Add(new ShoppingCartItem()
{ Quantity = 27, SKU = "12390123j213",
UnitPrice = 99.92f });
cart.CartItems = items.ToArray();
cartQueue.AddMessage(cart);
return View();
}private void ProcessShoppingCarts()
{
CloudQueueMessage cqm = cartQueue.GetMessage();
while (cqm != null)
{
ShoppingCartMessage cart =
QueueMessageBase.FromMessage<ShoppingCartMessage>(cqm);
toastRepository.AddNotification(new UserTextNotification()
{
MessageText = String.Format
("Your shopping cart containing {0} items has been processed.",
cart.CartItems.Length),
MessageDate = DateTime.Now,
TargetUserName = cart.UserName
});
cartQueue.DeleteMessage(cqm);
cqm = cartQueue.GetMessage();
}
}