我们的气流安装使用 CeleryExecutor。并发配置是
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 16
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
# Are DAGs paused by default at creation
dags_are_paused_at_creation = True
# When not using pools, tasks are run in the "default pool",
# whose size is guided by this …Run Code Online (Sandbox Code Playgroud) 我有多个大数据帧(大约 30GB)称为 as 和 bs,一个相对较小的数据帧(大约 500MB ~ 1GB)称为 spp。我试图将 spp 缓存到内存中,以避免多次从数据库或文件中读取数据。
但是我发现如果我缓存 spp,物理计划显示它不会使用广播连接,即使 spp 被广播功能包围。但是,如果我取消持久化 spp,计划会显示它使用广播连接。
有熟悉这个的吗?
scala> spp.cache
res38: spp.type = [id: bigint, idPartner: int ... 41 more fields]
scala> val as = acs.join(broadcast(spp), $"idsegment" === $"idAdnetProductSegment")
as: org.apache.spark.sql.DataFrame = [idsegmentpartner: bigint, ssegmentsource: string ... 44 more fields]
scala> as.explain
== Physical Plan ==
*SortMergeJoin [idsegment#286L], [idAdnetProductSegment#91L], Inner
:- *Sort [idsegment#286L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(idsegment#286L, 200)
: +- *Filter isnotnull(idsegment#286L)
: +- HiveTableScan [idsegmentpartner#282L, …Run Code Online (Sandbox Code Playgroud) apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0
这是我的用例。
问题在于第4步不会节省任何费用。这是否意味着在第3步之后,df3会发生变化?我已经对df1至df5使用cache()了。但是,如果我检查了Spark Web UI存储,如果源已更改,则df5似乎将重新计算。所有数据帧都是100%缓存的。
kafka connect 5.4 只有一个连接器,一个工人,并使用 connect-distributed。
下面是错误信息:
[2020-06-22 19:09:58,700] ERROR [Worker clientId=connect-1, groupId=test-cluster]
Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:290)
org.apache.kafka.connect.errors.ConnectException: Error while attempting to create/find topic(s) 'test-connect-offsets'
at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:262)
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:128)
at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:186)
at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:121)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:277)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:229)
... 11 more
Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: …Run Code Online (Sandbox Code Playgroud)