标签: distributed-computing

如何控制Hadoop流媒体作业的输出文件名称和内容?

有没有办法控制Hadoop Streaming作业的输出文件名?具体来说,我希望我的作业的输出文件内容和名称由reducer输出组织 - 每个文件只包含一个键的值,其名称将是键.

更新:刚刚找到答案 - 使用派生自MultipleOutputFormat的Java类作为作业输出格式允许控制输出文件名. http://hadoop.apache.org/core/docs/current/api/org/apache/hadoop/mapred/lib/MultipleOutputFormat.html

我还没有看到任何样本......有人能指出使用自定义输出格式Java类的Hadoop Streaming示例吗?

hadoop mapreduce distributed-computing

6
推荐指数
1
解决办法
9443
查看次数

复制模式定义?

我目前正在研究有关其复制能力的不同NoSQL和RDBMS,以便构建分布式系统.

通过阅读几篇论文和书籍,我感觉到一些供应商或作者使用他们自己关于术语的定义

  • 主 - 主复制(两个服务器之间的复制)
  • 主从复制(mutliple服务器之间的复制,以提高读取速度,写入只能用于主服务器)
  • 多主复制(= Peer-to-Peer?)
  • 点对点复制(n个节点之间的复制,每个节点都可以读/写)
  • 合并复制(?)

例如:有些人将Master-Master和Peer-to-Peer这两个术语混为一谈,而在Mysql docus中我发现它在Master-Master和Multi-Master(= Peer-to-peer ???)之间有区别.复制.

Multi-Master和Peer-to-Peer复制的区别在哪里?Multi-Master复制的用例是否更倾向于聚类,而Peer-To-Peer将分布式内容定位到分布式应用程序?

我想解决问题,并确保我在这些条款中有正确的理解,所以也许这里的讨论将有助于合并一些知识.

问候,克里斯

编辑:添加合并复制到列表和一些解释,因为我理解他们...

database replication couchdb distributed-computing

6
推荐指数
1
解决办法
712
查看次数

Haskell框架,用于并行化非线程安全的C++库

我有一个闭源非线程安全的C++共享库,它提供了一个函数f :: ByteString - > ByteString.此功能的运行时间可能介于一秒到几个小时之间.

我正在寻找一种方法将计算分配到多个核心/服务器(SIMD).

简而言之,我正在寻找一个提供功能的框架

    g :: Strategy b -> (a -> b) -> a -> b
Run Code Online (Sandbox Code Playgroud)

提升一个只能被顺序调用到一个函数的函数,该函数的行为类似于Haskell中的任何其他纯函数.

例如,我希望能够写:

    parMap rwhnf f args -- will not work
Run Code Online (Sandbox Code Playgroud)

由于f通过FFI在非线程安全的lib中调用C函数,因此不起作用.因此,我可以用函数g替换函数f,函数g保存作业队列并将任务分派给N个单独的进程.这些进程可以在本地运行或分发:

    parMap rwhnf g args -- should works
Run Code Online (Sandbox Code Playgroud)

我已经研究过的潜在框架是

  1. MPI:客户端(Haskell)< - MPI - >代理(C++)< - MPI - >工作者(C++)< - > Lib(C++)

  2. ZeroMQ:客户端(Haskell)< - ZeroMQ - > Broker(C++)< - ZeroMQ - > Worker(C++)< - > Lib(C++)

  3. Cloud Haskell:客户端(Haskell)< - CloudHaskell - > Worker(Haskell)< - FFI - > Lib(C++) …

concurrency haskell distributed-computing gearman zeromq

6
推荐指数
1
解决办法
490
查看次数

是否有分布式数据处理管道框架,或组织一个好方法?

我正在设计一个应用程序,它需要一组分布式处理工作程序,这些工作程序需要异步使用并在特定流程中生成数据.例如:

  • 组件A获取页面.
  • 组件B分析来自A的页面.
  • 组件C存储来自B的分析的位和片段.

显然,涉及的不仅仅是三个组成部分.

进一步要求:

  • 每个组件都需要是一个单独的进程(或一组进程).
  • 生产者对消费者一无所知.换句话说,组件A只生成数据,而不知道哪些组件使用该数据.

这是一种由Storm等面向拓扑的系统解决的数据流.虽然风暴看起来很好,但我持怀疑态度; 它是一个Java系统,它基于Thrift,我都不喜欢它.

