可以通过以下两种方式之一声明grain类型:
继承Grain<T>
:如果它们有一些持久化状态,这些状态希望由Orleans运行时来处理。换句话说,通过继承Grain<T>
,grain类型自动选择加入Orleans系统管理的持久性框架。对于本节的其余部分,我们将仅考虑第2种声明方式 (扩展Grain<T>
) ,因为第1种声明方式,grain将继续像现在一样运行,而不会有任何行为更改。
从Grain<T>
继承的grain类,(其中T
是需要被持久化的、特定于应用程序的状态数据类型)将从指定的存储中自动加载其状态。
grain将标记一个[StorageProvider]
属性,该属性指定存储提供程序的命名实例,用于读取/写入此grain的状态数据。
[StorageProvider(ProviderName="store1")]
public class MyGrain<MyGrainState> ...
{
...
}
Orleans框架提供了一种机制,来指定和注册不同的存储提供程序,并使用ISiloHostBuilder
来配置它们。
var silo = new SiloHostBuilder()
.AddMemoryGrainStorage("DevStore")
.AddAzureTableGrainStorage("store1", options => options.ConnectionString = "DefaultEndpointsProtocol=https;AccountName=data1;AccountKey=SOMETHING1")
.AddAzureBlobGrainStorage("store2", options => options.ConnectionString = "DefaultEndpointsProtocol=https;AccountName=data2;AccountKey=SOMETHING2")
.Build();
Orleans本身支持一系列的IGrainStorage实现,您可以将这些实现用于您的应用程序,来存储grain的状态。在本节中,我们将介绍如何在一个silo中,配置AzureTableGrainStorage
,AzureBlobGrainStorage
,DynamoDBGrainStorage
,MemoryGrainStorage
,和AdoNetGrainStorage
在。其他IGrainStorage
提供程序的配置类似。
var silo = new SiloHostBuilder()
.AddAzureTableGrainStorage("TableStore", options => options.ConnectionString = "UseDevelopmentStorage=true")
...
.Build();
通过AzureTableGrainStorageOptions
,以下设置可用于配置AzureTableGrainStorage
提供程序:
/// <summary>
/// Configuration for AzureTableGrainStorage
/// </summary>
public class AzureTableStorageOptions
{
/// <summary>
/// Azure table connection string
/// </summary>
[RedactConnectionString]
public string ConnectionString { get; set; }
/// <summary>
/// Table name where grain stage is stored
/// </summary>
public string TableName { get; set; } = DEFAULT_TABLE_NAME;
public const string DEFAULT_TABLE_NAME = "OrleansGrainState";
/// <summary>
/// Indicates if grain data should be deleted or reset to defaults when a grain clears it's state.
/// </summary>
public bool DeleteStateOnClear { get; set; } = false;
/// <summary>
/// Stage of silo lifecycle where storage should be initialized. Storage must be initialzed prior to use.
/// </summary>
public int InitStage { get; set; } = DEFAULT_INIT_STAGE;
public const int DEFAULT_INIT_STAGE = ServiceLifecycleStage.ApplicationServices;
#region json serialization
public bool UseJson { get; set; }
public bool UseFullAssemblyNames { get; set; }
public bool IndentJson { get; set; }
public TypeNameHandling? TypeNameHandling { get; set; }
#endregion json serialization
}
注意:状态大小不应超过64KB,这是Azure Table Storage强加的限制。
var silo = new SiloHostBuilder()
.AddAzureBlobGrainStorage("BlobStore", options => options.ConnectionString = "UseDevelopmentStorage=true")
...
.Build();
通过AzureBlobStorageOptions
,以下设置可用于配置AzureBlobGrainStorage
提供程序:
public class AzureBlobStorageOptions
{
/// <summary>
/// Azure connection string
/// </summary>
[RedactConnectionString]
public string ConnectionString { get; set; }
/// <summary>
/// Container name where grain stage is stored
/// </summary>
public string ContainerName { get; set; } = DEFAULT_CONTAINER_NAME;
public const string DEFAULT_CONTAINER_NAME = "grainstate";
/// <summary>
/// Stage of silo lifecycle where storage should be initialized. Storage must be initialzed prior to use.
/// </summary>
public int InitStage { get; set; } = DEFAULT_INIT_STAGE;
public const int DEFAULT_INIT_STAGE = ServiceLifecycleStage.ApplicationServices;
#region json serialization
public bool UseJson { get; set; }
public bool UseFullAssemblyNames { get; set; }
public bool IndentJson { get; set; }
public TypeNameHandling? TypeNameHandling { get; set; }
#endregion json serialization
}
var silo = new SiloHostBuilder()
.AddDynamoDBGrainStorage("DDBStore", options =>
{
options.AccessKey = "MY_ACCESS_KEY";
options.SecretKey = "MY_SECRET_KEY";
options.Service = "us-wes-1";
})
...
.Build();
通过DynamoDBStorageOptions
,以下设置可用于配置DynamoDBGrainStorage
提供程序:
public class DynamoDBStorageOptions
{
/// <summary>
/// Gets or sets a unique identifier for this service, which should survive deployment and redeployment.
/// </summary>
public string ServiceId { get; set; } = string.Empty;
/// <summary>
/// AccessKey string for DynamoDB Storage
/// </summary>
[Redact]
public string AccessKey { get; set; }
/// <summary>
/// Secret key for DynamoDB storage
/// </summary>
[Redact]
public string SecretKey { get; set; }
/// <summary>
/// DynamoDB Service name
/// </summary>
public string Service { get; set; }
/// <summary>
/// Read capacity unit for DynamoDB storage
/// </summary>
public int ReadCapacityUnits { get; set; } = DynamoDBStorage.DefaultReadCapacityUnits;
/// <summary>
/// Write capacity unit for DynamoDB storage
/// </summary>
public int WriteCapacityUnits { get; set; } = DynamoDBStorage.DefaultWriteCapacityUnits;
/// <summary>
/// DynamoDB table name.
/// Defaults to 'OrleansGrainState'.
/// </summary>
public string TableName { get; set; } = "OrleansGrainState";
/// <summary>
/// Indicates if grain data should be deleted or reset to defaults when a grain clears it's state.
/// </summary>
public bool DeleteStateOnClear { get; set; } = false;
/// <summary>
/// Stage of silo lifecycle where storage should be initialized. Storage must be initialzed prior to use.
/// </summary>
public int InitStage { get; set; } = DEFAULT_INIT_STAGE;
public const int DEFAULT_INIT_STAGE = ServiceLifecycleStage.ApplicationServices;
#region JSON Serialization
public bool UseJson { get; set; }
public bool UseFullAssemblyNames { get; set; }
public bool IndentJson { get; set; }
public TypeNameHandling? TypeNameHandling { get; set; }
#endregion
}
ADO .NET grain 存储提供程序,允许您在关系数据库中存储grain状态。目前支持以下数据库:
首先,安装基础包:
Install-Package Microsoft.Orleans.Persistence.AdoNet
还原项目的nuget包后,您就会找到受支持的数据库供应商的不同SQL脚本,这些脚本被复制到项目目录\OrleansAdoNetContent中,其中每个受支持的ADO.NET扩展都有自己的目录。您也可以从Orleans.Persistence.AdoNet库中获取它们。创建数据库,然后运行相应的脚本来创建表。
接下来的步骤是,安装第二个特定于所需数据库供应商的NuGet包(参见下表),并以编程方式或通过XML,配置存储提供程序。
数据库 | 脚本 | NuGet包 | AdoInvariant | 备注 |
---|---|---|---|---|
SQL Server | SQLServer的-Persistence.sql | System.Data.SqlClient | System.Data.SqlClient | |
MySQL / MariaDB | MySQL-Persistence.sql | MySql.Data | MySql.Data.MySqlClient | |
PostgreSQL | PostgreSQL-Persistence.sql | Npgsql | Npgsql | |
Oralce | Oracle-Persistence.sql | ODP.net | Oracle.DataAccess.Client | 不支持.net core |
以下是如何通过ISiloHostBuilder
配置ADO.NET存储提供程序的示例:
var siloHostBuilder = new SiloHostBuilder()
.AddAdoNetGrainStorage("OrleansStorage", options=>
{
options.Invariant = "<Invariant>";
options.ConnectionString = "<ConnectionString>";
options.UseJsonFormat = true;
});
实质上,您只需要设置特定于数据库供应商的连接字符串和标识供应商的 Invariant
(参见上表)。您还可以选择保存数据的格式,可以是二进制(默认),JSON或XML。虽然二进制是最紧凑的选项,但它是不透明的,您将无法读取或处理数据。建议使用JSON。
您可以通过AdoNetGrainStorageOptions
,设置以下属性:
/// <summary>
/// Options for AdonetGrainStorage
/// </summary>
public class AdoNetGrainStorageOptions
{
/// <summary>
/// Connection string for AdoNet storage.
/// </summary>
[Redact]
public string ConnectionString { get; set; }
/// <summary>
/// Stage of silo lifecycle where storage should be initialized. Storage must be initialzed prior to use.
/// </summary>
public int InitStage { get; set; } = DEFAULT_INIT_STAGE;
/// <summary>
/// Default init stage in silo lifecycle.
/// </summary>
public const int DEFAULT_INIT_STAGE = ServiceLifecycleStage.ApplicationServices;
/// <summary>
/// The default ADO.NET invariant used for storage if none is given.
/// </summary>
public const string DEFAULT_ADONET_INVARIANT = AdoNetInvariants.InvariantNameSqlServer;
/// <summary>
/// The invariant name for storage.
/// </summary>
public string Invariant { get; set; } = DEFAULT_ADONET_INVARIANT;
#region json serialization related settings
/// <summary>
/// Whether storage string payload should be formatted in JSON.
/// <remarks>If neither <see cref="UseJsonFormat"/> nor <see cref="UseXmlFormat"/> is set to true, then BinaryFormatSerializer will be configured to format storage string payload.</remarks>
/// </summary>
public bool UseJsonFormat { get; set; }
public bool UseFullAssemblyNames { get; set; }
public bool IndentJson { get; set; }
public TypeNameHandling? TypeNameHandling { get; set; }
#endregion
/// <summary>
/// Whether storage string payload should be formatted in Xml.
/// <remarks>If neither <see cref="UseJsonFormat"/> nor <see cref="UseXmlFormat"/> is set to true, then BinaryFormatSerializer will be configured to format storage string payload.</remarks>
/// </summary>
public bool UseXmlFormat { get; set; }
}
ADO.NET持久化具有:版本数据,以及使用任意应用程序规则和流定义任意(反)序列化程序的功能,但目前没有方法将它们暴露给应用程序代码。参见ADO.NET Persistence Rationale中的更多信息。
MemoryGrainStorage
是一个简单的grain存储实现,它背后实际上并没有使用持久化数据存储。便于快速学习使用grain 存储,但不要打算用于生产场景。
注意:此提供程序将状态持久化为易失性内存,该内存将在silo关闭时被删除。仅用于测试。
以下通过ISiloHostBuilder
,如何设置内存存储提供程序 。
var siloHostBuilder = new SiloHostBuilder()
.AddMemoryGrainStorage("OrleansStorage", options=>options.NumStorageGrains = 10);
您可以通过MemoryGrainStorageOptions
,设置以下属性:
/// <summary>
/// Options for MemoryGrainStorage
/// </summary>
public class MemoryGrainStorageOptions
{
/// <summary>
/// Default number of queue storage grains.
/// </summary>
public const int NumStorageGrainsDefaultValue = 10;
/// <summary>
/// Number of store grains to use.
/// </summary>
public int NumStorageGrains { get; set; } = NumStorageGrainsDefaultValue;
/// <summary>
/// Stage of silo lifecycle where storage should be initialized. Storage must be initialzed prior to use.
/// </summary>
public int InitStage { get; set; } = DEFAULT_INIT_STAGE;
/// <summary>
/// Default init stage
/// </summary>
public const int DEFAULT_INIT_STAGE = ServiceLifecycleStage.ApplicationServices;
}
如果没有为一个Grain<T>
的grain类指定[StorageProvider]
属性,则将搜索名为Default
的提供程序。如果未找到,则将其视为缺少存储提供程序。
如果在配置时,未将grain类引用的存储提供程序添加到silo中,则该类型的grain将无法在运行时激活,并且对它们的调用将失败,异常信息Orleans.Storage.BadProviderConfigException
指出此grain类型未被加载。但其余的grain类型不会受到影响。
不同的grain类型,可以使用不同的已配置的存储提供程序,即使两者都是相同的类型:例如,两个不同的Azure表存储提供程序实例,它们连接到不同的Azure存储帐户(请参阅上面的配置文件示例)。
存储提供程序的所有详细配置的信息都是通过ISiloHostBuilder定义的。目前没有提供动态更新或更改一个silo所使用的存储提供程序列表的机制。但是,这是一个优先级/工作负载约束,而不是一个基本设计约束。
状态持久化API有两个部分:grain状态API和存储提供程序API。
Orleans 运行时中的grain状态存储功能,将提供读写操作,以自动填充/保存该grain的数据对象GrainState
。在幕后,这些功能将连接(在Orleans client-gen工具生成的代码中)到为该grain配置的适当的持久化提供程序。
当一个grain被激活时,grain状态会被自动读取,不过,grain负责在必要时显式地触发任何改变的grain状态的写入。有关错误处理机制的详细信息,请参阅下面的“ 故障模式”部分。
在为该激活调用OnActivateAsync()
方法之前,GrainState
被自动读取(等同于使用base.ReadStateAsync()
)。 在任何方法调用一个grain之前,除非该grain被这个调用激活,否则GrainState
不会被刷新。
在任何grain方法调用期间,grain都可以通过调用base.WriteStateAsync()
,请求Orleans运行时将该激活体的当前grain状态数据,写入指定的存储提供程序。当对其状态数据进行重大更新时,grain负责显式地执行写操作。最常见的做法是,grain方法把base.WriteStateAsync()
的Task
,作为从该grain方法返回的最终结果Task
,但不是必须要遵循此模式。在任何grain方法调用之后,运行时不会自动更新存储的grain状态。
在grain中的任何grain方法或定时器回调处理方法中,grain通过调用base.ReadStateAsync()
,请求Orleans运行时从指定的存储提供程序中,重新读取当前grain状态数据,以进行该激活操作。这将使用从持久化存储中读取的最新状态数据,完全覆盖存储在grain状态对象中的当前的状态数据。
一个隐性的特定于提供程序的Etag
值(string
),当状态被读取时,可以由存储提供程序设置为占据grain状态的元数据的一部分。某些提供程序如果不使用Etag
的话,可以选择将其保留为null
。
从概念上讲,Orleans 运行时会持有grain状态数据对象的深拷贝,以供自己在任何写操作时使用。在幕后,运行时可以使用优化规则和启发式方法,来避免在某些情况下执行部分或全部的深拷贝,前提是预留了预期的逻辑隔离语义。
grain必须继承Grain<T>
类,才能参与Orleans的grain状态持久化机制。上述定义中的T
,将被替换为该grain的特定于应用程序的grain状态类;见下面的例子。
grain类还应该使用一个[StorageProvider]
属性进行注解,该属性告诉运行时,哪个存储提供程序(实例)与此类型的grain一起使用。
public class MyGrainState
{
public int Field1 { get; set; }
public string Field2 { get; set; }
}
[StorageProvider(ProviderName="store1")]
public class MyPersistenceGrain : Grain<MyGrainState>, IMyPersistenceGrain
{
...
}
在调用grain的OnActivateAsync()
方法之前,Orleans运行时将自动进行grain状态的初始读取;不需要任何应用程序代码即可实现此操作。从这一刻起,grain的状态可以通过grain类内的属性Grain<T>.State
来获得。
在对grain的内存状态进行任何适当的更改之后,grain应该调用base.WriteStateAsync()
方法,通过为此grain类型定义的存储提供程序,将更改写入持久化存储。此方法是异步的,并返回一个Task
,此Task
通常被grain方法作为它自己的完成Task
返回。
public Task DoWrite(int val)
{
State.Field1 = val;
return base.WriteStateAsync();
}
如果grain希望从后端存储显式地重读此grain的最新状态,那么grain应该调用base.ReadStateAsync()
方法。这将通过为该grain类型定义的存储提供程序,从持久化存储中重新加载grain状态,当ReadStateAsync()
Task
完成时,将覆盖并替换在先前的内存中的grain状态副本。
public async Task<int> DoRead()
{
await base.ReadStateAsync();
return State.Field1;
}
在初始读取特定grain的状态数据期间,存储提供程序返回的失败,将导致该grain的激活操作失败; 在这种情况下,不会调用该grain的生命周期回调方法OnActivateAsync()
。对引发该grain激活的原始请求,将以与grain激活期间的任何其他失败相同的方式,以失败的形式被返回给调用者。存储提供程序在读取特定grain的状态数据时,所遭遇的失败将导致ReadStateAsync()
Task
失败。grain可以选择处理或忽略失败的Task
,就像Orleans的其他任何Task
一样。
在silo启动时,某个grain由于存储提供程序配置的缺少/错误,而未能成功加载,此时尝试将消息发送到该grain,将返回永久性错误Orleans.BadProviderConfigException
。
存储提供程序写入特定grain的状态数据时遇到的失败,将导致WriteStateAsync()
Task
失败。通常,这意味着,grain调用将以失败的形式,被返回给客户端调用程序,前提是WriteStateAsync()
Task
被正确地链入到此grain方法的最终返回的Task
。但是,某些高级场景下,可能会编写grain代码来专门处理此类写入错误,就像它们可以处理任何其他失败的Task
一样。
执行错误处理/恢复代码的grain,必须捕获异常/失败的WriteStateAsync()
Task
,而不是重新抛出,以表示它们已成功处理写入错误。
有一个服务提供程序API,用于编写额外的持久化提供程序 —— IGrainStorage
。
Persistence Provider API涵盖了GrainState数据的读写操作。
/// <summary>
/// Interface to be implemented for a storage able to read and write Orleans grain state data.
/// </summary>
public interface IGrainStorage
{
/// <summary>Read data function for this storage instance.</summary>
/// <param name="grainType">Type of this grain [fully qualified class name]</param>
/// <param name="grainReference">Grain reference object for this grain.</param>
/// <param name="grainState">State data object to be populated for this grain.</param>
/// <returns>Completion promise for the Read operation on the specified grain.</returns>
Task ReadStateAsync(string grainType, GrainReference grainReference, IGrainState grainState);
/// <summary>Write data function for this storage instance.</summary>
/// <param name="grainType">Type of this grain [fully qualified class name]</param>
/// <param name="grainReference">Grain reference object for this grain.</param>
/// <param name="grainState">State data object to be written for this grain.</param>
/// <returns>Completion promise for the Write operation on the specified grain.</returns>
Task WriteStateAsync(string grainType, GrainReference grainReference, IGrainState grainState);
/// <summary>Delete / Clear data function for this storage instance.</summary>
/// <param name="grainType">Type of this grain [fully qualified class name]</param>
/// <param name="grainReference">Grain reference object for this grain.</param>
/// <param name="grainState">Copy of last-known state data object for this grain.</param>
/// <returns>Completion promise for the Delete operation on the specified grain.</returns>
Task ClearStateAsync(string grainType, GrainReference grainReference, IGrainState grainState);
}
当存储提供程序检测到Etag
约束冲突时,任何执行写入操作的尝试,都应该导致写入Task
失败,并出现瞬时错误Orleans.InconsistentStateException
,并包装底层的存储异常。
public class InconsistentStateException : AggregateException
{
/// <summary>The Etag value currently held in persistent storage.</summary>
public string StoredEtag { get; private set; }
/// <summary>The Etag value currently held in memory, and attempting to be updated.</summary>
public string CurrentEtag { get; private set; }
public InconsistentStateException(
string errorMsg,
string storedEtag,
string currentEtag,
Exception storageException
) : base(errorMsg, storageException)
{
this.StoredEtag = storedEtag;
this.CurrentEtag = currentEtag;
}
public InconsistentStateException(string storedEtag, string currentEtag, Exception storageException)
: this(storageException.Message, storedEtag, currentEtag, storageException)
{ }
}
写入操作的任何其他失败,都应该导致写入Task
中断,并带有一个异常信息,此异常信息包含底层的存储异常。
各个存储提供程序应该决定如何最好地存储grain状态 ——blob(各种格式/序列化形式)或column-per-field是显而易见的选择。
Azure Table的基本存储提供程序,使用Orleans二进制序列化,将状态数据字段编码为单个表列。
ADO.NET支持的持久化存储的原则是:
实际上,这意味着要遵循ADO.NET的实现目标,并在特定于ADO.NET的存储提供程序中,添加一些实现逻辑,以使存储中的数据形态不断演化。
除了通常的存储提供程序功能外,ADO.NET提供程序还具有内置功能:
1.和
2.
可以被应用任意的决策参数,例如grain ID,grain类型,有效载荷数据。
这样做是为了选择一种格式,例如简单二进制编码(SBE),并实现了 IStorageDeserializer和IStorageSerializer。内置的(反)序列化器是使用此方法构建的。该OrleansStorageDefault(反)序列化 可以用作如何实现其他格式的示例。
当(de)序列化器被实现后,它们需要添加到AdoNetGrainStorage中的StorageSerializationPicker
属性中。这是IStorageSerializationPicker的一个实现。默认情况下, 将使用StorageSerializationPicker。在RelationalStorageTests中,可以看到更改数据存储格式或使用(反)序列化器的示例。
目前没有方法将此公开给Orleans应用程序消费,因为没有方法可以访问框架创建的AdoNetGrainStorage。