小编Mal*_*ath的帖子

如何从多个 API 调用更新 Pandas 数据帧

我需要做一个python脚本来

  1. 读取包含列 ( person_id, name, flag)的 csv 文件。该文件有 3000 行。
  2. 基于person_id来自 csv 文件,我需要调用一个 URL 传递person_idGET http://api.myendpoint.intranet/get-data/1234 该 URL 将返回 的一些信息person_id,如下例所示。我需要获取所有租金对象并保存在我的 csv 中。我的输出需要是这样的
import pandas as pd
import requests

ids = pd.read_csv(f"{path}/data.csv", delimiter=';')
person_rents = df = pd.DataFrame([], columns=list('person_id','carId','price','rentStatus'))

for id in ids:
    response = request.get(f'endpoint/{id["person_id"]}')
    json = response.json()
    person_rents.append( [person_id, rent['carId'], rent['price'], rent['rentStatus'] ] )
    pd.read_csv(f"{path}/data.csv", delimiter=';' )
Run Code Online (Sandbox Code Playgroud)
person_id;name;flag;cardId;price;rentStatus
1000;Joseph;1;6638;1000;active
1000;Joseph;1;5566;2000;active
Run Code Online (Sandbox Code Playgroud)

响应示例

{
    "active": false,
    "ctodx": false,
    "rents": [{
            "carId": 6638, …
Run Code Online (Sandbox Code Playgroud)

python python-3.x pandas json-normalize

6
推荐指数
1
解决办法
1491
查看次数

使用本地 kafka-connect 集群连接远程数据库的连接超时

我正在尝试使用 docker-compose 运行本地 kafka-connect 集群。我需要连接远程数据库,而且我还使用远程 kafka 和模式注册表。我已启用从我的机器访问这些远程资源。

要启动集群,在我的 Ubuntu WSL2 终端的项目文件夹中,我正在运行

docker build -t my-connect:1.0.0

docker-compose up

应用程序成功运行,但是当我尝试创建新连接器时,返回错误 500 并超时。

我的 Dockerfile

FROM confluentinc/cp-kafka-connect-base:5.5.0

RUN cat /etc/confluent/docker/log4j.properties.template

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
ARG JDBC_DRIVER_DIR=/usr/share/java/kafka/

RUN   confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:5.5.0 \
   && confluent-hub install --no-prompt confluentinc/connect-transforms:1.3.2

ADD java/kafka-connect-jdbc /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
COPY java/kafka-connect-jdbc/ojdbc8.jar /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/

ENTRYPOINT ["sh","-c","export CONNECT_REST_ADVERTISED_HOST_NAME=$(hostname -I);/etc/confluent/docker/run"] 
Run Code Online (Sandbox Code Playgroud)

我的 docker-compose.yaml

services:
  connect:
    image: my-connect:1.0.0
    ports:
     - 8083:8083
    environment:
      - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http=//schema-registry:8081
      - CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
      - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http=//schema-registry:8081
      - CONNECT_BOOTSTRAP_SERVERS=broker1.intranet:9092
      - CONNECT_GROUP_ID=kafka-connect
      - CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
      - CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - CONNECT_OFFSET_STORAGE_TOPIC=kafka-connect.offset …
Run Code Online (Sandbox Code Playgroud)

docker docker-compose apache-kafka-connect debezium

5
推荐指数
1
解决办法
192
查看次数

使用 pyspark 按日期元素读取 json 文件并对其进行分组

我的 S3 存储桶上有多个 JSON 文件(10 TB ~),我需要按每个 json 文档中存在的日期元素来组织这些文件。

我认为我的代码需要做什么

  • 读取s3存储桶中的所有json文件。
  • 保留 2022-01-01 和 2022-04-01 之间包含元素“creation_date”的所有文档
  • 将它们以镶木地板格式保存在另一个存储桶中。

考虑到我正在处理的规模,我不确定这样做是否正确。

这是 json 文档的示例。每个文件都有多个这样的文档。

  {
    "id": 123456,
    "creation_date": "2022-01-01T23:35:16",
    "params": {
      "doc_info": "AXBD",
      "return_date": "20/05/2021",
      "user_name": "XXXXXXXX",
      "value": "40,00"
    },
    "user_id": "1234567",
    "type": "TEST"
  }
]
Run Code Online (Sandbox Code Playgroud)

这是我已经在DB笔记本上尝试过的,但实际上,我不能直接在笔记本上使用代码。我必须编写 Spark 代码并在气流 dag 上运行,因为我没有直接从笔记本使用存储桶的写入权限。

# Trying to read all the json files
df_test = spark.read.json("s3://my-bucket/**/**" + "/*.json")

# Filtering all documents that has the creation_date period that I want
df_test_filter = df_test.filter(F.col("creation_date").between('2022-01-01','2022-04-01'))

# Write parquet on …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark airflow databricks

4
推荐指数
1
解决办法
894
查看次数