小编Sha*_*ala的帖子

Spark Scala:无法从字符串转换为int,因为它可能会截断

我玩火花时遇到了这个异常.

线程"main"中的异常org.apache.spark.sql.AnalysisException:无法price从string转换为int,因为它可能会截断目标对象的类型路径为: - field(class:"scala.Int",name:" price") - root class:"org.spark.code.executable.Main.Record"您可以向输入数据添加显式强制转换,也可以在目标对象中选择更高精度的字段类型;

如何解决这个异常?这是代码

object Main {

 case class Record(transactionDate: Timestamp, product: String, price: Int, paymentType: String, name: String, city: String, state: String, country: String,
                accountCreated: Timestamp, lastLogin: Timestamp, latitude: String, longitude: String)
 def main(args: Array[String]) {

   System.setProperty("hadoop.home.dir", "C:\\winutils\\");

   val schema = Encoders.product[Record].schema

   val df = SparkConfig.sparkSession.read
  .option("header", "true")
  .csv("SalesJan2009.csv");

   import SparkConfig.sparkSession.implicits._
   val ds = df.as[Record]

  //ds.groupByKey(body => body.state).count().show()

  import org.apache.spark.sql.expressions.scalalang.typed.{
  count => typedCount,
  sum => typedSum
}

  ds.groupByKey(body => body.state)
  .agg(typedSum[Record](_.price).name("sum(price)"))
  .withColumnRenamed("value", …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

12
推荐指数
1
解决办法
7479
查看次数

如何从Scala Spark中的Excel(xls,xlsx)文件构造Dataframe?

我有一个Excel(xlsx and xls)包含多个工作表的大文件,我需要将其转换为RDD或者Dataframe以后可以将其连接到其他工作表dataframe.我正在考虑使用Apache POI并将其保存为a CSV然后读csvdataframe.但是,如果有任何库或API可以帮助这个过程很容易.任何帮助都非常感谢.

excel scala apache-spark apache-spark-sql

10
推荐指数
2
解决办法
3万
查看次数

如何在使用模式 Spark 读取 csv 时删除格式错误的行?

当我使用 Spark DataSet 加载 csv 文件时。我更喜欢清楚地指定模式。但我发现有几行不符合我的架构。一列应该是双精度的,但有些行是非数字值。是否可以轻松地从 DataSet 中过滤出所有不符合我的架构的行?

val schema = StructType(StructField("col", DataTypes.DoubleType) :: Nil)
val ds = spark.read.format("csv").option("delimiter", "\t").schema(schema).load("f.csv")
Run Code Online (Sandbox Code Playgroud)

f.csv:

a
1.0
Run Code Online (Sandbox Code Playgroud)

我更喜欢可以轻松地从我的数据集中过滤“a”。谢谢!

apache-spark apache-spark-dataset

8
推荐指数
2
解决办法
1万
查看次数

合并两个在Apache spark中具有不同列名的数据集

我们需要合并两个具有不同列名的数据集,数据集中没有共同的列.

我们尝试了几种方法,这两种方法都没有产生结果.请告诉我们如何使用Apache spark Java组合两个数据集

输入数据集1

"405-048011-62815", "CRC Industries",

"630-0746","Dixon value",

"4444-444","3M INdustries",

"555-55","Dixon coupling valve"
Run Code Online (Sandbox Code Playgroud)

输入数据集2

"222-2222-5555", "Tata",

"7777-88886","WestSide",

"22222-22224","Reliance",

"33333-3333","V industries"
Run Code Online (Sandbox Code Playgroud)

期待的是

    ----------label1----|------sentence1------|------label2---|------sentence2-----------
    | 405-048011-62815  | CRC Industries      | 222-2222-5555 |                      Tata|
    |        630-0746   |   Dixon value       |   7777-88886  |                  WestSide|
    -------------------------------------------------------------------------------------
Run Code Online (Sandbox Code Playgroud)

`

    List<Row> data = Arrays.asList(
                    RowFactory.create("405-048011-62815", "CRC Industries"),
                    RowFactory.create("630-0746","Dixon value"),
                    RowFactory.create("4444-444","3M INdustries"),
                    RowFactory.create("555-55","Dixon coupling valve"));

    StructType schema = new StructType(new StructField[] {new StructField("label1", DataTypes.StringType, false,Metadata.empty()),
            new StructField("sentence1", DataTypes.StringType, false,Metadata.empty()) });

    Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

    List<String> listStrings …
Run Code Online (Sandbox Code Playgroud)

java apache-spark apache-spark-sql spark-dataframe

6
推荐指数
1
解决办法
3762
查看次数

Spark Streamming:从具有多个模式的 kafka 读取数据

我正在为火花流的实现而苦苦挣扎。

来自 kafka 的消息看起来像这样,但有更多的字段

{"event":"sensordata", "source":"sensors", "payload": {"actual data as a json}}
{"event":"databasedata", "mysql":"sensors", "payload": {"actual data as a json}}
{"event":"eventApi", "source":"event1", "payload": {"actual data as a json}}
{"event":"eventapi", "source":"event2", "payload": {"actual data as a json}}
Run Code Online (Sandbox Code Playgroud)

我正在尝试从 Kafka 主题(具有多个模式)读取消息。我需要阅读每条消息并查找事件和源字段并决定将其存储为数据集的位置。实际数据在字段有效负载中作为 JSON,它只是一个记录。

有人可以帮助我实施这个或任何其他替代方案吗?

在同一主题中发送具有多个模式的消息并使用它是一种好方法吗?

提前致谢,

apache-kafka apache-spark spark-streaming apache-spark-dataset

5
推荐指数
1
解决办法
776
查看次数

用新值更新数据框列

df1 有字段idjson;df2 有字段idjson

df1.count()=> 1200; df2.count()=> 20

df1 包含所有行。df2 有一个只有 20 行的增量更新。

我的目标是用 .df1 中的值更新 df1 df2。的所有 iddf2都在 df1 中。但是 df2 已经更新json了这些相同 ID 的值(在字段中)。

结果 df 应该具有来自 的所有值df1和来自 的更新值df2

做这个的最好方式是什么?- 使用最少的连接和过滤器。

谢谢!

apache-spark pyspark

5
推荐指数
1
解决办法
5275
查看次数

如何导入 sparksession

如何创建 sparksession?

scala> import org.apache.spark.SparkConf
import org.apache.spark.SparkConf

scala>    import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext

scala> val conf = SparkSession.builder.master("local").appName("testing").enableHiveSupport().getOrCreate()  

<console>:27: error: not found: value SparkSession
         val conf = SparkSession.builder.master("local").appName("testing").enableHiveSupport().getOrCreate()
Run Code Online (Sandbox Code Playgroud)

apache-spark

5
推荐指数
1
解决办法
4590
查看次数

在 Scala 中从 mongoDB 读取

我想创建一个独立的 Scala 代码,该代码使用自定义设置从 MongoDB 中读取,使用MongoDB 网站中的此代码

当我运行 SBT 包时,我遇到了一些错误。我猜这与 SparkSession 的错误创建方法有关。你能给我一个提示来修复它吗?

我的Buid.sbt内容

scalaVersion := "2.11.12"

libraryDependencies ++= Seq(
  "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.1",
  "org.apache.spark" %% "spark-core" % "2.4.1",
  "org.apache.spark" %% "spark-sql" % "2.4.1"
)
Run Code Online (Sandbox Code Playgroud)

Firstapp.scala 代码

package com.mongodb
import org.apache.spark.sql.SparkSession
import com.mongodb.spark.config.{ReadConfig,WriteConfig}
import com.mongodb.spark.MongoSpark
import org.bson.Document

object FirstApp {
  def main(args: Array[String]) {

    val sc = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnectorIntro")
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
    .getOrCreate()

    val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
    val customRdd …
Run Code Online (Sandbox Code Playgroud)

scala mongodb sbt apache-spark

5
推荐指数
1
解决办法
426
查看次数

在 Windows 中为 pyspark 设置的环境变量

我的笔记本电脑上安装了 Spark。我能够执行“spark-shell”命令并打开 scala shell,如下所示。 C:\Spark1_6\spark-1.6.0-bin-hadoop2.6\bin>spark-shell scala> 但是当我尝试执行pyspark命令时。 C:\Spark1_6\spark-1.6.0-bin-hadoop2.6\bin>pyspark

我收到以下错误消息。

'python' 未被识别为内部或外部命令

我确实手动设置了环境用户“路径”变量。通过附加

";C:\Python27"

我重新启动了笔记本电脑,但仍然出现相同的错误。任何人都可以帮我解决这个问题吗?我没有正确更新环境变量吗?

版本:Spark:1.6.2 Windows:8.1

environment-variables apache-spark pyspark

4
推荐指数
1
解决办法
9344
查看次数

将 Array[String] 的 Spark 列拆分为 String 列

如果我有一个包含一列 Array[String] 的数据框:

scala> y.show
+---+----------+
|uid|event_comb|
+---+----------+
|  c|  [xx, zz]|
|  b|  [xx, xx]|
|  b|  [xx, yy]|
|  b|  [xx, zz]|
|  b|  [xx, yy]|
|  b|  [xx, zz]|
|  b|  [yy, zz]|
|  a|  [xx, yy]|
+---+----------+
Run Code Online (Sandbox Code Playgroud)

如何将列拆分"event_comb"为两列(例如"event1""event2")?

arrays string split apache-spark

4
推荐指数
1
解决办法
3821
查看次数