我正在寻找一种java并发习惯用法来匹配具有最高吞吐量的大量元素.
考虑一下我有"人"来自多个线程.每个"人"都在寻找一场比赛.当它找到另一个等待的"人"时,它们相互匹配并被移除以进行处理.
我不想锁定一个大的结构来改变状态.考虑Person有getMatch和setMatch.在提交之前,每个人的#getMatch都是null.但是当他们解锁(或被捕获)时,他们要么已经过期,因为他们等待长时间的比赛或#getMatch是非空的.
保持高通过率的一些问题是,如果PersonA与PersonB同时提交.它们相互匹配,但PersonB也匹配已经在等待的PersonC.PersonB的状态在提交时变为"可用".但是当PersonB与PersonC匹配时,PersonA不需要偶然获得PersonB.合理?另外,我想以异步方式执行此操作.换句话说,我不希望每个提交者都必须在具有waitForMatch类型的东西的线程上持有Person.
同样,我不希望请求必须在不同的线程上运行,但是如果有一个额外的匹配器线程也没关系.
似乎应该有一些成语,因为它似乎是一个非常普遍的事情.但我的谷歌搜索已经枯竭(我可能使用错误的条款).
UPDATE
有几件事让我很难解决这个问题.一个是我不想在内存中有对象,我想让所有等待的候选人都使用redis或memcache或类似的东西.另一个是任何人可能有几个可能的比赛.考虑如下界面:
person.getId(); // lets call this an Integer
person.getFriendIds(); // a collection of other person ids
Run Code Online (Sandbox Code Playgroud)
然后我有一个看起来像这样的服务器:
MatchServer:
submit( personId, expiration ) -> void // non-blocking returns immediately
isDone( personId ) -> boolean // either expired or found a match
getMatch( personId ) -> matchId // also non-blocking
Run Code Online (Sandbox Code Playgroud)
这是一个休息界面,它会使用重定向,直到你得到结果.我的第一个想法是在MatchServer中有一个Cache,它由redis之类的东西支持,并且对于当前被锁定和被操作的对象具有并发的弱值哈希映射.每个personId将由持久状态对象包装,状态为已提交,匹配和过期.
到目前为止?非常简单,提交代码完成了初始工作,它是这样的:
public void submit( Person p, long expiration ) {
MatchStatus incoming = new MatchStatus( p.getId(), expiration );
if ( !tryMatch( incoming, p.getFriendIds() ) )
cache.put( p.getId(), incoming );
}
public boolean isDone( Integer personId ) {
MatchStatus status = cache.get( personId );
status.lock();
try {
return status.isMatched() || status.isExpired();
} finally {
status.unlock();
}
}
public boolean tryMatch( MatchStatus incoming, Iterable<Integer> friends ) {
for ( Integer friend : friends ) {
if ( match( incoming, friend ) )
return true;
}
return false;
}
private boolean match( MatchStatus incoming, Integer waitingId ) {
CallStatus waiting = cache.get( waitingId );
if ( waiting == null )
return false;
waiting.lock();
try {
if ( waiting.isMatched() )
return false;
waiting.setMatch( incoming.getId() );
incoming.setMatch( waiting.getId() );
return true
} finally {
waiting.unlock();
}
}
Run Code Online (Sandbox Code Playgroud)
所以这里的问题是,如果两个人同时进来并且他们是他们唯一的比赛,他们就不会找到对方.竞争条件对吗?我能看到解决它的唯一方法是同步"tryMatch()".但这会影响我的吞吐量.我不能无限期地循环tryMatch,因为我需要这些非常短的调用.
那么有什么更好的方法来解决这个问题呢?我提出的每一个解决方案都会一次一个地强迫人们使用吞吐量.例如,创建后台线程并使用阻塞队列一次放入和接收传入线程.
任何指导将不胜感激.
您也许可以使用ConcurrentHashMap. 我假设您的对象具有可以匹配的键,例如 PersonA 和 PersonB 将具有“Person”键。
ConcurrentHashMap<String, Match> map = new ConcurrentHashMap<>();
void addMatch(Match match) {
boolean success = false;
while(!success) {
Match oldMatch = map.remove(match.key);
if(oldMatch != null) {
match.setMatch(oldMatch);
success = true;
} else if(map.putIfAbsent(match.key, match) == null) {
success = true;
}
}
}
Run Code Online (Sandbox Code Playgroud)
您将继续循环,直到将匹配添加到地图,或者删除现有匹配并将其配对。 remove并且putIfAbsent都是原子的。
编辑:因为您希望将数据卸载到磁盘,所以您可以使用MongoDB及其findAndModify方法来达到此目的。如果具有该密钥的对象已存在,则该命令将删除并返回它,以便您可以将旧对象与新对象配对,并可能存储与新密钥关联的对;如果带有该键的对象不存在,则该命令将存储带有该键的对象。这与行为相同,ConcurrentHashMap只不过数据存储在磁盘上而不是内存中;您无需担心两个对象同时写入,因为逻辑findAndModify可以防止它们无意中占用相同的键。
如果您需要将对象序列化为 JSON,请使用Jackson 。
Mongo 有替代品,例如DynamoDB,尽管 Dynamo 仅对少量数据免费。
编辑:鉴于朋友列表不是自反的,我认为您可以通过结合 MongoDB(或另一个具有原子更新的键值数据库)和ConcurrentHashMap.
ConcurrentHashMap<key, boolean>为其创建一个,可能在全局ConcurrentHashMap<key, ConcurrentHashMap<key, boolean>>.findAndModify原子方式将其设置为“已匹配”,然后将新朋友以“已匹配”状态写入 MongoDB,最后将该对添加到 MongoDB 中可查询的“Pairs”集合中由最终用户。ConcurrentHashMap从全局地图中删除该人的位置。ConcurrentHashMap. 已经发生了,那么什么也不做;如果没有,则检查该好友是否与其关联ConcurrentHashMap;如果是,则将与当前人员的密钥关联的值设置为“true”。(请注意,两个朋友仍然有可能写入彼此的哈希映射,因为当前人无法检查自己的映射并通过一个原子操作修改朋友的映射,但自哈希映射检查减少了这种可能性。)ConcurrentHashMap从全局映射中删除,并创建一个延迟任务,该任务将迭代写入该人的所有朋友的 ID ConcurrentHashMap(即使用ConcurrentHashMap#keySet())。此任务的延迟应该是随机的(例如Thread.sleep(500 * rand.nextInt(30))),以便两个朋友不会总是尝试同时匹配。如果当前人员没有需要重新检查的好友,则不要为其创建延迟任务。在常见情况下,一个人要么与朋友匹配,要么在迭代朋友列表时在没有将朋友添加到系统的情况下匹配失败(即该人的列表ConcurrentHashMap将为空)。如果朋友同时写入:
同时添加好友 1 和好友 2。
ConcurrentHashMap表示想念对方。ConcurrentHashMap指示相同的情况(只有当 Friend2 检查 Friend1 是否在 Friend1 向其映射写入的同时向其映射写入信息时,才会发生这种情况 - 通常 Friend2 会检测到 Friend1 已向其映射写入信息,因此它不会写入 Friend1 的地图)。一些小问题:
ConcurrentHashMaps与它们关联,例如,当 Friend1 检查映射是否在内存中时,如果 Friend2 仍在初始化其哈希映射。这很好,因为 Friend2 将写入 Friend1 的哈希映射,因此我们保证最终会尝试匹配 - 至少其中一个将具有哈希映射,而另一个正在迭代,因为哈希映射创建先于迭代。ConcurrentHashMap,然后下一次迭代应该使用它作为新的朋友列表。最终该人将被匹配,否则该人的“重新检查”好友列表将被清空。Thread.sleep(500 * rand.nextInt(30))在第一次迭代、Thread.sleep(500 * rand.nextInt(60))第二次迭代、Thread.sleep(500 * rand.nextInt(90))第三次迭代等)。ConcurrentHashMap然后再从 MongoDB 中删除该人员,否则将会出现数据争用。同样,在迭代潜在匹配项时,您必须从 MongoDB 中删除某个人,否则您可能会无意中将其匹配两次。编辑:一些代码:
该方法addUnmatchedToMongo(person1)将“不匹配”的 person1 写入 MongoDB
setToMatched(friend1)用于findAndModify自动设置friend1为“匹配”;friend1如果已经匹配或不存在,该方法将返回 false ,如果更新成功,该方法将返回 true
isMatched(friend1)如果friend1存在且匹配则返回 true,如果不存在或存在且“不匹配”则返回 false
private ConcurrentHashMap<String, ConcurrentHashMap<String, Person>> globalMap;
private DelayQueue<DelayedRetry> delayQueue;
private ThreadPoolExecutor executor;
executor.execute(new Runnable() {
public void run() {
while(true) {
Runnable runnable = delayQueue.take();
executor.execute(runnable);
}
}
}
public static void findMatch(Person person, Collection<Person> friends) {
findMatch(person, friends, 1);
}
public static void findMatch(Person person, Collection<Person> friends, int delayMultiplier) {
globalMap.put(person.id, new ConcurrentHashMap<String, Person>());
for(Person friend : friends) {
if(**setToMatched(friend)**) {
// write person to MongoDB in "matched" state
// write "Pair(person, friend)" to MongoDB so it can be queried by the end user
globalMap.remove(person.id);
return;
} else {
if(**!isMatched(friend)** && globalMap.get(person.id).get(friend.id) == null) {
// the existence of "friendMap" indicates another thread is currently trying to match the friend
ConcurrentHashMap<String, Person> friendMap = globalMap.get(friend.id);
if(friendMap != null) {
friendMap.put(person.id, person);
}
}
}
}
**addUnmatchedToMongo(person)**;
Collection<Person> retryFriends = globalMap.remove(person.id).values();
if(retryFriends.size() > 0) {
delayQueue.add(new DelayedRetry(500 * new Random().nextInt(30 * delayMultiplier), person, retryFriends, delayMultiplier));
}
}
public class DelayedRetry implements Runnable, Delayed {
private final long delay;
private final Person person;
private final Collection<Person> friends;
private final int delayMultiplier;
public DelayedRetry(long delay, Person person, Collection<Person> friends, delayMultiplier) {
this.delay = delay;
this.person = person;
this.friends = friends;
this.delayMultiplier = delayMultiplier;
}
public long getDelay(TimeUnit unit) {
return unit.convert(delay, TimeUnit.MILLISECONDS);
}
public void run {
findMatch(person, friends, delayMultiplier + 1);
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
502 次 |
| 最近记录: |