我已删除凭据sudo nano ~/.aws/config
但凭据仍在aws configure
.有没有办法以aws configure
清除状态重置?
我是Spark的新手,我一直在尝试将一个Dataframe转换为Spark中的镶木地板文件,但我还没有成功.该文件说,我可以使用write.parquet函数来创建该文件.但是,当我运行脚本时它向我显示:AttributeError:'RDD'对象没有属性'write'
from pyspark import SparkContext
sc = SparkContext("local", "Protob Conversion to Parquet ")
# spark is an existing SparkSession
df = sc.textFile("/temp/proto_temp.csv")
# Displays the content of the DataFrame to stdout
df.write.parquet("/output/proto.parquet")
Run Code Online (Sandbox Code Playgroud)
你知道怎么做这个吗?
我正在使用的spark版本是为Hadoop 2.7.3构建的Spark 2.0.1.
我想在传感器操作员中设置SLA.该文件是不是太清楚它的使用.所以我使用S3KeySensor运算符进行了测试,该运算符正在查找不存在的文件.我将sla设置为30秒,我希望在UI中看到30秒后的记录- 在SLA未命中 - 但它没有发生.我究竟做错了什么?
inputsensor = S3KeySensor(
task_id='check_for_files_in_s3',
bucket_key='adp/backload/20136585/',
wildcard_match=True,
bucket_name='weblogs-raw',
s3_conn_id='AWS_S3_CENTRAL',
timeout=120,
poke_interval=10,
sla=timedelta(seconds=30),
dag=dag)
inputsensor.set_downstream(next_step)
Run Code Online (Sandbox Code Playgroud) 假设我有一个事件的DataFrame,每行之间有时差,主要规则是如果只有事件在上一个或下一个事件的5分钟内,则计算一次访问:
+--------+-------------------+--------+
|userid |eventtime |timeDiff|
+--------+-------------------+--------+
|37397e29|2017-06-04 03:00:00|60 |
|37397e29|2017-06-04 03:01:00|60 |
|37397e29|2017-06-04 03:02:00|60 |
|37397e29|2017-06-04 03:03:00|180 |
|37397e29|2017-06-04 03:06:00|60 |
|37397e29|2017-06-04 03:07:00|420 |
|37397e29|2017-06-04 03:14:00|60 |
|37397e29|2017-06-04 03:15:00|1140 |
|37397e29|2017-06-04 03:34:00|540 |
|37397e29|2017-06-04 03:53:00|540 |
+--------+----------------- -+--------+
Run Code Online (Sandbox Code Playgroud)
挑战在于将最新事件时间的start_time和end_time分组,条件是在5分钟内.输出应该像这个表:
+--------+-------------------+--------------------+-----------+
|userid |start_time |end_time |events |
+--------+-------------------+--------------------+-----------+
|37397e29|2017-06-04 03:00:00|2017-06-04 03:07:00 |6 |
|37397e29|2017-06-04 03:14:00|2017-06-04 03:15:00 |2 |
+--------+-------------------+--------------------+-----------+
Run Code Online (Sandbox Code Playgroud)
到目前为止,我已经使用了窗口滞后函数和一些条件,但是,我不知道从哪里开始:
%spark.pyspark
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col
windowSpec = W.partitionBy(result_poi["userid"], result_poi["unique_reference_number"]).orderBy(result_poi["eventtime"])
windowSpecDesc …
Run Code Online (Sandbox Code Playgroud) 当我在haystack和elasticsearch支持的应用程序中运行"python manage.py rebuild_index"时出现问题.
Python 2.7 Django版本1.6.2 Haystack 2.1.0 Elasticsearch 1.0
请查看出现的错误:
回溯(最近一次调用最后一次):文件"manage.py",第10行,在execute_from_command_line(sys.argv)文件"/usr/lib/python2.7/site-packages/django/core/management/ init .py" ,第399行,> execute_from_command_line utility.execute()文件"/usr/lib/python2.7/site-packages/django/core/management/ init .py",第392行,>执行self.fetch_command(子命令) .run_from_argv(self.argv)文件"/usr/lib/python2.7/site-packages/django/core/management/base.py",第242行,> run_from_argv self.execute(*args,**options.dict)文件"/usr/lib/python2.7/site-packages/django/core/management/base.py",第285行,执行输出= self.handle(*args,**options)文件"/ usr /lib/python2.7/site-packages/haystack/management/commands/rebuild_index.py",第15行,句柄call_command('clear_index',**options)文件"/usr/lib/python2.7/site- packages/django/core/management/init .py",第159行,在call_command中返回klass.execute(*args,**defaults)文件"/usr/lib/python2.7/site-packages/django/core/managem ent/base.py",第285行,执行输出= self.handle(*args,**options)文件"/usr/lib/python2.7/site-packages/haystack/management/commands/clear_index.py" ,第48行,句柄后端=连接[backend_name] .get_backend()文件"/usr/lib/python2.7/site-packages/haystack/utils/loading.py",第98行,getitem self._connections [key ] = load_backend(self.connections_info [key] ['ENGINE'])(using = key)文件"/usr/lib/python2.7/site-packages/haystack/utils/loading.py",第51行,在load_backend中返回import_class(full_backend_path)文件"/usr/lib/python2.7/site-packages/haystack/utils/loading.py",第18行,在import_class中模块_itself = importlib.import_module(module_path)文件"/ usr/lib/python2 .7/site-packages/django/utils/importlib.py",第40行,在import_module 导入(名称)文件"/usr/lib/python2.7/site-packages/haystack/backends/elasticsearch_backend.py",行21,在提出MissingDependency("'elasticsearch'后端需要安装'requests'.")haystack.exceptions.Mis singDependency:'elasticsearch'后端需要安装'requests'.
我已经安装了运行这些应用程序所需的所有软件包,但询问请求是什么,它是什么?
我的数据看起来像这样:
userid,eventtime,location_point
4e191908,2017-06-04 03:00:00,18685891
4e191908,2017-06-04 03:04:00,18685891
3136afcb,2017-06-04 03:03:00,18382821
661212dd,2017-06-04 03:06:00,80831484
40e8a7c3,2017-06-04 03:12:00,18825769
Run Code Online (Sandbox Code Playgroud)
我想添加一个新的布尔列,如果userid
在同一个5分钟窗口内有2个或更多,则标记为true location_point
.我有一个想法,使用lag
函数来查找由userid
当前时间戳和接下来的5分钟之间的范围分隔的窗口:
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col
days = lambda i: i * 60*5
windowSpec = W.partitionBy(col("userid")).orderBy(col("eventtime").cast("timestamp").cast("long")).rangeBetween(0, days(5))
lastURN = F.lag(col("location_point"), 1).over(windowSpec)
visitCheck = (last_location_point == output.location_pont)
output.withColumn("visit_check", visitCheck).select("userid","eventtime", "location_pont", "visit_check")
Run Code Online (Sandbox Code Playgroud)
当我使用RangeBetween函数时,此代码给出了一个分析异常:
AnalysisException:u'Window当前行和1500行之间的帧范围必须匹配所需的帧前1和前1之间的行;
你知道解决这个问题的方法吗?
是否可以通过账户A的 Athena 接口直接访问账户B 的AWS Glue 数据目录?
有一个表(课程兴趣),其中包含一个单元格中的所有值.但这些值只是ids,我想加入另一个表(课程),所以我可以知道他们的名字.
课程兴趣:
MemberID MemberName CoursesInterested
-------------- --------------------- --------------
1 Al 1,4,5,6
2 A2 3,5,6
Run Code Online (Sandbox Code Playgroud)
课程表:
CourseId Course
-------------- ---------------------
1 MBA
2 Languages
3 English
4 French
5 Fashion
6 IT
Run Code Online (Sandbox Code Playgroud)
期望的输出:
MemberID MemberName CoursesInterested
-------------- --------------------- --------------
1 Al MBA,French,Fashion,IT
2 A2 English,Fashion,IT
Run Code Online (Sandbox Code Playgroud)
我想在MySql中做一个SQL查询,可以帮助我提取所需的输出.我知道如何以相反的方式做到这一点(将值连接到一个单元格),但我一直在努力寻找一种方法来分离ID并将交叉连接转换为另一个表.
我将感谢社区的任何帮助.谢谢
我是Spark的新手.我正在尝试读取EMR集群中的本地csv文件.该文件位于:/ home/hadoop /.我正在使用的脚本就是这个:
spark = SparkSession \
.builder \
.appName("Protob Conversion to Parquet") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()\
df = spark.read.csv('/home/hadoop/observations_temp.csv, header=True)
Run Code Online (Sandbox Code Playgroud)
当我运行该脚本时引发以下错误消息:
pyspark.sql.utils.AnalysisException:u'Path不存在:hdfs://ip-172-31-39-54.eu-west-1.compute.internal:8020/home/hadoop/observations_temp.csv
然后,我发现我必须在文件路径中添加file://,以便它可以在本地读取文件:
df = spark.read.csv('file:///home/hadoop/observations_temp.csv, header=True)
Run Code Online (Sandbox Code Playgroud)
但这一次,上述方法引发了一个不同的错误:
阶段0.0中丢失的任务0.3(TID 3,
ip-172-31-41-81.eu-west-1.compute.internal,executor 1):java.io.FileNotFoundException:文件文件:/ home/hadoop/observations_temp. csv不存在
我认为是因为文件//扩展只是在本地读取文件,而不是在其他节点上分发文件.
您知道如何读取csv文件并将其提供给所有其他节点吗?
基本上,我想使用SQL语句进行简单的删除,但是当我执行sql脚本时,它会抛出以下错误:
pyspark.sql.utils.ParseException:u"\'''''''''''''''''''''''''''''''''''''''''''=' --------------------- ^^^ \n"
这些是我正在使用的脚本:
sq = SparkSession.builder.config('spark.rpc.message.maxSize','1536').config("spark.sql.shuffle.partitions",str(shuffle_value)).getOrCreate()
adsquare = sq.read.csv(f, schema=adsquareSchemaDevice , sep=";", header=True)
adsquare_grid = adsqaureJoined.select("userid", "latitude", "longitude").repartition(1000).cache()
adsquare_grid.createOrReplaceTempView("adsquare")
sql = """
DELETE a.* FROM adsquare a
INNER JOIN codepoint c ON a.grid_id = c.grid_explode
WHERE dis2 > 1 """
sq.sql(sql)
Run Code Online (Sandbox Code Playgroud)
注意:代码点表是在执行期间创建的.
有没有其他方法可以删除具有上述条件的行?
pyspark ×5
apache-spark ×4
python ×2
airflow ×1
amazon-emr ×1
aws-cli ×1
aws-sdk ×1
django ×1
emr ×1
mysql ×1
pyspark-sql ×1
sql ×1