按时间戳字段过滤数据帧的正确方法是什么?
我尝试了不同的日期格式和过滤形式,没有任何帮助:pyspark返回0个对象,或者抛出一个错误,它不理解日期时间格式
这是我到目前为止所得到的:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from django.utils import timezone
from django.conf import settings
from myapp.models import Collection
sc = SparkContext("local", "DjangoApp")
sqlc = SQLContext(sc)
url = "jdbc:postgresql://%(HOST)s/%(NAME)s?user=%(USER)s&password=%(PASSWORD)s" % settings.DATABASES['default']
sf = sqlc.load(source="jdbc", url=url, dbtable='myapp_collection')
Run Code Online (Sandbox Code Playgroud)
时间戳字段的范围:
system_tz = timezone.pytz.timezone(settings.TIME_ZONE)
date_from = datetime.datetime(2014, 4, 16, 18, 30, 0, 0, tzinfo=system_tz)
date_to = datetime.datetime(2015, 6, 15, 18, 11, 59, 999999, tzinfo=system_tz)
Run Code Online (Sandbox Code Playgroud)
尝试1
date_filter = "my_col >= '%s' AND my_col <= '%s'" % (
date_from.isoformat(), date_to.isoformat()
)
sf …Run Code Online (Sandbox Code Playgroud) 我想通过替换子字符串在Spark Dataframe列上执行一些基本的词干.最快的方法是什么?
在我目前的用例中,我有一个我想要规范化的地址列表.例如,这个数据帧:
id address
1 2 foo lane
2 10 bar lane
3 24 pants ln
Run Code Online (Sandbox Code Playgroud)
会成为
id address
1 2 foo ln
2 10 bar ln
3 24 pants ln
Run Code Online (Sandbox Code Playgroud) response = "mi_or_chd_5"
outcome = sqlc.sql("""select eid,{response} as response
from outcomes
where {response} IS NOT NULL""".format(response=response))
outcome.write.parquet(response, mode="overwrite") # Success
print outcome.schema
StructType(List(StructField(eid,IntegerType,true),StructField(response,ShortType,true)))
Run Code Online (Sandbox Code Playgroud)
但是之后:
outcome2 = sqlc.read.parquet(response) # fail
Run Code Online (Sandbox Code Playgroud)
失败了:
AnalysisException: u'Unable to infer schema for Parquet. It must be specified manually.;'
Run Code Online (Sandbox Code Playgroud)
在
/usr/local/lib/python2.7/dist-packages/pyspark-2.1.0+hadoop2.7-py2.7.egg/pyspark/sql/utils.pyc in deco(*a, **kw)
Run Code Online (Sandbox Code Playgroud)
镶木地板的文档说格式是自我描述的,并且在保存镶木地板文件时可以使用完整的模式.是什么赋予了?
使用Spark 2.1.1.在2.2.0中也失败了.
发现此错误报告,但已在2.0.1,2.1.0中修复.
更新:当与master ="local"连接时,此工作,当连接到master ="mysparkcluster"时失败.
以下是关于我如何设置的一些要点:
通过重新运行作业,我在redshift中获得重复的行(正如预期的那样).但是,有没有办法在插入新数据之前替换或删除行,使用密钥或胶水中的分区设置?
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import SelectFields
from pyspark.sql.functions import lit
## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
columnMapping = [
("id", "int", "id", "int"),
("name", "string", "name", "string"),
]
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db01", table_name = …Run Code Online (Sandbox Code Playgroud) 我有一个与pyspark repartitionBy()函数相关的问题,我最初在这个问题的评论中发布了这个问题.我被要求将其作为一个单独的问题发布,所以这里是:
据我所知,df.partitionBy(COL)将每个值写入所有行COL到他们自己的文件夹,并且每个文件夹将(假设行以前通过其他键分布在所有分区上)具有与之前在文件中大致相同的文件数.整张桌子.我发现这种行为很烦人.如果我有一个包含500个分区的大表,并且我partitionBy(COL)在一些属性列上使用,我现在有100个文件夹,每个文件夹包含500个(现在非常小)文件.
我想要的是partitionBy(COL)行为,但文件大小和文件数量大致相同.
作为演示,上一个问题共享一个玩具示例,其中有一个包含10个分区的表,partitionBy(dayOfWeek)并且现在有70个文件,因为每个文件夹中有10个.我想要~10个文件,每天一个,可能需要2或3天,有更多的数据.
这可以轻松完成吗?喜欢的东西,df.write().repartition(COL).partitionBy(COL)好像它可能工作,但我担心,(在一个非常大的表,该表将被划分为多个文件夹的情况下),其首先将它结合到一些小的分区数之前做的partitionBy(COL)似乎是一个坏主意.
任何建议都非常感谢!
比方说,我的团队选择Python作为Spark开发的参考语言.但是后来由于性能原因,我们希望开发特定的Scala或Java特定的库,以便使用我们的Python代码(类似于使用Scala或Java骨架的Python存根)进行映射.
难道您不认为是否可以将新的自定义Python方法与一些Scala或Java用户定义函数联系起来?
我试图弄清楚你需要在哪里使用一个lit值,这个值literal column在文档中被定义为.
以此为例udf,它返回SQL列数组的索引:
def find_index(column, index):
return column[index]
Run Code Online (Sandbox Code Playgroud)
如果我要将一个整数传入此中,我会收到错误.我需要将lit(n)值传递给udf以获取数组的正确索引.
有没有我可以更好地学习的时候用硬性规定的地方lit和可能col呢?
阅读完文档之后,我不明白在YARN上运行的Spark是如何考虑Python内存消耗的.
是否计入spark.executor.memory,spark.executor.memoryOverhead还是在哪里?
特别是我有一个PySpark应用程序spark.executor.memory=25G,spark.executor.cores=4我遇到YARN因超出内存限制而被杀死的频繁容器.map在RDD上运行时出错.它运行在相当大量的复杂Python对象上,因此预计会占用一些非常重要的内存但不会占用25GB.我应该如何配置不同的内存变量以用于繁重的Python代码?
我试图获取数据框中的所有行,其中列值不在列表中(因此通过排除进行过滤).
举个例子:
df = sqlContext.createDataFrame([('1','a'),('2','b'),('3','b'),('4','c'),('5','d')]
,schema=('id','bar'))
Run Code Online (Sandbox Code Playgroud)
我得到了数据框:
+---+---+
| id|bar|
+---+---+
| 1| a|
| 2| b|
| 3| b|
| 4| c|
| 5| d|
+---+---+
Run Code Online (Sandbox Code Playgroud)
我只想排除bar所在的行('a'或'b').
使用SQL表达式字符串,它将是:
df.filter('bar not in ("a","b")').show()
Run Code Online (Sandbox Code Playgroud)
有没有办法在不使用SQL表达式的字符串或一次排除一个项目的情况下执行此操作?
编辑:
我可能有一个列表,['a','b'],我想要使用的排除值.
在纱线上运行火花时,我一直看到退出代码和退出状态:
以下是一些:
CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
...failed 2 times due to AM Container for application_1431523563856_0001_000002 exited with exitCode: 10...
...Exit status: 143. Diagnostics: Container killed on request
...Container exited with a non-zero exit code 52:...
...Container killed on request. Exit code is 137...
我从来没有发现任何这些消息是有用的......有没有机会解释这些消息究竟出了什么问题?我搜索了高低不一的表格来解释错误,但没有.
我能够从上面解释的唯一一个是退出代码52,但那是因为我在这里查看了源代码.这是说这是一个OOM.
我是否应该停止尝试解释其余的退出代码并退出状态?或者我错过了一些明显的方式,这些数字实际意味着什么?
即使有人能告诉我之间的差异exit code,exit status以及SIGNAL这将是有益的.但我现在只是随机猜测,而且我周围的其他所有人都使用了火花.
最后,为什么一些退出代码小于零以及如何解释这些?
例如 Exit status: -100. Diagnostics: Container released on a *lost* node
pyspark ×10
apache-spark ×9
python ×6
hadoop ×2
hadoop-yarn ×2
aws-glue ×1
java ×1
jdbc ×1
parquet ×1
pyspark-sql ×1
scala ×1