我有一个颗粒,在OnActivate方法中设置提醒.然后,提醒会定期执行某些操作,并且不需要从筒仓外部进行进一步的通信.
是否有可能在主机启动期间获得GrainProvider并激活主机本身内的颗粒?
或者我是否需要客户端才能启动首次激活?
我正在使用Mircosoft Orleans作为基础的工作流引擎,因为它提供了许多有用的功能,例如自动分配工作和处理故障转移.
我有三种类型的谷物:
我的问题是,当运行大量当前执行时,即> 1000时,性能确实受到影响.我做了一些分析,并将其缩小到谷物之间发生的沟通.反正我还能改进这个吗?
这是我的代码的轮廓以及晶粒如何相互作用
执行粒度位于循环中,从工作流中获取下一个工作块,然后在工作块上调用execute.谷物之间的这种不断调用导致我的一个测试工作流程的执行时间从运行单次执行时的10秒变为运行超过1000时的大约5分钟.这可以改进还是应该重新设计解决方案去除粮食沟通?
[StorageProvider(ProviderName = "WorkflowStore")]
[Reentrant]
[StatelessWorker]
public class Workflow : Grain<WorkflowState>, IWorkflow
{
public Task<BlockRef> GetNext(Guid currentBlockId, string connectionName)
{
//Lookup the next work block
}
}
[Reentrant]
[StatelessWorker]
public class WorkBlock : Grain<WorkBlock State>, IWorkBlock
{
public Task<string> Execute(IExecution execution)
{
//Do some work
}
}
[StorageProvider(ProviderName = "ExecutionStore")]
public class Execution : Grain<ExecutionState>, IExecution, IRemindable
{
private async Task ExecuteNext(bool skipBreakpointCheck = false) …Run Code Online (Sandbox Code Playgroud) 我在localHost Clustering模式下运行Orleans,目前有1个谷物和一个客户端.
// client code
for (int i = 0; i <num_scan; ++i)
{
Console.WriteLine("client " + i);
// the below call should have returned when first await is hit in foo()
// but it doesn't work like that
grain.foo(i);
}
// grain code
async Task foo(int i)
{
Console.WriteLine("grain "+i);
await Task.Delay(2000);
}
Run Code Online (Sandbox Code Playgroud)
输出结果如下:
client 0
client 1
client 2
client 3
client 4
client 5
client 6
grain 0
client 7
client 8
client 9
client 10
grain …Run Code Online (Sandbox Code Playgroud) 在奥尔良,PlacementStrategy如何运作?我看到PlacementStrategy有几种实现,包括RandomPlacement,PreferLocalPlacement,ActivationCountBasedPlacement和StatelessWorkerPlacement.
我有一些问题,如何使用它们如何实现?如何指定呼叫应该是Prefer-Local vs RandomPlacement?
我正在查看代码,我没有看到基于这些放置策略选择一个执行路径与另一个执行路径的代码.那会发生什么?
在Microsoft Orleans中,我尝试使用以下代码实现类似可用工作的列表:
public Task<WorkResponse> PerformWork(WorkRequest request)
{
Console.WriteLine("Performing work for id: {0}", request.Param1);
Thread.Sleep(TimeSpan.FromSeconds(10));
var result = Task.FromResult(new WorkResponse(request.Param1, request.Param2, request.Param3));
Console.WriteLine("Completed work for id: {0}", request.Param1);
return result;
}
Run Code Online (Sandbox Code Playgroud)
但是,如果我使用像这样的代码启动许多任务,那么事情就不会正常.
_work
.ToList()
.AsParallel()
.ForAll(x =>
{
Console.WriteLine("Requesting work for id: {0}", x.Key);
var worker = GrainFactory.GetGrain<IWork>(x.Key);
var response = worker.PerformWork(x.Value);
Console.WriteLine("Response for work id: {0}", x.Key);
});
Run Code Online (Sandbox Code Playgroud)
但是,如果另一个节点加入群集,则工作似乎永远不会移动到新节点.仅在该新节点上处理新计划的工作.
似乎如果奥尔良队列中有一堆额外的工作,那么新节点就会加入群集.
我有以下代码(https://github.com/avinash0161/OrleansExperiments/tree/c0155b4b0c8c1bfe60aea8624f2cc83a52853dc7):
// Client code
Console.WriteLine("Client making a call");
var hashGenerator = client.GetGrain<IGrainA>(0);
hashGenerator.Call_A_ToTemp();
await Task.Delay(1000);
hashGenerator.Call_B_ToTemp();
// GrainA code
public async Task Call_A_ToTemp()
{
Console.WriteLine("Making call A to a fellow grain");
IGrainB grain = this.GrainFactory.GetGrain<IGrainB>(1);
grain.CallA().ContinueWith((t)=>
{
if(t.IsFaulted)
{
// Silo message timeout is 32s so t.IsFaulted is true
Console.WriteLine("Task Faulted");
Call_A_ToTemp();
}
});
}
public async Task Call_B_ToTemp()
{
Console.WriteLine("Making call B to a fellow grain");
IGrainB grain = this.GrainFactory.GetGrain<IGrainB>(1);
await grain.CallB();
}
// GrainB code
public …Run Code Online (Sandbox Code Playgroud) 我们使用 Orleans Grain 作为会话,用事件填充会话,并希望在会话过期后(20 分钟不活动后)将会话保存到外部服务。最初我们打算使用 GrainCollectionOptions.CollectionAge 来保存停用时的会话,但在不同的来源中发现依赖 OnDeactivateAsync 并不安全,因为它可能并不总是被调用,特别是在筒仓崩溃或硬关闭期间。
有人可以建议针对此类用例的推荐方法吗?
这是我们的 Grain 代码:
public class SessionGrain : Grain, ISessionGrain
{
private readonly IPersistentState<Session> _persistentState;
public SessionGrain([PersistentState("sessionsState", "sessionsStorage")] IPersistentState<Session> persistentState)
{
_persistentState = persistentState;
}
public Task CompleteAsync()
{
DeactivateOnIdle();
return Task.CompletedTask;
}
public async Task<Session> TrackEventAsync(Event @event)
{
_persistentState.State.Events.Add(@event);
await _persistentState.WriteStateAsync();
return _persistentState.State;
}
public override async Task OnActivateAsync()
{
if (_persistentState.State == null)
{
_persistentState.State = new Session();
}
await base.OnActivateAsync();
}
public override async Task OnDeactivateAsync()
{
// …Run Code Online (Sandbox Code Playgroud) 我有以下来自奥尔良的客户和谷物的代码片段。(虽然在 Orleans 中推荐的开发方式是等待任务,但以下代码在某些时候并不等待,纯粹是出于实验目的)
// client code
while(true)
{
Console.WriteLine("Client giving another request");
double temperature = random.NextDouble() * 40;
var grain = client.GetGrain<ITemperatureSensorGrain>(500);
Task t = sensor.SubmitTemperatureAsync((float)temperature);
Console.WriteLine("Client Task Status - "+t.Status);
await Task.Delay(5000);
}
// ITemperatureSensorGrain code
public async Task SubmitTemperatureAsync(float temperature)
{
long grainId = this.GetPrimaryKeyLong();
Console.WriteLine($"{grainId} outer received temperature: {temperature}");
Task x = SubmitTemp(temperature); // SubmitTemp() is another function in the same grain
x.Ignore();
Console.WriteLine($"{grainId} outer received temperature: {temperature} exiting");
}
public async Task SubmitTemp(float temp)
{ …Run Code Online (Sandbox Code Playgroud) 如果我必须对奥尔良 Grains 集后面的持久层执行操作,如何确保受影响的 Grains 可以根据更新的数据重新激活?
我的示例是,我有一个 Employee 记录级别的 Grain(EmployeeID 作为 Grain ID),我需要对某些记录执行批量操作。显然,重要的是我确保在此操作后谷物重新加载到该状态。
我创建了一个奥尔良粮仓,用于执行各种操作,包括员工变更操作。我正在使用 GetGrain 功能,如下所示
var employeeGrain = _clusterClient.GetGrain<IEmployeeGrain>(employeeId, "employee");
Run Code Online (Sandbox Code Playgroud)
该颗粒在根据标准奥尔良生命周期使用时将保持活跃状态。我的问题是,如果我故意执行更改基础数据的操作,如何触发受影响的颗粒重新激活?
看来你可以强制单个颗粒停用
this.DeactivateOnIdle()
Run Code Online (Sandbox Code Playgroud)
但是,如果我进行更大的操作,我想避免对每粒颗粒都这样做吗?我想理想情况下回收特定类型的所有谷物。