我正在使用气流来触发数据块上的作业。我有许多运行数据块作业的 DAG,我希望只使用一个集群而不是多个集群,因为据我所知,这将降低这些任务产生的成本。
使用DatabricksSubmitRunOperator有两种方法可以在数据块上运行作业。使用正在运行的集群通过 id 调用它
'existing_cluster_id' : '1234-567890-word123',
Run Code Online (Sandbox Code Playgroud)
或者启动一个新集群
'new_cluster': {
'spark_version': '2.1.0-db3-scala2.11',
'num_workers': 2
},
Run Code Online (Sandbox Code Playgroud)
现在我想尽量避免为每个任务启动一个新集群,但是集群在停机期间关闭,因此它不再通过它的 id 可用,我会得到一个错误,所以我认为唯一的选择是新集群。
1) 有没有办法让集群即使在关闭时也可以通过 id 调用?
2)人们是否只是让集群保持活力?
3)还是我完全错了,为每个任务启动集群不会产生更多成本?
4)有什么我完全错过的吗?
为什么我们只能将图像放在 Docker hub 上,而没有 docker-compose 文件。我的意思是有很多应用程序使用多个容器,这些容器可能是可重用的,可能只需稍加配置。
或者有没有办法做到这一点?现在,我将 Docker hub 用于我的图像,并为撰写文件使用 git 存储库。但是我觉得只有一个地方来存储所有这些会更好。
所以问题是,可以像存储图像一样存储 docker-compose 文件吗?如果不是,是否可以解释为什么 Docker 的人认为这是一个坏主意?最后,是否有 docker-compose 文件库?我的意思是可以在 docker hub 上找到高质量的图像,但是我在 github 上找到的 docker-compose 文件不是很可靠。
如果我在 python 3 中运行以下代码
from io import BytesIO
import csv
from io import TextIOWrapper
def fill_into_stringio(input_io):
writer = csv.DictWriter(TextIOWrapper(input_io, encoding='utf-8'),fieldnames=['ids'])
for i in range(100):
writer.writerow({'ids': str(i)})
with BytesIO() as input_i:
fill_into_stringio(input_i)
input_i.seek(0)
Run Code Online (Sandbox Code Playgroud)
我收到一个错误:
ValueError: I/O operation on closed file.
Run Code Online (Sandbox Code Playgroud)
如果我不使用 TextIOWrapper,io 流将保持打开状态。例如,如果我将函数修改为
def fill_into_stringio(input_io):
for i in range(100):
input_io.write(b'erwfewfwef')
Run Code Online (Sandbox Code Playgroud)
我不再收到任何错误,因此出于某种原因,TestIOWrapper 正在关闭我想稍后阅读的流。这是打算像这样吗,是否有一种方法可以在不自己编写 csv 编写器的情况下实现我的尝试?
我正在尝试将数据从 AWS Redshift 卸载到 s3 存储桶。Redshift 集群还受到密码保护。我设置了 aws cli 以使用适当的密钥对并进行了成功测试。我可以使用我的凭据通过 DataGrip 访问 Redshift 集群,但现在当我尝试在 python3 中使用以下脚本卸载时
import json
import os
import psycopg2
def run(config_json, sql_query):
conn = psycopg2.connect(**config_json['db'])
cursor = conn.cursor()
query = """
UNLOAD ($${}$$)
to \'{}\'
parallel off
delimiter ','
allowoverwrite;
""".format(sql_query, config_json['s3bucket_path_to_file'])
print("The following UNLOAD query is being run: \n" + query)
cursor.execute(query)
print('Completed write to {}'.format(config_json['s3bucket_path_to_file']))
if __name__ == '__main__':
config_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config.json')
with open(config_path, 'r') as f:
config = json.loads(f.read())
query_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'query.sql') …Run Code Online (Sandbox Code Playgroud)