小编Ass*_*son的帖子

从json架构表示创建spark数据帧架构

有没有办法将数据帧模式序列化为json并在以后反序列化?

用例很简单:我有一个json配置文件,其中包含我需要读取的数据帧的模式.我希望能够从现有模式(在数据框中)创建默认配置,并且我希望能够通过从json字符串中读取它来生成稍后要使用的相关模式.

apache-spark apache-spark-sql

19
推荐指数
2
解决办法
2万
查看次数

实现java UDF并从pyspark调用它

我需要创建一个在pyspark python中使用的UDF,它使用java对象进行内部计算.

如果它是一个简单的python,我会做类似的事情:

def f(x):
    return 7
fudf = pyspark.sql.functions.udf(f,pyspark.sql.types.IntegerType())
Run Code Online (Sandbox Code Playgroud)

并使用以下方式调用:

df = sqlContext.range(0,5)
df2 = df.withColumn("a",fudf(df.id)).show()
Run Code Online (Sandbox Code Playgroud)

但是,我需要的函数的实现是在java而不是在python中.我需要以某种方式包装它,所以我可以从python中以类似的方式调用它.

我的第一个尝试是实现java对象,然后将其包装在pyspark中的python中并将其转换为UDF.因序列化错误而失败.

Java代码:

package com.test1.test2;

public class TestClass1 {
    Integer internalVal;
    public TestClass1(Integer val1) {
        internalVal = val1;
    }
    public Integer do_something(Integer val) {
        return internalVal;
    }    
}
Run Code Online (Sandbox Code Playgroud)

pyspark代码:

from py4j.java_gateway import java_import
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
java_import(sc._gateway.jvm, "com.test1.test2.TestClass1")
a = sc._gateway.jvm.com.test1.test2.TestClass1(7)
audf = udf(a,IntegerType())
Run Code Online (Sandbox Code Playgroud)

错误:

---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
<ipython-input-2-9756772ab14f> in <module>()
      4 java_import(sc._gateway.jvm, "com.test1.test2.TestClass1") …
Run Code Online (Sandbox Code Playgroud)

python java py4j apache-spark pyspark

9
推荐指数
1
解决办法
3598
查看次数

如何为 python 2.7 安装 openssl 1.1.1?

我在 Windows 10 机器上安装了 python 2.7.17。然后我想通过在 python 中运行以下命令来测试它的 openssl 版本:

import ssl
print ssl.OPENSSL_VERSION_INFO
Run Code Online (Sandbox Code Playgroud)

(1, 0, 2, 20, 15) 想升级到 1.1.1 版。做点冻结我得到:

import ssl
print ssl.OPENSSL_VERSION_INFO
Run Code Online (Sandbox Code Playgroud)

这些似乎是 pyOpenSSL 和密码学的最新 pip。

我安装的 openssl(作为 git bash 的一部分)是 1.1.1,但是,这与 python 中使用的版本不同。

如何将 python 中包含的 openssl 版本升级到 1.1.1 或更高版本?

编辑: 为了回应评论,以下是 python -m OpenSSL.debug 的结果:

C:\Users\assaf>python -m OpenSSL.debug
pyOpenSSL: 19.1.0
cryptography: 2.8
cffi: 1.14.0
cryptography's compiled against OpenSSL: OpenSSL 1.1.1d  10 Sep 2019
cryptography's linked OpenSSL: OpenSSL 1.1.1d  10 Sep 2019
Pythons's OpenSSL: …
Run Code Online (Sandbox Code Playgroud)

python openssl pyopenssl python-2.7 windows-10

9
推荐指数
1
解决办法
6016
查看次数

在pyspark中包装java函数

我正在尝试创建一个用户定义的聚合函数,我可以从python中调用它.我试着按照这个问题的答案.我基本上实现了以下(取自这里):

package com.blu.bla;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.Row;

public class MySum extends UserDefinedAggregateFunction {
    private StructType _inputDataType;
    private StructType _bufferSchema;
    private DataType _returnDataType;

    public MySum() {
        List<StructField> inputFields = new ArrayList<StructField>();
        inputFields.add(DataTypes.createStructField("inputDouble", DataTypes.DoubleType, true));
        _inputDataType = DataTypes.createStructType(inputFields);

        List<StructField> bufferFields = new ArrayList<StructField>();
        bufferFields.add(DataTypes.createStructField("bufferDouble", DataTypes.DoubleType, true));
        _bufferSchema = DataTypes.createStructType(bufferFields);

        _returnDataType = DataTypes.DoubleType;
    }

    @Override public StructType inputSchema() {
        return _inputDataType;
    }

    @Override public StructType …
Run Code Online (Sandbox Code Playgroud)

python java apache-spark pyspark

6
推荐指数
1
解决办法
5794
查看次数