小编mar*_*tin的帖子

从boto3中检索S3存储桶中的子文件夹名称

使用boto3,我可以访问我的AWS S3存储桶:

s3 = boto3.resource('s3')
bucket = s3.Bucket('my-bucket-name')
Run Code Online (Sandbox Code Playgroud)

现在,存储桶包含文件夹first-level,该文件夹本身包含多个以时间戳命名的子文件夹1456753904534.我需要知道我正在做的另一个工作的这些子文件夹的名称,我想知道我是否可以让boto3为我检索这些.

所以我尝试过:

objs = bucket.meta.client.list_objects(Bucket='my-bucket-name')
Run Code Online (Sandbox Code Playgroud)

它提供了一个字典,其中的"内容"键为我提供了所有第三级文件而不是第二级时间戳目录,实际上我得到一个包含所有内容的列表

{u'ETag':'"etag"',u'Key':first-level/1456753904534/part-00014',u'LastModified':datetime.datetime(2016,2,29,13,52,24,tzinfo = tzutc()),
u'Owner':{u'DisplayName':'owner',u'ID':'id'},
u'Size':size,u'StorageClass':'storageclass'}

您可以看到在这种情况下part-00014检索特定文件,而我想单独获取目录的名称.原则上我可以从所有路径中删除目录名称,但是在第三级检索所有内容以获得第二级别是丑陋且昂贵的!

我也试过这里报道的事情:

for o in bucket.objects.filter(Delimiter='/'):
    print(o.key)
Run Code Online (Sandbox Code Playgroud)

但我没有得到所需级别的文件夹.

有办法解决这个问题吗?

python amazon-s3 amazon-web-services boto3

52
推荐指数
9
解决办法
8万
查看次数

使用类似SQL的IN子句过滤Pyspark DataFrame

我想用类似SQL的IN子句过滤Pyspark DataFrame ,如

sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.sql('SELECT * from my_df WHERE field1 IN a')
Run Code Online (Sandbox Code Playgroud)

a元组在哪里(1, 2, 3).我收到此错误:

