标签: distributed-computing

分布式作业调度,管理和报告

我最近玩过Hadoop,并对MapReduce作业的调度,管理和报告印象深刻.它似乎使新工作的分配和执行非常无缝,使开发人员能够专注于他们的工作实施.

我想知道Java域中是否存在任何不容易表示为MapReduce问题的作业的分布式执行?例如:

  • 需要任务协调和同步的工作.例如,它们可能涉及顺序执行任务,但同时执行某些任务是可行的:

                   .-- B --.
            .--A --|       |--.
            |      '-- C --'  |
    Start --|                 |-- Done
            |                 |
            '--D -------------'
    
    Run Code Online (Sandbox Code Playgroud)
  • 您希望分发的CPU密集型任务但不提供任何减少的输出 - 例如图像转换/调整大小.

那么是否有一个提供这种分布式计算环境的Java框架/平台?或者这种事情是否可以使用Hadoop接受/实现 - 如果有的话,这些工作的模式/指南是什么?

java hadoop distributed-computing job-scheduling

9
推荐指数
1
解决办法
1万
查看次数

"采用MapReduce模型"=可扩展性的通用答案吗?

我一直在尝试理解MapReduce概念并将其应用于我目前的情况.我的情况怎么样?好吧,我这里有一个ETL工具,其中数据转换发生在源和目标数据源(数据库)之外.因此,源数据源纯粹用于加载的提取和目标.

因此,今天的这种转变行为,对于一百万条记录来说需要大约X小时.我想解决一个我将拥有十亿条记录的情况,但我希望在相同的X小时内完成工作.因此,我的产品需要根据数据规模进行扩展(添加更多商品机器).正如您所看到的,我只担心将产品转换功能分配到不同机器的能力,并利用所有这些机器的CPU功率.

我开始寻找选项,我遇到了Apache Hadoop,最后是MapReduce的概念.我在快速安装Hadoop时非常成功,而不会遇到集群模式的问题,并且很高兴能够运行wordcount演示.很快,我意识到为了实现我自己的MapReduce模型,我必须将我的产品转换功能重新定义为MAP和REDUCE功能.

这是麻烦开始的时候.我阅读了Hadoop:Definitive Guide的副本,我理解Hadoop的许多常见用例都在以下情况中:

  • 未构造的数据和一个想要执行聚合/排序/或类似的东西.
  • 未经编译的文本,需要执行挖掘
  • 等等!

这是我的场景,我从数据库中提取并加载到数据库(具有结构化数据),我的唯一目的是以可靠的方式引入更多的CPU,并通过分发我的转换.重新定义我的转换以适应Map和Reduce模型本身就是一个巨大的挑战.所以这是我的问题:

  1. 您是否在ETL场景中使用过Hadoop?如果是,可以具体说明您如何处理转换的MapReducing?你是否纯粹使用Hadoop来利用额外的CPU能力?

  2. MapReduce概念是分布式计算的通用答案吗?还有其他同样好的选择吗?

  3. 我的理解是MapReduce适用于大型数据集进行排序/分析/分组/计数/聚合/等,我的描述是否正确?

java architecture hadoop design-patterns distributed-computing

9
推荐指数
1
解决办法
334
查看次数

在动态环境中使用Paxos

使用2F + 1处理器时,Paxos算法可以容忍高达F的故障.据我所知,此算法仅适用于固定数量的处理器.是否可以在动态环境中使用此算法,其中可以动态添加和删除节点?

algorithm distributed-computing paxos

9
推荐指数
2
解决办法
1004
查看次数

分布式算法设计

我一直在读"算法入门",并开始在我脑海里浮现出一些想法和问题.最令我困惑的是你如何设计一种算法来安排分发队列中的项目/消息.

我的想法让我浏览维基百科,主题包括排序,消息队列,调度,分布式哈希表,等等.

场景: 假设你想要一个排队消息的系统(例如字符串或一些序列化对象).该系统的一个关键特性是避免任何单点故障.系统必须分布在某个集群中的多个节点上,并且必须始终(或尽可能最好)甚至集群中每个节点的工作负载以避免热点.

