标签: pyspark-sql

将数据导入Spark时如何设置分区/节点数

问题:我想使用以下方法将数据从S3导入Spark EMR:

data = sqlContext.read.json("s3n://.....")
Run Code Online (Sandbox Code Playgroud)

有没有办法可以设置Spark用来加载处理数据的节点数量?这是我处理数据的示例:

data.registerTempTable("table")
SqlData = sqlContext.sql("SELECT * FROM table")
Run Code Online (Sandbox Code Playgroud)

上下文:数据不是太大,需要很长时间才能加载到Spark中,也需要查询.我认为Spark将数据划分为太多节点.我希望能够手动设置.我知道在处理RDD时sc.parallelize我可以将分区数作为输入传递.此外,我已经看到了repartition(),但我不确定它是否可以解决我的问题.在我的例子中,变量data是一个DataFrame.

让我更准确地定义分区.定义一个:通常被称为"分区键",其中一列中选择和索引,以加快查询(这不是我想要的).定义二:(这是我关注的地方)假设你有一个数据集,Spark决定它将它分布在许多节点上,以便它可以并行地对数据进行操作.如果数据量太小,这可能会进一步减慢进程.我该如何设置该值

sql database-partitioning apache-spark pyspark-sql

14
推荐指数
2
解决办法
2万
查看次数

文本列上的Pyspark DataFrame UDF

我正在尝试在PySpark DataFrame中对一些Unicode列进行一些NLP文本清理.我已经尝试过Spark 1.3,1.5和1.6,似乎无法让事情在我的生活中发挥作用.我也尝试过使用Python 2.7和Python 3.4.

我已经创建了一个非常简单的udf,如下所示,它应该为新列中的每个记录返回一个字符串.其他函数将操作文本,然后将更改的文本返回到新列中.

import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf

def dummy_function(data_str):
    cleaned_str = 'dummyData' 
    return cleaned_str

dummy_function_udf = udf(dummy_function, StringType())
Run Code Online (Sandbox Code Playgroud)

一些样本数据可以从这里解压缩.

这是我用来导入数据然后应用udf的代码.

# Load a text file and convert each line to a Row.
lines = sc.textFile("classified_tweets.txt")
parts = lines.map(lambda l: l.split("\t"))
training = parts.map(lambda p: (p[0], p[1]))

# Create dataframe
training_df = sqlContext.createDataFrame(training, ["tweet", "classification"])

training_df.show(5)
+--------------------+--------------+
|               tweet|classification|
+--------------------+--------------+
|rt …
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql pyspark pyspark-sql

13
推荐指数
2
解决办法
4万
查看次数

Apache spark处理case语句

我正在处理将SQL代码转换为PySpark代码并遇到一些SQL语句.我不知道如何处理pyspark中的案例陈述?我打算创建一个RDD然后使用rdd.map然后做一些逻辑检查.这是正确的方法吗?请帮忙!

基本上我需要遍历RDD或DF中的每一行,并根据我需要编辑其中一个列值的逻辑.

     case  
               when (e."a" Like 'a%' Or e."b" Like 'b%') 
                And e."aa"='BW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitA'

               when (e."a" Like 'b%' Or e."b" Like 'a%') 
                And e."aa"='AW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitB'

else

'CallitC'
Run Code Online (Sandbox Code Playgroud)

apache-spark rdd pyspark spark-dataframe pyspark-sql

13
推荐指数
2
解决办法
3万
查看次数

Pyspark:使用字符串格式的正则表达式过滤数据帧?

我已经阅读了几篇关于使用"like"运算符来过滤火花数据帧的帖子,条件是包含一个字符串/表达式,但是想知道以下是否是在所需条件下使用%s的"最佳实践"如下:

input_path = <s3_location_str>
my_expr = "Arizona.*hot"  # a regex expression
dx = sqlContext.read.parquet(input_path)  # "keyword" is a field in dx

# is the following correct?
substr = "'%%%s%%'" %my_keyword  # escape % via %% to get "%"
dk = dx.filter("keyword like %s" %substr)

# dk should contain rows with keyword values such as "Arizona is hot."
Run Code Online (Sandbox Code Playgroud)

注意

我正在尝试获取包含表达式my_keyword的dx中的所有行.否则,对于完全匹配,我们不需要周围百分号'%'.

regex apache-spark-sql pyspark spark-dataframe pyspark-sql

13
推荐指数
3
解决办法
2万
查看次数

如何从Amazon SQS加载流数据?

我使用Spark 2.2.0.

如何使用pyspark将Amazon SQS流提供给spark结构化流?

这个问题试图通过创建自定义接收器来解决非结构化流和scala的问题.
pyspark中有类似的东西吗?

