我是使用Python的AWS新手,我正在尝试学习boto API但是我注意到Python有两个主要的版本/包.那将是boto和boto3.
AWS boto和boto3库有什么区别?
我正在开发自己的PHP库,我想从我的API调用RESTful Web服务.这可以在PHP中完成,如果是这样,这样做的基础是什么?
在这个Stackoverflow帖子的帮助下,我刚刚创建了一个程序(帖子中显示的那个),当一个文件放在S3存储桶中时,我的一个正在运行的DAG中的任务被触发,然后我使用BashOperator执行一些工作.一旦完成,虽然DAG不再处于运行状态,而是进入成功状态,如果我想让它拿起另一个文件,我需要清除所有"过去","未来","上游",下游'活动.我想制作这个程序,以便它始终在运行,并且只要在S3存储桶中放置一个新文件,程序就会启动任务.
我可以继续使用S3KeySenor执行此操作,还是需要设置一种设置外部触发器来运行DAG的方法?截至目前,我的S3KeySensor如果只运行一次就毫无意义.
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 5, 29),
'email': ['something@here.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_triggered_emr_cluster_dag', default_args=default_args, schedule_interval= '@once')
# This Activity runs a Python script that creates an AWS EMR cluster and then does EMR activity on the EMR cluster.
t2 = BashOperator(
task_id='create_emr_cluster_1', …
Run Code Online (Sandbox Code Playgroud) 我需要使用JavaScript中的jar文件中的类.我正在通过Java ScriptEngine使用JavaScript,并希望做类似于我在这里使用Jython做的事情,
import org.python.core.Py;
import org.python.core.PySystemState;
...
PySystemState engineSys = new PySystemState();
engineSys.path.append(Py.newString("C:/GMSEC_API/bin/gmsecapi.jar"));
Py.setSystemState(engineSys);
ScriptEngine engine = new ScriptEngineManager().getEngineByName("python");
Run Code Online (Sandbox Code Playgroud)
当我使用Jython执行此操作时,它工作正常,python文件可以使用jar文件中的api类.
所有 AWS IAM 角色都有关联的角色名称和角色 ID。通常看不到角色 ID,因为 AWS 控制台仅显示角色名称。在 S3 事件的 JSON 消息中,有一个 PrimaryID 值,其中包含用于执行 S3 操作的角色的角色 ID,例如"principalId":"AWS:AROAKJDKSDKF93HSA:123456789
。
从这份文件我们看到,
\n\n\n\n\n每个 IAM 实体(用户、组或角色)都有一个已定义的 aws:userid\n 变量。您将需要在存储桶策略中使用此变量\n,以将角色或用户指定为条件元素中的例外。\n 假定角色\xe2\x80\x99s aws:userId 值定义为\n UNIQUE- ROLE-ID:ROLE-SESSION-NAME(例如,\n AROAEXAMPLEID:userdefinesessionname)。
\n
所以我们知道S3事件消息中的PrincipalId是IAM角色的角色ID。如何使用该角色 ID 来获取角色名称?我已经搜索了 IAM 和 STS 库,但没有看到任何允许我传递角色 ID 并获取角色名称的 API。STS GetCallerIdentity没有帮助,IAM GetRole仅接受角色名称作为输入。
\n\n任何帮助将不胜感激。我只是使用S3 Events ,从S3 Event 的消息 json 中读取PrincipalID 值,从PrincipalID 中提取IAM 角色ID,并且我需要一种使用IAM 角色ID 获取IAM 角色名称的方法。
\n我有一个简单的Java应用程序,可以使用Hive或Impala使用像这样的代码连接和查询我的集群
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
...
Class.forName("com.cloudera.hive.jdbc41.HS2Driver");
Connection con = DriverManager.getConnection("jdbc:hive2://myHostIP:10000/mySchemaName;hive.execution.engine=spark;AuthMech=1;KrbRealm=myHostIP;KrbHostFQDN=myHostIP;KrbServiceName=hive");
Statement stmt = con.createStatement();
ResultSet rs = stmt.executeQuery("select * from foobar");
Run Code Online (Sandbox Code Playgroud)
但现在我想尝试使用Spark SQL进行相同的查询.我很难搞清楚如何使用Spark SQL API.具体如何设置连接.我看到了如何设置Spark会话的示例,但是我不清楚我需要提供哪些值
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
Run Code Online (Sandbox Code Playgroud)
如何告诉Spark SQL使用什么主机和端口,使用什么架构,以及如何告诉Spark SQL我正在使用哪种身份验证技术?例如,我正在使用Kerberos进行身份验证.
上面的Spark SQL代码来自https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
更新:
我能够取得一些进展,我想我想出了如何告诉Spark SQL连接使用什么主机和端口.
...
SparkSession spark = SparkSession
.builder()
.master("spark://myHostIP:10000")
.appName("Java Spark Hive Example")
.enableHiveSupport()
.getOrCreate();
Run Code Online (Sandbox Code Playgroud)
我在我的pom.xml文件中添加了以下依赖项
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.0.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
通过此更新,我可以看到连接正在进一步发展,但现在它似乎失败了,因为我没有通过身份验证.我需要弄清楚如何使用Kerberos进行身份验证.这是相关的日志数据
2017-12-19 11:17:55.717 INFO 11912 --- [o-auto-1-exec-1] …
Run Code Online (Sandbox Code Playgroud) 我正在使用集群Airflow环境,其中我有四个用于服务器的AWS ec2实例.
EC2-实例
我的设置已经完美地工作了三个月了,但偶尔每周一次,当Airflow试图记录某些东西时,我得到了一个断管异常.
*** Log file isn't local.
*** Fetching here: http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-13T00:00:00/1.log
[2018-07-16 00:00:15,521] {cli.py:374} INFO - Running on host ip-1-2-3-4
[2018-07-16 00:00:15,698] {models.py:1197} INFO - Dependencies all met for <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]>
[2018-07-16 00:00:15,710] {models.py:1197} INFO - Dependencies all met for <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]>
[2018-07-16 00:00:15,710] {models.py:1407} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------
[2018-07-16 00:00:15,719] {models.py:1428} INFO - Executing <Task(OmegaFileSensor): task_1> on 2018-07-13 00:00:00
[2018-07-16 00:00:15,720] {base_task_runner.py:115} …
Run Code Online (Sandbox Code Playgroud) 我在两个 AWS EC2 实例上运行的集群环境中运行 Airflow。一份给主人,一份给工人。不过,工作节点在运行“$airflowworker”时会定期抛出此错误:
[2018-08-09 16:15:43,553] {jobs.py:2574} WARNING - The recorded hostname ip-1.2.3.4 does not match this instance's hostname ip-1.2.3.4.eco.tanonprod.comanyname.io
Traceback (most recent call last):
File "/usr/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 387, in run
run_job.run()
File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 198, in run
self._execute()
File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 2527, in _execute
self.heartbeat()
File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 182, in heartbeat
self.heartbeat_callback(session=session)
File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/jobs.py", line 2575, in heartbeat_callback
raise AirflowException("Hostname of …
Run Code Online (Sandbox Code Playgroud) 是否可以限制Airflow中用户组对DAG的可见性和可访问性?
例如,我希望为整个公司提供一个大型Airflow环境,不同的团队将使用此Airflow环境来实现其团队的工作流程.假设我们的团队A和团队B都属于他们各自的AD/LDAP组,A组和B组.是否可以让A组只看到属于他们团队的DAG,反之亦然B组?
基于我的研究和理解,我认为这不可能在单个Airflow环境中实现.我认为为了让我这样做,我需要为每个团队创建一个单独的Airflow环境,以便每个团队都有自己的Airflow Dags文件夹,其中包含各自的DAG.
我刚装了1.10的气流在单个服务器上使用
sudo -E pip-3.6 install apache-airflow[celery,devel,postgres]
Run Code Online (Sandbox Code Playgroud)
在那之后我可能也跑过了
sudo -E pip-3.6 install apache-airflow[all]
Run Code Online (Sandbox Code Playgroud)
但是无论如何,我运行时airflow version
都会得到以下输出
[ec2-user@ip-1-2-3-4 ~]$ airflow version
[2018-08-29 16:09:59,088] {{__init__.py:51}} INFO - Using executor SequentialExecutor
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
v1.10.0
Run Code Online (Sandbox Code Playgroud)
所以我知道我已经安装了Airflow 1.10。我能够运行airflow initdb
,airflow …
airflow ×5
boto3 ×2
java ×2
amazon-iam ×1
amazon-s3 ×1
apache-spark ×1
aws-sts ×1
boto ×1
broken-pipe ×1
javascript ×1
jython ×1
kerberos ×1
logging ×1
php ×1
python ×1
python-3.x ×1
rest ×1
scriptengine ×1