我正在使用以下 docker-compose 图像,我从以下位置获取此图像: https: //github.com/apache/airflow/blob/main/docs/apache-airflow/start/docker-compose.yaml
version: "3"
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.0-python3.7}
environment: &airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ""
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__API__AUTH_BACKEND: "airflow.api.auth.backend.basic_auth"
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
depends_on: &airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
ports:
- 6379:6379 …
Run Code Online (Sandbox Code Playgroud) 我想用一些数据运行机器学习模型。在用这些数据训练模型之前,我需要处理它,所以我一直在阅读一些方法来做到这一点。
首先创建一个 Dataflow 管道将其上传到 Bigquery 或 Google Cloud Storage,然后使用 Google Dataprep 创建一个数据管道来清理它。
我想这样做的另一种方法是使用数据融合,它可以更轻松地创建数据管道,但我不知道,这是我的疑问,数据融合只是创建像 Dataflow 这样的管道,然后我必须使用DataPrep 来清理数据,或者 Data Fusion 是否可以清理数据并准备将其放入我的机器学习模型中。
如果 Data Fusion 可以将数据清理为 DataPrep,那么我应该什么时候使用 DataPrep?
google-cloud-platform google-cloud-dataflow google-cloud-dataprep google-cloud-data-fusion
我正在 pandas 中运行一个很长的 ETL 管道。我必须创建不同的 pandas 数据帧,并且我想释放某些数据帧的内存。
我一直在阅读如何释放内存,我发现运行此命令不会释放内存:
del dataframe
Run Code Online (Sandbox Code Playgroud)
点击此链接:如何从内存中删除多个 pandas (python) 数据帧以节省 RAM?,答案之一说 del 语句不会删除实例,它只是删除一个名称。
在答案中,他们说将数据帧放入列表中,然后删除列表:
lst = [pd.DataFrame(), pd.DataFrame(), pd.DataFrame()]
del lst
Run Code Online (Sandbox Code Playgroud)
如果我只想释放一个数据帧,我需要将其放入一个列表中,然后删除一个列表,如下所示:
lst = [pd.DataFrame()]
del lst
Run Code Online (Sandbox Code Playgroud)
我也看到了这个问题:How do Irelease memoryused by a pandas dataframe?
有不同的答案,例如:
import gc
del df_1
gc.collect()
Run Code Online (Sandbox Code Playgroud)
或者
就在数据框使用结束时
df = ""
Run Code Online (Sandbox Code Playgroud)
或者有更好的方法来实现这一目标?
我在 python 中有一个函数 for firestore,我为一个集合的所有用户执行 for 循环,然后我进入另一个集合以获取一些指标,并在第一个集合中更新这些指标。
我运行了该函数,但在执行过程中的某个时刻,该函数中断了,给了我这个错误:
_Rendezvous Traceback (most recent call last)
~\Anaconda3\envs\work\lib\site-packages\google\api_core\grpc_helpers.py in next(self)
78 try:
---> 79 return six.next(self._wrapped)
80 except grpc.RpcError as exc:
~\Anaconda3\envs\work\lib\site-packages\grpc\_channel.py in __next__(self)
363 def __next__(self):
--> 364 return self._next()
365
~\Anaconda3\envs\work\lib\site-packages\grpc\_channel.py in _next(self)
346 else:
--> 347 raise self
348 while True:
_Rendezvous: <_Rendezvous of RPC that terminated with:
status = StatusCode.DEADLINE_EXCEEDED
details = "Deadline Exceeded"
debug_error_string = "{"created":"@1570660422.708000000","description":"Error received from peer ipv4:216.58.202.234:443","file":"src/core/lib/surface/call.cc","file_line":1052,"grpc_message":"Deadline Exceeded","grpc_status":4}"
>
The above exception was the direct …
Run Code Online (Sandbox Code Playgroud) 我正在使用 Apache 气流,但我对预定日期和开始日期有疑问。
我希望 dag 每天在 8:00 AM UTC 运行。所以,我所做的是:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 12, 7, 10, 0,0),
'email': ['example@emaiil.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(hours=5)
}
#never run
dag = DAG(dag_id='id', default_args=default_args, schedule_interval='0 8 * * *',catchup=True)
Run Code Online (Sandbox Code Playgroud)
我上传 dag 的那天是 2020-12-07,我想在 2020-12-08 的 08:00:00 运行它
我将 start_date 设置在 2020-12-07 的 10:00:00 以避免在 2020-12-07 的 08:00:00 运行它,并且只在第二天触发它,但它不起作用。
然后我所做的是修改开始日期:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 12, 7, 7, 59,0),
'email': ['example@emaiil.com'], …
Run Code Online (Sandbox Code Playgroud) 我正在使用 Python 从客户端连接到 firestore 数据库。
问题是我不知道如何查看他在数据库中有哪些集合:
from google.cloud import firestore
import firebase_admin
from firebase_admin import credentials
from firebase_admin import firestore
cred = credentials.Certificate('credentials/credentials.json')
app = firebase_admin.initialize_app(cred)
db = firestore.client()
users_ref = db.collection(u'name_of_colection')
docs = users_ref.stream()
for doc in docs:
print(u'{} => {}'.format(doc.id, doc.to_dict()))
Run Code Online (Sandbox Code Playgroud)
我一直在寻找如何获取他拥有的收藏的名称,但我没有找到任何对我有用的东西。我也尝试过这个:
cols = db.collections()
list_col = []
for col in cols:
list_col.append(col)
len(list_col)
Run Code Online (Sandbox Code Playgroud)
我得到了 len = 6
然后我对生成的列表中的不同列执行了此操作:
docs = list_col[5].stream()
data = []
for doc in docs:
data.append(doc.to_dict())
print(data)
Run Code Online (Sandbox Code Playgroud)
该数据打印带有键和值的字典,但我不知道只获取带有集合名称的列表,
我从以下位置构建了一个图像: https: //archive.apache.org/dist/spark/spark-$2.4.4/spark-$2.4.4-bin-hadoop2.7.tgz
下载后,我会:
cd spark-2.4.4-bin-hadoop2.7 && bin/docker-image-tool.sh build
Run Code Online (Sandbox Code Playgroud)
然后我的形象spark-py:latest
就建立了。
我想使用这个 docker 文件在其中安装 pyarrow:
FROM spark-py:latest
COPY *.jar /opt/spark/jars/
RUN rm /opt/spark/jars/kubernetes-*-4.1.2.jar
RUN apk add --no-cache \
build-base \
cmake \
bash \
boost-dev \
autoconf \
zlib-dev \
flex \
bison \
g++
RUN wget -q https://bootstrap.pypa.io/get-pip.py && python3 get-pip.py && rm -f get-pip.py
RUN apk update
RUN apk add --update --no-cache py3-arrow
Run Code Online (Sandbox Code Playgroud)
但我有一个错误:
> [8/8] RUN apk add --update --no-cache py3-arrow:
#12 0.552 fetch http://dl-cdn.alpinelinux.org/alpine/v3.9/main/x86_64/APKINDEX.tar.gz
#12 …
Run Code Online (Sandbox Code Playgroud) python ×3
airflow ×2
docker ×2
firebase ×2
kubernetes ×1
memory ×1
pandas ×1
pyarrow ×1
python-3.x ×1