spark.readStream \
   .format("s3-sqs") \
   .option("fileFormat", "json") \
   .option("queueUrl", ...) \
   .schema(...) \
   .load()
Run Code Online (Sandbox Code Playgroud)

根据Databricks上面的接收器可以用于S3-SQS文件源.但是,对于只有SQS,如何才能采用一种方法.

我尝试从AWS-SQS-Receive_Message理解接收消息.但是,如何直接将流发送到火花流还不清楚.

amazon-sqs apache-spark pyspark-sql spark-structured-streaming

13
推荐指数
1
解决办法
2719
查看次数

在通过JDBC从pyspark数据帧插入外部数据库表时,打开DUPLICATE KEY UPDATE

好吧,我正在使用PySpark并且我有一个Spark数据帧,我使用它将数据插入到mysql表中.

url = "jdbc:mysql://hostname/myDB?user=xyz&password=pwd"

df.write.jdbc(url=url, table="myTable", mode="append")

我想通过列值和特定数字的总和更新列值(不在主键中).

我尝试过不同的模式(追加,覆盖)DataFrameWriter.jdbc()函数.

我的问题是我们如何ON DUPLICATE KEY UPDATE在mysql中更新列值,同时将pyspark数据帧数据插入表中.

apache-spark apache-spark-sql pyspark spark-dataframe pyspark-sql

12
推荐指数
1
解决办法
1412
查看次数

如何在Spark(Python)中订购我的Row对象的字段

我在Spark中创建Row对象.我不希望我的字段按字母顺序排序.但是,如果我执行以下操作,则按字母顺序排序.

row = Row(foo=1, bar=2)
Run Code Online (Sandbox Code Playgroud)

然后它创建一个如下所示的对象:

Row(bar=2, foo=1)
Run Code Online (Sandbox Code Playgroud)

当我然后在这个对象上创建一个数据帧时,列顺序将是第一个,第二个是foo,当我更喜欢用它来反过来时.

我知道我可以使用"_1"和"_2"(分别用于"foo"和"bar"),然后分配一个模式(带有适当的"foo"和"bar"名称).但是有什么方法可以阻止Row对象对它们进行排序吗?

python apache-spark apache-spark-sql pyspark pyspark-sql

12
推荐指数
1
解决办法
7314
查看次数

“正在确定 DBIO 文件片段的位置...”是什么意思,我该如何加快速度?

在 Databricks 中运行简单的 SQL 命令时,有时会收到以下消息:

确定 DBIO 文件片段的位置。此操作可能需要一些时间。

这是什么意思,我如何防止它每次都必须执行这种看似昂贵的操作?即使所有基础表都是增量表,也会发生这种情况。

pyspark-sql databricks

12
推荐指数
1
解决办法
3392
查看次数

缓存有序Spark DataFrame会创建不需要的作业

我想将RDD转换为DataFrame并想要缓存RDD的结果:

from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as fn

schema = StructType([StructField('t', DoubleType()), StructField('value', DoubleType())])

df = spark.createDataFrame(
    sc.parallelize([Row(t=float(i/10), value=float(i*i)) for i in range(1000)], 4), #.cache(),
    schema=schema,
    verifySchema=False
).orderBy("t") #.cache()
Run Code Online (Sandbox Code Playgroud)
  • 如果您不使用某个cache功能,则不会生成任何作业.
  • 如果cache仅在为orderBy1生成1个作业后使用cache:在此输入图像描述
  • 如果cache仅在parallelize生成无作业后使用.

为什么cache在这种情况下生成一份工作?如何避免生成cache(缓存DataFrame而没有RDD)?

编辑:我对问题进行了更多调查,发现orderBy("t")没有生成任何工作.为什么?

python apache-spark apache-spark-sql pyspark pyspark-sql

11
推荐指数
1
解决办法
789
查看次数

将PySpark dataframe列从列表转换为字符串

我有这个PySpark数据帧

+-----------+--------------------+
|uuid       |   test_123         |    
+-----------+--------------------+
|      1    |[test, test2, test3]|
|      2    |[test4, test, test6]|
|      3    |[test6, test9, t55o]|
Run Code Online (Sandbox Code Playgroud)

我想将列转换为test_123这样:

+-----------+--------------------+
|uuid       |   test_123         |    
+-----------+--------------------+
|      1    |"test,test2,test3"  |
|      2    |"test4,test,test6"  |
|      3    |"test6,test9,t55o"  |
Run Code Online (Sandbox Code Playgroud)

所以从列表到字符串.

我怎么能用PySpark做到这一点?

python apache-spark apache-spark-sql pyspark pyspark-sql

11
推荐指数
2
解决办法
9591
查看次数