小编blu*_*blu的帖子

PySpark; DecimalType 乘法精度损失

使用 PySpark 进行乘法运算时,PySpark 似乎正在失去精度。

例如,当精度为 38,10 的两位小数相乘时,它返回 38,6 并四舍五入到三位小数,这是不正确的结果。

from decimal import Decimal
from pyspark.sql.types import DecimalType, StructType, StructField

schema = StructType([StructField("amount", DecimalType(38,10)), StructField("fx", DecimalType(38,10))])
df = spark.createDataFrame([(Decimal(233.00), Decimal(1.1403218880))], schema=schema)

df.printSchema()
df = df.withColumn("amount_usd", df.amount * df.fx)
df.printSchema()
df.show()
Run Code Online (Sandbox Code Playgroud)

结果

>>> df.printSchema()
root
 |-- amount: decimal(38,10) (nullable = true)
 |-- fx: decimal(38,10) (nullable = true)
 |-- amount_usd: decimal(38,6) (nullable = true)

>>> df = df.withColumn("amount_usd", df.amount * df.fx)
>>> df.printSchema()
root
 |-- amount: decimal(38,10) (nullable = true)
 |-- fx: decimal(38,10) …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

9
推荐指数
1
解决办法
7564
查看次数

错误PythonUDFRunner:Python工作程序意外退出(崩溃)

我正在运行一个调用 udfs 的 PySpark 作业。我知道 udf 的内存很差,并且由于序列化/反序列化而速度缓慢,但由于情况,我们必须使用。

数据集大小为 60GB,分区良好,集群内存为 240GB。该作业运行良好,读取它并执行 Spark 函数,但当它开始调用 python udfs 时总是失败并出现以下错误。起初我以为这是内存问题,所以我增加了节点和执行器的内存,但问题仍然存在。此错误是什么意思以及如何解决?

执行者日志

19/11/03 05:05:52 ERROR PythonUDFRunner: Python worker exited unexpectedly (crashed)
java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:210)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:71)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage9.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

6
推荐指数
1
解决办法
1万
查看次数

标签 统计

apache-spark ×2

pyspark ×2

python ×2