小编fig*_*uts的帖子

如何在 google dataproc 上运行 Spark 3.2.0?

目前,google dataproc 没有 Spark 3.2.0 作为图像。最新可用的是 3.1.2。我想使用 Spark 随 3.2.0 发布的 pandas on pyspark 功能。

我正在执行以下步骤来使用 Spark 3.2.0

  1. 在本地创建了一个环境“pyspark”,其中包含 pyspark 3.2.0
  2. 导出环境 yamlconda env export > environment.yaml
  3. 使用此environment.yaml创建了一个dataproc集群。集群已正确创建,并且环境在 master 和所有工作线程上可用
  4. 然后我更改环境变量。export SPARK_HOME=/opt/conda/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark(指向 pyspark 3.2.0), export SPARK_CONF_DIR=/usr/lib/spark/conf(使用 dataproc 的配置文件)和,export PYSPARK_PYTHON=/opt/conda/miniconda3/envs/pyspark/bin/python(使环境包可用)

现在,如果我尝试运行 pyspark shell,我会得到:

21/12/07 01:25:16 ERROR org.apache.spark.scheduler.AsyncEventQueue: Listener AppStatusListener threw an exception
java.lang.NumberFormatException: For input string: "null"
        at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.lang.Integer.parseInt(Integer.java:580)
        at java.lang.Integer.parseInt(Integer.java:615)
        at scala.collection.immutable.StringLike.toInt(StringLike.scala:304)
        at scala.collection.immutable.StringLike.toInt$(StringLike.scala:304)
        at scala.collection.immutable.StringOps.toInt(StringOps.scala:33)
        at org.apache.spark.util.Utils$.parseHostPort(Utils.scala:1126)
        at org.apache.spark.status.ProcessSummaryWrapper.<init>(storeTypes.scala:527)
        at org.apache.spark.status.LiveMiscellaneousProcess.doUpdate(LiveEntity.scala:924) …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark google-cloud-dataproc

8
推荐指数
1
解决办法
1813
查看次数

在spark中spark.executor.pyspark.memory配置选项的含义是什么?

文档解释如下:

除非另有指定,否则在每个执行器中分配给 PySpark 的内存量(以 MiB 为单位)。如果设置,执行程序的 PySpark 内存将限制为此数量。如果未设置,Spark 将不会限制 Python 的内存使用,并且由应用程序来避免超出与其他非 JVM 进程共享的开销内存空间。当 PySpark 在 YARN 或 Kubernetes 中运行时,此内存将添加到执行器资源请求中。注意:该功能依赖于Python的resource模块;因此,行为和限制是遗传的。例如,Windows不支持资源限制,而macOS上实际资源不受限制。

还有其他两个配置选项。一个控制分配给每个执行器的内存量spark.executor.memory,另一个控制执行器中的每个 python 进程在开始将内存溢出到磁盘之前可以使用的内存量。spark.python.worker.memory

有人可以解释一下配置的行为和使用是什么以及它与andspark.executor.pyspark.memory有什么不同吗?spark.executor.memoryspark.python.worker.memory

apache-spark pyspark

7
推荐指数
1
解决办法
3252
查看次数

如果两个阶段使用相同的 DataFrame,spark 是否会读取同一文件两次?

以下代码读取相同的 csv 两次,即使只调用一个操作

端到端可运行示例:

import pandas as pd
import numpy as np

df1=  pd.DataFrame(np.arange(1_000).reshape(-1,1))
df1.index = np.random.choice(range(10),size=1000)
df1.to_csv("./df1.csv",index_label = "index")
############################################################################

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, StructField

spark = SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold","-1").\
config("spark.sql.adaptive.enabled","false").getOrCreate()

schema = StructType([StructField('index', StringType(), True),
                     StructField('0', StringType(), True)])

df1 = spark.read.csv("./df1.csv", header=True, schema = schema)

df2 = df1.groupby("index").agg(F.mean("0"))
df3 = df1.join(df2,on='index')

