为什么Scala和像Spark和Scalding这样的框架都有reduce
和foldLeft
?那么reduce
和之间的区别是fold
什么?
在shell中,我cleanJar
在Impatient/part1目录中输入了gradle .输出如下.错误是" 找不到org.apache.hadoop.mapred.JobConf的类文件 ".为什么编译失败?
:clean UP-TO-DATE
:compileJava
Download http://conjars.org/repo/cascading/cascading-core/2.0.1/cascading-core-2.0.1.pom
Download http://conjars.org/repo/cascading/cascading-hadoop/2.0.1/cascading-hadoop-2.0.1.pom
Download http://conjars.org/repo/riffle/riffle/0.1-dev/riffle-0.1-dev.pom
Download http://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.6.1/slf4j-api-1.6.1.pom
Download http://repo1.maven.org/maven2/org/slf4j/slf4j-parent/1.6.1/slf4j-parent-1.6.1.pom
Download http://repo1.maven.org/maven2/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.pom
Download http://conjars.org/repo/thirdparty/jgrapht-jdk1.6/0.8.1/jgrapht-jdk1.6-0.8.1.pom
Download http://repo1.maven.org/maven2/org/codehaus/janino/janino/2.5.16/janino-2.5.16.pom
Download http://conjars.org/repo/cascading/cascading-core/2.0.1/cascading-core-2.0.1.jar
Download http://conjars.org/repo/cascading/cascading-hadoop/2.0.1/cascading-hadoop-2.0.1.jar
Download http://conjars.org/repo/riffle/riffle/0.1-dev/riffle-0.1-dev.jar
Download http://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.6.1/slf4j-api-1.6.1.jar
Download http://repo1.maven.org/maven2/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar
Download http://conjars.org/repo/thirdparty/jgrapht-jdk1.6/0.8.1/jgrapht-jdk1.6-0.8.1.jar
Download http://repo1.maven.org/maven2/org/codehaus/janino/janino/2.5.16/janino-2.5.16.jar
/home/is_admin/lab/cascading/Impatient/part1/src/main/java/impatient/Main.java:50: error: cannot access JobConf
Tap inTap = new Hfs( new TextDelimited( true, "\t" ), inPath );
^
class file for org.apache.hadoop.mapred.JobConf not found
1 error
:compileJava FAILED
FAILURE: Build failed with an exception.
* What went wrong:
Execution failed for …
Run Code Online (Sandbox Code Playgroud) 在Scala中,如何解压缩包含的文本file.gz
以便可以处理它?我很高兴将文件的内容存储在变量中,或者将其保存为本地文件,以便以后可以通过程序读入.
具体来说,我使用Scalding处理压缩日志数据,但Scalding没有定义读取它们的方法FileSource.scala
.
从git获取代码后clone https://github.com/twitter/scalding.git
,./sbt update
我得到:
::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: UNRESOLVED DEPENDENCIES ::
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: com.hadoop.gplcompression#hadoop-lzo;0.4.16: not found
Run Code Online (Sandbox Code Playgroud)
然后:
sbt.ResolveException: unresolved dependency: com.hadoop.gplcompression#hadoop-lzo;0.4.16: not found
at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:213)
at sbt.IvyActions$$anonfun$update$1.apply(IvyActions.scala:122)
at sbt.IvyActions$$anonfun$update$1.apply(IvyActions.scala:121)
at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:116)
Run Code Online (Sandbox Code Playgroud)
我已经删除了~/.sbt
目录,重新安装了最新的Scala和克隆的存储库.问题仍然存在.
我的配置:
[dk@localhost scalding]$ scala -version
Scala code runner version 2.10.3 -- Copyright 2002-2013, LAMP/EPFL
[dk@localhost scalding]$ uname -a
Linux localhost.localdomain 2.6.32-431.3.1.el6.x86_64 #1 SMP Fri Jan 3 21:39:27 UTC 2014 x86_64 x86_64 x86_64 GNU/Linux
CentOS 6.5 (Final)
Run Code Online (Sandbox Code Playgroud)
什么可能是错的./sbt update
?我应该hadoop-lzo
单独安装依赖项并设置CLASSPATH来安装Scalding吗?还有其他方法吗? …
如果要在Scalding中创建一个包含超过22个字段的管道,则受Scala元组的限制,Scala元组的数量不能超过22个.
有没有办法使用集合而不是元组?我想象下面的例子,遗憾的是它不起作用:
input.read.mapTo('line -> aLotOfFields) { line: String =>
(1 to 24).map(_.toString)
}.write(output)
Run Code Online (Sandbox Code Playgroud) 所以我的输入数据有两个字段/列:id1和id2,我的代码如下:
TextLine(args("input"))
.read
.mapTo('line->('id1,'id2)) {line: String =>
val fields = line.split("\t")
(fields(0),fields(1))
}
.groupBy('id2){.size}
.write(Tsv(args("output")))
Run Code Online (Sandbox Code Playgroud)
输出结果(我假设)两个字段:id2*size.我有点坚持找出是否可以保留id2值并将其与id2分组并将其添加为另一个字段?
我正在使用scala 2.10和gradle 1.11
我的问题是,当我尝试在hadoop集群中运行时,编译后的jar会丢失一个错误.我想在hadoop上运行,因为我使用烫伤.
例外是:
Exception in thread "main" java.io.FileNotFoundException:
/tmp/hadoop-root/hadoop-unjar6538587701808097105/com/twitter/bijec
tion/GeneratedTupleCollectionInjections$$anon$31$$anonfun$invert$10$$anon
fun$apply$46$$anonfun$apply$47$$anonfun$apply$48$$anonfun$apply$49$$anonfu
n$apply$50$$anonfun$apply$51$$anonfun$apply$52$$anonfun$apply$53$$anonfun$app
ly$54$$anonfun$apply$55.class (File name too long)
Run Code Online (Sandbox Code Playgroud)
欢迎任何评论......
如何在单个Map Reduce作业中使用Scalding(/ cascading)写入依赖于键的多个输出.我当然可以使用.filter
所有可能的密钥,但这是一个可怕的黑客,它将启动许多工作.
我正在研究用于关系(类似SQL)运算符的DSL.我有一个Rep[Table]
类型.apply: ((Symbol, ...)) => Obj
,返回一个Obj
定义.flatMap: T1 => T2
和.map: T1 => T3
函数的对象的方法.由于类型Rep[Table]
对基础表的模式一无所知,因此该apply
方法就像投影一样 - 仅投影参数元组中指定的字段(很像无类型的烫印api).现在type T1
是一个"类似元组",它的长度受到一些无形魔法约束到投影元组的长度,但是否则元组元素的类型由api用户决定,所以代码就像
val a = loadTable(...)
val res = a(('x, 'y)).map { (t: Row2[Int, Int]) =>
(1, t(0))
}
Run Code Online (Sandbox Code Playgroud)
要么
val res = a(('x, 'y)).map { (t: Row2[String, String]) =>
(1, t(0))
}
Run Code Online (Sandbox Code Playgroud)
工作良好.请注意,必须显式指定map
/ flatMap
function 的参数类型.但是,当我尝试用它来理解时
val a = loadTable(...)
val b = loadTable(...)
val c = loadTable(...) …
Run Code Online (Sandbox Code Playgroud) 有这样的数据:
pid recom-pid
1 1
1 2
1 3
2 1
2 2
2 4
2 5
Run Code Online (Sandbox Code Playgroud)
需要做到:
pid, recommendations
1 2,3
2 1,4,5
Run Code Online (Sandbox Code Playgroud)
含义忽略第二列中的self,并将其余部分放入逗号分隔的字符串中.其标签分隔数据
尝试过变体,但不确定如何在foldLeft中引用productId
.groupBy('productId) {
_.foldLeft(('prodReco) -> 'prodsR)("") {
(s: String, s2: String) =>
{
println(" s " + s + ", s2 :" + s2 + "; pid :" + productId + ".")
if (productId.equals(s2)) {
s
} else {
s + "," + s2;
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
使用scala 2.10,烫洗0.10.0,级联2.5.3.需要一个烫伤的答案.我知道如何操纵scala中的数据.我只是想知道如何在分组过程中抓住列,并使用它们有条件地做左侧折叠或其他方法来获得过滤后的输出.
有关完整的工作示例,请参阅https://github.com/tgkprog/scaldingEx2/tree/master/Q1