小编Gam*_*ows的帖子

在本地加载Spark数据不完整的HDFS URI

我在本地CSV文件中遇到了SBT加载问题.基本上,我在Scala Eclipse中编写了一个Spark程序,它读取以下文件:

val searches = sc.textFile("hdfs:///data/searches")
Run Code Online (Sandbox Code Playgroud)

这在hdfs上工作正常,但出于de-bug原因,我希望从本地目录加载此文件,我已将其设置为项目目录.

所以我厌倦了以下几点:

val searches = sc.textFile("file:///data/searches")
val searches = sc.textFile("./data/searches")
val searches = sc.textFile("/data/searches")
Run Code Online (Sandbox Code Playgroud)

这些都不允许我从本地读取文件,并且所有这些都在SBT上返回此错误:

Exception in thread "main" java.io.IOException: Incomplete HDFS URI, no host: hdfs:/data/pages
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:143)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:304)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:179)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120) …
Run Code Online (Sandbox Code Playgroud)

scala sbt apache-spark

5
推荐指数
2
解决办法
9112
查看次数

朱莉娅组按名称和总结计数

我是 Julia 的新手,有一个简单的问题。我有一个具有以下结构的 csv 文件:[类别、名称、计数]。我有两件事要创造。

1,我想在 julia 中创建一个函数,其中 groupBy 类别并将计数相加(名称被忽略)。所以输出是 [Name, Count]。然后我将通过设置 x= Name 和 y= Count 来生成条形图

2,我想为每个类别生成多个图,其中每个名称的计数绘制在单独的条形图上。那么迭代绘图过程?

我想我已经掌握了绘图的窍门,但我不确定如何执行 groupBy 过程。对教程的任何帮助/重定向将不胜感激。

我的数据示例:

(net_worth,khan,14)
(net_worth,kevin,15)
(net_worth,bill,16)
Run Code Online (Sandbox Code Playgroud)

我目前正在研究的功能:

function wordcount(text,opinion,number)
words= text
counts= Dict()
  for w = words
    counts[w]= number
  end
return counts
end

function wcreduce(wcs)
counts=Dict()
  for c in wcs, (k,v) in c
    counts[k] = get(counts,k,0)+v
  end
return counts
end
Run Code Online (Sandbox Code Playgroud)

我想我正在寻找像 reduceByKey 或 GroupByKey 这样的函数。

plot julia

5
推荐指数
1
解决办法
1766
查看次数

scala匹配从布尔列表到求和Int列表

我有一个关于scala/spark列表匹配的一般问题.假设我有以下形式的布尔列表:

List(true, false, false ,true, true)
Run Code Online (Sandbox Code Playgroud)

我希望将此布尔列表转换为:

List(1, 1, 1, 2, 3)
Run Code Online (Sandbox Code Playgroud)

这样每次有一个true时,List加1,每次有一个false,它输出前一个结果.我认为有一些非常有效的方法来做到这一点没有循环,但现在不能想到任何...

scala

2
推荐指数
1
解决办法
509
查看次数

Scala/Spark高效的部分字符串匹配

我正在使用Scala在Spark中编写一个小程序,并遇到了一个问题.我有一个单字串的List/RDD和一个List/RDD的句子,这些句子可能包含也可能不包含单个单词列表中的单词.即

val singles = Array("this", "is")
val sentence = Array("this Date", "is there something", "where are something", "this is a string")
Run Code Online (Sandbox Code Playgroud)

我想选择包含单个单词中一个或多个单词的句子,结果应该是这样的:

output[(this, Array(this Date, this is a String)),(is, Array(is there something, this is a string))]
Run Code Online (Sandbox Code Playgroud)

我考虑了两种方法,一种是通过拆分句子并使用.contains进行过滤.另一种是将句子分割并格式化为RDD并使用.join进行RDD交集.我正在查看大约50个单词和500万个句子,哪种方法会更快?还有其他解决方案吗?你能不能帮我编码,我的代码似乎没有结果(尽管编译并运行没有错误)

