我需要做一个python脚本来
person_id
, name
, flag
)的 csv 文件。该文件有 3000 行。person_id
来自 csv 文件,我需要调用一个 URL 传递person_id
GET
http://api.myendpoint.intranet/get-data/1234
该 URL 将返回 的一些信息person_id
,如下例所示。我需要获取所有租金对象并保存在我的 csv 中。我的输出需要是这样的import pandas as pd
import requests
ids = pd.read_csv(f"{path}/data.csv", delimiter=';')
person_rents = df = pd.DataFrame([], columns=list('person_id','carId','price','rentStatus'))
for id in ids:
response = request.get(f'endpoint/{id["person_id"]}')
json = response.json()
person_rents.append( [person_id, rent['carId'], rent['price'], rent['rentStatus'] ] )
pd.read_csv(f"{path}/data.csv", delimiter=';' )
Run Code Online (Sandbox Code Playgroud)
person_id;name;flag;cardId;price;rentStatus
1000;Joseph;1;6638;1000;active
1000;Joseph;1;5566;2000;active
Run Code Online (Sandbox Code Playgroud)
响应示例
{
"active": false,
"ctodx": false,
"rents": [{
"carId": 6638, …
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 docker-compose 运行本地 kafka-connect 集群。我需要连接远程数据库,而且我还使用远程 kafka 和模式注册表。我已启用从我的机器访问这些远程资源。
要启动集群,在我的 Ubuntu WSL2 终端的项目文件夹中,我正在运行
docker build -t my-connect:1.0.0
docker-compose up
应用程序成功运行,但是当我尝试创建新连接器时,返回错误 500 并超时。
我的 Dockerfile
FROM confluentinc/cp-kafka-connect-base:5.5.0
RUN cat /etc/confluent/docker/log4j.properties.template
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
ARG JDBC_DRIVER_DIR=/usr/share/java/kafka/
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:5.5.0 \
&& confluent-hub install --no-prompt confluentinc/connect-transforms:1.3.2
ADD java/kafka-connect-jdbc /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
COPY java/kafka-connect-jdbc/ojdbc8.jar /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
ENTRYPOINT ["sh","-c","export CONNECT_REST_ADVERTISED_HOST_NAME=$(hostname -I);/etc/confluent/docker/run"]
Run Code Online (Sandbox Code Playgroud)
我的 docker-compose.yaml
services:
connect:
image: my-connect:1.0.0
ports:
- 8083:8083
environment:
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http=//schema-registry:8081
- CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http=//schema-registry:8081
- CONNECT_BOOTSTRAP_SERVERS=broker1.intranet:9092
- CONNECT_GROUP_ID=kafka-connect
- CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_OFFSET_STORAGE_TOPIC=kafka-connect.offset …
Run Code Online (Sandbox Code Playgroud) 我的 S3 存储桶上有多个 JSON 文件(10 TB ~),我需要按每个 json 文档中存在的日期元素来组织这些文件。
我认为我的代码需要做什么
考虑到我正在处理的规模,我不确定这样做是否正确。
这是 json 文档的示例。每个文件都有多个这样的文档。
{
"id": 123456,
"creation_date": "2022-01-01T23:35:16",
"params": {
"doc_info": "AXBD",
"return_date": "20/05/2021",
"user_name": "XXXXXXXX",
"value": "40,00"
},
"user_id": "1234567",
"type": "TEST"
}
]
Run Code Online (Sandbox Code Playgroud)
这是我已经在DB笔记本上尝试过的,但实际上,我不能直接在笔记本上使用代码。我必须编写 Spark 代码并在气流 dag 上运行,因为我没有直接从笔记本使用存储桶的写入权限。
# Trying to read all the json files
df_test = spark.read.json("s3://my-bucket/**/**" + "/*.json")
# Filtering all documents that has the creation_date period that I want
df_test_filter = df_test.filter(F.col("creation_date").between('2022-01-01','2022-04-01'))
# Write parquet on …
Run Code Online (Sandbox Code Playgroud) python ×2
airflow ×1
apache-spark ×1
databricks ×1
debezium ×1
docker ×1
pandas ×1
pyspark ×1
python-3.x ×1