小编ebe*_*tbm的帖子

如何清除aws configure中的凭据?

我已删除凭据sudo nano ~/.aws/config但凭据仍在aws configure.有没有办法以aws configure清除状态重置?

amazon-web-services aws-cli aws-sdk

24
推荐指数
3
解决办法
3万
查看次数

如何使用Spark(pyspark)编写镶木地板文件?

我是Spark的新手,我一直在尝试将一个Dataframe转换为Spark中的镶木地板文件,但我还没有成功.该文件说,我可以使用write.parquet函数来创建该文件.但是,当我运行脚本时它向我显示:AttributeError:'RDD'对象没有属性'write'

from pyspark import SparkContext
sc = SparkContext("local", "Protob Conversion to Parquet ")

# spark is an existing SparkSession
df = sc.textFile("/temp/proto_temp.csv")

# Displays the content of the DataFrame to stdout
df.write.parquet("/output/proto.parquet")
Run Code Online (Sandbox Code Playgroud)

你知道怎么做这个吗?

我正在使用的spark版本是为Hadoop 2.7.3构建的Spark 2.0.1.

python pyspark spark-dataframe

13
推荐指数
2
解决办法
5万
查看次数

如何在Airflow中设置SLA?

我想在传感器操作员中设置SLA.该文件是不是太清楚它的使用.所以我使用S3KeySensor运算符进行了测试,该运算符正在查找不存在的文件.我将sla设置为30秒,我希望在UI中看到30秒后的记录- 在SLA未命中 - 但它没有发生.我究竟做错了什么?

inputsensor = S3KeySensor(
    task_id='check_for_files_in_s3',
    bucket_key='adp/backload/20136585/',
    wildcard_match=True,
    bucket_name='weblogs-raw',
    s3_conn_id='AWS_S3_CENTRAL',
    timeout=120,
    poke_interval=10,
    sla=timedelta(seconds=30),
    dag=dag)

inputsensor.set_downstream(next_step)
Run Code Online (Sandbox Code Playgroud)

airflow apache-airflow airflow-scheduler

7
推荐指数
2
解决办法
8101
查看次数

具有条件的Pyspark窗口功能

假设我有一个事件的DataFrame,每行之间有时差,主要规则是如果只有事件在上一个或下一个事件的5分钟内,则计算一次访问:

+--------+-------------------+--------+
|userid  |eventtime          |timeDiff|
+--------+-------------------+--------+
|37397e29|2017-06-04 03:00:00|60      |
|37397e29|2017-06-04 03:01:00|60      |
|37397e29|2017-06-04 03:02:00|60      |
|37397e29|2017-06-04 03:03:00|180     |
|37397e29|2017-06-04 03:06:00|60      |
|37397e29|2017-06-04 03:07:00|420     |
|37397e29|2017-06-04 03:14:00|60      |
|37397e29|2017-06-04 03:15:00|1140    |
|37397e29|2017-06-04 03:34:00|540     |
|37397e29|2017-06-04 03:53:00|540     |
+--------+----------------- -+--------+
Run Code Online (Sandbox Code Playgroud)

挑战在于将最新事件时间的start_time和end_time分组,条件是在5分钟内.输出应该像这个表:

+--------+-------------------+--------------------+-----------+
|userid  |start_time         |end_time            |events     |
+--------+-------------------+--------------------+-----------+
|37397e29|2017-06-04 03:00:00|2017-06-04 03:07:00 |6          |
|37397e29|2017-06-04 03:14:00|2017-06-04 03:15:00 |2          |
+--------+-------------------+--------------------+-----------+
Run Code Online (Sandbox Code Playgroud)

到目前为止,我已经使用了窗口滞后函数和一些条件,但是,我不知道从哪里开始:

%spark.pyspark
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col

windowSpec = W.partitionBy(result_poi["userid"], result_poi["unique_reference_number"]).orderBy(result_poi["eventtime"])
windowSpecDesc …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

7
推荐指数
2
解决办法
8480
查看次数

错误:'elasticsearch'后端需要安装'requests'.我如何解决它?

