小编Alb*_*bin的帖子

使用AtomicBoolean的Java循环调度算法

我想在向外部系统发送请求时实施严格的循环调度.有两个外部系统服务器.第一个请求应该转到'System1',第二个请求必须转到'System2',然后转到'System1',依此类推.

因为我只有两个服务器来发送请求,并且因为我想要最大性能而没有任何阻塞和上下文切换,所以我已经使用了AtomicBoolean,因为它使用了CAS操作.

我的实现类

1. RoundRobinTest.java

package com.concurrency;

import java.util.Iterator;

public class RoundRobinTest 
{
    public static void main(String[] args) 
    {
        for (int i = 0; i < 500; i++) 
        {
            new Thread(new RoundRobinLogic()).start();
        }
        try 
        {
            // Giving a few seconds for the threads to complete
            Thread.currentThread().sleep(2000);
            Iterator<String> output = RoundRobinLogic.output.iterator();
            int i=0;
            while (output.hasNext()) 
            {
                System.out.println(i+++":"+output.next());
                // Sleeping after each out.print 
                Thread.currentThread().sleep(20);
            }
        } 
        catch (Exception ex) 
        {
            // do nothing
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

2.RoundRobinLogic.java(具有静态AtomicBoolean对象的类)

package com.concurrency;

import java.util.Queue; …
Run Code Online (Sandbox Code Playgroud)

java algorithm multithreading atomicity atomicboolean

5
推荐指数
1
解决办法
5736
查看次数

如果对持久化RDD执行多个操作,缓存RDD的工作原理

val logList: RDD[String] = ...
val errorLogs = logList.filter(_.contains("Error")).persist()
//first action   
val first100 = errorLogs.take(100)
//second action
val count = errorLogs.count 
Run Code Online (Sandbox Code Playgroud)

这个案子将如何坚持工作?在下面的代码的情况下

val errorLogs = logList.filter(_.contains("Error")).take(100)
Run Code Online (Sandbox Code Playgroud)

Spark不会扫描所有日志,因为Spark知道我们只对100行日志感兴趣.但是当我们缓存这个RDD并在其上调用多个动作时会发生什么,第一个动作只需要很少的记录,后来需要转换整个RDD的记录.

调用第一个动作时它会缓存记录吗?或者它会仅缓存调用第一个操作时第一个操作所需的部分记录?

caching scala apache-spark rdd

3
推荐指数
1
解决办法
218
查看次数

有没有办法为同一 DAG 中的任务配置不同的“重试”

我有一个包含许多子任务的 DAG。在 DAG 的中间,有一个验证任务,根据任务的结果/返回代码,我想采用两条不同的路径。如果成功,将遵循一条路线(一系列任务),如果失败,我们将执行一组不同的任务。当前方法有两个问题,一是如果退出代码为 1,验证任务将执行多次(根据配置的重试次数)。二是无法采取不同的执行分支

为了解决问题 1,我们可以使用任务实例中可用的重试次数,它可以通过宏 {{ task_instance }} 获得。感谢有人可以为我们指出更简洁的方法,而采取不同路径的问题 2 仍未解决。

airflow

3
推荐指数
2
解决办法
5194
查看次数