标签: pyspark

PySpark 处理流数据并将处理后的数据保存到文件

我正在尝试复制正在传输其位置坐标的设备,然后处理数据并将其保存到文本文件中。我正在使用 Kafka 和 Spark 流(在 pyspark 上),这是我的架构:

1-Kafka生产者以以下字符串格式将数据发送到名为test的主题:

"LG float LT float" example : LG 8100.25191107 LT 8406.43141483
Run Code Online (Sandbox Code Playgroud)

生产者代码:

from kafka import KafkaProducer
import random

producer = KafkaProducer(bootstrap_servers='localhost:9092')

for i in range(0,10000):
    lg_value = str(random.uniform(5000, 10000))
    lt_value = str(random.uniform(5000, 10000))
producer.send('test', 'LG '+lg_value+' LT '+lt_value)

producer.flush()
Run Code Online (Sandbox Code Playgroud)

生产者工作正常,我在消费者中获取流数据(甚至在 Spark 中)

2- Spark Streaming正在接收这个流,我可以甚至pprint()

Spark流处理代码

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

ssc = StreamingContext(sc, 1)
kvs = KafkaUtils.createDirectStream(ssc, ["test"], {"bootstrap.servers": "localhost:9092"})

lines = kvs.map(lambda …
Run Code Online (Sandbox Code Playgroud)

python-2.7 apache-spark spark-streaming pyspark kafka-python

-1
推荐指数
1
解决办法
3129
查看次数

从字符串 PySpark Dataframe 列中删除正则表达式

我需要从 pyspark 数据框中的字符串列中删除正则表达式

df = spark.createDataFrame(
    [
        ("Dog 10H03", "10H03"),
        ("Cat 09H24 eats rat", "09H24"),
        ("Mouse 09H45 runs away", "09H45"),
        ("Mouse 09H45 enters room", "09H45"),
    ],
    ["Animal", "Time"],
)
Run Code Online (Sandbox Code Playgroud)

时间戳(例如10H03)是必须删除的正则表达式。

+--------------------+------------------+-----+
|              Animal| Animal_strip_time| Time|
+--------------------+------------------+-----+
|           Dog 10H03|              Dog |10H03|
|  Cat 09H24 eats rat|     Cat  eats rat|09H24|
|Mouse 09H45 runs ...|  Mouse  runs away|09H45|
|Mouse 09H45 enter...|Mouse  enters room|09H45|
+--------------------+------------------+-----+
Run Code Online (Sandbox Code Playgroud)

该列中的时间戳Time可能与该列中的时间戳不同Animal。因此,它不能用于匹配字符串。

正则表达式应遵循 XXHXX 模式,其中 X 是 0-9 之间的数字

python regex apache-spark-sql pyspark

-1
推荐指数
1
解决办法
1万
查看次数

如何在pyspark中将日期格式“YYYY-MM-DD”转换为ddMMyy?

我尝试使用 to_date 将日期格式 2018-07-12 转换为 ddMMyy 但在转换日期格式后我得到 null

df = spark.createDataFrame([('2018-07-12',)], ['Date_col'])

df = df.withColumn('new_date',to_date('Date_col', 'ddMMyy'))
Run Code Online (Sandbox Code Playgroud)

我需要使用这个逻辑来转换数据帧列。我是 Spark 编程的新手,尝试了很多解决方案,但没有任何帮助。

我需要连接一列中的 ddMMyy 和另一列中的 hhss

有什么帮助吗?

date-format pyspark

-1
推荐指数
1
解决办法
2万
查看次数

从Pyspark中的字符串列创建日期时间

假设我有以下日期时间列,如下所示。我想将字符串中的列转换为日期时间类型,以便提取月份,日期和年份等。

+---+------------+
|agg|    datetime|
+---+------------+
|  A|1/2/17 12:00|
|  B|        null|
|  C|1/4/17 15:00|
+---+------------+
Run Code Online (Sandbox Code Playgroud)

我已经在下面尝试了以下代码,但是datetime列中的返回值为null,目前我不了解其原因。

df.select(df['datetime'].cast(DateType())).show()
Run Code Online (Sandbox Code Playgroud)

而且我也尝试了以下代码:

df = df.withColumn('datetime2', from_unixtime(unix_timestamp(df['datetime']), 'dd/MM/yy HH:mm'))
Run Code Online (Sandbox Code Playgroud)

但是,它们都产生以下数据帧:

+---+------------+---------+
|agg|    datetime|datetime2|
+---+------------+---------+
|  A|1/2/17 12:00|     null|
|  B|       null |     null|
|  C|1/4/17 12:00|     null|
Run Code Online (Sandbox Code Playgroud)

我已经阅读并尝试了本文中指定的解决方案,但无济于事:PySpark数据帧将异常的字符串格式转换为时间戳

python apache-spark pyspark

-2
推荐指数
1
解决办法
7445
查看次数

使用PySpark删除和替换字符

我有一个数据框,并希望删除所有括号,并替换为两个连字符.

之前:

+------------+
|  dob_concat|
+------------+
|[1983][6][3]|
+------------+
Run Code Online (Sandbox Code Playgroud)

后:

+------------+
| dob_concat |
+------------+
| 1983-6-3   |
+------------+
Run Code Online (Sandbox Code Playgroud)

apache-spark-sql pyspark databricks

-2
推荐指数
1
解决办法
3363
查看次数

如何在数据框中投射一列?

我正在从 hbase 获取数据并将其转换为数据帧。现在,我在数据框中有一列是string数据类型。但我需要将其数据类型转换为Int.

尝试了下面的代码,但它给我一个错误

df.withColumn("order", 'order.cast(int)')
Run Code Online (Sandbox Code Playgroud)

我面临的错误如下

error:col should be column
Run Code Online (Sandbox Code Playgroud)

我在这里给出了正确的列名,我需要在 pyspark 中更改上述代码的语法吗?

dataframe apache-spark apache-spark-sql pyspark

-2
推荐指数
1
解决办法
3万
查看次数

删除前导零 pyspark?

我想删除pyspark中一列的前导零?

在此处输入图片说明

pyspark

-2
推荐指数
1
解决办法
7315
查看次数

减少 pyspark 中的 parquet 文件数量

已使用 databricks 中的 pyspark 在 Azure Blob 存储中创建了按日期分区的 Parquet 文件,但在一个日期文件夹中收到了如此多的文件,例如 500 个文件。我需要使用 PySpark 减少文件数量,例如一个日期文件夹中的 10 或 15 个文件。

df.write.format("parquet").mode("overwrite").partitionBy("Date").save(
    "/mnt/mydata.parquet"
)
Run Code Online (Sandbox Code Playgroud)

我尝试过coalesce

df.write.format("parquet").mode("overwrite").partitionBy("Date").coalesce(15).save(
    "/mnt/mydata.parquet"
)
Run Code Online (Sandbox Code Playgroud)

但会抛出错误:

AttributeError:“DataFrameWriter”对象没有属性“coalesce”

请帮忙。

apache-spark pyspark azure-blob-storage databricks

-2
推荐指数
1
解决办法
1004
查看次数

将字符串转换为单独的行,然后转换为 Pyspark 数据框

我有一个这样的字符串,每一行都用 \n 分隔。

我尝试了多种方法,但找不到任何合适的方法来做到这一点。

列名称 \n 第一行 \n 第二行 例如

"Name,ID,Number\n abc,1,123 \n xyz,2,456"

I want to convert it into pyspark dataframe like this

Name     ID   Number
abc      1      123
xyz      2      456

Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

-2
推荐指数
1
解决办法
293
查看次数

在Apache Spark中按多个字段排序

我有一个火花的RDD.RDD的每个元素都是一个列表.而且,所有元素都是相似模式的列表,所以它有点像表.我需要按特定优先级顺序按某些列排序RDD.

我怎样才能做到这一点?

PS:这是我试过的.

我尝试按优先级最高的字段排序,然后按其分组,然后按优先级次高的字段对每个结果进行排序.我递归地做了这个,并加入了结果.但是,使用RDD.groupBy这么多次使它非常慢.

python sorting apache-spark pyspark

-3
推荐指数
1
解决办法
2万
查看次数

在 pyspark 数据框中将字符串转换为十进制 (18, 2)

将字符串转换为十进制 (18,2)

from pyspark.sql.types import *

DF1 = DF.withColumn("New_col", DF["New_col"].cast(DecimalType(12,2)))

display(DF1)
Run Code Online (Sandbox Code Playgroud)

我看到的预期和实际 O/P

需要帮助将字符串转换为十进制以将 DF 加载到数据库中。

python dataframe pandas apache-spark pyspark

-4
推荐指数
1
解决办法
4万
查看次数