我目前倾向于使用AMQP作为数据传输的pub/sub-style方法,使用HTTP作为数据共享/存储的协议.这意味着AMQP队列模型成为公共API - 换句话说,消费者需要知道生产者使用哪个AMQP主机和队列 - 我不是特别高兴,但它可能值得妥协.

AMQP方法的另一个问题是每个组件必须具有非常相似的逻辑:

  • 连接到队列
  • 处理连接错误
  • 将数据序列化/反序列化为通用格式
  • 运行实际的工作人员(goroutines或分叉子进程)
  • 动态扩展工人
  • 容错
  • 节点注册
  • 处理指标
  • 队列限制
  • 队列优先级(一些工人不如其他工人重要)

......以及每个组件需要的许多其他细节.

即使消费者在逻辑上非常简单(想想MapReduce作业,比如将文本分成标记),也有很多样板.当然,我可以自己完成所有这些 - 我非常熟悉AMQP和队列以及其他所有内容 - 并将所有这些包装在一个由所有组件共享的公共包中,但后来我已经在发明框架了.

这类东西是否存在良好的框架?

请注意,我特别询问Go.我想避免使用Hadoop和整个Java堆栈.

编辑:为清晰起见添加了一些要点.

mapreduce distributed-computing go

6
推荐指数
1
解决办法
1502
查看次数

Hazelcast单节点快速启动进行调试

我正在编写一个使用Hazelcast的应用程序.我的应用程序开始很慢,因为Hazelcast尝试在启动时与网络上的其他节点通信.这对于生产来说很好,但是当我进行单节点调试时,它确实减慢了我的编辑 - 编译 - 运行 - 调试周期.

是否有Hazelcast设置告诉它只有一个节点,所以请快速启动,不要打扰ping网络的其余部分?

distributed-computing hazelcast

6
推荐指数
2
解决办法
2289
查看次数

COMPSs Monitor不显示任何应用程序

我正在使用COMPS运行COMPSs示例应用程序手册中显示的Increment应用程序.我添加了-m标志以启用监视功能:

$ runcompss -m --debug increment.Increment 5 1 2 3
Run Code Online (Sandbox Code Playgroud)

应用程序正常运行并完成(std输出/错误中没有显示错误,并且.COMPSs文件夹中的runtime.log没有任何堆栈跟踪).

我还启动了运行以下命令的COMPSs Monitor服务(我也添加了它的输出)

$ /etc/init.d/compss-monitor start
* Starting COMPSs Monitor
* Checking JAVA Installation...
   Success
* Checking IT_HOME...
WARNING: IT_HOME not defined. Trying default location /opt/COMPSs/
   Success
* Checking IT_MONITOR...
     IT_MONITOR=/root/.COMPSs/
   Success
* Checking COMPSs Monitor Port...
Warning: COMPSs_MONITOR_PORT not defined.
  Loading from configuration file.
      COMPSs_MONITOR_PORT=8080
   Success
* Checking COMPSs Monitor Timeout...
Warning: COMPSs_MONITOR_TIMEOUT not defined.
  Loading from configuration file.
      COMPSs_MONITOR_TIMEOUT=20000
   Success
* Configuring COMPSs Monitor …
Run Code Online (Sandbox Code Playgroud)

java hpc distributed-computing compss

6
推荐指数
1
解决办法
92
查看次数

C COMPS执行失败的所有作业

我已经从http://www.bsc.es/computer-sciences/grid-computing/comp-superscalar/downloads-and-documentation下载了COMPS 1.4和一些测试程序,我正在尝试测试它们.Java执行很顺利; 但是,我对C有问题.

我目前正在尝试执行Simple.自述文件声明我只需要两个命令:

buidapp simple

runcompss --lang=c master/simple 1
Run Code Online (Sandbox Code Playgroud)

应用程序构建正常,但使用此命令执行时,我收到以下错误:

[ERRMGR]  -  WARNING: Job 1 for running task 1 on worker localhost has failed; resubmitting task to the same worker.

[ERRMGR]  -  WARNING: Task 1 execution on worker localhost has failed; rescheduling task execution. (changing worker)

[ERRMGR]  -  WARNING: No task could be scheduled to any of the available resources.
                      This could end up blocking COMPSs. Will check it again in 20 seconds.
                      Possible causes: 
                          -Network problems: …
Run Code Online (Sandbox Code Playgroud)

c c++ hpc distributed-computing compss

