我正在尝试复制正在传输其位置坐标的设备,然后处理数据并将其保存到文本文件中。我正在使用 Kafka 和 Spark 流(在 pyspark 上),这是我的架构:
1-Kafka生产者以以下字符串格式将数据发送到名为test的主题:
"LG float LT float" example : LG 8100.25191107 LT 8406.43141483
Run Code Online (Sandbox Code Playgroud)
生产者代码:
from kafka import KafkaProducer
import random
producer = KafkaProducer(bootstrap_servers='localhost:9092')
for i in range(0,10000):
lg_value = str(random.uniform(5000, 10000))
lt_value = str(random.uniform(5000, 10000))
producer.send('test', 'LG '+lg_value+' LT '+lt_value)
producer.flush()
Run Code Online (Sandbox Code Playgroud)
生产者工作正常,我在消费者中获取流数据(甚至在 Spark 中)
2- Spark Streaming正在接收这个流,我可以甚至pprint()它
Spark流处理代码
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
ssc = StreamingContext(sc, 1)
kvs = KafkaUtils.createDirectStream(ssc, ["test"], {"bootstrap.servers": "localhost:9092"})
lines = kvs.map(lambda …Run Code Online (Sandbox Code Playgroud) python-2.7 apache-spark spark-streaming pyspark kafka-python
我需要从 pyspark 数据框中的字符串列中删除正则表达式
df = spark.createDataFrame(
[
("Dog 10H03", "10H03"),
("Cat 09H24 eats rat", "09H24"),
("Mouse 09H45 runs away", "09H45"),
("Mouse 09H45 enters room", "09H45"),
],
["Animal", "Time"],
)
Run Code Online (Sandbox Code Playgroud)
时间戳(例如10H03)是必须删除的正则表达式。
+--------------------+------------------+-----+
| Animal| Animal_strip_time| Time|
+--------------------+------------------+-----+
| Dog 10H03| Dog |10H03|
| Cat 09H24 eats rat| Cat eats rat|09H24|
|Mouse 09H45 runs ...| Mouse runs away|09H45|
|Mouse 09H45 enter...|Mouse enters room|09H45|
+--------------------+------------------+-----+
Run Code Online (Sandbox Code Playgroud)
该列中的时间戳Time可能与该列中的时间戳不同Animal。因此,它不能用于匹配字符串。
正则表达式应遵循 XXHXX 模式,其中 X 是 0-9 之间的数字
我尝试使用 to_date 将日期格式 2018-07-12 转换为 ddMMyy 但在转换日期格式后我得到 null
df = spark.createDataFrame([('2018-07-12',)], ['Date_col'])
df = df.withColumn('new_date',to_date('Date_col', 'ddMMyy'))
Run Code Online (Sandbox Code Playgroud)
我需要使用这个逻辑来转换数据帧列。我是 Spark 编程的新手,尝试了很多解决方案,但没有任何帮助。
我需要连接一列中的 ddMMyy 和另一列中的 hhss
有什么帮助吗?
假设我有以下日期时间列,如下所示。我想将字符串中的列转换为日期时间类型,以便提取月份,日期和年份等。
+---+------------+
|agg| datetime|
+---+------------+
| A|1/2/17 12:00|
| B| null|
| C|1/4/17 15:00|
+---+------------+
Run Code Online (Sandbox Code Playgroud)
我已经在下面尝试了以下代码,但是datetime列中的返回值为null,目前我不了解其原因。
df.select(df['datetime'].cast(DateType())).show()
Run Code Online (Sandbox Code Playgroud)
而且我也尝试了以下代码:
df = df.withColumn('datetime2', from_unixtime(unix_timestamp(df['datetime']), 'dd/MM/yy HH:mm'))
Run Code Online (Sandbox Code Playgroud)
但是,它们都产生以下数据帧:
+---+------------+---------+
|agg| datetime|datetime2|
+---+------------+---------+
| A|1/2/17 12:00| null|
| B| null | null|
| C|1/4/17 12:00| null|
Run Code Online (Sandbox Code Playgroud)
我已经阅读并尝试了本文中指定的解决方案,但无济于事:PySpark数据帧将异常的字符串格式转换为时间戳
我有一个数据框,并希望删除所有括号,并替换为两个连字符.
之前:
+------------+
| dob_concat|
+------------+
|[1983][6][3]|
+------------+
Run Code Online (Sandbox Code Playgroud)
后:
+------------+
| dob_concat |
+------------+
| 1983-6-3 |
+------------+
Run Code Online (Sandbox Code Playgroud) 我正在从 hbase 获取数据并将其转换为数据帧。现在,我在数据框中有一列是string数据类型。但我需要将其数据类型转换为Int.
尝试了下面的代码,但它给我一个错误
df.withColumn("order", 'order.cast(int)')
Run Code Online (Sandbox Code Playgroud)
我面临的错误如下
error:col should be column
Run Code Online (Sandbox Code Playgroud)
我在这里给出了正确的列名,我需要在 pyspark 中更改上述代码的语法吗?
已使用 databricks 中的 pyspark 在 Azure Blob 存储中创建了按日期分区的 Parquet 文件,但在一个日期文件夹中收到了如此多的文件,例如 500 个文件。我需要使用 PySpark 减少文件数量,例如一个日期文件夹中的 10 或 15 个文件。
df.write.format("parquet").mode("overwrite").partitionBy("Date").save(
"/mnt/mydata.parquet"
)
Run Code Online (Sandbox Code Playgroud)
我尝试过coalesce:
df.write.format("parquet").mode("overwrite").partitionBy("Date").coalesce(15).save(
"/mnt/mydata.parquet"
)
Run Code Online (Sandbox Code Playgroud)
但会抛出错误:
AttributeError:“DataFrameWriter”对象没有属性“coalesce”
请帮忙。
我有一个这样的字符串,每一行都用 \n 分隔。
我尝试了多种方法,但找不到任何合适的方法来做到这一点。
列名称 \n 第一行 \n 第二行 例如
"Name,ID,Number\n abc,1,123 \n xyz,2,456"
I want to convert it into pyspark dataframe like this
Name ID Number
abc 1 123
xyz 2 456
Run Code Online (Sandbox Code Playgroud) 我有一个火花的RDD.RDD的每个元素都是一个列表.而且,所有元素都是相似模式的列表,所以它有点像表.我需要按特定优先级顺序按某些列排序RDD.
我怎样才能做到这一点?
PS:这是我试过的.
我尝试按优先级最高的字段排序,然后按其分组,然后按优先级次高的字段对每个结果进行排序.我递归地做了这个,并加入了结果.但是,使用RDD.groupBy这么多次使它非常慢.
将字符串转换为十进制 (18,2)
from pyspark.sql.types import *
DF1 = DF.withColumn("New_col", DF["New_col"].cast(DecimalType(12,2)))
display(DF1)
Run Code Online (Sandbox Code Playgroud)
需要帮助将字符串转换为十进制以将 DF 加载到数据库中。
pyspark ×11
apache-spark ×7
python ×4
databricks ×2
dataframe ×2
date-format ×1
kafka-python ×1
pandas ×1
python-2.7 ×1
regex ×1
sorting ×1