小编nst*_*ski的帖子

选项上的Scala贴图方法

我是scala的新手,请在下面的问题中帮助我.

我们可以在Option上调用map方法吗?(例如Option [Int] .map()?).

如果是,那么可以用例子来帮助我.

scala

8
推荐指数
2
解决办法
1万
查看次数

将附加参数传递给 pyspark 中的 foreachBatch

我在 pyspark 结构化流中使用 foreachBatch 使用 JDBC 将每个微批处理写入 SQL Server。我需要对多个表使用相同的过程,并且我想通过为表名添加一个额外的参数来重用相同的编写器函数,但我不确定如何传递表名参数。

这里的示例非常有用,但在 python 示例中,表名是硬编码的,看起来在 scala 示例中他们引用了一个全局变量(?),我想将表名传递给函数。

上面链接的python示例中给出的函数是:

def writeToSQLWarehose(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()
Run Code Online (Sandbox Code Playgroud)

我想使用这样的东西:

def writeToSQLWarehose(df, epochId, tableName):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", tableName) \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()
Run Code Online (Sandbox Code Playgroud)

但我不确定如何通过 foreachBatch 传递附加参数。

apache-spark pyspark databricks spark-structured-streaming

6
推荐指数
1
解决办法
2122
查看次数