6
推荐指数
1
解决办法
51
查看次数

火花的环境副本有多少?

我有一个PySpark应用程序,必须详细说明5GB的压缩数据(字符串).我正在使用一个12核(24线程)和72Gb RAM的小型服务器.我的PySpark程序只包含2个地图操作,由3个非常大的正则表达式(每个已经编译3gb)和加载pickle.Spark在独立模式下工作,同一台机器上有worker和master.

我的问题是:spark是否为每个执行器核心复制每个变量?因为它使用了所有可用的内存,然后使用了大量的交换空间.或者它可能加载RAM中的所有分区?RDD包含大约1000万个字符串,必须由3个正则表达式进行搜索.RDD计数大约1000个分区.我很难完成这项任务,因为几分钟后内存已满并且火花开始使用交换空间变得非常慢.我注意到没有正则表达式的情况是一样的.

这是我的代码,它会删除twitter推文的所有无用字段,并扫描推文的特定单词的文本和描述:

import json
import re
import twitter_util as twu
import pickle

from pyspark import SparkContext
sc = SparkContext()

prefix = '/home/lucadiliello'

source = prefix + '/data/tweets'
dest = prefix + '/data/complete_tweets'

#Regex's path
companies_names_regex = prefix + '/data/comp_names_regex'
companies_names_dict = prefix + '/data/comp_names_dict'
companies_names_dict_to_legal = prefix + '/data/comp_names_dict_to_legal'

#Loading the regex's
comp_regex = pickle.load(open(companies_names_regex))
comp_dict = pickle.load(open(companies_names_dict))
comp_dict_legal = pickle.load(open(companies_names_dict_to_legal))

#Loading the RDD from textfile 
tx = sc.textFile(source).map(lambda a: json.loads(a))


def get_device(input_text):
    output_text = …
Run Code Online (Sandbox Code Playgroud)

python distributed-computing bigdata apache-spark pyspark

6
推荐指数
1
解决办法
94
查看次数

水平扩展聊天记录工作者

我已经考虑了很多,但无法想出一个我很满意的解决方案.

基本上这就是问题:记录100k + Chats(有些慢,有些更快)进入cassandra.因此保存userId,channelId,timestamp和消息.

Cassandra已经支持水平扩展,我在这里没有问题.

现在,我读取这些聊天的软件通过TCP(IRC)完成.对于顶级1k通道,通常会有300条消息/秒,而单个IRC连接无法处理我的实验.

我现在想要构建的是记录器的多个实例(使用Docker/Kubernetes)并在它们之间共享负载.理想情况下,如果我有4个工人和1k个聊天(例如).他们每个人都会加入至少250个频道.我说至少是因为我想要可选的冗余,所以我可以在同一个聊天中有2个记录器,以确保没有消息丢失.重复项没有问题,因为所有消息都有唯一的ID.

现在,我将如何最好地动态分享工人之间加入的当前渠道.我想避免拥有主人或控制点.还应该很容易添加更多的工人,然后减少其他工人的负担.

有关这种行为的好文章吗?也许已经定义了好的概念或协议?就像我说的,我想避免另一个中央控制点,所以没有rabbitmq,redis或其他什么.

编辑:我已经研究过像Raft Consensus Algorithm这样的东西了,但是我认为这没有意义,因为我不希望我的客户就共享状态达成一致,而是将它们之间的状态"平等地"分开.

parallel-processing tcp distributed-computing docker kubernetes

6
推荐指数
1
解决办法
162
查看次数

拆分数组以找到分布式环境中两个子数组之和之间的最小差异

昨天有人问我这个问题。我必须编写代码将数组分为两部分,以使这两部分之和之间的差异最小。

这是我写的复杂度为O(n)的代码

function solution(a) {
  let leftSum = 0;
  let rightSum = a.reduce((acc, value) => acc + value ,0);
  let min = Math.abs(rightSum - leftSum);
  a.forEach((item, i) => {
   leftSum += a[i];
   rightSum -= a[i]; 
   const tempMin = Math.abs(rightSum - leftSum);
   if(tempMin < min) min = tempMin;
  })
  return min;
}
Run Code Online (Sandbox Code Playgroud)

但是随后我被问到输入数组的长度是否为1000万,如何在分布式环境中解决此问题?

我是分布式编程的新手,在这方面需要帮助。

javascript algorithm distributed distributed-computing distributed-system

6
推荐指数
1
解决办法
144
查看次数