我对 Spark 的缓存机制有点困惑。
假设我有一个 Spark 应用程序,在多次转换结束时只有一个操作。假设我有一个数据帧 A,并对其应用了 2-3 次转换,创建了多个数据帧,这最终有助于创建将保存到磁盘的最后一个数据帧。
例子 :
val A=spark.read() // large size
val B=A.map()
val C=A.map()
.
.
.
val D=B.join(C)
D.save()
Run Code Online (Sandbox Code Playgroud)
那么我是否需要缓存数据帧 A 以增强性能?
提前致谢。
我得到以下异常:
Exception in thread "main" com.google.cloud.datastore.DatastoreException: no matching index found. recommended index is:
- kind: cp_outbox
properties:
- name: format
- name: occasion_name
- name: sent_datetime
- name: status
- name: send_date
Run Code Online (Sandbox Code Playgroud)
在运行以下查询时:
SELECT * FROM cp_outbox where send_date <= '2018-03-14' and sent_datetime is null and format='test1' and status=0 and occasion_name='test'
所以我在查询中使用了不等式运算符,并且我有复合索引:
但我仍然得到例外。
通过查看错误,我认为属性的顺序是问题所在。如果这是真的,那么为什么这个顺序很重要。
提前致谢。
我正在编写一个数据流流管道。在其中一个转换中,DoFn 我想要访问外部服务 - 在本例中,它是数据存储区。
这种初始化步骤有没有最佳实践?我不想为每个 processElement 方法调用创建数据存储连接对象。
java google-cloud-datastore google-cloud-dataflow apache-beam
我需要在 sql 文件中访问 BigqueryOperator 传递的参数,但是我ERROR - queryParameters argument must have a type <class 'dict'> not <class 'list'>
使用以下代码时出现错误:
t2 = bigquery_operator.BigQueryOperator(
task_id='bq_from_source_to_clean',
sql='prepare.sql',
use_legacy_sql=False,
allow_large_results=True,
query_params=[{ 'name': 'threshold_date', 'parameterType': { 'type': 'STRING' },'parameterValue': { 'value': '2020-01-01' } }],
destination_dataset_table="{}.{}.{}".format('xxxx',
'xxxx',
'temp_airflow_test'),
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
dag=dag
Run Code Online (Sandbox Code Playgroud)
)
查询:
select cast(DATE_ADD(a.dt_2, interval 7 day) as DATE) as dt_1
,a.dt_2
,cast('2010-01-01' as DATE) as dt_3
from (select cast(@threshold_date as date) as dt_2) a
Run Code Online (Sandbox Code Playgroud)
我正在使用 Google 作曲家版本composer-1.7.0-airflow-1.10.2
提前致谢。