我正在尝试处理LogFile.首先,我按照我的要求读取日志文件并拆分这些文件,并将每个列保存到单独的JavaRDD中.现在我需要将这些JavaRDD转换为DataFrames以供将来操作.这是我到目前为止所尝试的代码:
SparkConf conf = new SparkConf().setAppName("AuctionBid").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> diskfile = sc.textFile("/Users/karuturi/Downloads/log.txt");
JavaRDD<String> urlrdd=diskfile.flatMap(line -> Arrays.asList(line.split("\t")[0]));
System.out.println(urlrdd.take(1));
SQLContext sql = new SQLContext(sc);
Run Code Online (Sandbox Code Playgroud)
这就是我试图将JavaRDD转换为DataFrame的方式:
DataFrame fileDF = sqlContext.createDataFrame(urlRDD, Model.class);
Run Code Online (Sandbox Code Playgroud)
但上面的行不起作用.我对Model.class感到困惑.
任何人都可以建议我.
谢谢.
我正在尝试将Dataframe写入Phoenix表,但我得到了例外.
这是我的代码:
df.write.format("org.apache.phoenix.spark").mode(SaveMode.Overwrite).options(collection.immutable.Map(
"zkUrl" -> "localhost:2181/hbase-unsecure",
"table" -> "TEST")).save();
Run Code Online (Sandbox Code Playgroud)
例外是:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 3.0 (TID 411, ip-xxxxx-xx-xxx.ap-southeast-1.compute.internal):
java.lang.RuntimeException: java.sql.SQLException: No suitable driver found for jdbc:phoenix:localhost:2181:/hbase-unsecure;
at org.apache.phoenix.mapreduce.PhoenixOutputFormat.getRecordWriter(PhoenixOutputFormat.java:58)
at org.apache.spark.rdd.PairRDDFunctions$anonfun$saveAsNewAPIHadoopDataset$1$anonfun$12.apply(PairRDDFunctions.scala:1030)
at org.apache.spark.rdd.PairRDDFunctions$anonfun$saveAsNewAPIHadoopDataset$1$anonfun$12.apply(PairRDDFunctions.scala:1014)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
Run Code Online (Sandbox Code Playgroud)
我已经加入phoenix-spark和phoenix-core罐子到我的pom.xml
在我的应用程序中,我需要连接到数据库,所以我需要在提交应用程序时传递IP地址和数据库名称.
我按如下方式提交申请:
./spark-submit --class class name --master spark://localhost:7077 \
--deploy-mode client /home/hadoop/myjar.jar
Run Code Online (Sandbox Code Playgroud) 我使用的是hive 1.2.0和hadoop 2.6.0.每当我在我的机器上运行配置单元...选择查询工作正常,但如果count(*)它显示以下错误:
此任务的诊断消息:container_1434646588807_0001_01_000005的容器启动失败: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException:sun.reflect.NativeConstructorAccessorImpl的sun.reflect.NativeConstructorAccessorImpl.newInstance0 (本地方法)中不存在auxService:mapreduce_shuffle. newInstance(NativeConstructorAccessorImpl.java:62)位于org.apache.hadoop.yarn.api的java.lang.reflect.Constructor.newInstance(Constructor.java:422)的sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)位于org.apache.hadoop的org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106)的.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168) .mapreduce.v2.app.launcher.ContainerLauncherImpl $ Container.launch(ContainerLauncherImpl.java:155)at org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl $ EventProce ssor.run(ContainerLauncherImpl.java:369)java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)java.lang上的java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) .Thread.run(Thread.java:745)
FAILED:执行错误,从org.apache.hadoop.hive.ql.exec.mr.MapRedTask返回代码2启动MapReduce作业:Stage-Stage-1:Map:1 Reduce:1 HDFS读取:0 HDFS写入:0 FAIL Total MapReduce CPU耗时:0毫秒
我正在尝试解析 JSON 数据,为此我编写了自定义架构。通过添加架构或不添加架构来解析数据时,我收到以下错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`queryResults`.`searchResponse`.`response`.`docs`.`transactions`['code']' due to data type mismatch: argument 2 requires integral type, however, ''code'' is of string type.;;
Run Code Online (Sandbox Code Playgroud)
这是我的示例数据:
{
"queryResults": {
"searchResponse": {
"response": {
"docs": [{
"transactions": [{
"recordDate": "2010-02-02 00:00:00",
"code": "PGM/",
"description": "Recordation of Patent Grant Mailed"
}, {
"recordDate": "2010-01-13 00:00:00",
"code": "WPIR",
"description": "Issue Notification Mailed"
}, {
"recordDate": "2009-12-17 00:00:00",
"code": "R1021",
"description": "Receipt into Pubs"
}]
}]
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
这是我的架构:
val …Run Code Online (Sandbox Code Playgroud) apache-spark ×4
hadoop ×3
java ×2
databricks ×1
hadoop-yarn ×1
hbase ×1
hdfs ×1
hive ×1
json ×1
phoenix ×1
spark-submit ×1