使用 PySpark 进行乘法运算时,PySpark 似乎正在失去精度。
例如,当精度为 38,10 的两位小数相乘时,它返回 38,6 并四舍五入到三位小数,这是不正确的结果。
from decimal import Decimal
from pyspark.sql.types import DecimalType, StructType, StructField
schema = StructType([StructField("amount", DecimalType(38,10)), StructField("fx", DecimalType(38,10))])
df = spark.createDataFrame([(Decimal(233.00), Decimal(1.1403218880))], schema=schema)
df.printSchema()
df = df.withColumn("amount_usd", df.amount * df.fx)
df.printSchema()
df.show()
Run Code Online (Sandbox Code Playgroud)
结果
>>> df.printSchema()
root
|-- amount: decimal(38,10) (nullable = true)
|-- fx: decimal(38,10) (nullable = true)
|-- amount_usd: decimal(38,6) (nullable = true)
>>> df = df.withColumn("amount_usd", df.amount * df.fx)
>>> df.printSchema()
root
|-- amount: decimal(38,10) (nullable = true)
|-- fx: decimal(38,10) …Run Code Online (Sandbox Code Playgroud) 我正在运行一个调用 udfs 的 PySpark 作业。我知道 udf 的内存很差,并且由于序列化/反序列化而速度缓慢,但由于情况,我们必须使用。
数据集大小为 60GB,分区良好,集群内存为 240GB。该作业运行良好,读取它并执行 Spark 函数,但当它开始调用 python udfs 时总是失败并出现以下错误。起初我以为这是内存问题,所以我增加了节点和执行器的内存,但问题仍然存在。此错误是什么意思以及如何解决?
执行者日志
19/11/03 05:05:52 ERROR PythonUDFRunner: Python worker exited unexpectedly (crashed)
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:71)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at …Run Code Online (Sandbox Code Playgroud)