小编Sat*_*uri的帖子

在Spark java中将JavaRDD转换为DataFrame

我正在尝试处理LogFile.首先,我按照我的要求读取日志文件并拆分这些文件,并将每个列保存到单独的JavaRDD中.现在我需要将这些JavaRDD转换为DataFrames以供将来操作.这是我到目前为止所尝试的代码:

         SparkConf conf = new SparkConf().setAppName("AuctionBid").setMaster("local");
         JavaSparkContext sc = new JavaSparkContext(conf);
         JavaRDD<String> diskfile = sc.textFile("/Users/karuturi/Downloads/log.txt");
         JavaRDD<String> urlrdd=diskfile.flatMap(line -> Arrays.asList(line.split("\t")[0]));
         System.out.println(urlrdd.take(1));
         SQLContext sql = new SQLContext(sc);
Run Code Online (Sandbox Code Playgroud)

这就是我试图将JavaRDD转换为DataFrame的方式:

DataFrame fileDF = sqlContext.createDataFrame(urlRDD, Model.class);
Run Code Online (Sandbox Code Playgroud)

但上面的行不起作用.我对Model.class感到困惑.

任何人都可以建议我.

谢谢.

java hadoop apache-spark apache-spark-sql

9
推荐指数
2
解决办法
2万
查看次数

将数据帧写入Phoenix

我正在尝试将Dataframe写入Phoenix表,但我得到了例外.

这是我的代码:

df.write.format("org.apache.phoenix.spark").mode(SaveMode.Overwrite).options(collection.immutable.Map(
                "zkUrl" -> "localhost:2181/hbase-unsecure",
                "table" -> "TEST")).save();
Run Code Online (Sandbox Code Playgroud)

例外是:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: 
Lost task 0.3 in stage 3.0 (TID 411, ip-xxxxx-xx-xxx.ap-southeast-1.compute.internal):
java.lang.RuntimeException: java.sql.SQLException: No suitable driver found for jdbc:phoenix:localhost:2181:/hbase-unsecure;
            at org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:58)
            at org.apache.spark.rdd.PairRDDFunctions$anonfun$saveAsNewAPIHadoopDataset$1$anonfun$12.apply(PairRDDFunctions.scala:1030)
            at org.apache.spark.rdd.PairRDDFunctions$anonfun$saveAsNewAPIHadoopDataset$1$anonfun$12.apply(PairRDDFunctions.scala:1014)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
            at org.apache.spark.scheduler.Task.run(Task.scala:88)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
Run Code Online (Sandbox Code Playgroud)

我已经加入phoenix-sparkphoenix-core罐子到我的pom.xml

hadoop hbase phoenix apache-spark

9
推荐指数
1
解决办法
683
查看次数

如何通过Spark提交传递外部参数

在我的应用程序中,我需要连接到数据库,所以我需要在提交应用程序时传递IP地址和数据库名称.

我按如下方式提交申请:

./spark-submit --class class name --master spark://localhost:7077 \
--deploy-mode client /home/hadoop/myjar.jar
Run Code Online (Sandbox Code Playgroud)

java apache-spark spark-submit

8
推荐指数
1
解决办法
1万
查看次数

auxService:在hive 1.2.0上不存在mapreduce_shuffle

我使用的是hive 1.2.0和hadoop 2.6.0.每当我在我的机器上运行配置单元...选择查询工作正常,但如果count(*)它显示以下错误:

此任务的诊断消息:container_1434646588807_0001_01_000005的容器启动失败: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException:sun.reflect.NativeConstructorAccessorImpl的sun.reflect.NativeConstructorAccessorImpl.newInstance0 (本地方法)中不存在auxService:mapreduce_shuffle. newInstance(NativeConstructorAccessorImpl.java:62)位于org.apache.hadoop.yarn.api的java.lang.reflect.Constructor.newInstance(Constructor.java:422)的sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)位于org.apache.hadoop的org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)的.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168) .mapreduce.v2.app.launcher.ContainerLauncherImpl $ Container.launch(ContainerLauncherImpl.java:155)at org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl $ EventProce ssor.run(ContainerLauncherImpl.java:369)java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)java.lang上的java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) .Thread.run(Thread.java:745)

FAILED:执行错误,从org.apache.hadoop.hive.ql.exec.mr.MapRedTask返回代码2启动MapReduce作业:Stage-Stage-1:Map:1 Reduce:1 HDFS读取:0 HDFS写入:0 FAIL Total MapReduce CPU耗时:0毫秒

hadoop hive hdfs hadoop-yarn

5
推荐指数
1
解决办法
8968
查看次数

Spark SQL解析Json数据时因数据类型不匹配导致参数2需要整数类型错误

我正在尝试解析 JSON 数据,为此我编写了自定义架构。通过添加架构或不添加架构来解析数据时,我收到以下错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`queryResults`.`searchResponse`.`response`.`docs`.`transactions`['code']' due to data type mismatch: argument 2 requires integral type, however, ''code'' is of string type.;;
Run Code Online (Sandbox Code Playgroud)

这是我的示例数据:

{
    "queryResults": {
        "searchResponse": {
            "response": {
                "docs": [{
                    "transactions": [{
                        "recordDate": "2010-02-02 00:00:00",
                        "code": "PGM/",
                        "description": "Recordation of Patent Grant Mailed"
                    }, {
                        "recordDate": "2010-01-13 00:00:00",
                        "code": "WPIR",
                        "description": "Issue Notification Mailed"
                    }, {
                        "recordDate": "2009-12-17 00:00:00",
                        "code": "R1021",
                        "description": "Receipt into Pubs"
                    }]
                }]
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

这是我的架构:

val …
Run Code Online (Sandbox Code Playgroud)

json apache-spark apache-spark-sql databricks

5
推荐指数
1
解决办法
2万
查看次数