我只是使用标准缩放器来规范我的ML应用程序的功能.选择缩放功能后,我想将其转换回双打数据帧,尽管我的矢量长度是任意的.我知道如何使用特定的3个功能
myDF.map{case Row(v: Vector) => (v(0), v(1), v(2))}.toDF("f1", "f2", "f3")
Run Code Online (Sandbox Code Playgroud)
但不是任意数量的功能.是否有捷径可寻?
例:
val testDF = sc.parallelize(List(Vectors.dense(5D, 6D, 7D), Vectors.dense(8D, 9D, 10D), Vectors.dense(11D, 12D, 13D))).map(Tuple1(_)).toDF("scaledFeatures")
val myColumnNames = List("f1", "f2", "f3")
// val finalDF = DataFrame[f1: Double, f2: Double, f3: Double]
Run Code Online (Sandbox Code Playgroud)
编辑
我在创建数据帧时发现了如何解压缩到列名,但是仍然无法将向量转换为创建数据帧所需的序列:
finalDF = testDF.map{case Row(v: Vector) => v.toArray.toSeq /* <= this errors */}.toDF(List("f1", "f2", "f3"): _*)
Run Code Online (Sandbox Code Playgroud) 如何在分组数据中插入 PySpark 数据帧?
例如:
我有一个包含以下列的 PySpark 数据框:
+--------+-------------------+--------+
|webID |timestamp |counts |
+--------+-------------------+--------+
|John |2018-02-01 03:00:00|60 |
|John |2018-02-01 03:03:00|66 |
|John |2018-02-01 03:05:00|70 |
|John |2018-02-01 03:08:00|76 |
|Mo |2017-06-04 01:05:00|10 |
|Mo |2017-06-04 01:07:00|20 |
|Mo |2017-06-04 01:10:00|35 |
|Mo |2017-06-04 01:11:00|40 |
+--------+----------------- -+--------+
Run Code Online (Sandbox Code Playgroud)
我需要在他们自己的时间间隔内每分钟将 John 和 Mo 的计数数据插入一个数据点。我对任何简单的线性插值持开放态度 - 但请注意,我的真实数据是每隔几秒一次,我想插值到每一秒。
所以结果应该是:
+--------+-------------------+--------+
|webID |timestamp |counts |
+--------+-------------------+--------+
|John |2018-02-01 03:00:00|60 |
|John |2018-02-01 03:01:00|62 |
|John |2018-02-01 03:02:00|64 |
|John |2018-02-01 03:03:00|66 |
|John |2018-02-01 03:04:00|68 …Run Code Online (Sandbox Code Playgroud) 我们正在使用具有 8 个核心和 32GB RAM 的 Spark 独立集群,以及具有相同配置的 3 节点集群。
有时流批处理会在不到 1 秒的时间内完成。有时需要超过 10 秒,此时控制台中会出现下面的日志。
2016-03-29 11:35:25,044 INFO TaskSchedulerImpl:59 - Removed TaskSet 18.0, whose tasks have all completed, from pool
2016-03-29 11:35:25,044 INFO DAGScheduler:59 - Job 18 finished: foreachRDD at EventProcessor.java:87, took 1.128755 s
2016-03-29 11:35:31,471 INFO JobScheduler:59 - Added jobs for time 1459231530000 ms
2016-03-29 11:35:35,004 INFO JobScheduler:59 - Added jobs for time 1459231535000 ms
2016-03-29 11:35:40,004 INFO JobScheduler:59 - Added jobs for time 1459231540000 ms
2016-03-29 11:35:45,136 INFO …Run Code Online (Sandbox Code Playgroud) 我通过groupby column1和date在Spark中创建了一个数据框,并计算了数量.
val table = df1.groupBy($"column1",$"date").sum("amount")
Run Code Online (Sandbox Code Playgroud)
Column1 |Date |Amount
A |1-jul |1000
A |1-june |2000
A |1-May |2000
A |1-dec |3000
A |1-Nov |2000
B |1-jul |100
B |1-june |300
B |1-May |400
B |1-dec |300
Run Code Online (Sandbox Code Playgroud)
现在,我想添加新列,表中任意两个日期的数量之间存在差异.
这是代码段:
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext()
spark = SparkSession(sc)
d = spark.read.format("csv").option("header", True).option("inferSchema", True).load('file.csv')
d.show()
Run Code Online (Sandbox Code Playgroud)
之后遇到错误:
An error occurred while calling o163.showString. Trace:
py4j.Py4JException: Method showString([class java.lang.Integer, class java.lang.Integer, class java.lang.Boolean]) does not exist
Run Code Online (Sandbox Code Playgroud)
所有其他方法都可以正常工作。试图进行大量研究但徒劳无功。任何线索将不胜感激
我有一个以下格式的 Spark 数据框。
df = spark.createDataFrame([(1, 2, 3), (1, 4, 100), (20, 30, 50)],['a', 'b', 'c'])
df.show()
Run Code Online (Sandbox Code Playgroud)
输入:
我想添加一个新列“中位数”作为列“a”、“b”、“c”的中位数。如何在 PySpark 中执行此操作。
预期输出:
我正在使用 Spark 2.3.1
我正在尝试本地 Kubernetes(Docker-on-mac),并尝试提交 Spark 作业。Spark 作业连接 PostgreSQL 数据库并执行一些计算。
PostgreSQL 正在我的 Kube 上运行,由于我已经发布了它,我可以通过 localhost:5432 从主机访问它。但是,当 Spark 应用程序尝试连接到 PostgreSQL 时,它会抛出异常
Exception in thread "main" org.postgresql.util.PSQLException: Connection to localhost:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.
Run Code Online (Sandbox Code Playgroud)
kubectl 集群信息
Kubernetes master is running at https://kubernetes.docker.internal:6443
KubeDNS is running at https://kubernetes.docker.internal:6443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
Run Code Online (Sandbox Code Playgroud)
kubectl 获取服务 postgresql-published
kubectl描述服务spark-store-1588217023181-driver-svc
Name: spark-store-1588217023181-driver-svc
Namespace: default
Labels: <none>
Annotations: <none>
Selector: spark-app-selector=spark-533ecb8556b6439eb938d487cc77c330,spark-role=driver
Type: ClusterIP
IP: None
Port: driver-rpc-port 7078/TCP
TargetPort: 7078/TCP
Endpoints: <none> …Run Code Online (Sandbox Code Playgroud) 我是 Spark 和 Scala 的新手。我正在阅读 Spark 的 distinct() 函数。但我找不到任何适当的细节。我有一些我无法解决的疑问,并已将它们写下来。
在 Spark 中如何实现 distinct() ?
我不太擅长使用 Spark 源代码来识别整个流程。当我检查执行计划时,我只能看到一个 ShuffleRDD
distinct 的时间复杂度是多少?
我还从 Google 搜索中发现,它还以某种方式使用了散列和排序。
所以,我想它是否使用与在 Hashset 的帮助下从数组中获取唯一元素相同的原理。如果它是一个系统,我会猜到时间复杂度是 O(nlogn) 。
但是它分布在许多分区中并被打乱,时间复杂度的顺序是什么?
有没有办法避免在特定情况下改组?
如果我确保按照我的用例正确分区我的数据,我可以避免改组吗?
即,例如,假设在具有唯一行的数据框中分解一个 ArrayType 列会创建新行,而其他列被复制。我将选择其他列。通过这种方式,我确保每个分区的重复项都是唯一的。因为我知道每个分区的重复项是唯一的,所以我可以避免洗牌,只是敏锐地删除该分区中的重复项
我还发现这是否 spark 的 distinct() 函数只对每个分区中的不同元组进行洗牌。
谢谢你的帮助 。如果我在任何地方错了,请纠正我。
我们想使用 Grafana 来显示测量数据。现在,我们的测量设置创建了大量数据并保存在文件中。我们按原样保留文件,并直接使用 Spark(“数据湖”方法)对它们进行后处理。
我们现在想要创建一些可视化,我想在运行 Spark 和 HDFS(存储文件的位置)的集群上设置 Cassandra。将有一个服务(或 Spark-Streaming 作业)将选定的通道从测量数据文件转储到 Kafka 主题,另一个作业将它们放入 Cassandra 中。我使用这种方法是因为我们还有其他流处理作业也可以进行即时计算。
我现在考虑编写一个小型 REST 服务,使 Grafana 的简单 JSON 数据源可用于提取数据并将其可视化。到目前为止一切都很好,但由于我们收集的数据量很大(有时每分钟大约 300MiB),Cassandra 数据库应该只保存最近几个小时的数据。
我现在的问题是:如果有人查看数据,发现一些有趣的东西并创建仪表板或面板的快照(或发生某个事件并自动拍摄快照),并且原始数据从 Cassandra 中删除,快照是否可以还可以查看吗?数据是一起保存的吗?还是快照只保存元数据,重新查询数据源?
我有一个PySpark数据框,在列中包含时间戳(调用列'dt'),如下所示:
2018-04-07 16:46:00
2018-03-06 22:18:00
Run Code Online (Sandbox Code Playgroud)
当我执行:
SELECT trunc(dt, 'day') as day
Run Code Online (Sandbox Code Playgroud)
...我期望:
2018-04-07 00:00:00
2018-03-06 00:00:00
Run Code Online (Sandbox Code Playgroud)
但我得到了:
null
null
Run Code Online (Sandbox Code Playgroud)
我如何截断到一天而不是一小时?
我想将包含纪元时间的时间戳列转换为日期时间(人类可读)。from_unixtime没有给我正确的日期和时间。请帮忙。
df = spark.createDataFrame([('1535934855077532656',), ('1535934855077532656',),('1535935539886503614',)], ['timestamp',])
df.show()
Run Code Online (Sandbox Code Playgroud)
df = spark.createDataFrame([('1535934855077532656',), ('1535934855077532656',),('1535935539886503614',)], ['timestamp',])
df.show()
Run Code Online (Sandbox Code Playgroud)
+-------------------+
| timestamp|
+-------------------+
|1535934855077532656|
|1535934855077532656|
|1535935539886503614|
+-------------------+
Run Code Online (Sandbox Code Playgroud)
df.withColumn('datetime',from_unixtime(df.timestamp,"yyyy-MM-dd HH:mm:ss:SSS")).select(['timestamp','datetime']).show(15,False)
Run Code Online (Sandbox Code Playgroud) 我有一个熊猫数据框。我尝试将包含字符串值的两列首先连接到列表中,然后使用zip将列表中的每个元素都用'_'连接。我的数据集如下:
df['column_1']: 'abc, def, ghi'
df['column_2']: '1.0, 2.0, 3.0'
Run Code Online (Sandbox Code Playgroud)
我想将这两列连接到第三列,如下所示,用于数据框的每一行。
df['column_3']: [abc_1.0, def_2.0, ghi_3.0]
Run Code Online (Sandbox Code Playgroud)
我已经使用下面的代码在python中成功完成了此操作,但该数据框非常大,并且需要花费很长时间才能为整个数据框运行它。我想在PySpark中做同样的事情以提高效率。我已经成功读取了spark数据框中的数据,但是我很难确定如何使用PySpark等效函数复制Pandas函数。如何在PySpark中获得想要的结果?
df['column_3'] = df['column_2']
for index, row in df.iterrows():
while index < 3:
if isinstance(row['column_1'], str):
row['column_1'] = list(row['column_1'].split(','))
row['column_2'] = list(row['column_2'].split(','))
row['column_3'] = ['_'.join(map(str, i)) for i in zip(list(row['column_1']), list(row['column_2']))]
Run Code Online (Sandbox Code Playgroud)
我已使用以下代码将两列转换为PySpark中的数组
from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import col, split
crash.withColumn("column_1",
split(col("column_1"), ",\s*").cast(ArrayType(StringType())).alias("column_1")
)
crash.withColumn("column_2",
split(col("column_2"), ",\s*").cast(ArrayType(StringType())).alias("column_2")
)
Run Code Online (Sandbox Code Playgroud)
现在,我需要使用'_'在两列中压缩数组的每个元素。我该如何使用zip?任何帮助表示赞赏。
我正在学习 Scala 和 Spark,想要打印一些格式化的日志。
这是我得到的一些例子:
val flag : Boolean = true
val charA : Char = 'a'
val piVal : Float = 3.14159265f
val num : Int = 1
println(f"val of pi = $piVal%.3f")
println(f"another formatting : $num%05d")
println(s"values like $num $flag $charA")
println(s"evaluate expression = ${1+2}")
Run Code Online (Sandbox Code Playgroud)
但我不明白这一切的意义。
请建议何时使用 f 以及何时将 s 与 printf 一起使用以及使用它的格式是什么?
apache-spark ×10
pyspark ×6
scala ×4
dataframe ×1
docker ×1
formatting ×1
grafana ×1
java ×1
kubectl ×1
kubernetes ×1
pandas ×1
py4j ×1
pyspark-sql ×1
python ×1
sorting ×1
timestamp ×1