小编Ale*_*rov的帖子

使用模拟获取应用的函数输入数据框

我有以下功能

def main():
    (
        pd.DataFrame({'a': [1, 2, float('NaN')], 'b': [1.0, 2, 3]})
        .dropna(subset=['a'])
        .assign(
            b=lambda x: x['b'] * 2
        )
        .apply(do_something_with_each_row, axis='columns')
    )

def do_something_with_each_row(one_row):
    # do_something_with_row
    print(one_row)
Run Code Online (Sandbox Code Playgroud)

在我的测试中,我想查看在所有链接操作之后构建的数据框,并在调用do_something_with_each_row. 最后一个函数不返回数据帧(它只是迭代所有行,类似于iterrow)。

我试图apply像这样模拟这个函数:

# need pytest-mock and pytest
import pandas as pd


def test_not_working(mocker):
    mocked_apply = mocker.patch.object(pd.Dataframe, 'apply')
    main()
Run Code Online (Sandbox Code Playgroud)

但在这种情况下,我无法访问输入到的数据帧apply以测试其内容。

我还试图嘲笑do_something_with_each_row

# need pytest-mock and pytest
import pandas as pd


def test_not_working_again(mocker):
    mocked_to_something = mocker.patch('path.to.file.do_something_with_each_row')
    main()
Run Code Online (Sandbox Code Playgroud)

但这次我有所有带有行参数的调用,但它们都有None值。

我如何获取apply调用函数的数据帧并检查它是否确实与以下内容相同: …

pytest python-3.x pandas pytest-mock

8
推荐指数
1
解决办法
325
查看次数

在pyspark中高效地以分布式方式生成大型DataFrame(无需pyspark.sql.Row)

问题归结为以下几点:我想使用现有的并行输入集合和一个函数在 pyspark 中生成一个 DataFrame,该函数给定一个输入可以生成一批相对较大的行。在下面的示例中,我想使用 1000 个执行器生成 10^12 行数据帧:

def generate_data(one_integer):
  import numpy as np
  from pyspark.sql import Row
  M = 10000000 # number of values to generate per seed, e.g. 10M
  np.random.seed(one_integer)
  np_array = np.random.random_sample(M) # generates an array of M random values
  row_type = Row("seed", "n", "x")
  return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]

N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd) …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark apache-arrow pyarrow

5
推荐指数
1
解决办法
5417
查看次数

如何以编程方式获取有关 PySpark 中执行程序的信息

当一个新的 pyspark 应用程序启动时,它会创建一个漂亮的 Web UI,其中包含作业、阶段、执行程序等选项卡。如果我转到执行程序选项卡,我可以看到执行程序的完整列表以及有关每个执行程序的一些信息 - 例如核心数,使用的存储内存与总数等。

我的问题是我是否可以以某种方式从应用程序本身以编程方式访问相同的信息(或至少其中的一部分),例如看起来像spark.sparkContext.<function_name_to_get_info_about_executors>()

我找到了一些以类似于 webUI 的方式执行 url 请求的解决方法,但我认为我可能缺少一个更简单的解决方案。

我正在使用 Spark 3.0.0

apache-spark pyspark

5
推荐指数
1
解决办法
334
查看次数