当我在haystack和elasticsearch支持的应用程序中运行"python manage.py rebuild_index"时出现问题.

Python 2.7 Django版本1.6.2 Haystack 2.1.0 Elasticsearch 1.0

请查看出现的错误:

回溯(最近一次调用最后一次):文件"manage.py",第10行,在execute_from_command_line(sys.argv)文件"/usr/lib/python2.7/site-packages/django/core/management/ init .py" ,第399行,> execute_from_command_line utility.execute()文件"/usr/lib/python2.7/site-packages/django/core/management/ init .py",第392行,>执行self.fetch_command(子命令) .run_from_argv(self.argv)文件"/usr/lib/python2.7/site-packages/django/core/management/base.py",第242行,> run_from_argv self.execute(*args,**options.dict)文件"/usr/lib/python2.7/site-packages/django/core/management/base.py",第285行,执行输出= self.handle(*args,**options)文件"/ usr /lib/python2.7/site-packages/haystack/management/commands/rebuild_index.py",第15行,句柄call_command('clear_index',**options)文件"/usr/lib/python2.7/site- packages/django/core/management/init .py",第159行,在call_command中返回klass.execute(*args,**defaults)文件"/usr/lib/python2.7/site-packages/django/core/managem ent/base.py",第285行,执行输出= self.handle(*args,**options)文件"/usr/lib/python2.7/site-packages/haystack/management/commands/clear_index.py" ,第48行,句柄后端=连接[backend_name] .get_backend()文件"/usr/lib/python2.7/site-packages/haystack/utils/loading.py",第98行,getitem self._connections [key ] = load_backend(self.connections_info [key] ['ENGINE'])(using = key)文件"/usr/lib/python2.7/site-packages/haystack/utils/loading.py",第51行,在load_backend中返回import_class(full_backend_path)文件"/usr/lib/python2.7/site-packages/haystack/utils/loading.py",第18行,在import_class中模块_itself = importlib.import_module(module_path)文件"/ usr/lib/python2 .7/site-packages/django/utils/importlib.py",第40行,在import_module 导入(名称)文件"/usr/lib/python2.7/site-packages/haystack/backends/elasticsearch_backend.py",行21,在提出MissingDependency("'elasticsearch'后端需要安装'requests'.")haystack.exceptions.Mis singDependency:'elasticsearch'后端需要安装'requests'.

我已经安装了运行这些应用程序所需的所有软件包,但询问请求是什么,它是什么?

python django django-haystack elasticsearch

6
推荐指数
2
解决办法
3221
查看次数

如何在时间戳值上使用lag和rangeBetween函数?

我的数据看起来像这样:

userid,eventtime,location_point
4e191908,2017-06-04 03:00:00,18685891
4e191908,2017-06-04 03:04:00,18685891
3136afcb,2017-06-04 03:03:00,18382821
661212dd,2017-06-04 03:06:00,80831484
40e8a7c3,2017-06-04 03:12:00,18825769
Run Code Online (Sandbox Code Playgroud)

我想添加一个新的布尔列,如果userid在同一个5分钟窗口内有2个或更多,则标记为true location_point.我有一个想法,使用lag函数来查找由userid当前时间戳和接下来的5分钟之间的范围分隔的窗口:

from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col

days = lambda i: i * 60*5 

windowSpec = W.partitionBy(col("userid")).orderBy(col("eventtime").cast("timestamp").cast("long")).rangeBetween(0, days(5))

lastURN = F.lag(col("location_point"), 1).over(windowSpec)
visitCheck = (last_location_point == output.location_pont)
output.withColumn("visit_check", visitCheck).select("userid","eventtime", "location_pont", "visit_check")
Run Code Online (Sandbox Code Playgroud)

当我使用RangeBetween函数时,此代码给出了一个分析异常:

AnalysisException:u'Window当前行和1500行之间的帧范围必须匹配所需的帧前1和前1之间的行;

你知道解决这个问题的方法吗?

window-functions apache-spark apache-spark-sql pyspark

5
推荐指数
2
解决办法
5535
查看次数

通过 Athena 跨账户访问 AWS Glue 数据目录