java.lang.RuntimeException:[1.67]失败:``('''',但是找到了标识符

这基本上是说它期待类似'(1,2,3)'而不是a.问题是我不能手动写入a中的值,因为它是从另一个作业中提取的.

在这种情况下我该如何过滤?

python sql dataframe apache-spark pyspark

37
推荐指数
4
解决办法
5万
查看次数

使用boto3从S3存储桶中读取文件内容

我这样读了我的S3桶中的文件名

objs = boto3.client.list_objects(Bucket='my_bucket')
    while 'Contents' in objs.keys():
        objs_contents = objs['Contents']
        for i in range(len(objs_contents)):
            filename = objs_contents[i]['Key']
Run Code Online (Sandbox Code Playgroud)

现在,我需要获取文件的实际内容,类似于open(filename).readlines().什么是最好的方法?

python amazon-s3 amazon-web-services boto3

35
推荐指数
5
解决办法
8万
查看次数

RDD中的分区数和Spark中的性能

在Pyspark中,我可以从列表中创建RDD并确定要有多少分区:

sc = SparkContext()
sc.parallelize(xrange(0, 10), 4)
Run Code Online (Sandbox Code Playgroud)

我决定对RDD进行分区的分区数量如何影响性能?这取决于我的机器核心数量如何?

performance apache-spark rdd pyspark

31
推荐指数
2
解决办法
3万
查看次数

将JSON文件读入Spark时出现_corrupt_record错误

我有这个JSON文件

{
    "a": 1, 
    "b": 2
}
Run Code Online (Sandbox Code Playgroud)

这是用Python json.dump方法获得的.现在,我想使用pyspark将此文件读入Spark中的DataFrame.以下文档,我正在这样做

sc = SparkContext()

sqlc = SQLContext(sc)

df = sqlc.read.json('my_file.json')

print df.show()

print语句虽然吐出了这个:

+---------------+
|_corrupt_record|
+---------------+
|              {|
|       "a": 1, |
|         "b": 2|
|              }|
+---------------+
Run Code Online (Sandbox Code Playgroud)

任何人都知道发生了什么以及为什么它没有正确解释文件?

python json dataframe pyspark

29
推荐指数
3
解决办法
2万
查看次数

如何在PySpark中只打印某一列DataFrame?

可以使用操作collecttake仅打印DataFrame的给定列吗?

这个

df.col.collect()
Run Code Online (Sandbox Code Playgroud)

给出错误

TypeError:'Column'对象不可调用

还有这个:

df[df.col].take(2)
Run Code Online (Sandbox Code Playgroud)

pyspark.sql.utils.AnalysisException:u"类型为string的过滤器表达式'col'不是布尔值.;"

python dataframe apache-spark pyspark

10
推荐指数
1
解决办法
4万
查看次数

PySpark:在日期为字符串的范围内按日期字段过滤DataFrame

我的数据帧包含一个日期字段,它以字符串格式显示,例如

'2015-07-02T11:22:21.050Z'
Run Code Online (Sandbox Code Playgroud)

我需要在日期过滤DataFrame以仅获取上周的记录.所以,我正在尝试使用strptime将字符串日期转换为datetime对象的地图方法:

def map_to_datetime(row):
     format_string = '%Y-%m-%dT%H:%M:%S.%fZ'
     row.date = datetime.strptime(row.date, format_string)

df = df.map(map_to_datetime)
Run Code Online (Sandbox Code Playgroud)

然后我会应用过滤器作为

df.filter(lambda row:
    row.date >= (datetime.today() - timedelta(days=7)))
Run Code Online (Sandbox Code Playgroud)

我设法让映射工作,但过滤器失败了

TypeError:condition应该是string或Column

有没有办法以一种有效的方式使用过滤,或者我应该改变方法以及如何?

python datetime date dataframe pyspark

10
推荐指数
2
解决办法
2万
查看次数

从Pyspark DataFrame中的选定行获取特定字段

我有一个通过pyspark从JSON文件构建的Spark DataFrame

sc = SparkContext()
sqlc = SQLContext(sc)

users_df = sqlc.read.json('users.json')
Run Code Online (Sandbox Code Playgroud)

现在,我想访问selected_user数据,这是它的_id字段.我可以

print users_df[users_df._id == chosen_user].show()
Run Code Online (Sandbox Code Playgroud)

这给了我完整的用户行.但是假设我只想在Row中有一个特定字段,比如用户性别,我该如何获得它?

python dataframe apache-spark apache-spark-sql pyspark

9
推荐指数
1
解决办法
3万
查看次数

PySpark按条件计算值

我有一个DataFrame,这里有一个片段:

[['u1', 1], ['u2', 0]]
Run Code Online (Sandbox Code Playgroud)

基本上是一个名为的字符串字段f,对于第二个元素(is_fav)为1或0 .

我需要做的是分组第一个字段并计算1和0的出现次数.我希望做类似的事情

num_fav = count((col("is_fav") == 1)).alias("num_fav")

num_nonfav = count((col("is_fav") == 0)).alias("num_nonfav")

df.groupBy("f").agg(num_fav, num_nonfav)
Run Code Online (Sandbox Code Playgroud)

它不能正常工作,我在两种情况下都得到相同的结果,这相当于组中项目的计数,因此过滤器(无论是1还是0)似乎被忽略.这取决于count工作原理吗?

python apache-spark pyspark

8
推荐指数
1
解决办法
2万
查看次数

Boto3:仅从S3资源中获取所选对象

我可以通过获取和读取AWS S3存储桶中的所有对象

s3 = boto3.resource('s3')
    bucket = s3.Bucket('my-bucket')
    all_objs = bucket.objects.all()
    for obj in all_objs:
        pass
        #filter only the objects I need
Run Code Online (Sandbox Code Playgroud)

然后

obj.key
Run Code Online (Sandbox Code Playgroud)

会给我一条路径.

有没有办法事先过滤那些尊重某个起始路径(存储桶中的目录)的文件,这样我就可以避免遍历所有对象并在以后过滤?

python amazon-s3 amazon-web-services boto3

8
推荐指数
2
解决办法
1万
查看次数