Google Cloud Data Fusion——从 REST API 端点源构建管道

Kor*_*ain 5 rest pagination endpoint google-cloud-data-fusion

尝试构建管道以从 3rd 方 REST API 端点数据源读取。

我正在使用 Hub 中的 HTTP(1.2.0 版)插件。

响应请求 URL 是: https://api.example.io/v2/somedata?return_count=false

响应体示例:

{
  "paging": {
    "token": "12456789",
    "next": "https://api.example.io/v2/somedata?return_count=false&__paging_token=123456789"
  },
  "data": [
    {
      "cID": "aerrfaerrf",
      "first": true,
      "_id": "aerfaerrfaerrf",
      "action": "aerrfaerrf",
      "time": "1970-10-09T14:48:29+0000",
      "email": "example@aol.com"
    },
    {...}
  ]
}
Run Code Online (Sandbox Code Playgroud)

日志中的主要错误是:

java.lang.NullPointerException: null
    at io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator.getNextPage(BaseHttpPaginationIterator.java:118) ~[1580429892615-0/:na]
    at io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator.ensurePageIterable(BaseHttpPaginationIterator.java:161) ~[1580429892615-0/:na]
    at io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator.hasNext(BaseHttpPaginationIterator.java:203) ~[1580429892615-0/:na]
    at io.cdap.plugin.http.source.batch.HttpRecordReader.nextKeyValue(HttpRecordReader.java:60) ~[1580429892615-0/:na]
    at io.cdap.cdap.etl.batch.preview.LimitingRecordReader.nextKeyValue(LimitingRecordReader.java:51) ~[cdap-etl-core-6.1.1.jar:na]
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:214) ~[spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) ~[spark-core_2.11-2.3.3.jar:2.3.3]
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:128) ~[spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:127) ~[spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1415) ~[spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:139) [spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83) [spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78) [spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) [spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.scheduler.Task.run(Task.scala:109) [spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) [spark-core_2.11-2.3.3.jar:2.3.3]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_232]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_232]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_232]
Run Code Online (Sandbox Code Playgroud)

可能的问题

在尝试解决此问题一段时间后,我认为问题可能出在

分页

  • Data Fusion HTTP插件有很多处理分页的方法
    • 根据上面的响应正文,分页类型的最佳选择似乎是Link in Response Body
    • 对于所需的下一页 JSON/XML 字段路径参数,我已经尝试过$.paging.nextpaging/next. 都不工作。
    • 我已验证/paging/next在 Chrome 中打开时链接有效

验证

  • 当只是尝试在 Chrome 中查看响应 URL 时,会弹出一个提示,要求输入用户名和密码
    • 只需要输入用户名的 API 密钥即可在 Chrome 中通过此提示
    • 为此,在 Data Fusion HTTP 插件中,API 密钥用于基本身份验证部分中的用户名

有没有人在 Google Cloud Data Fusion 中创建数据源是 REST API 的管道方面取得成功?

Pad*_*eye 1

在回答

有人在 Google Cloud Data Fusion 中成功创建数据源为 REST API 的管道吗?

这不是实现此目标的最佳方法,最好的方法是将数据服务 API 概述摄取 到 pub/sub,然后使用 pub/sub 作为管道的源,这将为您的数据提供一个简单且可靠的暂存位置用于处理、存储和分析,请参阅 pub/sub API 的文档。为了将其与 Dataflow 结合使用,要遵循的步骤位于官方文档中使用 Pub/Sub 与 Dataflow