小编rak*_*esh的帖子

为什么我必须明确告诉Spark要缓存什么?

在Spark中,每次我们对RDD执行任何操作时,都会重新计算RDD.因此,如果我们知道RDD将被重用,我们应该明确地缓存RDD.

让我们说,Spark决定懒惰地缓存所有RDD并使用LRU自动将最相关的RDD保存在内存中(这是大多数缓存以任何方式工作的方式).这对开发人员有很大的帮助,因为他不必考虑缓存并专注于应用程序.此外,我不知道它如何对性能产生负面影响,因为很难跟踪程序内部使用变量(RDD)的次数,大多数程序员将决定以任何方式缓存大多数RDD.

缓存通常会自动发生.以OS /平台或框架或工具为例.但是由于分布式计算中缓存的复杂性,我可能会忽略为什么缓存不能自动或性能影响.

所以我无法理解,为什么我必须显式缓存为,

  1. 它看起来很难看
  2. 它很容易被遗漏
  3. 它可以很容易地使用过度/不足

caching apache-spark

5
推荐指数
1
解决办法
1402
查看次数

Spark 流作业的可靠检查点(保持复杂状态)

我们在 Red Hat 4.4.7 上使用 Spark 1.6 和 JVM 1.6 来运行我们的 Spark 流应用程序/作业。我们的一些流作业使用复杂的状态,我们有 Scala 案例类来表示它们。但是在测试作业的升级周期时,我们遇到了一些问题,如下所示。由于流作业将永远运行,因此在设计易于升级的应用程序方面需要帮助。

我正在检查作业无法从检查点重新启动的确切用例。

  • 只是重新启动作业而不更改任何内容并不会产生问题。
  • 在进行随机更改(与状态无关)后重新启动作业不会产生问题。
  • 在更改状态处理功能(例如通过添加打印)后重新启动作业不会产生问题。
  • 在更改状态(通过添加新的布尔字段)后重新启动作业确实会产生问题。

在做了一些谷歌搜索之后,处理这个问题的一般准则似乎是,

  1. 将状态实现为“将模式与数据一起存储的格式”,如 json 或 avro。
    • 客户端代码必须在将其放入状态之前进行序列化,并在从状态中读取后反序列化。序列化和反序列化将在每个流间隔后发生,mapWithState 可能会有所帮助。
    • 如果作业的多个版本可以共存,则必须明确处理将状态从版本 x 升级到 y !!!
  2. 停止输入,完成输入的处理,重新开始作为具有新检查点的新作业。
    • 虽然这很容易实现,但对于我们的一些工作来说是不可能的。升级周期也会变得稍微复杂一些。
  3. 同时将数据保存到外部存储,并在升级时将其加载为初始 RDD。
    • 这将引入保持状态的外部依赖。
    • 如果作业的多个版本可以共存,则必须明确处理将状态从版本 x 升级到 y !!!

由于信息分散在整个网络上,我感到很困惑,无法得出结论。以下是我的问题,

  1. 如果状态类的结构发生变化,检查点将变得无效,但是,如果状态类的程序集 jar 或功能(不是结构)发生变化,是否还有其他已知问题使检查点变得无效?
  2. 您使用什么策略来轻松升级有状态的 Spark 流作业?

apache-spark spark-streaming

5
推荐指数
1
解决办法
692
查看次数

如何调整“spark.rpc.askTimeout”?

我们有一个 spark 1.6.1 应用程序,它从两个 kafka 主题中获取输入并将结果写入另一个 kafka 主题。应用程序在第一个输入主题中接收一些大(大约 1MB)文件,并从第二个输入主题接收一些简单的条件。如果满足条件,则将文件写入输出主题 else 中的状态(我们使用 mapWithState)。

该逻辑适用于较少(几百)数量的输入文件,但失败org.apache.spark.rpc.RpcTimeoutException并建议增加spark.rpc.askTimeout. 从默认值(120 秒)增加到 300 秒后,运行时间更长,但在 1 小时后因相同的错误而崩溃。把值改成500s后,工作正常运行了2个多小时。

注意:我们在本地模式下运行 spark 作业,而 kafka 也在机器本地运行。另外,有一段时间我看到警告"[2016-09-06 17:36:05,491] [WARN] - [org.apache.spark.storage.MemoryStore] - Not enough space to cache rdd_2123_0 in memory! (computed 2.6 GB so far)"

现在,考虑到所有本地配置,300 秒似乎足够大了。但是任何想法,如何根据测试得出理想的超时值,而不是仅使用 500 秒或更高,因为我看到使用 800 秒的崩溃案例和建议使用 60000 秒的案例?

apache-spark spark-streaming

5
推荐指数
1
解决办法
5817
查看次数

仅使用互斥锁读/写锁实现?

我试图仅使用互斥锁实现读/写锁定(仅用于学习).就在我认为我已经覆盖所有角落的情况下(因为程序使用各种组合),我已经意识到,我忽略了事实(因为它在ubuntu中工作),互斥体应该由线程的所有者释放.以下是我的实施,

class rw_lock_t{

    int NoOfReaders;
    int NoOfWriters, NoOfWritersWaiting;
    pthread_mutex_t class_mutex;
    pthread_cond_t class_cond;
    pthread_mutex_t data_mutex;

public:

    rw_lock_t()
    : NoOfReaders(0),
      NoOfWriters(0), NoOfWritersWaiting(0)
    {
            pthread_mutex_init(&class_mutex, NULL);
            pthread_mutex_init(&data_mutex, NULL);
            pthread_cond_init(&class_cond, NULL);
    }
    void r_lock()
    {
            pthread_mutex_lock(&class_mutex);
            //while(NoOfWriters!=0 || NoOfWritersWaiting!=0) //Writer Preference
            while(NoOfWriters!=0)
            {
                    pthread_cond_wait(&class_cond, &class_mutex);
            }
            if(NoOfReaders==0)
            {
                    pthread_mutex_unlock(&class_mutex);
                    pthread_mutex_lock(&data_mutex);
                    pthread_mutex_lock(&class_mutex);
                    NoOfReaders++;
                    pthread_mutex_unlock(&class_mutex);
            }
            else if(NoOfReaders>0) //Already Locked
            {
                    NoOfReaders++;
                    pthread_mutex_unlock(&class_mutex);
            }
    }
    void w_lock()
    {
            pthread_mutex_lock(&class_mutex);
            NoOfWritersWaiting++;
            while(NoOfReaders!=0 && NoOfWriters!=0)
            {
                    pthread_cond_wait(&class_cond, &class_mutex);
            }
            pthread_mutex_unlock(&class_mutex);

            pthread_mutex_lock(&data_mutex);
            pthread_mutex_lock(&class_mutex);
            NoOfWritersWaiting--; NoOfWriters++;
            pthread_mutex_unlock(&class_mutex);
    }
    void r_unlock() …
Run Code Online (Sandbox Code Playgroud)

c++ multithreading mutex pthreads

4
推荐指数
1
解决办法
1万
查看次数