标签: apache-spark

选择DataFrame中数组的最后一个元素

我正在一个项目上,正在处理具有复杂模式/数据结构的一些嵌套JSON日期。基本上,我想做的是在数据框中过滤掉其中的一列,以便选择数组中的最后一个元素。我完全坚持如何做到这一点。我希望这是有道理的。

以下是我要完成的示例:

val singersDF = Seq(
  ("beatles", "help,hey,jude"),
  ("romeo", "eres,mia"),
  ("elvis", "this,is,an,example")
).toDF("name", "hit_songs")

val actualDF = singersDF.withColumn(
  "hit_songs",
  split(col("hit_songs"), "\\,")
)

actualDF.show(false)
actualDF.printSchema() 

+-------+-----------------------+
|name   |hit_songs              |
+-------+-----------------------+
|beatles|[help, hey, jude]      |
|romeo  |[eres, mia]            |
|elvis  |[this, is, an, example]|
+-------+-----------------------+
root
 |-- name: string (nullable = true)
 |-- hit_songs: array (nullable = true)
 |    |-- element: string (containsNull = true)
Run Code Online (Sandbox Code Playgroud)

输出的最终目标将是以下内容,以选择hit_songs数组中的最后一个“字符串”。

我不担心之后的架构是什么样的。

+-------+---------+
|name   |hit_songs|
+-------+---------+
|beatles|jude     |
|romeo  |mia      |
|elvis  |example  |
+-------+---------+
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

1
推荐指数
3
解决办法
2184
查看次数

如何添加具有最大值的新列?

我有一个包含2列tag和的数据框value

我想补充一点,包含新列maxvalue列。(对于每行它将是相同的值)。

我尝试执行以下操作,但是没有成功。

val df2 = df.withColumn("max",max($"value"))
Run Code Online (Sandbox Code Playgroud)

如何将max列添加到数据集?

scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
1395
查看次数

基于列单调增加ID

我正在尝试将新列添加到我的spark DF。我了解可以使用以下代码:

df.withColumn("row",monotonically_increasing_id)
Run Code Online (Sandbox Code Playgroud)

但是我的用例是:

输入DF:

col value
  1
  2
  2
  3
  3
  3
Run Code Online (Sandbox Code Playgroud)

输出DF:

col_value      identifier
  1               1
  2               1
  2               2
  3               1
  3               2
  3               3
Run Code Online (Sandbox Code Playgroud)

关于使用monotonically_increasing或rowWithUniqueIndex进行获取的任何建议。

scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
415
查看次数

Spark - 如何计算Spark中的百分位数?

我试图获得单列数据帧的0.8百分位数.我试过这样的方式:

val limit80 = 0.8
val dfSize = df.count()
val perfentileIndex = dfSize*limit80 

dfSorted = df.sort()
val percentile80 = dfSorted .take(perfentileIndex).last()
Run Code Online (Sandbox Code Playgroud)

但我认为这对大型数据帧来说是失败的,因为它们可能分布在不同的节点上.

有没有更好的方法来计算百分位数?或者我怎么能在同一台机器中拥有数据帧的所有行(即使这是非常反模式的),所以这df.take(index)将真正考虑整个数据集而不仅仅是节点中的分区.

scala apache-spark

1
推荐指数
2
解决办法
5128
查看次数

Spark Shell依赖异常

我的主机系统是Windows 10,我有cloudera vm,我的spark版本是1.6。我试图使用以下命令加载spark-shell。

spark-shell --packages org.apache.spark:spark-streaming-twitter_2.10:1.6.0
Run Code Online (Sandbox Code Playgroud)

但是它抛出以下异常:

:::: ERRORS Server access error at url https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-twitter_2.10/1.6.0/spark-streaming-twitter_2.10-1.6.0.pom (javax.net.ssl.SSLException: Received fatal alert: protocol_version)
    Server access error at url https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-twitter_2.10/1.6.0/spark-streaming-twitter_2.10-1.6.0.jar (javax.net.ssl.SSLException: Received fatal alert: protocol_version)
Run Code Online (Sandbox Code Playgroud)

::使用详细或调试消息级别获取更多详细信息线程中的异常

