我正在一个项目上,正在处理具有复杂模式/数据结构的一些嵌套JSON日期。基本上,我想做的是在数据框中过滤掉其中的一列,以便选择数组中的最后一个元素。我完全坚持如何做到这一点。我希望这是有道理的。
以下是我要完成的示例:
val singersDF = Seq(
("beatles", "help,hey,jude"),
("romeo", "eres,mia"),
("elvis", "this,is,an,example")
).toDF("name", "hit_songs")
val actualDF = singersDF.withColumn(
"hit_songs",
split(col("hit_songs"), "\\,")
)
actualDF.show(false)
actualDF.printSchema()
+-------+-----------------------+
|name |hit_songs |
+-------+-----------------------+
|beatles|[help, hey, jude] |
|romeo |[eres, mia] |
|elvis |[this, is, an, example]|
+-------+-----------------------+
root
|-- name: string (nullable = true)
|-- hit_songs: array (nullable = true)
| |-- element: string (containsNull = true)
Run Code Online (Sandbox Code Playgroud)
输出的最终目标将是以下内容,以选择hit_songs数组中的最后一个“字符串”。
我不担心之后的架构是什么样的。
+-------+---------+
|name |hit_songs|
+-------+---------+
|beatles|jude |
|romeo |mia |
|elvis |example |
+-------+---------+
Run Code Online (Sandbox Code Playgroud) 我有一个包含2列tag和的数据框value。
我想补充一点,包含新列max的value列。(对于每行它将是相同的值)。
我尝试执行以下操作,但是没有成功。
val df2 = df.withColumn("max",max($"value"))
Run Code Online (Sandbox Code Playgroud)
如何将max列添加到数据集?
我正在尝试将新列添加到我的spark DF。我了解可以使用以下代码:
df.withColumn("row",monotonically_increasing_id)
Run Code Online (Sandbox Code Playgroud)
但是我的用例是:
输入DF:
col value
1
2
2
3
3
3
Run Code Online (Sandbox Code Playgroud)
输出DF:
col_value identifier
1 1
2 1
2 2
3 1
3 2
3 3
Run Code Online (Sandbox Code Playgroud)
关于使用monotonically_increasing或rowWithUniqueIndex进行获取的任何建议。
我试图获得单列数据帧的0.8百分位数.我试过这样的方式:
val limit80 = 0.8
val dfSize = df.count()
val perfentileIndex = dfSize*limit80
dfSorted = df.sort()
val percentile80 = dfSorted .take(perfentileIndex).last()
Run Code Online (Sandbox Code Playgroud)
但我认为这对大型数据帧来说是失败的,因为它们可能分布在不同的节点上.
有没有更好的方法来计算百分位数?或者我怎么能在同一台机器中拥有数据帧的所有行(即使这是非常反模式的),所以这df.take(index)将真正考虑整个数据集而不仅仅是节点中的分区.
我的主机系统是Windows 10,我有cloudera vm,我的spark版本是1.6。我试图使用以下命令加载spark-shell。
spark-shell --packages org.apache.spark:spark-streaming-twitter_2.10:1.6.0
Run Code Online (Sandbox Code Playgroud)
但是它抛出以下异常:
:::: ERRORS Server access error at url https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-twitter_2.10/1.6.0/spark-streaming-twitter_2.10-1.6.0.pom (javax.net.ssl.SSLException: Received fatal alert: protocol_version)
Server access error at url https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-twitter_2.10/1.6.0/spark-streaming-twitter_2.10-1.6.0.jar (javax.net.ssl.SSLException: Received fatal alert: protocol_version)
Run Code Online (Sandbox Code Playgroud)
::使用详细或调试消息级别获取更多详细信息线程中的异常
"main" java.lang.RuntimeException: [unresolved dependency: org.apache.spark#spark-streaming-twitter_2.10;1.6.0: not found] at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1067) at org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:287) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:154) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Run Code Online (Sandbox Code Playgroud) 在PySpark驱动程序中创建新的Python进程时(例如,使用JobLib或其他多处理库),这些进程是否共享Spark驱动程序内存,或者它们在该PySpark驱动程序JVM之外分配了内存?
我正在尝试运行一个非常简单的Spark Streaming字数统计程序,该程序从一个Kafka主题读取。下面是我的代码:
val spark = SparkSession
.builder()
.appName("KafkaWordCount")
.config("spark.master", "local")
.getOrCreate()
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.load()
val lines = df.selectExpr("CAST(value AS STRING)").as[String]
val words = lines.flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
运行该程序时,出现以下异常:
Exception in thread "stream execution thread for [id = f704d6e5-14bf-4bd7-94a0-38c4b77986ea, runId = d277eaac-e18c-4128-954b-6a318bb8039c]" Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:287)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:394)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用spark-jdbc在postgres db上读取表。为此,我想出了以下代码:
object PartitionRetrieval {
var conf = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.default.parallelism", "20")
val log = LogManager.getLogger("Spark-JDBC Program")
Logger.getLogger("org").setLevel(Level.ERROR)
val conFile = "/home/myuser/ReconTest/inputdir/testconnection.properties"
val properties = new Properties()
properties.load(new FileInputStream(conFile))
val connectionUrl = properties.getProperty("gpDevUrl")
val devUserName = properties.getProperty("devUserName")
val devPassword = properties.getProperty("devPassword")
val driverClass = properties.getProperty("gpDriverClass")
val tableName = "base.ledgers"
try {
Class.forName(driverClass).newInstance()
} catch {
case cnf: ClassNotFoundException =>
log.error("Driver class: " + driverClass + " not found")
System.exit(1)
case e: Exception =>
log.error("Exception: " + e.printStackTrace())
System.exit(1)
}
def main(args: …Run Code Online (Sandbox Code Playgroud) 假设我有一个包含以下内容的数据集/数据框:
name, marks1, marks2
Alice, 10, 20
Bob, 20, 30
Run Code Online (Sandbox Code Playgroud)
我想添加一个新列,该列应具有列B和C的平均值。
预期结果:-
name, marks1, marks2, Result(Avg)
Alice, 10, 20, 15
Bob, 20, 30, 25
Run Code Online (Sandbox Code Playgroud)
用于求和或任何其他算术运算df.withColumn("xyz", $"marks1"+$"marks2")。我找不到平均值的类似方法。请帮忙。
另外:-列数不是固定的。就像有时它可能是2列的平均值,有时是3列甚至更多列。所以我想要一个通用的代码,它应该可以工作。
我有一个Spark项目,该项目最近可以工作。
该项目获得一个CSV,并向其中添加两个字段,然后输出带有JavaPairRddsaveasTextfile()的内容。
我的Spark版本是:2.3.0我的Java版本是:1.8
该项目在Windows 10下的Eclipse IDE中运行。
这是错误:
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2080)
at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:78)
... 32 more
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Ljava/lang/String;JJJI)Ljava/io/FileDescriptor;
at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileWithMode0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.createFileOutputStreamWithMode(NativeIO.java:559)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:219)
at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911) …Run Code Online (Sandbox Code Playgroud)