Spark SQL UDF任务不可序列化

ger*_*dev 1 scala datastax apache-spark apache-spark-sql

Cassandra和DataStax社区,我有一个问题,我希望有人可以帮助我.

我们正在将我们的分析代码从Hadoop迁移到运行在Cassandra之上的Spark(通过DataStax Enterprise).DSE 4.7正在生产中,但4.8正在开发中.

Java 7正在开发中,正在开发Java 7/8.

我们需要一些DataFrame转换,我们认为通过Spark SQLContext对内存DataFrame编写的UDF可以完成这项工作.主要是:

  1. 我们数据的每个文本值都带有前缀,后缀为".即"一些数据"这是非常烦人的,所以我们想清理这些中的每一个.
  2. 我们想添加一个包含由多个其他列组成的散列键的列.

我们的代码如下.这运行良好,没有在sqlContext中包含UDF调用,但是一旦添加它们我们得到"任务不可序列化"错误

线程"main"中的异常org.apache.spark.SparkException:任务不可序列化

我已经尝试将"implements Serializable"作为此类(以及许多其他类)的基类,它将错误类更改为链中的下一个类,但是这会导致异常类上的失败不可序列化...可能意味着我正走向错误的方向.

我也尝试将UDF实现为lambda,并且还会导致相同的错误.

如果有人能指出我做错了什么,将不胜感激!

public class entities implements Serializable{
    private spark_context m_spx = null;
    private DataFrame m_entities = null;
    private String m_timekey = null;

    public entities(spark_context _spx, String _timekey){
        m_spx = _spx;
        m_timekey = _timekey;
    }


    public DataFrame get_dimension(){
        if(m_entities == null) {

            DataFrame df = m_spx.get_flat_data(m_timekey).select("event", "url");

            //UDF to generate hashed ids
            UDF2 get_hashed_id = new UDF2<String, String, String>() {
                public String call(String o, String o2) throws Exception {
                    return o.concat(o2);
                }
            };


            //UDF to clean the " from strings
            UDF1 clean_string = new UDF1<String, String>() {
                public String call(String o) throws Exception {
                    return o.replace("\"","");
                }
            };


            //Get the Spark SQL Context from SC.
            SQLContext sqlContext = new SQLContext(m_spx.sc());


            //Register the UDFs
            sqlContext.udf().register("getid", get_hashed_id, DataTypes.StringType);
            sqlContext.udf().register("clean_string", clean_string, DataTypes.StringType);


            //Register the DF as a table.
            sqlContext.registerDataFrameAsTable(df, "entities");
            m_entities = sqlContext.sql("SELECT getid(event, url) as event_key, clean_string(event) as event_cleaned, clean_string(url) as url_cleaned FROM entities");
        }

        return m_entities;
    }
}
Run Code Online (Sandbox Code Playgroud)

Tza*_*har 7

您的entities类包含一个SparkContext成员 - 因此它不能序列化(SparkContexts在国际上不可序列化,您不应该序列化它们).

因为entities它不是可序列化的,所以它的任何非静态方法/成员/匿名内部类都不可序列化(因为它们会尝试序列化entities保存它们的实例).

在这种情况下,最好的解决方法是将匿名UDF提取到类的静态成员中:

private final static UDF2 get_hashed_id = new UDF2<String, String, String>() {
   public String call(String o, String o2) throws Exception {
       return o.concat(o2);
   }
};

private final static UDF1 clean_string = new UDF1<String, String>() {
   public String call(String o) throws Exception {
       return o.replace("\"","");
   }
};
Run Code Online (Sandbox Code Playgroud)

然后你就可以使用它们了get_dimension.