"main" java.lang.RuntimeException: [unresolved dependency: org.apache.spark#spark-streaming-twitter_2.10;1.6.0: not found] at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1067) at org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:287) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:154) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

1
推荐指数
1
解决办法
840
查看次数

Pyspark驱动程序中Python子进程的内存分配

在PySpark驱动程序中创建新的Python进程时(例如,使用JobLib或其他多处理库),这些进程是否共享Spark驱动程序内存,或者它们在该PySpark驱动程序JVM之外分配了内存?

python apache-spark pyspark

1
推荐指数
1
解决办法
177
查看次数

不兼容的Jackson版本:Spark结构化流

我正在尝试运行一个非常简单的Spark Streaming字数统计程序,该程序从一个Kafka主题读取。下面是我的代码:

val spark = SparkSession
  .builder()
  .appName("KafkaWordCount")
  .config("spark.master", "local")
  .getOrCreate()

import spark.implicits._

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .load()

val lines = df.selectExpr("CAST(value AS STRING)").as[String]

val words = lines.flatMap(_.split(" "))

val wordCounts = words.groupBy("value").count()

val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

运行该程序时,出现以下异常:

Exception in thread "stream execution thread for [id = f704d6e5-14bf-4bd7-94a0-38c4b77986ea, runId = d277eaac-e18c-4128-954b-6a318bb8039c]" Exception in thread "main" java.lang.ExceptionInInitializerError
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.map(RDD.scala:370)
    at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:287)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:394)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) …
Run Code Online (Sandbox Code Playgroud)

scala sbt apache-spark apache-spark-sql

1
推荐指数
1
解决办法
3816
查看次数

如何在spark-jdbc连接中操作numPartitions,lowerBound,upperBound?

我正在尝试使用spark-jdbc在postgres db上读取表。为此,我想出了以下代码:

object PartitionRetrieval {
  var conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.default.parallelism", "20")
  val log   = LogManager.getLogger("Spark-JDBC Program")
  Logger.getLogger("org").setLevel(Level.ERROR)
  val conFile       = "/home/myuser/ReconTest/inputdir/testconnection.properties"
  val properties    = new Properties()
  properties.load(new FileInputStream(conFile))
  val connectionUrl = properties.getProperty("gpDevUrl")
  val devUserName   = properties.getProperty("devUserName")
  val devPassword   = properties.getProperty("devPassword")
  val driverClass   = properties.getProperty("gpDriverClass")
  val tableName     = "base.ledgers"
  try {
    Class.forName(driverClass).newInstance()
  } catch {
    case cnf: ClassNotFoundException =>
      log.error("Driver class: " + driverClass + " not found")
      System.exit(1)
    case e: Exception =>
      log.error("Exception: " + e.printStackTrace())
      System.exit(1)
  }
  def main(args: …
Run Code Online (Sandbox Code Playgroud)

apache-spark

1
推荐指数
1
解决办法
1933
查看次数

spark-计算2列或更多列中的平均值,并在每行中放入新列

假设我有一个包含以下内容的数据集/数据框:

name, marks1, marks2
Alice, 10, 20
Bob, 20, 30
Run Code Online (Sandbox Code Playgroud)

我想添加一个新列,该列应具有列B和C的平均值。

预期结果:-

name, marks1, marks2, Result(Avg)
Alice, 10, 20, 15
Bob, 20, 30, 25
Run Code Online (Sandbox Code Playgroud)

用于求和或任何其他算术运算df.withColumn("xyz", $"marks1"+$"marks2")。我找不到平均值的类似方法。请帮忙。

另外:-列数不是固定的。就像有时它可能是2列的平均值,有时是3列甚至更多列。所以我想要一个通用的代码,它应该可以工作。

apache-spark apache-spark-sql pyspark pyspark-sql

1
推荐指数
2
解决办法
5581
查看次数

java.lang.UnsatisfiedLinkError:org.apache.hadoop.io.nativeio.NativeIO $ Windows.createFileWithMode0(Ljava / lang / String; JJJI)Ljava / io / FileDescriptor

我有一个Spark项目,该项目最近可以工作。

该项目获得一个CSV,并向其中添加两个字段,然后输出带有JavaPairRddsaveasTextfile()的内容。

我的Spark版本是:2.3.0我的Java版本是:1.8

该项目在Windows 10下的Eclipse IDE中运行。

这是错误:

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2080)
at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:78)
... 32 more
 Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/FileDescriptor;
at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileOutputStreamWithMode(NativeIO.java:559)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:219)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911) …
Run Code Online (Sandbox Code Playgroud)

java eclipse apache-spark

1
推荐指数
1
解决办法
1054
查看次数