小编Hao*_*ang的帖子

concurrent.futures.ThreadPoolExecutor.map():超时无效

import concurrent.futures
import time 

def process_one(i):
    try:                                                                             
        print("dealing with {}".format(i))                                           
        time.sleep(50)
        print("{} Done.".format(i))                                                  
    except Exception as e:                                                           
        print(e)

def process_many():
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: 
        executor.map(process_one,
                range(100),                                                          
                timeout=3)                                                           


if __name__ == '__main__':                                                           
    MAX_WORKERS = 10
    try:
        process_many()
    except Exception as e:                                                           
        print(e)      
Run Code Online (Sandbox Code Playgroud)

文件说:

返回的迭代器引发一个concurrent.futures.TimeoutErrorif __next__()被调用,结果timeout在原始调用的秒数之后不可用Executor.map()

但是这里的脚本没有引发任何异常并且一直在等待.有什么建议?

python concurrency concurrent.futures

15
推荐指数
1
解决办法
1306
查看次数

测试浮点数相等

我在Windows 7(32位)下的MinGW中使用gfortran来编译Fortran代码.这是文件中包含的最小代码testequal.f:

      program testequal
      real*8 a1, a2

      a1 = 0.3d0
      a2 = 0.7d0

      write(*,*) 1.d0
      write(*,*) a1+a2
      write(*,*) a1+a2.eq.1.0
      write(*,*) a1+a2.eq.1.d0
      end
Run Code Online (Sandbox Code Playgroud)

编译

gfortran testequal.f -std=legacy
Run Code Online (Sandbox Code Playgroud)

输出是:

1.0000000000000000
1.0000000000000000
F
F
Run Code Online (Sandbox Code Playgroud)

但我希望这两个布尔都是T(真的).这里有什么问题?

fortran mingw

8
推荐指数
1
解决办法
3914
查看次数

在 Spark Structured Streaming 中使用 collect_list 时出错

我有两个版本的 Spark 代码。第一个使用带有 Kafka 源的结​​构化流:

dfStream.printSchema()
//root
//|-- dt: string (nullable = true)
//|-- ip: string (nullable = true)
//|-- device: string (nullable = true)

val dfWindowed = dfStream                                                
    .groupBy($"ip")
    .agg(concat_ws(",", collect_list($"device")).alias("devices"))
    .writeStream                                             
    .outputMode("complete")                                              
    .format("memory")                                                        
    .start()           
Run Code Online (Sandbox Code Playgroud)

第二个从文件中读取。但是数据真的和上面一样:

logDF.printSchema() 
//root
//|-- dt: string (nullable = true)
//|-- ip: string (nullable = true)
//|-- device: string (nullable = true)

logDF.repartition(32)                                                                                                   
    .groupBy("ip")
    .agg(concat_ws(",", collect_list($"device")).alias("devices")) 
Run Code Online (Sandbox Code Playgroud)

问题是,虽然第二个运行良好,但第一个一直给我以下错误:

at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:284)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 1.0 …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-structured-streaming

5
推荐指数
1
解决办法
1179
查看次数