string scala apache-spark

2
推荐指数
1
解决办法
4702
查看次数

spark scala最有效的方法来进行部分字符串计数

我有一个问题,关于在1000万长度的火花RDD(或scala数组)中进行部分字符串匹配的最有效方法.考虑以下:

val set1 = Array("star wars", "ipad") //These are the String I am looking for
val set2 = RDD[("user1", "star wars 7 is coming out"),
           ("user1", "where to watch star wars"),
           ("user2", "star wars"),
           ("user2", "cheap ipad")]
Run Code Online (Sandbox Code Playgroud)

我希望能够计算Set1中属于Set1的每个字符串的出现次数.所以结果应该是这样的:

Result = ("star wars", 3),("ipad", 1)
Run Code Online (Sandbox Code Playgroud)

我还想计算搜索该术语的用户数(即不同的用户),因此结果应为:

Result = ("star wars", 2), ("ipad", 1)
Run Code Online (Sandbox Code Playgroud)

我尝试了两种方法,第一种方法是将RDD字符串转换为set,flatMapValues然后进行连接操作,但它耗费内存.我正在考虑的另一种方法是正则表达式方法,因为只需要计数并给出确切的字符串,但我不知道如何使其有效(通过创建函数并在映射RDD时调用它?)

我似乎能够在使用LIKE的pgsql中很容易地做到这一点,但不确定是否存在以相同方式工作的RDD连接.

任何帮助将不胜感激.

string scala match apache-spark

2
推荐指数
1
解决办法
721
查看次数

bash cat包含文件名中某个字符串的所有文件

bash,如何cat在目录中包含文件名中的某个字符串的所有文件.例如,我有名为的文件:

test001.csv
test002.csv
test003.csv
result001.csv
result002.csv
Run Code Online (Sandbox Code Playgroud)

我希望cat所有.csv包含test文件名.csv中的字符串的所有内容,以及包含result文件名中的字符串的所有内容.

bash shell command-line cat

2
推荐指数
1
解决办法
3510
查看次数

在序列化的Spark中使用maxmind geoip

我正在尝试将MaxMind GeoIP API用于scala-spark,该https://github.com/snowplow/scala-maxmind-iplookups找到了。我使用标准加载文件:

val ipLookups = IpLookups(geoFile = Some("GeoLiteCity.dat"), memCache = false, lruCache = 20000)
Run Code Online (Sandbox Code Playgroud)

我有一个基本的csv文件,该文件中包含时间和IP地址:

val sweek1 = week1.map{line=> IP(parse(line))}.collect{
  case Some(ip) => {
    val ipadress = ipdetect(ip.ip)
    (ip.time, ipadress)
    }
}
Run Code Online (Sandbox Code Playgroud)

ipdetect函数的基本定义是:

def ipdetect(a:String)={
  ipLookups.performLookups(a)._1 match{
    case Some(value) => value.toString
    case _ => "Unknown"
  }
}
Run Code Online (Sandbox Code Playgroud)

当我运行该程序时,它提示“任务不可序列化”。所以我读了几篇文章,似乎有一些解决方法。

1,包装 2,使用SparkContext.addFile(在整个群集中分发文件)

但是我无法弄清楚它们中的任何一个是如何工作的,我尝试了包装器,但是我不知道如何以及在何处调用它。我尝试了addFile,但是它返回的是Unit而不是String,我认为您需要以某种方式通过管道传输Binary文件。因此,我不确定现在该怎么办。任何帮助深表感谢

因此,我已经能够通过使用mapPartitions对其进行某种程度的序列化,并在每个本地分区上进行迭代,但是我想知道是否存在一种更有效的方法,因为我拥有数百万个数据集

scala geoip apache-spark

1
推荐指数
1
解决办法
3359
查看次数

标签 统计

scala ×5

apache-spark ×4

string ×2

bash ×1

cat ×1

command-line ×1

geoip ×1

julia ×1

match ×1

plot ×1

sbt ×1

shell ×1