我玩火花时遇到了这个异常.
线程"main"中的异常org.apache.spark.sql.AnalysisException:无法
price从string转换为int,因为它可能会截断目标对象的类型路径为: - field(class:"scala.Int",name:" price") - root class:"org.spark.code.executable.Main.Record"您可以向输入数据添加显式强制转换,也可以在目标对象中选择更高精度的字段类型;
如何解决这个异常?这是代码
object Main {
case class Record(transactionDate: Timestamp, product: String, price: Int, paymentType: String, name: String, city: String, state: String, country: String,
accountCreated: Timestamp, lastLogin: Timestamp, latitude: String, longitude: String)
def main(args: Array[String]) {
System.setProperty("hadoop.home.dir", "C:\\winutils\\");
val schema = Encoders.product[Record].schema
val df = SparkConfig.sparkSession.read
.option("header", "true")
.csv("SalesJan2009.csv");
import SparkConfig.sparkSession.implicits._
val ds = df.as[Record]
//ds.groupByKey(body => body.state).count().show()
import org.apache.spark.sql.expressions.scalalang.typed.{
count => typedCount,
sum => typedSum
}
ds.groupByKey(body => body.state)
.agg(typedSum[Record](_.price).name("sum(price)"))
.withColumnRenamed("value", …Run Code Online (Sandbox Code Playgroud) 我有一个Excel(xlsx and xls)包含多个工作表的大文件,我需要将其转换为RDD或者Dataframe以后可以将其连接到其他工作表dataframe.我正在考虑使用Apache POI并将其保存为a CSV然后读csv入dataframe.但是,如果有任何库或API可以帮助这个过程很容易.任何帮助都非常感谢.
当我使用 Spark DataSet 加载 csv 文件时。我更喜欢清楚地指定模式。但我发现有几行不符合我的架构。一列应该是双精度的,但有些行是非数字值。是否可以轻松地从 DataSet 中过滤出所有不符合我的架构的行?
val schema = StructType(StructField("col", DataTypes.DoubleType) :: Nil)
val ds = spark.read.format("csv").option("delimiter", "\t").schema(schema).load("f.csv")
Run Code Online (Sandbox Code Playgroud)
f.csv:
a
1.0
Run Code Online (Sandbox Code Playgroud)
我更喜欢可以轻松地从我的数据集中过滤“a”。谢谢!
我们需要合并两个具有不同列名的数据集,数据集中没有共同的列.
我们尝试了几种方法,这两种方法都没有产生结果.请告诉我们如何使用Apache spark Java组合两个数据集
输入数据集1
"405-048011-62815", "CRC Industries",
"630-0746","Dixon value",
"4444-444","3M INdustries",
"555-55","Dixon coupling valve"
Run Code Online (Sandbox Code Playgroud)
输入数据集2
"222-2222-5555", "Tata",
"7777-88886","WestSide",
"22222-22224","Reliance",
"33333-3333","V industries"
Run Code Online (Sandbox Code Playgroud)
期待的是
----------label1----|------sentence1------|------label2---|------sentence2-----------
| 405-048011-62815 | CRC Industries | 222-2222-5555 | Tata|
| 630-0746 | Dixon value | 7777-88886 | WestSide|
-------------------------------------------------------------------------------------
Run Code Online (Sandbox Code Playgroud)
`
List<Row> data = Arrays.asList(
RowFactory.create("405-048011-62815", "CRC Industries"),
RowFactory.create("630-0746","Dixon value"),
RowFactory.create("4444-444","3M INdustries"),
RowFactory.create("555-55","Dixon coupling valve"));
StructType schema = new StructType(new StructField[] {new StructField("label1", DataTypes.StringType, false,Metadata.empty()),
new StructField("sentence1", DataTypes.StringType, false,Metadata.empty()) });
Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
List<String> listStrings …Run Code Online (Sandbox Code Playgroud) 我正在为火花流的实现而苦苦挣扎。
来自 kafka 的消息看起来像这样,但有更多的字段
{"event":"sensordata", "source":"sensors", "payload": {"actual data as a json}}
{"event":"databasedata", "mysql":"sensors", "payload": {"actual data as a json}}
{"event":"eventApi", "source":"event1", "payload": {"actual data as a json}}
{"event":"eventapi", "source":"event2", "payload": {"actual data as a json}}
Run Code Online (Sandbox Code Playgroud)
我正在尝试从 Kafka 主题(具有多个模式)读取消息。我需要阅读每条消息并查找事件和源字段并决定将其存储为数据集的位置。实际数据在字段有效负载中作为 JSON,它只是一个记录。
有人可以帮助我实施这个或任何其他替代方案吗?
在同一主题中发送具有多个模式的消息并使用它是一种好方法吗?
提前致谢,
apache-kafka apache-spark spark-streaming apache-spark-dataset
df1 有字段id和json;df2 有字段id和json
df1.count()=> 1200; df2.count()=> 20
df1 包含所有行。df2 有一个只有 20 行的增量更新。
我的目标是用 .df1 中的值更新 df1 df2。的所有 iddf2都在 df1 中。但是 df2 已经更新json了这些相同 ID 的值(在字段中)。
结果 df 应该具有来自 的所有值df1和来自 的更新值df2。
做这个的最好方式是什么?- 使用最少的连接和过滤器。
谢谢!
如何创建 sparksession?
scala> import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext
scala> val conf = SparkSession.builder.master("local").appName("testing").enableHiveSupport().getOrCreate()
<console>:27: error: not found: value SparkSession
val conf = SparkSession.builder.master("local").appName("testing").enableHiveSupport().getOrCreate()
Run Code Online (Sandbox Code Playgroud) 我想创建一个独立的 Scala 代码,该代码使用自定义设置从 MongoDB 中读取,使用MongoDB 网站中的此代码。
当我运行 SBT 包时,我遇到了一些错误。我猜这与 SparkSession 的错误创建方法有关。你能给我一个提示来修复它吗?
我的Buid.sbt内容
scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
"org.mongodb.spark" %% "mongo-spark-connector" % "2.4.1",
"org.apache.spark" %% "spark-core" % "2.4.1",
"org.apache.spark" %% "spark-sql" % "2.4.1"
)
Run Code Online (Sandbox Code Playgroud)
Firstapp.scala 代码
package com.mongodb
import org.apache.spark.sql.SparkSession
import com.mongodb.spark.config.{ReadConfig,WriteConfig}
import com.mongodb.spark.MongoSpark
import org.bson.Document
object FirstApp {
def main(args: Array[String]) {
val sc = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate()
val readConfig = ReadConfig(Map("collection" -> "spark", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
val customRdd …Run Code Online (Sandbox Code Playgroud) 我的笔记本电脑上安装了 Spark。我能够执行“spark-shell”命令并打开 scala shell,如下所示。
C:\Spark1_6\spark-1.6.0-bin-hadoop2.6\bin>spark-shell
scala>
但是当我尝试执行pyspark命令时。
C:\Spark1_6\spark-1.6.0-bin-hadoop2.6\bin>pyspark
我收到以下错误消息。
'python' 未被识别为内部或外部命令
我确实手动设置了环境用户“路径”变量。通过附加
";C:\Python27"
我重新启动了笔记本电脑,但仍然出现相同的错误。任何人都可以帮我解决这个问题吗?我没有正确更新环境变量吗?
版本:Spark:1.6.2 Windows:8.1
如果我有一个包含一列 Array[String] 的数据框:
scala> y.show
+---+----------+
|uid|event_comb|
+---+----------+
| c| [xx, zz]|
| b| [xx, xx]|
| b| [xx, yy]|
| b| [xx, zz]|
| b| [xx, yy]|
| b| [xx, zz]|
| b| [yy, zz]|
| a| [xx, yy]|
+---+----------+
Run Code Online (Sandbox Code Playgroud)
如何将列拆分"event_comb"为两列(例如"event1"和"event2")?