spark弹性搜索抛出403禁止错误

Nag*_*tal 1 scala elasticsearch apache-spark

当我尝试使用基本身份验证从 spark 连接 elasticsearch 以创建新索引时,出现以下错误。

来自弹性搜索的错误没有提供完整的错误信息来调试更多

 org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: [HEAD] on [devl_test_index] failed; server[https://<elasticServerHost>:9200] returned [403|Forbidden:]
            at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:477)
            at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:447)
            at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:539)
            at org.elasticsearch.hadoop.rest.RestClient.indexExists(RestClient.java:534)
            at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:545)
            at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:364)
            at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:660)
            at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:636)
            at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:65)
            at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
            at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
            at org.apache.spark.scheduler.Task.run(Task.scala:109)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

用于连接的代码:

Sbt dependency: "org.elasticsearch" % "elasticsearch-hadoop" % "7.5.0"

import org.elasticsearch.spark.sql._

 val spark = SparkSession.builder().appName("SparkJDBC")
    .enableHiveSupport()
    .config("spark.es.port","9200")
    .config("spark.es.nodes", "<elasticServerHost>")
    .config("spark.es.nodes.wan.only","true")
    .config("spark.es.net.ssl","true")
    .config("spark.es.net.http.auth.user","USERNAME")
    .config("spark.es.net.http.auth.pass","PASSWRD")
    .master("local[*]")
    .getOrCreate()

val df = spark.sql("select * from employee")

df.saveToEs("devl_test_index")
Run Code Online (Sandbox Code Playgroud)

Nag*_*tal 5

当用户尝试访问索引而不是分配的组时,将从弹性搜索服务器抛出此错误。

就我而言,我的用户组可以访问以employee* 开头的索引,但我尝试访问以devl* 开头的索引

如果您在调试模式下运行 spark,您将获得实际的错误堆栈跟踪,其中包含更多信息,如下所示:

20/02/19 10:38:57 DEBUG wire.header: << "HTTP/1.1 403 Forbidden[\r][\n]"
20/02/19 10:38:57 DEBUG wire.header: << "HTTP/1.1 403 Forbidden[\r][\n]"
20/02/19 10:38:57 DEBUG wire.header: << "content-type: application/json; charset=UTF-8[\r][\n]"
20/02/19 10:38:57 DEBUG wire.header: << "content-length: 259[\r][\n]"
20/02/19 10:38:57 DEBUG wire.header: << "[\r][\n]"
20/02/19 10:38:57 DEBUG wire.content: << "{"error":{"root_cause":[{"type":"security_exception","reason":"action [indices:admin/aliases/get] is unauthorized for user [rdsuser]"}],"type":"security_exception","reason":"action [indices:admin/aliases/get] is unauthorized for user [USERNAME]"},"status":403}"
20/02/19 10:38:57 DEBUG sql.EsDataFrameWriter: Provided index name [devl_test_index] is not an alias. Reason: [org.elasticsearch.hadoop.rest.EsHadoopRemoteException: security_exception: action [indices:admin/aliases/get] is unauthorized for user [USERNAME]
null]
Run Code Online (Sandbox Code Playgroud)

在创建新索引之前,spark 会在内部检查索引名称是否存在。在这个过程中,会命中别名API /_all/_alias/devl_test_index

错误响应:

{
    "error": {
        "root_cause": [
            {
                "type": "security_exception",
                "reason": "action [indices:admin/aliases/get] is unauthorized for user [USERNAME]"
            }
        ],
        "type": "security_exception",
        "reason": "action [indices:admin/aliases/get] is unauthorized for user [USERNAME]"
    },
    "status": 403
}
Run Code Online (Sandbox Code Playgroud)