使用boto3,我可以访问我的AWS S3存储桶:
s3 = boto3.resource('s3')
bucket = s3.Bucket('my-bucket-name')
Run Code Online (Sandbox Code Playgroud)
现在,存储桶包含文件夹first-level,该文件夹本身包含多个以时间戳命名的子文件夹1456753904534.我需要知道我正在做的另一个工作的这些子文件夹的名称,我想知道我是否可以让boto3为我检索这些.
所以我尝试过:
objs = bucket.meta.client.list_objects(Bucket='my-bucket-name')
Run Code Online (Sandbox Code Playgroud)
它提供了一个字典,其中的"内容"键为我提供了所有第三级文件而不是第二级时间戳目录,实际上我得到一个包含所有内容的列表
{u'ETag':'"etag"',u'Key':first-level/1456753904534/part-00014',u'LastModified':datetime.datetime(2016,2,29,13,52,24,tzinfo = tzutc()),
u'Owner':{u'DisplayName':'owner',u'ID':'id'},
u'Size':size,u'StorageClass':'storageclass'}
您可以看到在这种情况下part-00014检索特定文件,而我想单独获取目录的名称.原则上我可以从所有路径中删除目录名称,但是在第三级检索所有内容以获得第二级别是丑陋且昂贵的!
for o in bucket.objects.filter(Delimiter='/'):
print(o.key)
Run Code Online (Sandbox Code Playgroud)
但我没有得到所需级别的文件夹.
有办法解决这个问题吗?
我想用类似SQL的IN子句过滤Pyspark DataFrame ,如
sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.sql('SELECT * from my_df WHERE field1 IN a')
Run Code Online (Sandbox Code Playgroud)
a元组在哪里(1, 2, 3).我收到此错误:
java.lang.RuntimeException:[1.67]失败:``('''',但是找到了标识符
这基本上是说它期待类似'(1,2,3)'而不是a.问题是我不能手动写入a中的值,因为它是从另一个作业中提取的.
在这种情况下我该如何过滤?
我这样读了我的S3桶中的文件名
objs = boto3.client.list_objects(Bucket='my_bucket')
while 'Contents' in objs.keys():
objs_contents = objs['Contents']
for i in range(len(objs_contents)):
filename = objs_contents[i]['Key']
Run Code Online (Sandbox Code Playgroud)
现在,我需要获取文件的实际内容,类似于open(filename).readlines().什么是最好的方法?
在Pyspark中,我可以从列表中创建RDD并确定要有多少分区:
sc = SparkContext()
sc.parallelize(xrange(0, 10), 4)
Run Code Online (Sandbox Code Playgroud)
我决定对RDD进行分区的分区数量如何影响性能?这取决于我的机器核心数量如何?
我有这个JSON文件
{
"a": 1,
"b": 2
}
Run Code Online (Sandbox Code Playgroud)
这是用Python json.dump方法获得的.现在,我想使用pyspark将此文件读入Spark中的DataFrame.以下文档,我正在这样做
sc = SparkContext()
sqlc = SQLContext(sc)
df = sqlc.read.json('my_file.json')
print df.show()
print语句虽然吐出了这个:
+---------------+
|_corrupt_record|
+---------------+
| {|
| "a": 1, |
| "b": 2|
| }|
+---------------+
Run Code Online (Sandbox Code Playgroud)
任何人都知道发生了什么以及为什么它没有正确解释文件?
可以使用操作collect或take仅打印DataFrame的给定列吗?
这个
df.col.collect()
Run Code Online (Sandbox Code Playgroud)
给出错误
TypeError:'Column'对象不可调用
还有这个:
df[df.col].take(2)
Run Code Online (Sandbox Code Playgroud)
给
pyspark.sql.utils.AnalysisException:u"类型为string的过滤器表达式'col'不是布尔值.;"
我的数据帧包含一个日期字段,它以字符串格式显示,例如
'2015-07-02T11:22:21.050Z'
Run Code Online (Sandbox Code Playgroud)
我需要在日期过滤DataFrame以仅获取上周的记录.所以,我正在尝试使用strptime将字符串日期转换为datetime对象的地图方法:
def map_to_datetime(row):
format_string = '%Y-%m-%dT%H:%M:%S.%fZ'
row.date = datetime.strptime(row.date, format_string)
df = df.map(map_to_datetime)
Run Code Online (Sandbox Code Playgroud)
然后我会应用过滤器作为
df.filter(lambda row:
row.date >= (datetime.today() - timedelta(days=7)))
Run Code Online (Sandbox Code Playgroud)
我设法让映射工作,但过滤器失败了
TypeError:condition应该是string或Column
有没有办法以一种有效的方式使用过滤,或者我应该改变方法以及如何?
我有一个通过pyspark从JSON文件构建的Spark DataFrame
sc = SparkContext()
sqlc = SQLContext(sc)
users_df = sqlc.read.json('users.json')
Run Code Online (Sandbox Code Playgroud)
现在,我想访问selected_user数据,这是它的_id字段.我可以
print users_df[users_df._id == chosen_user].show()
Run Code Online (Sandbox Code Playgroud)
这给了我完整的用户行.但是假设我只想在Row中有一个特定字段,比如用户性别,我该如何获得它?
我有一个DataFrame,这里有一个片段:
[['u1', 1], ['u2', 0]]
Run Code Online (Sandbox Code Playgroud)
基本上是一个名为的字符串字段f,对于第二个元素(is_fav)为1或0 .
我需要做的是分组第一个字段并计算1和0的出现次数.我希望做类似的事情
num_fav = count((col("is_fav") == 1)).alias("num_fav")
num_nonfav = count((col("is_fav") == 0)).alias("num_nonfav")
df.groupBy("f").agg(num_fav, num_nonfav)
Run Code Online (Sandbox Code Playgroud)
它不能正常工作,我在两种情况下都得到相同的结果,这相当于组中项目的计数,因此过滤器(无论是1还是0)似乎被忽略.这取决于count工作原理吗?
我可以通过获取和读取AWS S3存储桶中的所有对象
s3 = boto3.resource('s3')
bucket = s3.Bucket('my-bucket')
all_objs = bucket.objects.all()
for obj in all_objs:
pass
#filter only the objects I need
Run Code Online (Sandbox Code Playgroud)
然后
obj.key
Run Code Online (Sandbox Code Playgroud)
会给我一条路径.
有没有办法事先过滤那些尊重某个起始路径(存储桶中的目录)的文件,这样我就可以避免遍历所有对象并在以后过滤?