我在 Jupyter Notebook 上使用 Python 工作,收到以下警告:
WARNING:root:'PYARROW_IGNORE_TIMEZONE' environment variable was not set.
我试图将其删除,但我做不到。我尝试将 PYARROW_IGNORE_TIMEZONE 设置为 1,正如我在一些论坛上看到的那样,但它不起作用。
这是我的代码:
PYARROW_IGNORE_TIMEZONE=1
import databricks.koalas as ks
import pyspark
from pyspark.sql import SparkSession, functions
from pyspark.sql.types import *
import datetime
Run Code Online (Sandbox Code Playgroud)
它出什么问题了 ?
我正在使用火花和考拉。
当我在 python 脚本中运行以下代码并直接使用 python 运行它时,出现以下错误。当我启动 pyspark 会话,然后导入 koalas、创建数据帧并调用 head() 时,它运行良好并给出了预期的输出。
是否需要设置 SparkSession 才能使考拉工作的特定方式?
from pyspark.sql import SparkSession
import pandas as pd
import databricks.koalas as ks
spark = SparkSession.builder \
.master("local[*]") \
.appName("Pycedro Spark Application") \
.getOrCreate()
kdf = ks.DataFrame({"a" : [4 ,5, 6],
"b" : [7, 8, 9],
"c" : [10, 11, 12]})
print(kdf.head())
Run Code Online (Sandbox Code Playgroud)
在python脚本中运行时出错:
File "/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 586, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
command = serializer._read_with_length(file)
File "/usr/local/Cellar/apache-spark/3.1.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line …Run Code Online (Sandbox Code Playgroud) 当我尝试在本地使用 koalas.DataFrame.to_delta() 将 Koalas DataFrame 直接写入增量表时,出现以下 Pyspark 异常:
java.lang.ClassNotFoundException: Failed to find data source: delta
编辑:忽略下面,直接调用 Pyspark 也会出现问题。
如果我将 Koalas DataFrame 转换为 Spark DataFrame 然后写入 delta,我似乎没有问题。是否存在 Koalas 不知道但 Pyspark 知道的底层库?看起来很奇怪,因为我认为在幕后使用相同的 Pyspark 模块...我应该注意到 Koalas to_delta() 方法似乎确实在 Databricks 上工作,这表明我的本地设置缺少与 Delta 相关的库。
失败的考拉代码:
kdf = ks.DataFrame({'eid': [1, 2, 3],
'contigName': ['chr1', 'chr2', 'chr3'],
'phen1': [0.123, 0.456, 0.789],
'phen2': [0.987, 0.654, 0.321]})
kdf.to_delta(path='tmp/test.delta', mode='overwrite')
Run Code Online (Sandbox Code Playgroud)
编辑:不让考拉 Spark 到 Delta 代码:
kdf = ks.DataFrame({'eid': [1, 2, 3],
'contigName': ['chr1', 'chr2', 'chr3'],
'phen1': [0.123, 0.456, 0.789], …Run Code Online (Sandbox Code Playgroud) 在比较 pyspark 3.2.1 中的两个 API 时,我得到了奇怪的性能结果,这两个 API 提供了在 Spark Dataframe 的分组结果上运行 pandas UDF 的能力:
首先,我在本地 Spark 模式(Spark 3.2.1)下运行以下输入生成器代码:
import pyspark.sql.types as types
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
import pyspark.pandas as ps
spark = SparkSession.builder \
.config("spark.sql.execution.arrow.pyspark.enabled", True) \
.getOrCreate()
ps.set_option("compute.default_index_type", "distributed")
spark.range(1000000).withColumn('group', (col('id') / 10).cast('int')) \
.write.parquet('/tmp/sample_input', mode='overwrite')
Run Code Online (Sandbox Code Playgroud)
然后我测试applyInPandas:
def getsum(pdf):
pdf['sum_in_group'] = pdf['id'].sum()
return pdf
df = spark.read.parquet(f'/tmp/sample_input')
output_schema = types.StructType(
df.schema.fields + [types.StructField('sum_in_group', types.FloatType())]
)
df.groupBy('group').applyInPandas(getsum, …Run Code Online (Sandbox Code Playgroud) 我正在努力在 pyspark 上的 pandas 上使用 pandas UDF。您能帮我理解如何实现这一目标吗?以下是我的尝试:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark import pandas as ps
spark = SparkSession.builder.getOrCreate()
df = ps.DataFrame({'A': 'a a b'.split(),
'B': [1, 2, 3],
'C': [4, 6, 5]}, columns=['A', 'B', 'C'])
@pandas_udf('float')
def agg_a(x):
return (x**2).mean()
@pandas_udf('float')
def agg_b(x):
return x.mean()
spark.udf.register('agg_a_',agg_a)
spark.udf.register('agg_b_',agg_b)
df_means = df.groupby('A')
dfout=df_means.agg({'B':'agg_a_','C':'agg_b_'})
Run Code Online (Sandbox Code Playgroud)
这导致了我很难理解的异常:
AnalysisException: expression 'B' is neither present in the group by, nor is it an aggregate function. Add to group by …Run Code Online (Sandbox Code Playgroud)