标签: pyspark

PySpark SQL中的日期时间范围过滤器

按时间戳字段过滤数据帧的正确方法是什么?

我尝试了不同的日期格式和过滤形式,没有任何帮助:pyspark返回0个对象,或者抛出一个错误,它不理解日期时间格式

这是我到目前为止所得到的:

from pyspark import SparkContext
from pyspark.sql import SQLContext

from django.utils import timezone
from django.conf import settings

from myapp.models import Collection

sc = SparkContext("local", "DjangoApp")
sqlc = SQLContext(sc)
url = "jdbc:postgresql://%(HOST)s/%(NAME)s?user=%(USER)s&password=%(PASSWORD)s" % settings.DATABASES['default']
sf = sqlc.load(source="jdbc", url=url, dbtable='myapp_collection')
Run Code Online (Sandbox Code Playgroud)

时间戳字段的范围:

system_tz = timezone.pytz.timezone(settings.TIME_ZONE)
date_from = datetime.datetime(2014, 4, 16, 18, 30, 0, 0, tzinfo=system_tz)
date_to = datetime.datetime(2015, 6, 15, 18, 11, 59, 999999, tzinfo=system_tz)
Run Code Online (Sandbox Code Playgroud)

尝试1

date_filter = "my_col >= '%s' AND my_col <= '%s'" % (
    date_from.isoformat(), date_to.isoformat()
)
sf …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

22
推荐指数
2
解决办法
3万
查看次数

Pyspark替换Spark数据帧列中的字符串

我想通过替换子字符串在Spark Dataframe列上执行一些基本的词干.最快的方法是什么?

在我目前的用例中,我有一个我想要规范化的地址列表.例如,这个数据帧:

id     address
1       2 foo lane
2       10 bar lane
3       24 pants ln
Run Code Online (Sandbox Code Playgroud)

会成为

id     address
1       2 foo ln
2       10 bar ln
3       24 pants ln
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

22
推荐指数
2
解决办法
5万
查看次数

加载Parquet文件时无法推断架构

response = "mi_or_chd_5"

outcome = sqlc.sql("""select eid,{response} as response
from outcomes
where {response} IS NOT NULL""".format(response=response))
outcome.write.parquet(response, mode="overwrite") # Success
print outcome.schema
StructType(List(StructField(eid,IntegerType,true),StructField(response,ShortType,true)))
Run Code Online (Sandbox Code Playgroud)

但是之后:

outcome2 = sqlc.read.parquet(response)  # fail
Run Code Online (Sandbox Code Playgroud)

失败了:

AnalysisException: u'Unable to infer schema for Parquet. It must be specified manually.;'
Run Code Online (Sandbox Code Playgroud)

/usr/local/lib/python2.7/dist-packages/pyspark-2.1.0+hadoop2.7-py2.7.egg/pyspark/sql/utils.pyc in deco(*a, **kw)
Run Code Online (Sandbox Code Playgroud)

镶木地板的文档说格式是自我描述的,并且在保存镶木地板文件时可以使用完整的模式.是什么赋予了?

使用Spark 2.1.1.在2.2.0中也失败了.

发现此错误报告,但已在2.0.1,2.1.0中修复.

更新:当与master ="local"连接时,此工作,当连接到master ="mysparkcluster"时失败.

apache-spark parquet pyspark

22
推荐指数
5
解决办法
4万
查看次数

AWS Glue to Redshift:是否可以替换,更新或删除数据?

以下是关于我如何设置的一些要点:

  • 我有上传到S3的CSV文件和一个Glue爬虫设置来创建表和模式.
  • 我有一个Glue作业设置,它使用JDBC连接将数据从Glue表写入我们的Amazon Redshift数据库.Job还负责映射列并创建redshift表.

通过重新运行作业,我在redshift中获得重复的行(正如预期的那样).但是,有没有办法在插入新数据之前替换或删除行,使用密钥或胶水中的分区设置?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import SelectFields

from pyspark.sql.functions import lit

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

columnMapping = [
    ("id", "int", "id", "int"),
    ("name", "string", "name", "string"),
]

datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db01", table_name = …
Run Code Online (Sandbox Code Playgroud)

jdbc amazon-web-services pyspark aws-glue

22
推荐指数
4
解决办法
1万
查看次数

pyspark:有效地使partitionBy写入与原始表相同数量的总分区

我有一个与pyspark repartitionBy()函数相关的问题,我最初在这个问题的评论中发布了这个问题.我被要求将其作为一个单独的问题发布,所以这里是:

据我所知,df.partitionBy(COL)将每个值写入所有行COL到他们自己的文件夹,并且每个文件夹将(假设行以前通过其他键分布在所有分区上)具有与之前在文件中大致相同的文件数.整张桌子.我发现这种行为很烦人.如果我有一个包含500个分区的大表,并且我partitionBy(COL)在一些属性列上使用,我现在有100个文件夹,每个文件夹包含500个(现在非常小)文件.

