小编Lit*_*chy的帖子

如何设置一个微批次中的最大行数?

我正在foreachBatch通过以下代码使用spark-structed-streaming从redis读取批记录(尝试通过设置batchSize stream.read.batch.size

val data = spark.readStream.format("redis")
  .option("stream.read.batch.size").load()

val query = data.writeStream.foreachBatch { 
  (batchDF: DataFrame, batchId: Long) => ...
  // we count size of batchDF here, we want to limit its size
  // some operation
}
Run Code Online (Sandbox Code Playgroud)

目前我们设置stream.read.batch.size为 128,但似乎这不起作用。batchSize似乎是随机的,有时超过1000甚至10000。

但是我不想等待这么久(10000条记录),因为我有一些操作(在代码注释中// some operation)需要尽快完成,所以我想控制最大批量大小,以便当记录达到此限制时可以立即处理,怎么办?

redis apache-spark spark-structured-streaming

6
推荐指数
1
解决办法
8225
查看次数

调用 Java/Scala 方法的最早方法是什么?

我使用一些脏代码从这个问题的第二个答案在 Scala 中设置环境变量

我在 IDE(IDEA Intellij)中对此进行了测试,并OMP_NUM_THREADS在课程开始时进行了设置。

import org.scalatest.{FlatSpec, Matchers}
class MyTest extends FlatSpec with Matchers {
  val m = Map("OMP_NUM_THREADS" -> "1")
  EnvHacker.setEnv(m)
Run Code Online (Sandbox Code Playgroud)

设置后,我可以从 读取System.env,它有效。但是当我的程序运行时,它不使用这个。我尝试将其设置在静态块中,但仍然无法正常工作。

但是如果我在 IDE 运行配置中设置它(在 JVM 运行之前),它会按我的预期工作和运行。所以似乎它在我修改变量之前被读取。

或者换句话说,我有一段代码,在 Java/Scala 中最早的调用方式是什么。例如,在main 方法的第一行之前调用静态块。

更新了一些细节:

我正在使用 tensorflow-mkl Java API,它会OMP_NUM_THREADS在某个时间读取系统环境变量,根据我的测试结果,此操作是在系统静态块之前。但是,我想在代码中进行控制,因为我不知道没有代码逻辑的预期配置。

java scala environment-variables

5
推荐指数
1
解决办法
209
查看次数

如何使用外部触发器停止结构化流式查询?

我正在使用 spark 结构化流媒体,我想检查是否stop存在文件以退出我的程序。

我可以做这样的事情:

def main(args: Array[String]) = {
    val query = SparkSession...load.writeStream.foreachBatch{
      if (stop file exist) exit(0)
      // do some processing here
    }.start()
    // add Execute Listener here to listen query
    query.awaitTermination()
}
Run Code Online (Sandbox Code Playgroud)

但是,这只能在有新行附加到此查询表时触发。如果没有新行,该stop文件将没有任何影响。

实现这个触发器有什么更好的主意吗?


以上是问题,感谢下面接受的答案,这是我最终运行良好的代码。

object QueryManager {
  def queryTerminator(query: StreamingQuery): Runnable = new Runnable {
    override def run() = {if(stop condition) query.stop()}
  }
  def listenTermination(query: StreamingQuery) = {
    Executors.newSingleThreadScheduledExecutor.scheduleWithFixedDelay(
      queryTerminator(query), initialDelay=1, delay=1, SECONDS
    )
  }
}
// and in main method
def main(args: …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-structured-streaming

4
推荐指数
1
解决办法
684
查看次数

Protobuf:对不同的包使用相同的消息名称

我正在使用 protobuf java,如下.proto

// service1.proto
option java_package = "package";
option java_outer_classname = "Proto1";
message M {
   ... // some definition
}
Run Code Online (Sandbox Code Playgroud)

// service2.proto
option java_package = "package";
option java_outer_classname = "Proto2";
message M {
   ... // some different definition
}
Run Code Online (Sandbox Code Playgroud)

编译时,会抛出service2.proto错误"M" is already defined in service1.proto

但从包和生成的代码来看,它们应该是package.Proto1.Mpackage.Proto2.M,这是否冲突?

protocol-buffers protobuf-java

3
推荐指数
1
解决办法
2242
查看次数

Pythonunittest发现无法导入源代码

我使用 VSCode 中集成的 Python unittest 进行测试。我有这样的目录

project_root/
  src/
    module/
      __init__.py
      a.py
  test/
    module/
      __init__.py
      test_a.py
Run Code Online (Sandbox Code Playgroud)

test_a.py进口from module.a import SomeClass

我有参数

"-v",
"-s",
"./test",
"-p",
"test*.py"
Run Code Online (Sandbox Code Playgroud)

当运行测试发现时,它失败并引发ModuleNotFoundCan not import模块或类a.py

PS:我已经设置了PYTHONPATHsettings.json代码分析和程序启动都工作正常。但似乎对插件没有帮助unittest。一个问题是,两个src,test都有命名的模块module,我不确定这是否重要。

如何让它发挥作用?

更新:似乎这是一个名称冲突问题,unittest_discovery 无法处理此问题。在我更改 args 后,-s ./test/module它可以导入 src 模块。

python python-unittest visual-studio-code

2
推荐指数
1
解决办法
2685
查看次数