您希望避免使用主/从设计进行复制和扩展(没有单点故障).该系统完全避免写入盘并保持在存储器数据结构中.

由于这是一种某种类型的队列,系统应该能够使用不同的调度算法(FIFO,最早期限,循环等......)来确定在下一个请求中应该返回哪个消息,而不管哪个节点在请求所针对的集群.

我最初的想法, 我可以想象这将如何在一台机器上工作,但当我开始思考你如何分配像这样的问题,如:

我如何散列每条消息?

我怎么知道邮件发送到哪个节点?

我如何安排每个项目,以便我可以确定下一个应该返回哪个消息和哪个节点?

我开始阅读有关分布式哈希表以及像Apache Cassandra这样的项目如何使用某种一致性散列来分发数据但我认为,因为查询不会提供密钥,我需要知道下一个项目的位置并且只是提供它.这导致阅读有关对等协议以及它们如何跨节点处理同步问题.

所以我的问题是,你将如何处理上述问题,或者这是一个太过牵强,只是一个愚蠢的想法......?

只是一个概述,指针,不同的方法,陷阱和每个的好处.可能适用的技术/概念/设计/理论.基本上任何可以用来理解这样的东西可能有用的东西.

如果你想知道,不,我不打算实施这样的事情,它只是在阅读时突然出现在我脑海中(碰巧的是,当我读一本好书时,我会被狂野的想法分心).

UPDATE

另一个有趣的问题是分布式删除.我知道像Cassandra这样的系统已经通过实施HintedHandoff,Read RepairAntiEntropy解决了这个问题,它似乎运行良好,但有没有其他(可行和有效)的方法解决这个问题?

algorithm p2p protocols distributed-computing message-queue

9
推荐指数
1
解决办法
1197
查看次数

弹性风暴拓扑/ Storm-Hadoop共存

我们正在评估为部署而追求Storm,但我有点担心.我们目前运行Hadoop MapReduce,并希望将我们的一些处理从MapReduce转换为Storm进程.请注意,这是一些,但不是全部.我们仍然会有一些MapReduce功能.

我找到了Mesos,它可能(可能)允许我们在同一硬件上维护Storm和Hadoop部署,但还有一些其他问题:

  • 我设想理想的情况是能够任意地"借用"Storm和Hadoop之间的插槽.恩.两者都会根据需要使用相同的资源.不幸的是,这是一个固定的部署,并不像EC2那样"基于云".

  • 我想避免在我们的Storm环境中遇到瓶颈.一个理想的情况是根据需求"旋转"(或反向)螺栓的更多实例.这可能/现实吗?

  • "重新启动"拓扑似乎是一个相当昂贵的操作,我不确定是否真的是一个选项.理想情况下,我希望它尽可能无缝.

我们正确接近这个问题吗?从本质上讲,Storm拓扑将"提供"MapReduce批处理作业.我们的一些处理可以以流方式处理,并且作为Storm拓扑会更好,而其中一些需要批处理.

任何一般性反馈,即使它没有解决我的具体问题,也会受到欢迎.在这一点上,这更像是一个探索阶段,我可能完全以错误的方式接近这一点.

java hadoop mapreduce distributed-computing apache-storm

9
推荐指数
1
解决办法
2559
查看次数

在Apache Spark中.如何设置worker/executor的环境变量?

我在EMR上的火花程序经常出现这个错误:

Caused by: javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated
    at sun.security.ssl.SSLSessionImpl.getPeerCertificates(SSLSessionImpl.java:421)
    at org.apache.http.conn.ssl.AbstractVerifier.verify(AbstractVerifier.java:128)
    at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:397)
    at org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:148)
    at org.apache.http.impl.conn.AbstractPoolEntry.open(AbstractPoolEntry.java:149)
    at org.apache.http.impl.conn.AbstractPooledConnAdapter.open(AbstractPooledConnAdapter.java:121)
    at org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:573)
    at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:425)
    at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:820)
    at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:754)
    at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:334)
    at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:281)
    at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:942)
    at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2148)
    at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2075)
    at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1093)
    at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:548)
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:172)
    at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
    at org.apache.hadoop.fs.s3native.$Proxy8.retrieveMetadata(Unknown Source)
    at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414)
    at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
    at org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:341)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