df3.explain()
df3.count()
Run Code Online (Sandbox Code Playgroud)

Web UI 中的 sql 选项卡显示以下内容:

在此输入图像描述

如您所见,df1 文件被读取了两次。这是预期的行为吗?为什么会发生这种情况?我只有一项操作,因此管道的同一部分不应运行多次。

我已经在这里阅读了答案。问题几乎是相同的,但是在该问题中使用了 RDD,并且我在 pyspark API 中使用了数据帧。在这个问题中,建议如果要避免多个文件扫描,那么 DataFrames API 会有所帮助,这就是 …

apache-spark apache-spark-sql pyspark

7
推荐指数
1
解决办法
710
查看次数

了解 Spark 创建的分区数量

读取 .csv 时 pyspark-sql 将创建多少个分区?

我对此的理解是 number of partitions = math.ceil(file_size/spark.conf.get('spark.sql.files.maxPartitionBytes'))

在我的机器上:

spark.conf.get('spark.sql.files.maxPartitionBytes')
output: 
'134217728b' #128MBs
Run Code Online (Sandbox Code Playgroud)

但是,我没有观察到这种行为。我创建了一个在磁盘上占用 96 MB 的文件。我在本地模式下运行 Spark。我有一台 8 核笔记本电脑。我认为它应该读入 1 个分区。但是,该文件在 8 个分区中被读取。以下是我使用过的代码库:

import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#creating a small DataFrame. This will occupy 96 MBs on disk
pd.DataFrame({'id':np.arange(10000000),'b':np.random.choice(['a','b','c','d'],size=(10000000,),p=[0.25,0.25,0.25,0.25])}).to_csv('df_s.csv',index=None)
sd=spark.read.csv('df_s.csv',schema="id int, b string")
sd.rdd.getNumPartitions()
output: 8
Run Code Online (Sandbox Code Playgroud)

您能帮我理解为什么无论文件大小如何我都会看到 8 个分区吗?

apache-spark apache-spark-sql pyspark

6
推荐指数
1
解决办法
1884
查看次数

tf.gfile.GFile() 中使用的编码是什么?

tf.gfile.GFile() 不接受“编码”参数。从这里我了解到 gfile 仅返回字节流,但现在似乎已更改为:

with tf.gfile.GFile("./data/squad/test1.txt", mode = "rb") as file1:
    print(file1.read(n = 2), type(file1.read(n = 2)))
with tf.gfile.GFile("./data/squad/test1.txt", mode = "r") as file1:
    print(file1.read(n = 2), type(file1.read(n = 2)))
Run Code Online (Sandbox Code Playgroud)

输出:

b'as' <class 'bytes'>
as <class 'str'>
Run Code Online (Sandbox Code Playgroud)

那么它在读取这些字符串时使用的编码到底是什么?是 utf8 还是像 python 中的开放协议那样依赖于平台?

python encoding python-3.x tensorflow

5
推荐指数
1
解决办法
1333
查看次数

random.setstate() 和 random.seed() 之间差异或相似之处的确切性质是什么

我对使用种子生成随机数有一个非常高的了解。因此,在生成随机数之前设置特定的种子将导致每次调用时生成相同的数字。

我假设通过以下方式恢复状态

import random
test123 = random.getstate()
random.setstate(test123)
Run Code Online (Sandbox Code Playgroud)

也会通过一些类似的过程导致随机数的复制

我正在寻求对它们的相似点和差异的理解。例如:是否将种子设置为某个值并通过 random.setstate(random.getstate()) 恢复生成器的内部状态在每种情况下复制随机数的某些方法?

我能找到的关于它们的记录不多

python random

4
推荐指数
1
解决办法
285
查看次数

为什么keras张量流中的layers.Embedding需要input_length?

Layers.embedding 有一个参数(input_length),文档描述为:

input_length :输入序列的长度(当它是常数时)。如果要连接 Flatten 然后是 Dense 层上游(没有它,则无法计算密集输出的形状),则需要此参数。

