多个消费者是否可以接收同一条消息。我有一个生产者,它从网络套接字生成刻度数据(股票市场)。我现在有一个消费者每秒收到 1000 条消息,效果很好。但现在我想让多个消费者使用System.Threading.Channels
. 单个生产者/消费者的完整工作代码。
class ConsumerOne
{
private readonly Channel<DummyData> _tickQueue;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly string _tag;
public ConsumerOne(Channel<DummyData> tickQueue, CancellationTokenSource cancellationTokenSource, string tag)
{
_tickQueue = tickQueue;
_cancellationTokenSource = cancellationTokenSource;
_tag = tag;
}
public async Task StartConsuming()
{
await foreach (var message in _tickQueue.Reader.ReadAllAsync(
cancellationToken: _cancellationTokenSource.Token))
{
// Business logic of One
Console.WriteLine($"from consumer {_tag} ==> {message.Price}");
}
}
}
public class DummyData
{
public long Ticks { get; set; }
public DateTime DateTime …
Run Code Online (Sandbox Code Playgroud) c# producer-consumer publish-subscribe .net-core system.threading.channels
我有一个场景,我正在从数据库中读取一些数据。该数据以 的形式返回IAsyncEnumerable<MyData>
。读取数据后我想将其发送给消费者。该消费者是异步的。现在我的代码看起来像这样:
// C#
IAsyncEnumerable<MyData> enumerable = this.dataSource.Read(query);
await foreach (var data in enumerable)
{
await this.consumer.Write(data);
}
Run Code Online (Sandbox Code Playgroud)
我的问题是,当我枚举数据库时,我持有数据的锁。我不想持有这把锁的时间超过我需要的时间。
如果消费者消耗数据的速度比生产者生成数据的速度慢,有什么方法可以让我急切地从数据源中读取数据,而无需仅调用ToList
or ToListAsync
。我想避免一次将所有数据读入内存,如果现在生产者比消费者慢,这会导致相反的问题。如果数据库上的锁不是越短越好,我想要在内存中一次有多少数据与我们保持枚举运行多长时间之间进行可配置的权衡。
我的想法是,有某种方法可以使用队列或类似通道的数据结构来充当生产者和消费者之间的缓冲区。
在 Golang 中我会做这样的事情:
// go
queue := make(chan MyData, BUFFER_SIZE)
go dataSource.Read(query, queue)
// Read sends data on the channel, closes it when done
for data := range queue {
consumer.Write(data)
}
Run Code Online (Sandbox Code Playgroud)
有没有办法在 C# 中获得类似的行为?
这是我的设置......
/* Bounded Buffer item structure */
struct item {
int id; /* string index value */
char str[80]; /* string value */
};
/* Structure for the shared memory region */
typedef struct {
int debug; /* debug flag */
int in; /* index of next empty slot */
int out; /* index of next full slot */
char MUTEXname[32]; /* name of the MUTEX semaphore */
char EMPTYname[32]; /* name of the EMPTY semaphore */
char FULLname[32]; /* …
Run Code Online (Sandbox Code Playgroud) 以下是摘自"操作系统概念"第7版高尔文,加涅第3章的硬拷贝本身:
以下变量驻留在生产者和使用者进程共享的内存区域中:
#define BUFFER_SIZE 10
typedef struct {
. . .
} item;
item buffer[ BUFFER_SIZE ];
int in = 0;
int out = 0;
Run Code Online (Sandbox Code Playgroud)
共享缓冲器被实现为具有两个逻辑指针的圆形阵列:在与出 .The变量在指向在缓冲器中的下一个自由位置; out指向缓冲区中的第一个完整位置.当in==out;
缓冲区已满时,缓冲区为空((in+1)%BUFFER_SIZE)==out
.
该方案允许BUFFER_SIZE-1
缓冲区中的大多数项目同时存在.
我用粗体突出了我的困惑.这是该章的在线幻灯片的链接(但它编辑了本书的几行).转到"使用共享内存的生产者 - 消费者示例"部分
http://www.cs.uic.edu/~jbell/CourseNotes/OperatingSystems/3_Processes.html
为什么BUFFER_SIZE-1
缓冲区中有项目?如果我们从开始buffer[0]
到buffer[BUFFERSIZE-1]
,是不是等于BUFFER_SIZE
项目数?作者是否意味着说缓冲区的索引不能超过BUFFER_SIZE-1
?但是,他已经明确写出了同时不能超过的项目数BUFFER_SIZE-1
.请解释整件事.
我们说有两个生产者P1 and P2
。
P1尝试在时间t1将日志L1写入分区PT,但是由于网络等待时间,它在t2被写入。
P2尝试在时间t3将日志L2写入分区PT,但是由于网络等待时间而在t4写入日志L2。
t1<t3
但是t2>t4
,让我们说,分区PT中L1和L2的最终顺序是什么?
logging publish producer-consumer publish-subscribe apache-kafka
我试图以spark
本地模式运行我的应用程序.为了全部设置,我按照本教程:http://blog.d2-si.fr/2015/11/05/apache-kafka-3/,(法语)显示构建本地kafka
/ zookeeper
环境的每个步骤.
而且,我使用IntelliJ
以下配置:
val sparkConf = new SparkConf().setAppName("zumbaApp").setMaster("local[2]")
Run Code Online (Sandbox Code Playgroud)
我的运行配置,为消费者:
"127.0.0.1:2181" "zumbaApp-gpId" "D2SI" "1"
Run Code Online (Sandbox Code Playgroud)
而对于制片人:
"127.0.0.1:9092" "D2SI" "my\Input\File.csv" 300
Run Code Online (Sandbox Code Playgroud)
在此之前,我检查如果消费者从默认的生产者收到的意见console-producer
和console-consumer
的kafka_2.10-0.9.0.1
; 它确实.
但是,我面临以下错误:
java.lang.NoSuchMethodError: org.I0Itec.zkclient.ZkClient.createEphemeral(Ljava/lang/String;Ljava/lang/Object;Ljava/util/List;)V
at kafka.utils.ZkPath$.createEphemeral(ZkUtils.scala:921)
at kafka.utils.ZkUtils.createEphemeralPath(ZkUtils.scala:348)
at kafka.utils.ZkUtils.createEphemeralPathExpectConflict(ZkUtils.scala:363)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$18.apply(ZookeeperConsumerConnector.scala:839)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$18.apply(ZookeeperConsumerConnector.scala:833)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.reflectPartitionOwnershipDecision(ZookeeperConsumerConnector.scala:833)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:721)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:636)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:627)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:627) …
Run Code Online (Sandbox Code Playgroud) producer-consumer apache-kafka apache-spark apache-zookeeper
我使用的消费者代码编写和制作wait()
,并notify()
在Java中.创建并调用Thread-0并创建produce()
Thread-1并调用它consume()
.
public class Processor {
private volatile List<Integer> list = new ArrayList<>();
private final int MAX_CAPACITY = 5;
Object lock = new Object();
public void produce() throws InterruptedException {
while (true) {
while (list.size() == MAX_CAPACITY) {
System.out.println("List is full! Producer is Waiting....");
synchronized (lock) {
lock.wait();
}
}
synchronized (lock) {
int random = new Random().nextInt(100);
list.add(random);
System.out.println("Added to list:" + random);
lock.notify();
}
}
}
public void consume() throws InterruptedException …
Run Code Online (Sandbox Code Playgroud) 我正在尝试了解RxJava并发性的一些细节,但不确定自己的想法是否正确。我对SubscribeOn / ObserveOn的工作方式有很好的了解,但是我正在尝试确定池调度程序的某些细节。为此,我正在考虑实现一个1-N生产者-消费者链,该链应尽可能多地拥有尽可能多的CPU。
根据文档,Schedulers.computation()由与内核一样多的线程池支持。但是,根据无功合同,操作员只能获得顺序呼叫。
因此,这样的设置
Observable.range(1, 1000) // Whatever has to be processed
.observeOn(Schedulers.computation())
.doOnNext(/* heavy computation */)
.doOnCompleted(() -> System.out.println("COMPLETED"))
.forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)
即使使用线程池,也只会收到并发调用doOnNext。进行睡眠检查的实验OperatorObserveOn.java
似乎证实了这一点,因为每次observeOn
呼叫都会获得一名工人。另外,如果不是这样,则必须对OnCompleted进行复杂的管理,必须等待任何未完成的OnNext完成,但我发现这并不存在。
假设我在正确的轨道上(也就是说,只涉及一个线程,尽管您可以使用observeOn在其中几个线程之间跳转),那么正确的模式是什么?我可以找到相反情况的示例(将多个异步事件生成器同步到一个使用者中),但是对于这种典型情况,不是简单的示例。
我猜想涉及了flatMap,也许使用了beta版本(在1.x中),该版本限制了并发订阅的数量。可能像使用window / flatMap这样简单吗?
Observable
.range(1, 1000) // Whatever has to be processed
.window(1) // Emit one observable per item, for example
.flatMap(/* Processing */, 4) // For 4-concurrent processing
.subscribe()
Run Code Online (Sandbox Code Playgroud)
在这种方法中,我仍然缺少以Rx通用方式最大化CPU的简单方法(即,指定计算调度程序而不是对flatMap进行最大预订)。所以,也许...:
Observable
.range(1, 1000) // Whatever has to be processed
.window(1) // Emit one observable per item, for example
.flatMap(v -> Observable.just(v) …
Run Code Online (Sandbox Code Playgroud) 我看到它的方式,生产者和消费者线程都可以单独缓存计数并因此做出错误的决定.如果变量不是volatile,count ++可能只是更新缓存吗?
class Buffer {
private char [] buffer;
private int count = 0, in = 0, out = 0;
Buffer(int size)
{
buffer = new char[size];
}
public synchronized void Put(char c) {
while(count == buffer.length)
{
try { wait(); }
catch (InterruptedException e) { }
finally { }
}
System.out.println("Producing " + c + " ...");
buffer[in] = c;
in = (in + 1) % buffer.length;
count++;
notify();
}
public synchronized char Get() {
while (count == 0) …
Run Code Online (Sandbox Code Playgroud) 原始问题:
我得到了一个结构数组,并在主线程中读取它时将其填充到一个单独的线程中:
struct DataModel MyData[1024];
struct DataModel
{
bool IsFilled;
float a;
float b;
}
Run Code Online (Sandbox Code Playgroud)
我有一个Thread,它将Mydata
数组从0索引填充到最后一个索引(在上面是1024).
然后我从填充线程中获取最后一个填充的结构索引.
然后我尝试读取元素的值,其中一个索引低于填充的索引.
我们假设当第500个元素被填充时,我从MyData
数组的499元素中读取值,所以我确保我没有读取正在写入的数组元素.
Q1:这个线程安全吗?
Q2:是否有可能发生未定义的行为或误读vales?
进一步编辑:
问题是编辑不当以添加更多细节,这就是为什么它引入了不一致的答案,所以我分开了以前的编辑,以提高答案和接受答案的一致性.
编辑1:这是可能实施的建议.虽然它可能显示错误的结果,但只是我想询问线程安全和未定义的行为,以下解决方案可能会显示各种结果,但我试图首先询问线程安全性.
std::atomic<int> FilledIndex;
void FillingMyData(struct DataModel myData[])
{
for(size_t i = 0; i < 1024; i++)
{
myData[i].a = rand();
myData[i].b = rand();
myData[i].IsFilled = true;
FilledIndex = i;
}
}
int main()
{
std::thread ReadThread(FillingMyData, MyData);
while(FilledIndex < 1024)
{
std::cout << MyData[FilledIndex].a;
}
ReadThread.join();
return 0;
}
Run Code Online (Sandbox Code Playgroud) apache-kafka ×2
c ×2
c# ×2
concurrency ×2
java ×2
.net-core ×1
apache-spark ×1
async-await ×1
c++ ×1
linq ×1
logging ×1
posix ×1
publish ×1
rx-java ×1
stdatomic ×1
synchronized ×1