标签: spark-koalas

这是什么意思 ?警告:root:未设置“PYARROW_IGNORE_TIMEZONE”环境变量

我在 Jupyter Notebook 上使用 Python 工作,收到以下警告:

WARNING:root:'PYARROW_IGNORE_TIMEZONE' environment variable was not set.

我试图将其删除,但我做不到。我尝试将 PYARROW_IGNORE_TIMEZONE 设置为 1,正如我在一些论坛上看到的那样,但它不起作用。

这是我的代码:

PYARROW_IGNORE_TIMEZONE=1
import databricks.koalas as ks
import pyspark
from pyspark.sql import SparkSession, functions
from pyspark.sql.types import *
import datetime
Run Code Online (Sandbox Code Playgroud)

它出什么问题了 ?

我正在使用火花和考拉。

python apache-spark pyspark jupyter-notebook spark-koalas

10
推荐指数
1
解决办法
6993
查看次数

进行简单的 head() 调用时,考拉会在 <module 'pyspark.cloudpickle' 上抛出 ' Can't get attribute _fill_function'

当我在 python 脚本中运行以下代码并直接使用 python 运行它时,出现以下错误。当我启动 pyspark 会话,然后导入 koalas、创建数据帧并调用 head() 时,它运行良好并给出了预期的输出。

是否需要设置 SparkSession 才能使考拉工作的特定方式?

from pyspark.sql import SparkSession
import pandas as pd
import databricks.koalas as ks


spark = SparkSession.builder \
        .master("local[*]") \
        .appName("Pycedro Spark Application") \
        .getOrCreate()


kdf = ks.DataFrame({"a" : [4 ,5, 6],
                    "b" : [7, 8, 9],
                    "c" : [10, 11, 12]})

print(kdf.head())
Run Code Online (Sandbox Code Playgroud)

在python脚本中运行时出错:

    File "/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 586, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
  File "/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line …
Run Code Online (Sandbox Code Playgroud)

spark-koalas

6
推荐指数
1
解决办法
3711
查看次数

Koalas/pyspark 找不到数据源:delta

当我尝试在本地使用 koalas.DataFrame.to_delta() 将 Koalas DataFrame 直接写入增量表时,出现以下 Pyspark 异常:
java.lang.ClassNotFoundException: Failed to find data source: delta
编辑:忽略下面,直接调用 Pyspark 也会出现问题。

如果我将 Koalas DataFrame 转换为 Spark DataFrame 然后写入 delta,我似乎没有问题。是否存在 Koalas 不知道但 Pyspark 知道的底层库?看起来很奇怪,因为我认为在幕后使用相同的 Pyspark 模块...我应该注意到 Koalas to_delta() 方法似乎确实在 Databricks 上工作,这表明我的本地设置缺少与 Delta 相关的库。

失败的考拉代码:

kdf = ks.DataFrame({'eid': [1, 2, 3],
                        'contigName': ['chr1', 'chr2', 'chr3'],
                        'phen1': [0.123, 0.456, 0.789],
                        'phen2': [0.987, 0.654, 0.321]})
kdf.to_delta(path='tmp/test.delta', mode='overwrite')
Run Code Online (Sandbox Code Playgroud)

编辑:不让考拉 Spark 到 Delta 代码:

kdf = ks.DataFrame({'eid': [1, 2, 3],
                        'contigName': ['chr1', 'chr2', 'chr3'],
                        'phen1': [0.123, 0.456, 0.789], …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark databricks delta-lake spark-koalas

6
推荐指数
1
解决办法
1万
查看次数

为什么 Pandas-API-on-Spark 在组上的应用比 pyspark API 慢得多?

在比较 pyspark 3.2.1 中的两个 API 时,我得到了奇怪的性能结果,这两个 API 提供了在 Spark Dataframe 的分组结果上运行 pandas UDF 的能力:

首先,我在本地 Spark 模式(Spark 3.2.1)下运行以下输入生成器代码:

import pyspark.sql.types as types
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
import pyspark.pandas as ps

spark = SparkSession.builder \
    .config("spark.sql.execution.arrow.pyspark.enabled", True) \
    .getOrCreate()

ps.set_option("compute.default_index_type", "distributed")

spark.range(1000000).withColumn('group', (col('id') / 10).cast('int')) \
    .write.parquet('/tmp/sample_input', mode='overwrite')
Run Code Online (Sandbox Code Playgroud)

然后我测试applyInPandas

def getsum(pdf):
    pdf['sum_in_group'] = pdf['id'].sum()
    return pdf

df = spark.read.parquet(f'/tmp/sample_input')
output_schema = types.StructType(
    df.schema.fields + [types.StructField('sum_in_group', types.FloatType())]
)
df.groupBy('group').applyInPandas(getsum, …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark spark-koalas

6
推荐指数
0
解决办法
2155
查看次数

如何在 pyspark groupby 上将 UDF 与 pandas 一起使用?

我正在努力在 pyspark 上的 pandas 上使用 pandas UDF。您能帮我理解如何实现这一目标吗?以下是我的尝试:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark import pandas as ps
spark = SparkSession.builder.getOrCreate()
df = ps.DataFrame({'A': 'a a b'.split(),
                   'B': [1, 2, 3],
                   'C': [4, 6, 5]}, columns=['A', 'B', 'C'])
@pandas_udf('float')
def agg_a(x):
    return (x**2).mean()
@pandas_udf('float')
def agg_b(x):
    return x.mean()
spark.udf.register('agg_a_',agg_a)
spark.udf.register('agg_b_',agg_b)
df_means = df.groupby('A')
dfout=df_means.agg({'B':'agg_a_','C':'agg_b_'})
Run Code Online (Sandbox Code Playgroud)

这导致了我很难理解的异常:

AnalysisException: expression 'B' is neither present in the group by, nor is it an aggregate function. Add to group by …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark spark-koalas

1
推荐指数
1
解决办法
3694
查看次数