我有几个文件(每个近1GB)和数据.数据是一个字符串行.
我需要与数百名消费者一起处理这些文件.这些消费者中的每一个都做一些与其他消费者不同的处 消费者不会同时写任何地方.他们只需要输入字符串.处理后,他们更新本地缓冲区.消费者可以轻松地并行执行.
重要提示:对于一个特定文件,每个消费者必须以正确的顺序处理所有行(不跳过)(因为它们出现在文件中).处理不同文件的顺序无关紧要.
一个消费者对单行的处理速度相当快.我希望Corei5上的时间不到50微秒.
所以现在我正在寻找解决这个问题的好方法.这将成为.NET项目的一部分,所以请让我们坚持使用.NET(最好是C#).
我知道TPL和DataFlow.我猜最相关的是BroadcastBlock.但我认为这里的问题是,每一行我都要等待所有消费者完成才能发布新的消费者.我猜它效率不高.
我认为理想的情况是这样的:
我采用这种方法是对的吗?无论是否,我如何实施良好的解决方案?
我有一个数据帧:
df = pd.DataFrame(np.random.randint(0,100,size=(5, 2)), columns=list('AB'))
A B
0 92 65
1 61 97
2 17 39
3 70 47
4 56 6
Run Code Online (Sandbox Code Playgroud)
这是5%的分位数:
down_quantiles = df.quantile(0.05)
A 24.8
B 12.6
Run Code Online (Sandbox Code Playgroud)
这里是低于分位数的值的掩码:
outliers_low = (df < down_quantiles)
A B
0 False False
1 False False
2 True False
3 False False
4 False True
Run Code Online (Sandbox Code Playgroud)
我想将df低于分位数的值设置为其列分位数.我可以这样做:
df[outliers_low] = np.nan
df.fillna(down_quantiles, inplace=True)
A B
0 92.0 65.0
1 61.0 97.0
2 24.8 39.0
3 70.0 47.0
4 56.0 12.6
Run Code Online (Sandbox Code Playgroud)
但肯定应该有一种更优雅的方式.我怎么能不这样做 …
我需要有一些像BroadcastBlock一样的对象,但保证交付.所以我用了这个问题的答案.但我真的不清楚这里的执行流程.我有一个控制台应用程序.这是我的代码:
static void Main(string[] args)
{
ExecutionDataflowBlockOptions execopt = new ExecutionDataflowBlockOptions { BoundedCapacity = 5 };
List<ActionBlock<int>> blocks = new List<ActionBlock<int>>();
for (int i = 0; i <= 10; i++)
blocks.Add(new ActionBlock<int>(num =>
{
int coef = i;
Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ". " + num * coef);
}, execopt));
ActionBlock<int> broadcaster = new ActionBlock<int>(async num =>
{
foreach (ActionBlock<int> block in blocks) await block.SendAsync(num);
}, execopt);
broadcaster.Completion.ContinueWith(task =>
{
foreach (ActionBlock<int> block in blocks) block.Complete();
});
Task producer = …Run Code Online (Sandbox Code Playgroud) c# multithreading task-parallel-library async-await tpl-dataflow
假设我有两个数据框:
df1 df2
A B C D
1 3 -2 7
2 4 0 10
Run Code Online (Sandbox Code Playgroud)
我需要创建一个由两个数据帧的列组成的相关矩阵。
corrmat_df
C D
A 1 *
B * 1
Run Code Online (Sandbox Code Playgroud)
我可以在嵌套循环中逐元素执行此操作,但也许还有更多Pythonic 方式?谢谢。
c# ×2
dataframe ×2
pandas ×2
python ×2
tpl-dataflow ×2
.net ×1
async-await ×1
correlation ×1
quantile ×1