我正在尝试从运行在IBM Analytics Engine上的Spark 2.3连接到在IBM Cloud上运行的ScyllaDB数据库.
我这样开始火花壳......
$ spark-shell --master local[1] \
--files jaas.conf \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,datastax:spark-cassandra-connector:2.3.0-s_2.11,commons-configuration:commons-configuration:1.10 \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
--conf spark.cassandra.connection.host=xxx1.composedb.com,xxx2.composedb.com,xxx3.composedb.com \
--conf spark.cassandra.connection.port=28730 \
--conf spark.cassandra.auth.username=scylla \
--conf spark.cassandra.auth.password=SECRET \
--conf spark.cassandra.connection.ssl.enabled=true \
--num-executors 1 \
--executor-cores 1
Run Code Online (Sandbox Code Playgroud)
然后执行以下spark scala代码:
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
val stocksRdd = sc.cassandraTable("stocks", "stocks")
stocksRdd.count()
Run Code Online (Sandbox Code Playgroud)
但是,我看到一堆警告:
18/08/23 10:11:01 WARN Cluster: You listed xxx1.composedb.com/xxx.xxx.xxx.xxx:28730 in your contact points, but it wasn't found in the control host's system.peers at startup
18/08/23 10:11:01 …Run Code Online (Sandbox Code Playgroud) 我有以下源文件.我的文件中有一个名为" john" 的名称想要拆分为列表['j','o','h','n'].请按以下方式查找人员档案.
源文件:
id,name,class,start_data,end_date
1,john,xii,20170909,20210909
Run Code Online (Sandbox Code Playgroud)
码:
from pyspark.sql import SparkSession
def main():
spark = SparkSession.builder.appName("PersonProcessing").getOrCreate()
df = spark.read.csv('person.txt', header=True)
nameList = [x['name'] for x in df.rdd.collect()]
print(list(nameList))
df.show()
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
实际产量:
[u'john']
Run Code Online (Sandbox Code Playgroud)
期望的输出:
['j','o','h','n']
Run Code Online (Sandbox Code Playgroud) 我有DataFrame以下架构:
root
|- documentId
|- timestamp
|- anotherField
Run Code Online (Sandbox Code Playgroud)
例如,
"d1", "2018-09-20 10:00:00", "blah1"
"d2", "2018-09-20 09:00:00", "blah2"
"d1", "2018-09-20 10:01:00", "blahnew"
Run Code Online (Sandbox Code Playgroud)
请注意,为了理解(和方便起见),我将时间戳显示为字符串。实际上,它是long自纪元以来的毫秒数。
如此处所示,存在重复的行(第1行和第3行),它们具有相同documentId但不同的timestamp(可能还有其他字段)。我想timestamp为每个重复数据删除并仅保留最新行(基于)documentId。
一种简单的方法df.groupBy("documentId").agg(max("timestamp), ...)似乎不太可能在这里工作,因为我不知道如何将其他字段保留在与满足条件的字段相对应的行中max("timestamp")。
因此,我想出了一种复杂的方法。
// first find the max timestamp corresponding to each documentId
val mostRecent = df
.select("documentId", "timestamp")
.groupBy("documentId")
.agg(max("timestamp"))
// now join with the original df on timestamp to retain
val dedupedDf = df.join(mostRecent, Seq("documentId", "timestamp"), "inner")
Run Code Online (Sandbox Code Playgroud)
此结果dedupedDf …
大家好日子.我会尝试解释我的问题,这样你就能理解我.
在一些地方我发现它认为Scala比Python更快:
另外,据说Scala是最适合在Apache Spark中运行应用程序的编程语言:
https://www.dezyre.com/article/scala-vs-python-for-apache-spark/213
但是,在这个网站上,另一个用户(@Mrityunjay)问了一个类似于我在这里提出的问题:
在这篇文章中,来自@ zero323的回复强调了以下内容:
一般来说,回复说明是特殊的,并且通过@ zero323在Scala和Python之间进行修改可以实现非常相似的执行时间.
考虑到这些信息,我给自己编写了一个简单程序的任务,这个程序可以让我解释我的应用程序发生的类似情况,突出显示我在Scala中的代码比用Python编写的代码慢.为此,我避免使用ReduceByKey操作,只使用了map操作.
我将尝试执行任何超复杂的操作,以最大化群集占用(96核,48 GB RAM)并实现大延迟.为此,该代码生成一组100万个人工数据(仅用于计算处理100万个数据的执行时间,无论它们是否被复制),其中包含标识符ID,长度为10的向量DoubleS.
由于我的应用程序是使用DataFrame实现的,我在Scala中创建了两个程序,一个使用RDD,另一个使用DataFrame,目的是观察问题是否使用DataFrame.同样,一个等效的程序是用Python编写的.
通常,操作应用于每个RDD/DataFrame记录,其结果放在附加字段中,从而生成包含原始字段的新RDD/DataFrame和包含结果的新字段.
这是Scala中的代码:
import org.apache.spark.sql.SparkSession
import scala.math.BigDecimal
object RDDvsDFMapComparison {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("Test").getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val parts = 96
val repl = 1000000
val rep = 60000000
val ary = (0 until 10).toArray
val m = Array.ofDim[Int](repl, ary.length)
for (i <- 0 until repl)
m(i) = ary
val …Run Code Online (Sandbox Code Playgroud) 我读json为:
val df = spark.read.json(rdd)
Run Code Online (Sandbox Code Playgroud)
我阅读了来自不同主题的消息,因此无法指定显式架构。某些消息包含带有嵌套json的字段,它们将转换为StructType。例如:
{"name": "John", "son": {"name":"Tom"}}
Run Code Online (Sandbox Code Playgroud)
如何将其转换为String?我需要将“儿子”字段读取为字符串:
"{\"name\":\"Tom\"}"
Run Code Online (Sandbox Code Playgroud)
使用cast方法或sql函数失败:
df.selectExpr("cast(son as string)")
Run Code Online (Sandbox Code Playgroud)
错误:
java.lang.String is not a valid external type for schema of struct<name:string>
Run Code Online (Sandbox Code Playgroud) 我是Spark的新手,我想了解两个JavaRDD与JavaPairRDD之间的区别,以及如果将JavaRDD转换为JavaPairRDD,此操作的繁重程度
JavaRDD<Tuple2<String, String>> myRdd // This is my JavaRDD
JavaPairRDD<String, String> pairRDD = JavaPairRDD.fromJavaRDD(myRdd);
Run Code Online (Sandbox Code Playgroud) 我使用自制软件在我的Mac上安装了spark。我正在尝试查找安装目录。我已经尝试使用Google搜索,但是运气不佳。似乎应该不会那么棘手。谁能告诉我在Mac终端中或从spark shell运行需要什么才能找到spark的安装目录?
更新:
码:
brew info apache-spark
Run Code Online (Sandbox Code Playgroud)
输出:
apache-spark: stable 2.3.2, HEAD
Engine for large-scale data processing
https://spark.apache.org/
/usr/local/Cellar/apache-spark/2.3.2 (1,058 files, 244.6MB) *
Built from source on 2018-10-30 at 14:16:30
From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/apache-spark.rb
==> Requirements
Required: java = 1.8 ?
==> Options
--HEAD
Install HEAD version
==> Analytics
install: 4,534 (30 days), 14,340 (90 days), 56,698 (365 days)
install_on_request: 4,263 (30 days), 13,490 (90 days), 51,876 (365 days)
build_error: 0 (30 days)
Run Code Online (Sandbox Code Playgroud)
码:
which spark-shell
Run Code Online (Sandbox Code Playgroud)
输出:
/Users/sshields/anaconda2/bin/spark-shell
Run Code Online (Sandbox Code Playgroud) 我写了一份火花工作。工作顺利,没有任何问题。但是,当我查看错误日志文件时,会看到很多类型的消息
[error] 18/11/25 17:28:14 INFO CodeGenerator: Code
generated in 16.947005 ms
Run Code Online (Sandbox Code Playgroud)
和
[error] 18/11/25 17:28:15 INFO ContextCleaner: Cleaned
accumulator 239819
Run Code Online (Sandbox Code Playgroud)
和
[error] 18/11/25 17:28:06 INFO BlockManagerInfo: Removed
broadcast_13354_piece0 on 192.168.2.101:43753 in memory
(size: 20.5 KB, free: 6.2 GB)
Run Code Online (Sandbox Code Playgroud)
有什么办法可以抑制这些消息。他们只是在膨胀我的日志文件。
不知道为什么spark将这些报告为错误。当它们看起来像某种调试消息时。
我想在spark中扩展SparkSession类.我复制了部分原版SparkSession的构造函数:
class SparkSession private(
@transient val sparkContext: SparkContext,
@transient private val existingSharedState: Option[SharedState],
@transient private val parentSessionState: Option[SessionState],
@transient private[sql] val extensions: SparkSessionExtensions)
extends Serializable with Closeable with Logging { self =>
private[sql] def this(sc: SparkContext) {
this(sc, None, None, new SparkSessionExtensions)
}
// other implementations
}
Run Code Online (Sandbox Code Playgroud)
这是我尝试扩展它:
class CustomSparkSession private(
@transient override val sparkContext: SparkContext,
@transient private val existingSharedState: Option[SharedState],
@transient private val parentSessionState: Option[SessionState],
@transient override private[sql] val extensions: SparkSessionExtensions)
extends SparkSession {
// implementation
}
Run Code Online (Sandbox Code Playgroud)
但我得到的错误SparkSession …
我正在尝试从数据框中删除多个列。我已经在/sf/answers/2787305181/上遵循了该示例,但是它对我不起作用。我正在使用Spark 2.4.0
这就是我现在正在做的事情。它确实有效,但是我想知道是否有更好的方法可以做到这一点。
val colsToRemove = Seq("colA", "colB", "colC", etc)
var filterdDF = df
for(i <- 0 until colsToRemove.size){
filterdDF = filterdDF.drop(colsToRemove(i))
}
Run Code Online (Sandbox Code Playgroud) apache-spark ×10
scala ×5
python ×2
rdd ×2
compose-db ×1
dataframe ×1
duplicates ×1
homebrew ×1
ibm-cloud ×1
inner-join ×1
macos ×1
pyspark ×1
scylla ×1