小编SCo*_*uto的帖子

Scala Spark中的NullPointerException似乎是由集合类型引起的?

sessionIdList的类型是:

scala> sessionIdList res19:org.apache.spark.rdd.RDD [String] = MappedRDD [17] at distinct at:30

当我尝试运行以下代码时:

scala> sessionIdList
res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at <console>:30
Run Code Online (Sandbox Code Playgroud)

我收到例外:

val x = sc.parallelize(List(1,2,3)) 
val cartesianComp = x.cartesian(x).map(x => (x))

val kDistanceNeighbourhood = sessionIdList.map(s => {
    cartesianComp.filter(v => v != null)
})

kDistanceNeighbourhood.take(1)
Run Code Online (Sandbox Code Playgroud)

但是,如果我使用:

14/05/21 16:20:46 ERROR Executor: Exception in task ID 80
java.lang.NullPointerException
        at org.apache.spark.rdd.RDD.filter(RDD.scala:261)
        at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:38)
        at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:36)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
Run Code Online (Sandbox Code Playgroud)

然后没有显示异常

两个代码片段之间的区别在于,在第一个片段中,sessionIdList的类型为:

val l = sc.parallelize(List("1","2")) 
val kDistanceNeighbourhood = l.map(s => …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

13
推荐指数
2
解决办法
2万
查看次数

如果 Spark 中的数据帧是不可变的,为什么我们能够使用诸如 withColumn() 之类的操作来修改它?

这可能是一个愚蠢的问题,源于我的无知。我已经在 PySpark 上工作了几个星期,并没有太多的编程经验可以开始。

我的理解是,在 Spark 中,RDD、数据帧和数据集都是不可变的——我再次理解,这意味着您无法更改数据。如果是这样,为什么我们能够使用 编辑 Dataframe 的现有列withColumn()

apache-spark pyspark

10
推荐指数
1
解决办法
6044
查看次数

带有TimeZone的SimpleDateFormat

我正在尝试从java.util.Date格式化日期.我需要这种格式:

2016-06-10T13:38:13.687+02:00.

如何从标准日期格式正确转换它

May 04 09:51:52 CDT 2009

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss z", Locale.getDefault());
sdf.format(new Date());
Run Code Online (Sandbox Code Playgroud)

遗憾的是,此代码没有返回值+02:00.

java android

9
推荐指数
3
解决办法
3万
查看次数

具有AWS Code管道的自定义版本标签

我将AWS CodePipeline与CodeBuild结合使用来构建应用程序并将其部署到ElasticBeanstalk。

CodePipeline生成如下版本名称: code-pipeline-1122334455667-MyApp-1ac31f7c-1343-471x-a7e8-46b24f1785a

是否可以自定义这些标签?

amazon-web-services amazon-elastic-beanstalk aws-codepipeline aws-codebuild

6
推荐指数
1
解决办法
222
查看次数

Apache Spark startsWith 在 SQL 表达式中

在 Apache Spark API 中,我可以使用startsWith函数来测试列的值:

myDataFrame.filter(col("columnName").startsWith("PREFIX"))
Run Code Online (Sandbox Code Playgroud)

是否可以在 Spark SQL 表达式中执行相同的操作,如果可以,请举例说明?

scala apache-spark apache-spark-sql

6
推荐指数
2
解决办法
6644
查看次数

在 Scala 中编写阶乘尾递归函数

我正在尝试以下面的方式编写尾递归函数,但编译器抛出错误:

方法的参数太多了:(v1: Int)Int in trait Function1 else factorial(x-1, x*acc)

我曾尝试用 Function2 替换 Function1 并给 Function2[Int, Int, Int] = new Function2[Int, Int, Int]

但它仍然给我带来了同样的错误。有人能指出我哪里出错了吗?

import scala.annotation.tailrec
var factorial: Function1[Int, Int] = new Function1[Int, Int] {
    @tailrec override def apply (x:Int, acc:Int=1): Int = {
        if (x<=1) acc
        else factorial(x-1, x*acc)
    }
}

factorial(5)
Run Code Online (Sandbox Code Playgroud)

recursion functional-programming scala tail-recursion function

5
推荐指数
1
解决办法
3831
查看次数

如何在 Scala 中解析 JSON 数据?

我是 Scala 的新手。我想在 Scala 中解析 JSON 数据。

我想循环这些数据,并在每次迭代中为id,v,qt从值中提取数据

我正在使用以下代码将其解析为 JSON

import scala.util.parsing.json._

val data =
  """
{
  "timestamp":
  1518501114949
  , "values":
  [
  {
    "id":
    "abc"
    , "v":
    0
    , "q":
    true
    , "t":
    1518501114487
  }
  ,
  {
    "id":
    "xyz"
    , "v":
    15
    , "q":
    true
    , "t":
    1518501114494
  }
  ]
}
"""

val parsed = JSON.parseFull(data)
Run Code Online (Sandbox Code Playgroud)

我得到如下输出

 Some(Map(timestamp -> 1.518501114949E12, values -> List(Map(id -> abc, v -> 0.0, q -> true, t -> 1.518501114487E12), Map(id -> …
Run Code Online (Sandbox Code Playgroud)

scala spark-streaming

5
推荐指数
1
解决办法
6020
查看次数

如何在Map 8中将Map <String,List <String >>转换为Map <String,String>

map喜欢

key= ["a1", "a2", "a3"] 
value = [["a1.value1", "a1.value2"],["a2.value1", "a2.value2"]]
Run Code Online (Sandbox Code Playgroud)

生成的Map应该是这样的

key = ["a1", "a2", "a3"]
value = ["a1.value1, a1.value2", "a2.value1, a2.value2"]
Run Code Online (Sandbox Code Playgroud)

我们如何Collectors.joining用作中间步骤?

java java-8 java-stream

5
推荐指数
1
解决办法
351
查看次数

ClassNotFoundException:找不到数据源:bigquery

我正在尝试将数据从 Google BigQuery 加载到在 Google Dataproc 上运行的 Spark(我正在使用 Java)。我尝试按照此处的说明进行操作:https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example

我收到错误:“ ClassNotFoundException: Failed to find data source: bigquery。”

我的 pom.xml 如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.virtualpairprogrammers</groupId>
    <artifactId>learningSpark</artifactId>
    <version>0.0.3-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>com.google.cloud.spark</groupId>
            <artifactId>spark-bigquery_2.11</artifactId>
            <version>0.9.1-beta</version>
            <classifier>shaded</classifier>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.0.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target> …
Run Code Online (Sandbox Code Playgroud)

java maven google-bigquery apache-spark google-cloud-dataproc

5
推荐指数
1
解决办法
1万
查看次数

如何在 Spark 3.0+ 中获取一年中的第几周?

我正在尝试创建一个包含日、月等列的日历文件。以下代码工作正常,但我找不到一种干净的方法来提取一年中的星期(1-52)。在 中spark 3.0+,以下代码行不起作用:.withColumn("week_of_year", date_format(col("day_id"), "W"))

我知道我可以创建一个视图/表,然后对其运行 SQL 查询来提取week_of_year,但有没有更好的方法来做到这一点?`

df.withColumn("day_id", to_date(col("day_id"), date_fmt))
.withColumn("week_day", date_format(col("day_id"), "EEEE"))
.withColumn("month_of_year", date_format(col("day_id"), "M"))
.withColumn("year", date_format(col("day_id"), "y"))
.withColumn("day_of_month", date_format(col("day_id"), "d"))
.withColumn("quarter_of_year", date_format(col("day_id"), "Q"))
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

5
推荐指数
1
解决办法
8713
查看次数