我想要的是partitionBy(COL)行为,但文件大小和文件数量大致相同.

作为演示,上一个问题共享一个玩具示例,其中有一个包含10个分区的表,partitionBy(dayOfWeek)并且现在有70个文件,因为每个文件夹中有10个.我想要~10个文件,每天一个,可能需要2或3天,有更多的数据.

这可以轻松完成吗?喜欢的东西,df.write().repartition(COL).partitionBy(COL)好像它可能工作,但我担心,(在一个非常大的表,该表将被划分为多个文件夹的情况下),其首先将它结合到一些小的分区数之前做的partitionBy(COL)似乎是一个坏主意.

任何建议都非常感谢!

apache-spark pyspark

22
推荐指数
1
解决办法
5787
查看次数

Spark:如何使用Scala或Java用户定义函数映射Python?

比方说,我的团队选择Python作为Spark开发的参考语言.但是后来由于性能原因,我们希望开发特定的Scala或Java特定的库,以便使用我们的Python代码(类似于使用Scala或Java骨架的Python存根)进行映射.

难道您不认为是否可以将新的自定义Python方法与一些Scala或Java用户定义函数联系起来?

python java scala apache-spark pyspark

21
推荐指数
1
解决办法
1万
查看次数

你需要在Pyspark SQL中使用lit()?

我试图弄清楚你需要在哪里使用一个lit值,这个值literal column在文档中被定义为.

以此为例udf,它返回SQL列数组的索引:

def find_index(column, index):
    return column[index]
Run Code Online (Sandbox Code Playgroud)

如果我要将一个整数传入此中,我会收到错误.我需要将lit(n)值传递给udf以获取数组的正确索引.

有没有我可以更好地学习的时候用硬性规定的地方lit和可能col呢?

python apache-spark apache-spark-sql pyspark

21
推荐指数
2
解决办法
4万
查看次数

在YARN上运行的Spark如何解释Python内存使用情况?

阅读完文档之后,我不明白在YARN上运行的Spark是如何考虑Python内存消耗的.

是否计入spark.executor.memory,spark.executor.memoryOverhead还是在哪里?

特别是我有一个PySpark应用程序spark.executor.memory=25G,spark.executor.cores=4我遇到YARN因超出内存限制而被杀死的频繁容器.map在RDD上运行时出错.它运行在相当大量的复杂Python对象上,因此预计会占用一些非常重要的内存但不会占用25GB.我应该如何配置不同的内存变量以用于繁重的Python代码?

python hadoop hadoop-yarn apache-spark pyspark

21
推荐指数
1
解决办法
2536
查看次数

使用isin排除过滤pyspark数据帧

我试图获取数据框中的所有行,其中列值不在列表中(因此通过排除进行过滤).

举个例子:

df = sqlContext.createDataFrame([('1','a'),('2','b'),('3','b'),('4','c'),('5','d')]
,schema=('id','bar'))
Run Code Online (Sandbox Code Playgroud)

我得到了数据框:

+---+---+
| id|bar|
+---+---+
|  1|  a|
|  2|  b|
|  3|  b|
|  4|  c|
|  5|  d|
+---+---+
Run Code Online (Sandbox Code Playgroud)

我只想排除bar所在的行('a'或'b').

使用SQL表达式字符串,它将是:

df.filter('bar not in ("a","b")').show()
Run Code Online (Sandbox Code Playgroud)

有没有办法在不使用SQL表达式的字符串或一次排除一个项目的情况下执行此操作?

编辑:

我可能有一个列表,['a','b'],我想要使用的排除值.

python apache-spark pyspark pyspark-sql

21
推荐指数
4
解决办法
7万
查看次数

退出代码和退出状态是否意味着什么火花?

在纱线上运行火花时,我一直看到退出代码和退出状态:

以下是一些:

  • CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM

  • ...failed 2 times due to AM Container for application_1431523563856_0001_000002 exited with exitCode: 10...

  • ...Exit status: 143. Diagnostics: Container killed on request

  • ...Container exited with a non-zero exit code 52:...

  • ...Container killed on request. Exit code is 137...

我从来没有发现任何这些消息是有用的......有没有机会解释这些消息究竟出了什么问题?我搜索了高低不一的表格来解释错误,但没有.

我能够从上面解释的唯一一个是退出代码52,但那是因为我在这里查看了源代码.这是说这是一个OOM.

我是否应该停止尝试解释其余的退出代码并退出状态?或者我错过了一些明显的方式,这些数字实际意味着什么?

即使有人能告诉我之间的差异exit code,exit status以及SIGNAL这将是有益的.但我现在只是随机猜测,而且我周围的其他所有人都使用了火花.

最后,为什么一些退出代码小于零以及如何解释这些?

例如 Exit status: -100. Diagnostics: Container released on a *lost* node

hadoop hadoop-yarn apache-spark pyspark spark-dataframe

21
推荐指数
1
解决办法
1万
查看次数