Run Code Online (Sandbox Code Playgroud)

我做了一些研究,发现通过设置环境变量,可以在低安全性情况下禁用此身份验证:

com.amazonaws.sdk.disableCertChecking=true
Run Code Online (Sandbox Code Playgroud)

但是我只能用spark-submit.sh --conf设置它,它只影响驱动程序,而大多数错误都在工作者身上.

有没有办法将它们传播给工人?

非常感谢.

distributed-computing amazon-s3 amazon-web-services apache-spark

9
推荐指数
2
解决办法
4197
查看次数

rdd.collect().toMap to rdd.collectAsMap()之间的区别?

是否有任何性能的影响,当我在我的RDD使用collectAsMap代替rdd.collect().toMap的?

我有一个键值RDD,我要转换为HashMap类,据我所知收集()是没有效率的大型数据集,因为它运行在驱动程序我可以用collectAsMap而不是有任何的性能影响?

原版的:

val QuoteHashMap=QuoteRDD.collect().toMap 
val QuoteRDDData=QuoteHashMap.values.toSeq 
val QuoteRDDSet=sc.parallelize(QuoteRDDData.map(x => x.toString.replace("(","").replace(")",""))) 
QuoteRDDSet.saveAsTextFile(Quotepath) 
Run Code Online (Sandbox Code Playgroud)

更改:

val QuoteHashMap=QuoteRDD.collectAsMap() 
val QuoteRDDData=QuoteHashMap.values.toSeq 
val QuoteRDDSet=sc.parallelize(QuoteRDDData.map(x => x.toString.replace("(","").replace(")",""))) 
QuoteRDDSet.saveAsTextFile(Quotepath)
Run Code Online (Sandbox Code Playgroud)

scala distributed-computing apache-spark

9
推荐指数
1
解决办法
8942
查看次数

为什么Windows7上的TCP/IP需要500个预热才能进行预热?(w10,w8证明不会受到影响)

我们看到ZeroMQ上有一个奇怪且无法解释的现象,Windows 7通过TCP发送消息.(或者inproc,因为ZeroMQ在Windows内部使用TCP进行信令).

这种现象是前500条消息越来越慢,延迟越来越慢.然后,除了由CPU /网络争用引起的峰值之外,延迟下降和消息一直快速到达.

这里描述了这个问题:https://github.com/zeromq/libzmq/issues/1608

它始终是500条消息.如果我们发送没有延迟,那么消息被批处理,所以我们看到这个现象延伸了几千个发送.如果我们在发送之间延迟,我们会更清楚地看到图表.即使在发送之间延迟多达50-100毫秒也不会改变事情.

邮件大小也无关紧要.我已经测试了10字节消息和10K消息,结果相同.

最大延迟始终为2毫秒(2,000 usec).

在Linux机器上,我们没有看到这种现象.

我们想要做的是消除这个初始曲线,因此消息与正常的低延迟(大约20-100 usec)保持新的连接.


更新:该问题不会在Windows 10和8上显示.它似乎发生在Windows 7上.

tcp winsock distributed-computing zeromq low-latency

9
推荐指数
1
解决办法
533
查看次数

使用Java中的DynamoDBMapper更新DynamoDB项

如何使用DynamoDBMapper更新DynamoDB项?

我有多个进程,使用DynamoDB表,因此,get + save会产生不一致.我找不到使用DynamoDBMapper更新项目的方法.

java distributed-computing amazon-dynamodb

9
推荐指数
1
解决办法
9798
查看次数

Redis的set命令是原子操作吗?

我正在尝试使用Redis的set命令来实现一个最简单的分布式锁组件,但我无法通过官方文档找到关于原子性的任何确切依据,Redis是否SET key value [EX seconds] [PX milliseconds] [NX|XX]命令进行原子操作?

locking distributed-computing redis

9
推荐指数
1
解决办法
2986
查看次数