为什么密集输出的形状无法计算。对我来说,Flatten似乎很容易做到。它tf.rehshape(input,(-1,1))后面只是一个密集层,具有我们选择的任何输出形状。

你能帮我指出我对整个逻辑的理解上的失误吗?

reshape keras tensorflow tf.keras tensorflow2.0

4
推荐指数
1
解决办法
3726
查看次数

什么是火花溢出(磁盘和内存)?

根据文档:

洗牌溢出(内存)是内存中洗牌数据的反序列化形式的大小。

Shuffle 溢出(磁盘)是磁盘上数据的序列化形式的大小。

我对shuffle的理解是这样的:

  1. 每个执行器都会获取其上的所有分区,并将它们哈希分区为 200 个新分区(这 200 个可以更改)。每个新分区都与一个稍后将转到的执行程序相关联。例如:For each existing partition: new_partition = hash(partitioning_id)%200; target_executor = new_partition%num_executors其中%是模运算符,num_executors 是集群上执行程序的数量。
  2. 这些新分区被转储到其初始执行器的每个节点的磁盘上。每个新分区稍后都会被 target_executor 读取
  3. 目标执行器选取各自的新分区(在生成的 200 个分区中)

我对shuffle操作的理解是否正确?

您能帮我将 shuffle 溢出(内存)和 shuffle 溢出(磁盘)的定义放在 shuffle 机制的上下文中(如果正确的话,上面描述的)?例如(也许):“shuffle溢出(磁盘)是上面提到的第2点中发生的部分,其中200个分区被转储到各自节点的磁盘上”(我不知道这样说是否正确;只是举个例子)

apache-spark apache-spark-sql pyspark spark-ui spark-shuffle

4
推荐指数
1
解决办法
9834
查看次数

了解“运行时错误:叶变量已移至图形内部”背后的原因

我正在尝试了解 pytorch 以及 autograd 在其中的工作原理。我尝试通过用其他张量的值填充它来创建一个张量,然后检查梯度。RuntimeError: leaf variable has been moved into the graph interior但是,如果我不设置requires_gradequal to ,我就会遇到问题False

代码:

x = torch.ones(3,5,requires_grad=True)

y = x+2

z = y*y*3

out1 = z.mean()
out2 = 2*z.mean()

outi = torch.empty(2,requires_grad=True)

outi[0] = out1
outi[1] = out2

outi.backward(torch.tensor([0.,1.]))
Run Code Online (Sandbox Code Playgroud)

输出:

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-22-1000fc52a64c> in <module>
     13 outi[1] = out2
     14 
---> 15 outi.backward(torch.tensor([0.,1.]))

~/anaconda3/envs/pytorch/lib/python3.8/site-packages/torch/tensor.py in backward(self, gradient, retain_graph, create_graph)
    183                 products. Defaults to ``False``.
    184         """ …
Run Code Online (Sandbox Code Playgroud)

pytorch

3
推荐指数
1
解决办法
2621
查看次数

如何在 pyspark groupby 上将 UDF 与 pandas 一起使用?

我正在努力在 pyspark 上的 pandas 上使用 pandas UDF。您能帮我理解如何实现这一目标吗?以下是我的尝试:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark import pandas as ps
spark = SparkSession.builder.getOrCreate()
df = ps.DataFrame({'A': 'a a b'.split(),
                   'B': [1, 2, 3],
                   'C': [4, 6, 5]}, columns=['A', 'B', 'C'])
@pandas_udf('float')
def agg_a(x):
    return (x**2).mean()
@pandas_udf('float')
def agg_b(x):
    return x.mean()
spark.udf.register('agg_a_',agg_a)
spark.udf.register('agg_b_',agg_b)
df_means = df.groupby('A')
dfout=df_means.agg({'B':'agg_a_','C':'agg_b_'})
Run Code Online (Sandbox Code Playgroud)

这导致了我很难理解的异常:

AnalysisException: expression 'B' is neither present in the group by, nor is it an aggregate function. Add to group by …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark spark-koalas

1
推荐指数
1
解决办法
3694
查看次数