小编Bra*_*don的帖子

使用 Spark Structured Streaming 从多个 Kafka 主题读取并写入不同接收器的最佳方法是什么?

我正在尝试编写一个 Spark Structured Streaming 作业,该作业从多个 Kafka 主题(可能是 100 个)读取并根据主题名称将结果写入 S3 上的不同位置。我开发了这段代码,它当前从多个主题中读取并将结果输出到控制台(基于循环),它按预期工作。但是,我想了解性能影响是什么。这是推荐的方法吗?不建议有多个 readStream 和 writeStream 操作吗?如果是这样,推荐的方法是什么?

my_topics = ["topic_1", "topic_2"]

for i in my_topics:
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("subscribePattern", i) \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    output_df = df \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .outputMode("update") \
        .option("checkpointLocation", "s3://<MY_BUCKET>/{}".format(i)) \
        .start()
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark pyspark spark-structured-streaming

7
推荐指数
2
解决办法
750
查看次数

有没有办法使用ReadFromText转换(Python)读取Apache Beam中的多行csv文件?

有没有办法ReadFromText在Python中使用转换读取多行csv文件?我有一个文件,其中包含一行,我试图让Apache Beam将输入读作一行,但无法让它工作.

def print_each_line(line):
    print line

path = './input/testfile.csv'
# Here are the contents of testfile.csv
# foo,bar,"blah blah
# more blah blah",baz

p = apache_beam.Pipeline()

(p
 | 'ReadFromFile' >> apache_beam.io.ReadFromText(path)
 | 'PrintEachLine' >> apache_beam.FlatMap(lambda line: print_each_line(line))
 )

# Here is the output:
# foo,bar,"blah blah
# more blah blah",baz
Run Code Online (Sandbox Code Playgroud)

上面的代码将输入解析为两行,即使多行csv文件的标准是将多行元素包装在双引号内.

python google-cloud-platform google-cloud-dataflow apache-beam apache-beam-io

5
推荐指数
1
解决办法
889
查看次数

有没有办法在 Apache Airflow 中将 RBAC 与 LDAP 结合起来?

我正在尝试在 Airflow 中针对 Active Directory 中的用户强制实施细化权限。是否可以通过LDAP对 Active Directory 进行身份验证并通过RBAC实现安全/权限(通过将 RBAC 角色映射到 AD 组/用户)?我知道 LDAP 集成提供了通过过滤器配置(LDAP 文档)将组映射到超级用户和数据分析器的能力。但我对通过 RBAC 提供的更细粒度的控制感兴趣。

我已经能够将我的 Active Directory 连接到 Airflow。但是,当我尝试添加 RBAC 时,我无法登录。似乎 RBAC 配置覆盖了 LDAP 配置。有没有人能够做到这一点?

python ldap rbac airflow

5
推荐指数
1
解决办法
1771
查看次数

如何在单个 Spark 作业中调用多个 writeStream 操作?

我正在尝试编写一个 Spark 结构化流作业,该作业从 Kafka 主题读取并通过操作写入单独的路径(在执行一些转换之后)writeStream。但是,当我运行以下代码时,仅writeStream执行第一个代码,而第二个代码被忽略。

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()

write_one = df.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
  .start() \
  .awaitTermination()

// transform df to df2

write_two = df2.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_two(x,y)) \
  .start() \
  .awaitTermination()
Run Code Online (Sandbox Code Playgroud)

我最初认为我的问题与这篇文章有关,但是,将我的代码更改为以下内容后:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()

write_one = df.writeStream \
  .foreachBatch(lambda x, y: transform_and_write_to_zone_one(x,y)) \
  .start() …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark spark-structured-streaming

2
推荐指数
1
解决办法
1645
查看次数