是否可以通过账户A的 Athena 接口直接访问账户B 的AWS Glue 数据目录?

amazon-web-services amazon-athena

5
推荐指数
1
解决办法
3518
查看次数

SQL查询一个单元格中的多个值

有一个表(课程兴趣),其中包含一个单元格中的所有值.但这些值只是ids,我想加入另一个表(课程),所以我可以知道他们的名字.

课程兴趣:

MemberID          MemberName              CoursesInterested
--------------    ---------------------   --------------
1                  Al                     1,4,5,6
2                  A2                     3,5,6
Run Code Online (Sandbox Code Playgroud)

课程表:

CourseId          Course
--------------    ---------------------
1                 MBA 
2                 Languages
3                 English
4                 French
5                 Fashion
6                 IT
Run Code Online (Sandbox Code Playgroud)

期望的输出:

MemberID          MemberName              CoursesInterested
--------------    ---------------------   --------------
1                  Al                     MBA,French,Fashion,IT
2                  A2                     English,Fashion,IT
Run Code Online (Sandbox Code Playgroud)

我想在MySql中做一个SQL查询,可以帮助我提取所需的输出.我知道如何以相反的方式做到这一点(将值连接到一个单元格),但我一直在努力寻找一种方法来分离ID并将交叉连接转换为另一个表.

我将感谢社区的任何帮助.谢谢

mysql sql

4
推荐指数
1
解决办法
1万
查看次数

Pyspark - 加载文件:路径不存在

我是Spark的新手.我正在尝试读取EMR集群中的本地csv文件.该文件位于:/ home/hadoop /.我正在使用的脚本就是这个:

spark = SparkSession \
    .builder \
    .appName("Protob Conversion to Parquet") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()\

df = spark.read.csv('/home/hadoop/observations_temp.csv, header=True)
Run Code Online (Sandbox Code Playgroud)

当我运行该脚本时引发以下错误消息:

pyspark.sql.utils.AnalysisException:u'Path不存在:hdfs://ip-172-31-39-54.eu-west-1.compute.internal:8020/home/hadoop/observations_temp.csv

然后,我发现我必须在文件路径中添加file://,以便它可以在本地读取文件:

df = spark.read.csv('file:///home/hadoop/observations_temp.csv, header=True)
Run Code Online (Sandbox Code Playgroud)

但这一次,上述方法引发了一个不同的错误:

阶段0.0中丢失的任务0.3(TID 3,
ip-172-31-41-81.eu-west-1.compute.internal,executor 1):java.io.FileNotFoundException:文件文件:/ home/hadoop/observations_temp. csv不存在

我认为是因为文件//扩展只是在本地读取文件,而不是在其他节点上分发文件.

您知道如何读取csv文件并将其提供给所有其他节点吗?

amazon-emr emr apache-spark pyspark pyspark-sql

4
推荐指数
2
解决办法
2万
查看次数

如何删除从Spark数据帧创建的表中的行?

基本上,我想使用SQL语句进行简单的删除,但是当我执行sql脚本时,它会抛出以下错误:

pyspark.sql.utils.ParseException:u"\'''''''''''''''''''''''''''''''''''''''''''=' --------------------- ^^^ \n"

这些是我正在使用的脚本:

sq = SparkSession.builder.config('spark.rpc.message.maxSize','1536').config("spark.sql.shuffle.partitions",str(shuffle_value)).getOrCreate()
adsquare = sq.read.csv(f, schema=adsquareSchemaDevice , sep=";", header=True)
adsquare_grid = adsqaureJoined.select("userid", "latitude", "longitude").repartition(1000).cache()
adsquare_grid.createOrReplaceTempView("adsquare")   

sql = """
    DELETE a.* FROM adsquare a
    INNER JOIN codepoint c ON a.grid_id = c.grid_explode
    WHERE dis2 > 1 """

sq.sql(sql)
Run Code Online (Sandbox Code Playgroud)

注意:代码点表是在执行期间创建的.

有没有其他方法可以删除具有上述条件的行?

apache-spark apache-spark-sql pyspark

4
推荐指数
2
解决办法
2万
查看次数