目前,google dataproc 没有 Spark 3.2.0 作为图像。最新可用的是 3.1.2。我想使用 Spark 随 3.2.0 发布的 pandas on pyspark 功能。
我正在执行以下步骤来使用 Spark 3.2.0
conda env export > environment.yamlexport SPARK_HOME=/opt/conda/miniconda3/envs/pyspark/lib/python3.9/site-packages/pyspark(指向 pyspark 3.2.0), export SPARK_CONF_DIR=/usr/lib/spark/conf(使用 dataproc 的配置文件)和,export PYSPARK_PYTHON=/opt/conda/miniconda3/envs/pyspark/bin/python(使环境包可用)现在,如果我尝试运行 pyspark shell,我会得到:
21/12/07 01:25:16 ERROR org.apache.spark.scheduler.AsyncEventQueue: Listener AppStatusListener threw an exception
java.lang.NumberFormatException: For input string: "null"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at scala.collection.immutable.StringLike.toInt(StringLike.scala:304)
at scala.collection.immutable.StringLike.toInt$(StringLike.scala:304)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:33)
at org.apache.spark.util.Utils$.parseHostPort(Utils.scala:1126)
at org.apache.spark.status.ProcessSummaryWrapper.<init>(storeTypes.scala:527)
at org.apache.spark.status.LiveMiscellaneousProcess.doUpdate(LiveEntity.scala:924) …Run Code Online (Sandbox Code Playgroud) 文档解释如下:
除非另有指定,否则在每个执行器中分配给 PySpark 的内存量(以 MiB 为单位)。如果设置,执行程序的 PySpark 内存将限制为此数量。如果未设置,Spark 将不会限制 Python 的内存使用,并且由应用程序来避免超出与其他非 JVM 进程共享的开销内存空间。当 PySpark 在 YARN 或 Kubernetes 中运行时,此内存将添加到执行器资源请求中。注意:该功能依赖于Python的
resource模块;因此,行为和限制是遗传的。例如,Windows不支持资源限制,而macOS上实际资源不受限制。
还有其他两个配置选项。一个控制分配给每个执行器的内存量spark.executor.memory,另一个控制执行器中的每个 python 进程在开始将内存溢出到磁盘之前可以使用的内存量。spark.python.worker.memory
有人可以解释一下配置的行为和使用是什么以及它与andspark.executor.pyspark.memory有什么不同吗?spark.executor.memoryspark.python.worker.memory
以下代码读取相同的 csv 两次,即使只调用一个操作
端到端可运行示例:
import pandas as pd
import numpy as np
df1= pd.DataFrame(np.arange(1_000).reshape(-1,1))
df1.index = np.random.choice(range(10),size=1000)
df1.to_csv("./df1.csv",index_label = "index")
############################################################################
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, StructField
spark = SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold","-1").\
config("spark.sql.adaptive.enabled","false").getOrCreate()
schema = StructType([StructField('index', StringType(), True),
StructField('0', StringType(), True)])
df1 = spark.read.csv("./df1.csv", header=True, schema = schema)
df2 = df1.groupby("index").agg(F.mean("0"))
df3 = df1.join(df2,on='index')
df3.explain()
df3.count()
Run Code Online (Sandbox Code Playgroud)
Web UI 中的 sql 选项卡显示以下内容:
如您所见,df1 文件被读取了两次。这是预期的行为吗?为什么会发生这种情况?我只有一项操作,因此管道的同一部分不应运行多次。
我已经在这里阅读了答案。问题几乎是相同的,但是在该问题中使用了 RDD,并且我在 pyspark API 中使用了数据帧。在这个问题中,建议如果要避免多个文件扫描,那么 DataFrames API 会有所帮助,这就是 …
读取 .csv 时 pyspark-sql 将创建多少个分区?
我对此的理解是
number of partitions = math.ceil(file_size/spark.conf.get('spark.sql.files.maxPartitionBytes'))
在我的机器上:
spark.conf.get('spark.sql.files.maxPartitionBytes')
output:
'134217728b' #128MBs
Run Code Online (Sandbox Code Playgroud)
但是,我没有观察到这种行为。我创建了一个在磁盘上占用 96 MB 的文件。我在本地模式下运行 Spark。我有一台 8 核笔记本电脑。我认为它应该读入 1 个分区。但是,该文件在 8 个分区中被读取。以下是我使用过的代码库:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#creating a small DataFrame. This will occupy 96 MBs on disk
pd.DataFrame({'id':np.arange(10000000),'b':np.random.choice(['a','b','c','d'],size=(10000000,),p=[0.25,0.25,0.25,0.25])}).to_csv('df_s.csv',index=None)
sd=spark.read.csv('df_s.csv',schema="id int, b string")
sd.rdd.getNumPartitions()
output: 8
Run Code Online (Sandbox Code Playgroud)
您能帮我理解为什么无论文件大小如何我都会看到 8 个分区吗?
tf.gfile.GFile() 不接受“编码”参数。从这里我了解到 gfile 仅返回字节流,但现在似乎已更改为:
with tf.gfile.GFile("./data/squad/test1.txt", mode = "rb") as file1:
print(file1.read(n = 2), type(file1.read(n = 2)))
with tf.gfile.GFile("./data/squad/test1.txt", mode = "r") as file1:
print(file1.read(n = 2), type(file1.read(n = 2)))
Run Code Online (Sandbox Code Playgroud)
输出:
b'as' <class 'bytes'>
as <class 'str'>
Run Code Online (Sandbox Code Playgroud)
那么它在读取这些字符串时使用的编码到底是什么?是 utf8 还是像 python 中的开放协议那样依赖于平台?
我对使用种子生成随机数有一个非常高的了解。因此,在生成随机数之前设置特定的种子将导致每次调用时生成相同的数字。
我假设通过以下方式恢复状态
import random
test123 = random.getstate()
random.setstate(test123)
Run Code Online (Sandbox Code Playgroud)
也会通过一些类似的过程导致随机数的复制
我正在寻求对它们的相似点和差异的理解。例如:是否将种子设置为某个值并通过 random.setstate(random.getstate()) 恢复生成器的内部状态在每种情况下复制随机数的某些方法?
我能找到的关于它们的记录不多
Layers.embedding 有一个参数(input_length),文档描述为:
input_length :输入序列的长度(当它是常数时)。如果要连接 Flatten 然后是 Dense 层上游(没有它,则无法计算密集输出的形状),则需要此参数。
为什么密集输出的形状无法计算。对我来说,Flatten似乎很容易做到。它tf.rehshape(input,(-1,1))后面只是一个密集层,具有我们选择的任何输出形状。
你能帮我指出我对整个逻辑的理解上的失误吗?
根据文档:
洗牌溢出(内存)是内存中洗牌数据的反序列化形式的大小。
Shuffle 溢出(磁盘)是磁盘上数据的序列化形式的大小。
我对shuffle的理解是这样的:
For each existing partition: new_partition = hash(partitioning_id)%200; target_executor = new_partition%num_executors其中%是模运算符,num_executors 是集群上执行程序的数量。我对shuffle操作的理解是否正确?
您能帮我将 shuffle 溢出(内存)和 shuffle 溢出(磁盘)的定义放在 shuffle 机制的上下文中(如果正确的话,上面描述的)?例如(也许):“shuffle溢出(磁盘)是上面提到的第2点中发生的部分,其中200个分区被转储到各自节点的磁盘上”(我不知道这样说是否正确;只是举个例子)
apache-spark apache-spark-sql pyspark spark-ui spark-shuffle
我正在尝试了解 pytorch 以及 autograd 在其中的工作原理。我尝试通过用其他张量的值填充它来创建一个张量,然后检查梯度。RuntimeError: leaf variable has been moved into the graph interior但是,如果我不设置requires_gradequal to ,我就会遇到问题False。
代码:
x = torch.ones(3,5,requires_grad=True)
y = x+2
z = y*y*3
out1 = z.mean()
out2 = 2*z.mean()
outi = torch.empty(2,requires_grad=True)
outi[0] = out1
outi[1] = out2
outi.backward(torch.tensor([0.,1.]))
Run Code Online (Sandbox Code Playgroud)
输出:
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-22-1000fc52a64c> in <module>
13 outi[1] = out2
14
---> 15 outi.backward(torch.tensor([0.,1.]))
~/anaconda3/envs/pytorch/lib/python3.8/site-packages/torch/tensor.py in backward(self, gradient, retain_graph, create_graph)
183 products. Defaults to ``False``.
184 """ …Run Code Online (Sandbox Code Playgroud) 我正在努力在 pyspark 上的 pandas 上使用 pandas UDF。您能帮我理解如何实现这一目标吗?以下是我的尝试:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark import pandas as ps
spark = SparkSession.builder.getOrCreate()
df = ps.DataFrame({'A': 'a a b'.split(),
'B': [1, 2, 3],
'C': [4, 6, 5]}, columns=['A', 'B', 'C'])
@pandas_udf('float')
def agg_a(x):
return (x**2).mean()
@pandas_udf('float')
def agg_b(x):
return x.mean()
spark.udf.register('agg_a_',agg_a)
spark.udf.register('agg_b_',agg_b)
df_means = df.groupby('A')
dfout=df_means.agg({'B':'agg_a_','C':'agg_b_'})
Run Code Online (Sandbox Code Playgroud)
这导致了我很难理解的异常:
AnalysisException: expression 'B' is neither present in the group by, nor is it an aggregate function. Add to group by …Run Code Online (Sandbox Code Playgroud) apache-spark ×6
pyspark ×6
python ×2
tensorflow ×2
encoding ×1
keras ×1
python-3.x ×1
pytorch ×1
random ×1
reshape ×1
spark-koalas ×1
spark-ui ×1
tf.keras ×1