到底如何创建一个高效且可重用的 Databricks 工作流程来将原始 SQL 数据库转储到 Delta Lake 中。这里的一些混淆是为了实现以下目的的最佳方法:
人们可能会想象以下过程:
table_names = spark.read.jdbc(url=jdbcUrl, table="information_schema.tables",
properties=connectionProperties) \
.filter("table_schema = 'public'") \
.select("table_name") \
.rdd.flatMap(lambda x: x) \
.collect()
for table in table_names:
...
Run Code Online (Sandbox Code Playgroud)
像 Airbyte 和其他公司这样的第三方供应商提供了这项服务——并不是因为它确实应该如此难以实施。但更有可能的是,由于 Databricks DLT/Delta Lake 方面此通用流程的文档或参考实现乏善可陈。
令人满意的答案将是(I)对 OP 中包含的(错误?)假设的一些背景/验证,(II)此工作流程缺少的代码,以及(III)对提出的 3 点的答案/澄清。
apache-spark pyspark databricks delta-lake delta-live-tables
我首先尝试用 python 发送交易:
from web3 import Web3
transaction = {
'chainId': 97, # 97: Testnet. 56: main.
'to': '0xmyaddress',
'value': 1,
'gas': 2000000,
'gasPrice': 13,
'nonce': 0,
}
infura_url = "https://mainnet.infura.io/v3/my-api-key"
w3 = Web3(Web3.HTTPProvider(infura_url))
key = '0xmykey'
signed = w3.eth.account.signTransaction(transaction, key)
w3.eth.sendRawTransaction(signed.rawTransaction)
Run Code Online (Sandbox Code Playgroud)
给我以下错误:ValueError: {'code': -32000, 'message': 'invalid sender'}
现在,我正在尝试与合同进行交互 - 调用方法并提供输入,但我不确定如何完成此操作。
我想要一个“active_in”属性作为时间范围。我假设 DBMS 针对 postgresql tsrange 字段进行了优化,因此最好使用 DateTimeRangeField 而不是 2 个单独的字段作为 start_date 和 end_date。
这样做我希望该字段有一个默认值。
active_in = models.DateTimeRangeField(default=timezone.now+'-'+timezone.now+10YEARS)
Run Code Online (Sandbox Code Playgroud)
我可能的解决方案:
使用字符串操作的代码:
active_in = models.DateTimeRangeField(default=timezone.now+'-'+timezone.now[:-2]+'30')
Run Code Online (Sandbox Code Playgroud)
使用自定义函数对象的代码:(从此处调整: https: //stackoverflow.com/a/27491426/7458018)
def today_years_ahead():
return timezone.now + '-' timezone.now() + timezone.timedelta(years=10)
class MyModel(models.Model):
...
active_in = models.DateTimeRangeField(default=today_years_ahead)
Run Code Online (Sandbox Code Playgroud) apache-spark ×1
binance ×1
databricks ×1
delta-lake ×1
django ×1
postgresql ×1
pyspark ×1
transactions ×1
web3py ×1