在Spark中,每次我们对RDD执行任何操作时,都会重新计算RDD.因此,如果我们知道RDD将被重用,我们应该明确地缓存RDD.
让我们说,Spark决定懒惰地缓存所有RDD并使用LRU自动将最相关的RDD保存在内存中(这是大多数缓存以任何方式工作的方式).这对开发人员有很大的帮助,因为他不必考虑缓存并专注于应用程序.此外,我不知道它如何对性能产生负面影响,因为很难跟踪程序内部使用变量(RDD)的次数,大多数程序员将决定以任何方式缓存大多数RDD.
缓存通常会自动发生.以OS /平台或框架或工具为例.但是由于分布式计算中缓存的复杂性,我可能会忽略为什么缓存不能自动或性能影响.
所以我无法理解,为什么我必须显式缓存为,
我们在 Red Hat 4.4.7 上使用 Spark 1.6 和 JVM 1.6 来运行我们的 Spark 流应用程序/作业。我们的一些流作业使用复杂的状态,我们有 Scala 案例类来表示它们。但是在测试作业的升级周期时,我们遇到了一些问题,如下所示。由于流作业将永远运行,因此在设计易于升级的应用程序方面需要帮助。
我正在检查作业无法从检查点重新启动的确切用例。
在做了一些谷歌搜索之后,处理这个问题的一般准则似乎是,
由于信息分散在整个网络上,我感到很困惑,无法得出结论。以下是我的问题,
我们有一个 spark 1.6.1 应用程序,它从两个 kafka 主题中获取输入并将结果写入另一个 kafka 主题。应用程序在第一个输入主题中接收一些大(大约 1MB)文件,并从第二个输入主题接收一些简单的条件。如果满足条件,则将文件写入输出主题 else 中的状态(我们使用 mapWithState)。
该逻辑适用于较少(几百)数量的输入文件,但失败org.apache.spark.rpc.RpcTimeoutException并建议增加spark.rpc.askTimeout. 从默认值(120 秒)增加到 300 秒后,运行时间更长,但在 1 小时后因相同的错误而崩溃。把值改成500s后,工作正常运行了2个多小时。
注意:我们在本地模式下运行 spark 作业,而 kafka 也在机器本地运行。另外,有一段时间我看到警告"[2016-09-06 17:36:05,491] [WARN] - [org.apache.spark.storage.MemoryStore] - Not enough space to cache rdd_2123_0 in memory! (computed 2.6 GB so far)"
现在,考虑到所有本地配置,300 秒似乎足够大了。但是任何想法,如何根据测试得出理想的超时值,而不是仅使用 500 秒或更高,因为我看到使用 800 秒的崩溃案例和建议使用 60000 秒的案例?
我试图仅使用互斥锁实现读/写锁定(仅用于学习).就在我认为我已经覆盖所有角落的情况下(因为程序使用各种组合),我已经意识到,我忽略了事实(因为它在ubuntu中工作),互斥体应该由线程的所有者释放.以下是我的实施,
class rw_lock_t{
int NoOfReaders;
int NoOfWriters, NoOfWritersWaiting;
pthread_mutex_t class_mutex;
pthread_cond_t class_cond;
pthread_mutex_t data_mutex;
public:
rw_lock_t()
: NoOfReaders(0),
NoOfWriters(0), NoOfWritersWaiting(0)
{
pthread_mutex_init(&class_mutex, NULL);
pthread_mutex_init(&data_mutex, NULL);
pthread_cond_init(&class_cond, NULL);
}
void r_lock()
{
pthread_mutex_lock(&class_mutex);
//while(NoOfWriters!=0 || NoOfWritersWaiting!=0) //Writer Preference
while(NoOfWriters!=0)
{
pthread_cond_wait(&class_cond, &class_mutex);
}
if(NoOfReaders==0)
{
pthread_mutex_unlock(&class_mutex);
pthread_mutex_lock(&data_mutex);
pthread_mutex_lock(&class_mutex);
NoOfReaders++;
pthread_mutex_unlock(&class_mutex);
}
else if(NoOfReaders>0) //Already Locked
{
NoOfReaders++;
pthread_mutex_unlock(&class_mutex);
}
}
void w_lock()
{
pthread_mutex_lock(&class_mutex);
NoOfWritersWaiting++;
while(NoOfReaders!=0 && NoOfWriters!=0)
{
pthread_cond_wait(&class_cond, &class_mutex);
}
pthread_mutex_unlock(&class_mutex);
pthread_mutex_lock(&data_mutex);
pthread_mutex_lock(&class_mutex);
NoOfWritersWaiting--; NoOfWriters++;
pthread_mutex_unlock(&class_mutex);
}
void r_unlock() …Run Code Online (Sandbox Code Playgroud)