小编use*_*714的帖子

sbt-assembly:重复数据删除发现错误

我不确定mergestrategy或exclude jars是否是最好的选择.任何有关如何进一步处理此错误的帮助都会很棒!

[sameert@pzxdcc0151 approxstrmatch]$ sbt assembly
[info] Loading project definition from /apps/sameert/software/approxstrmatch/project
[info] Set current project to approxstrmatch (in build file:/apps/sameert/software/approxstrmatch/)
[info] Including from cache: scala-library.jar
[info] Checking every *.class/*.jar file's SHA-1.
[info] Merging files...
[info] Including from cache: curator-client-2.4.0.jar
[info] Including from cache: secondstring-20140729.jar
[info] Including from cache: slf4j-api-1.7.5.jar
[info] Including from cache: jsr305-1.3.9.jar
[info] Including from cache: jul-to-slf4j-1.7.5.jar
[info] Including from cache: jcl-over-slf4j-1.7.5.jar
[info] Including from cache: commons-digester-1.8.jar
[info] Including from cache: compress-lzf-1.0.0.jar
[info] Including from cache: commons-beanutils-1.7.0.jar …
Run Code Online (Sandbox Code Playgroud)

scala sbt sbt-assembly

113
推荐指数
4
解决办法
5万
查看次数

如何将包含外部jar文件包含到jar中?

我的项目依赖于外部jar.我创建了一个目录lib并将jar文件复制到其中.这是我的build.sbt:

name := "approxstrmatch"

version := "1.0"

scalaVersion := "2.10.4"

unmanagedJars in Compile += file("lib/secondstring-20140729.jar")

libraryDependencies+="org.apache.spark"%%"spark-core"%"1.0.0"

resolvers += "AkkaRepository" at "http://repo.akka.io/releases/"
Run Code Online (Sandbox Code Playgroud)

当我运行cleanpackage,外部jar文件不会被包含在生成jar文件.为什么?

这是项目布局:

project/build.sbt
lib/
src/.....
Run Code Online (Sandbox Code Playgroud)

sbt

35
推荐指数
3
解决办法
5万
查看次数

多个RDD的Spark联合

在我的猪代码中,我这样做:

all_combined = Union relation1, relation2, 
    relation3, relation4, relation5, relation 6.
Run Code Online (Sandbox Code Playgroud)

我想用火花做同样的事情.然而,不幸的是,我发现我必须继续这样做:

first = rdd1.union(rdd2)
second = first.union(rdd3)
third = second.union(rdd4)
# .... and so on
Run Code Online (Sandbox Code Playgroud)

是否有一个联合运算符可以让我一次操作多个rdds:

例如 union(rdd1, rdd2,rdd3, rdd4, rdd5, rdd6)

这是一个方便的问题.

python apache-spark rdd pyspark

35
推荐指数
1
解决办法
6万
查看次数

如何在Pyspark中加入多个列?

我正在使用Spark 1.3,并希望使用python接口(SparkSQL)加入多个列

以下作品:

我首先将它们注册为临时表.

numeric.registerTempTable("numeric")
Ref.registerTempTable("Ref")

test  = numeric.join(Ref, numeric.ID == Ref.ID, joinType='inner')
Run Code Online (Sandbox Code Playgroud)

我现在想基于多个列加入它们.

我得到SyntaxError:语法无效:

test  = numeric.join(Ref,
   numeric.ID == Ref.ID AND numeric.TYPE == Ref.TYPE AND
   numeric.STATUS == Ref.STATUS ,  joinType='inner')
Run Code Online (Sandbox Code Playgroud)

python join apache-spark apache-spark-sql pyspark

31
推荐指数
3
解决办法
5万
查看次数

火花Word2vec矢量数学

我一直在寻找的例子星火网站Word2Vec的:

val input = sc.textFile("text8").map(line => line.split(" ").toSeq)

val word2vec = new Word2Vec()

val model = word2vec.fit(input)

val synonyms = model.findSynonyms("country name here", 40)
Run Code Online (Sandbox Code Playgroud)

我如何做有趣的矢量,如国王 - 男人+女人=女王.我可以使用model.getVectors,但不知道如何继续进行.

machine-learning apache-spark word2vec apache-spark-mllib

17
推荐指数
2
解决办法
5987
查看次数

PySpark和广播连接示例

我正在使用Spark 1.3

# Read from text file, parse it and then do some basic filtering to get   data1
data1.registerTempTable('data1')

# Read from text file, parse it and then do some basic filtering to get data1
data2.registerTempTable('data2')

# Perform join
data_joined = data1.join(data2, data1.id == data2.id);
Run Code Online (Sandbox Code Playgroud)

我的数据非常偏斜,data2(几KB)<< data1(GB的10s),性能非常糟糕.我正在阅读有关广播加入的内容,但不确定如何使用Python API执行相同操作.

python apache-spark apache-spark-sql pyspark

16
推荐指数
1
解决办法
2万
查看次数

具有多个条件的Sparksql过滤(使用where子句选择)

嗨,我有以下问题:

numeric.registerTempTable("numeric"). 
Run Code Online (Sandbox Code Playgroud)

我要过滤的所有值都是文字空字符串,而不是N/A或Null值.

我试过这三个选项:

  1. numeric_filtered = numeric.filter(numeric['LOW'] != 'null').filter(numeric['HIGH'] != 'null').filter(numeric['NORMAL'] != 'null')

  2. numeric_filtered = numeric.filter(numeric['LOW'] != 'null' AND numeric['HIGH'] != 'null' AND numeric['NORMAL'] != 'null')

  3. sqlContext.sql("SELECT * from numeric WHERE LOW != 'null' AND HIGH != 'null' AND NORMAL != 'null'")

不幸的是,numeric_filtered总是空的.我检查并且数字具有应根据这些条件过滤的数据.

以下是一些示例值:

低高正常

3.5 5.0 null

2.0 14.0 null

null 38.0 null

null null null

1.0 null 4.0

python sql apache-spark apache-spark-sql pyspark

11
推荐指数
1
解决办法
4万
查看次数

Spark:广播变量:您似乎尝试从广播变量,操作或转换引用SparkContext

Class ProdsTransformer:

    def __init__(self):  
      self.products_lookup_hmap = {}
      self.broadcast_products_lookup_map = None

    def create_broadcast_variables(self):
      self.broadcast_products_lookup_map = sc.broadcast(self.products_lookup_hmap)

    def create_lookup_maps(self):
    // The code here builds the hashmap that maps Prod_ID to another space.

pt = ProdsTransformer ()
pt.create_broadcast_variables()  

pairs = distinct_users_projected.map(lambda x: (x.user_id,    
                         pt.broadcast_products_lookup_map.value[x.Prod_ID]))
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

"例外:您似乎尝试从广播变量,操作或转换中引用SparkContext.SparkContext只能用于驱动程序,而不能用于在工作程序上运行的代码.有关更多信息,请参阅SPARK-5063."

任何有关如何处理广播变量的帮助都会很棒!

python apache-spark pyspark

7
推荐指数
1
解决办法
1万
查看次数

pyspark和HDFS命令

我想在我的Spark程序(Pyspark)开始时做一些清理工作.例如,我想从之前的HDFS运行中删除数据.在猪中,这可以使用诸如的命令来完成

fs -copyFromLocal ....

rmf /path/to-/hdfs
Run Code Online (Sandbox Code Playgroud)

或本地使用sh命令.

我想知道如何与Pyspark做同样的事情.

python hdfs apache-spark pyspark

6
推荐指数
3
解决办法
1万
查看次数

Spark UDAF:java.lang.InternalError:格式错误的类名

我正在使用CDH 5.5.2发行版中的Spark 1.5.0。我从2.10.4切换到Scala 2.10.5。我将以下代码用于UDAF。这是String vs UTF8String的问题吗?如果是,任何帮助将不胜感激。

object GroupConcat extends UserDefinedAggregateFunction {
    def inputSchema = new StructType().add("x", StringType)
    def bufferSchema = new StructType().add("buff", ArrayType(StringType))
    def dataType = StringType
    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = {
      buffer.update(0, ArrayBuffer.empty[String])
    }

    def update(buffer: MutableAggregationBuffer, input: Row) = {
      if (!input.isNullAt(0)) 
        buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
    }

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
    }

    def evaluate(buffer: Row) = UTF8String.fromString(
      buffer.getSeq[String](0).mkString(","))
}
Run Code Online (Sandbox Code Playgroud)

但是,我在运行时收到此错误消息:

Exception in thread "main" java.lang.InternalError: Malformed class …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-dataframe

5
推荐指数
1
解决办法
4274
查看次数