我正在foreachBatch通过以下代码使用spark-structed-streaming从redis读取批记录(尝试通过设置batchSize stream.read.batch.size)
val data = spark.readStream.format("redis")
.option("stream.read.batch.size").load()
val query = data.writeStream.foreachBatch {
(batchDF: DataFrame, batchId: Long) => ...
// we count size of batchDF here, we want to limit its size
// some operation
}
Run Code Online (Sandbox Code Playgroud)
目前我们设置stream.read.batch.size为 128,但似乎这不起作用。batchSize似乎是随机的,有时超过1000甚至10000。
但是我不想等待这么久(10000条记录),因为我有一些操作(在代码注释中// some operation)需要尽快完成,所以我想控制最大批量大小,以便当记录达到此限制时可以立即处理,怎么办?
我使用一些脏代码从这个问题的第二个答案在 Scala 中设置环境变量
我在 IDE(IDEA Intellij)中对此进行了测试,并OMP_NUM_THREADS在课程开始时进行了设置。
import org.scalatest.{FlatSpec, Matchers}
class MyTest extends FlatSpec with Matchers {
val m = Map("OMP_NUM_THREADS" -> "1")
EnvHacker.setEnv(m)
Run Code Online (Sandbox Code Playgroud)
设置后,我可以从 读取System.env,它有效。但是当我的程序运行时,它不使用这个。我尝试将其设置在静态块中,但仍然无法正常工作。
但是如果我在 IDE 运行配置中设置它(在 JVM 运行之前),它会按我的预期工作和运行。所以似乎它在我修改变量之前被读取。
或者换句话说,我有一段代码,在 Java/Scala 中最早的调用方式是什么。例如,在main 方法的第一行之前调用静态块。
我正在使用 tensorflow-mkl Java API,它会OMP_NUM_THREADS在某个时间读取系统环境变量,根据我的测试结果,此操作是在系统静态块之前。但是,我想在代码中进行控制,因为我不知道没有代码逻辑的预期配置。
我正在使用 spark 结构化流媒体,我想检查是否stop存在文件以退出我的程序。
我可以做这样的事情:
def main(args: Array[String]) = {
val query = SparkSession...load.writeStream.foreachBatch{
if (stop file exist) exit(0)
// do some processing here
}.start()
// add Execute Listener here to listen query
query.awaitTermination()
}
Run Code Online (Sandbox Code Playgroud)
但是,这只能在有新行附加到此查询表时触发。如果没有新行,该stop文件将没有任何影响。
实现这个触发器有什么更好的主意吗?
以上是问题,感谢下面接受的答案,这是我最终运行良好的代码。
object QueryManager {
def queryTerminator(query: StreamingQuery): Runnable = new Runnable {
override def run() = {if(stop condition) query.stop()}
}
def listenTermination(query: StreamingQuery) = {
Executors.newSingleThreadScheduledExecutor.scheduleWithFixedDelay(
queryTerminator(query), initialDelay=1, delay=1, SECONDS
)
}
}
// and in main method
def main(args: …Run Code Online (Sandbox Code Playgroud) 我正在使用 protobuf java,如下.proto
// service1.proto
option java_package = "package";
option java_outer_classname = "Proto1";
message M {
... // some definition
}
Run Code Online (Sandbox Code Playgroud)
和
// service2.proto
option java_package = "package";
option java_outer_classname = "Proto2";
message M {
... // some different definition
}
Run Code Online (Sandbox Code Playgroud)
编译时,会抛出service2.proto错误"M" is already defined in service1.proto
但从包和生成的代码来看,它们应该是package.Proto1.M和package.Proto2.M,这是否冲突?
我使用 VSCode 中集成的 Python unittest 进行测试。我有这样的目录
project_root/
src/
module/
__init__.py
a.py
test/
module/
__init__.py
test_a.py
Run Code Online (Sandbox Code Playgroud)
有test_a.py进口from module.a import SomeClass
我有参数
"-v",
"-s",
"./test",
"-p",
"test*.py"
Run Code Online (Sandbox Code Playgroud)
当运行测试发现时,它失败并引发ModuleNotFound或Can not import模块或类a.py
PS:我已经设置了PYTHONPATH,settings.json代码分析和程序启动都工作正常。但似乎对插件没有帮助unittest。一个问题是,两个src,test都有命名的模块module,我不确定这是否重要。
如何让它发挥作用?
更新:似乎这是一个名称冲突问题,unittest_discovery 无法处理此问题。在我更改 args 后,-s ./test/module它可以导入 src 模块。