问题:我想使用以下方法将数据从S3导入Spark EMR:
data = sqlContext.read.json("s3n://.....")
Run Code Online (Sandbox Code Playgroud)
有没有办法可以设置Spark用来加载和处理数据的节点数量?这是我处理数据的示例:
data.registerTempTable("table")
SqlData = sqlContext.sql("SELECT * FROM table")
Run Code Online (Sandbox Code Playgroud)
上下文:数据不是太大,需要很长时间才能加载到Spark中,也需要查询.我认为Spark将数据划分为太多节点.我希望能够手动设置.我知道在处理RDD时sc.parallelize我可以将分区数作为输入传递.此外,我已经看到了repartition(),但我不确定它是否可以解决我的问题.在我的例子中,变量data是一个DataFrame.
让我更准确地定义分区.定义一个:通常被称为"分区键",其中一列中选择和索引,以加快查询(这不是我想要的).定义二:(这是我关注的地方)假设你有一个数据集,Spark决定它将它分布在许多节点上,以便它可以并行地对数据进行操作.如果数据量太小,这可能会进一步减慢进程.我该如何设置该值
我正在尝试在PySpark DataFrame中对一些Unicode列进行一些NLP文本清理.我已经尝试过Spark 1.3,1.5和1.6,似乎无法让事情在我的生活中发挥作用.我也尝试过使用Python 2.7和Python 3.4.
我已经创建了一个非常简单的udf,如下所示,它应该为新列中的每个记录返回一个字符串.其他函数将操作文本,然后将更改的文本返回到新列中.
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
def dummy_function(data_str):
cleaned_str = 'dummyData'
return cleaned_str
dummy_function_udf = udf(dummy_function, StringType())
Run Code Online (Sandbox Code Playgroud)
这是我用来导入数据然后应用udf的代码.
# Load a text file and convert each line to a Row.
lines = sc.textFile("classified_tweets.txt")
parts = lines.map(lambda l: l.split("\t"))
training = parts.map(lambda p: (p[0], p[1]))
# Create dataframe
training_df = sqlContext.createDataFrame(training, ["tweet", "classification"])
training_df.show(5)
+--------------------+--------------+
| tweet|classification|
+--------------------+--------------+
|rt …Run Code Online (Sandbox Code Playgroud) 我正在处理将SQL代码转换为PySpark代码并遇到一些SQL语句.我不知道如何处理pyspark中的案例陈述?我打算创建一个RDD然后使用rdd.map然后做一些逻辑检查.这是正确的方法吗?请帮忙!
基本上我需要遍历RDD或DF中的每一行,并根据我需要编辑其中一个列值的逻辑.
case
when (e."a" Like 'a%' Or e."b" Like 'b%')
And e."aa"='BW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitA'
when (e."a" Like 'b%' Or e."b" Like 'a%')
And e."aa"='AW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitB'
else
'CallitC'
Run Code Online (Sandbox Code Playgroud) 我已经阅读了几篇关于使用"like"运算符来过滤火花数据帧的帖子,条件是包含一个字符串/表达式,但是想知道以下是否是在所需条件下使用%s的"最佳实践"如下:
input_path = <s3_location_str>
my_expr = "Arizona.*hot" # a regex expression
dx = sqlContext.read.parquet(input_path) # "keyword" is a field in dx
# is the following correct?
substr = "'%%%s%%'" %my_keyword # escape % via %% to get "%"
dk = dx.filter("keyword like %s" %substr)
# dk should contain rows with keyword values such as "Arizona is hot."
Run Code Online (Sandbox Code Playgroud)
注意
我正在尝试获取包含表达式my_keyword的dx中的所有行.否则,对于完全匹配,我们不需要周围百分号'%'.
我使用Spark 2.2.0.
如何使用pyspark将Amazon SQS流提供给spark结构化流?
这个问题试图通过创建自定义接收器来解决非结构化流和scala的问题.
pyspark中有类似的东西吗?
spark.readStream \
.format("s3-sqs") \
.option("fileFormat", "json") \
.option("queueUrl", ...) \
.schema(...) \
.load()
Run Code Online (Sandbox Code Playgroud)
根据Databricks上面的接收器可以用于S3-SQS文件源.但是,对于只有SQS,如何才能采用一种方法.
我尝试从AWS-SQS-Receive_Message理解接收消息.但是,如何直接将流发送到火花流还不清楚.
amazon-sqs apache-spark pyspark-sql spark-structured-streaming
好吧,我正在使用PySpark并且我有一个Spark数据帧,我使用它将数据插入到mysql表中.
url = "jdbc:mysql://hostname/myDB?user=xyz&password=pwd"
df.write.jdbc(url=url, table="myTable", mode="append")
我想通过列值和特定数字的总和更新列值(不在主键中).
我尝试过不同的模式(追加,覆盖)DataFrameWriter.jdbc()函数.
我的问题是我们如何ON DUPLICATE KEY UPDATE在mysql中更新列值,同时将pyspark数据帧数据插入表中.
apache-spark apache-spark-sql pyspark spark-dataframe pyspark-sql
我在Spark中创建Row对象.我不希望我的字段按字母顺序排序.但是,如果我执行以下操作,则按字母顺序排序.
row = Row(foo=1, bar=2)
Run Code Online (Sandbox Code Playgroud)
然后它创建一个如下所示的对象:
Row(bar=2, foo=1)
Run Code Online (Sandbox Code Playgroud)
当我然后在这个对象上创建一个数据帧时,列顺序将是第一个,第二个是foo,当我更喜欢用它来反过来时.
我知道我可以使用"_1"和"_2"(分别用于"foo"和"bar"),然后分配一个模式(带有适当的"foo"和"bar"名称).但是有什么方法可以阻止Row对象对它们进行排序吗?
在 Databricks 中运行简单的 SQL 命令时,有时会收到以下消息:
确定 DBIO 文件片段的位置。此操作可能需要一些时间。
这是什么意思,我如何防止它每次都必须执行这种看似昂贵的操作?即使所有基础表都是增量表,也会发生这种情况。
我想将RDD转换为DataFrame并想要缓存RDD的结果:
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as fn
schema = StructType([StructField('t', DoubleType()), StructField('value', DoubleType())])
df = spark.createDataFrame(
sc.parallelize([Row(t=float(i/10), value=float(i*i)) for i in range(1000)], 4), #.cache(),
schema=schema,
verifySchema=False
).orderBy("t") #.cache()
Run Code Online (Sandbox Code Playgroud)
为什么cache在这种情况下生成一份工作?如何避免生成cache(缓存DataFrame而没有RDD)?
编辑:我对问题进行了更多调查,发现orderBy("t")没有生成任何工作.为什么?
我有这个PySpark数据帧
+-----------+--------------------+
|uuid | test_123 |
+-----------+--------------------+
| 1 |[test, test2, test3]|
| 2 |[test4, test, test6]|
| 3 |[test6, test9, t55o]|
Run Code Online (Sandbox Code Playgroud)
我想将列转换为test_123这样:
+-----------+--------------------+
|uuid | test_123 |
+-----------+--------------------+
| 1 |"test,test2,test3" |
| 2 |"test4,test,test6" |
| 3 |"test6,test9,t55o" |
Run Code Online (Sandbox Code Playgroud)
所以从列表到字符串.
我怎么能用PySpark做到这一点?
pyspark-sql ×10
apache-spark ×8
pyspark ×7
python ×4
amazon-sqs ×1
databricks ×1
rdd ×1
regex ×1
sql ×1