我有三个节点Cassandra Cluster,我创建了一个表,其中有超过2,000,000行.
当我select count(*) from userdetails在cqlsh中执行this()查询时,我收到此错误:
OperationTimedOut:errors = {},last_host = 192.168.1.2
当我为较少的行或限制50,000运行计数功能时,它工作正常.
我正在尝试使用eclipse(使用maven conf)和2个worker来执行下面的代码,每个都有2个核心或者也尝试过spark-submit.
public class StreamingWorkCount implements Serializable {
public static void main(String[] args) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
JavaStreamingContext jssc = new JavaStreamingContext(
"spark://192.168.1.19:7077", "JavaWordCount",
new Duration(1000));
JavaDStream<String> trainingData = jssc.textFileStream(
"/home/bdi-user/kaushal-drive/spark/data/training").cache();
trainingData.foreach(new Function<JavaRDD<String>, Void>() {
public Void call(JavaRDD<String> rdd) throws Exception {
List<String> output = rdd.collect();
System.out.println("Sentences Collected from files " + output);
return null;
}
});
trainingData.print();
jssc.start();
jssc.awaitTermination();
}
}
Run Code Online (Sandbox Code Playgroud)
并记录该代码
15/01/22 21:57:13 INFO FileInputDStream: New files at time 1421944033000 ms:
15/01/22 21:57:13 INFO JobScheduler: Added jobs for time 1421944033000 …Run Code Online (Sandbox Code Playgroud) 我正在创建一个Spark-Cassandra应用程序(Spark 1.6.0和spark-cassandra-connector 1.6.0-M1),其中我要求多个用户输入他们的Cassandra属性,如主机,用户名,密码,Keyspace,表等.
要动态更改上述属性并从Cassandra表创建数据框,我用Google搜索并找到了一些信息
http://www.russellspitzer.com/2016/02/16/Multiple-Clusters-SparkSql-Cassandra/
val csc = new CassandraSQLContext(SparkConnection._sc)
csc.setConf(s"${cluster}/spark.cassandra.connection.host", host)
csc.setConf(s"${cluster}/spark.cassandra.connection.port", port)
csc.setConf(s"${cluster}/spark.cassandra.auth.username", username)
csc.setConf(s"${cluster}/spark.cassandra.auth.password", password)
csc.read.format("org.apache.spark.sql.cassandra")
.options(Map("cluster" -> cluster, "keyspace" -> keySpace, "table" -> table))
.load()
Run Code Online (Sandbox Code Playgroud)
我尝试了提及属性,群集那些不需要身份验证连接成功,但当我尝试使用用户名和密码属性连接安全群集时,我收到一些错误.
Exception in thread "Thread-10" java.io.IOException: Failed to open native connection to Cassandra at {192.168.1.17}:9042
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:162)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$.getTokenFactory(CassandraRDDPartitioner.scala:184)
at org.apache.spark.sql.cassandra.CassandraSourceRelation$.apply(CassandraSourceRelation.scala:267)
at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:57)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
at com.bdbizviz.pa.spark.util.ServiceUtil$.readData(ServiceUtil.scala:97)
at com.bdbizviz.pa.spark.services.SparkServices$$anon$1.run(SparkServices.scala:114)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.AuthenticationException: Authentication error …Run Code Online (Sandbox Code Playgroud) cassandra apache-spark apache-spark-sql spark-cassandra-connector
我已经使用Spark(1.4.0)配置了Hive(1.13.1),我可以从配置单元访问所有数据库和表,我的仓库目录是 hdfs://192.168.1.17:8020/user/hive/warehouse
但是,当我试图通过Spark-Shell(使用master)将Dataframe保存到Hive使用df.saveAsTable("df")函数时,我收到了这个错误.
15/07/03 14:48:59 INFO audit: ugi=user ip=unknown-ip-addr cmd=get_database: default
15/07/03 14:48:59 INFO HiveMetaStore: 0: get_table : db=default tbl=df
15/07/03 14:48:59 INFO audit: ugi=user ip=unknown-ip-addr cmd=get_table : db=default tbl=df
java.net.ConnectException: Call From bdiuser-Vostro-3800/127.0.1.1 to 192.168.1.19:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:783)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:730)
at org.apache.hadoop.ipc.Client.call(Client.java:1414)
at org.apache.hadoop.ipc.Client.call(Client.java:1363)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy14.getFileInfo(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at …Run Code Online (Sandbox Code Playgroud) 我正在为我的应用程序使用 3 节点 Standalone Spark (1.6.0) 集群。
这是从外部 Hadoop 源获取数据。没有 hadoop 身份验证,应用程序工作正常。但是当我要在我的外部 hadoop 集群应用程序中启用 kerberos 安全性时,由于这个错误而失败。
org.apache.hadoop.security.AccessControlException: SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS]
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1764)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1124)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1120)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1120)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:73)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at
Run Code Online (Sandbox Code Playgroud)
我没有找到任何实现,如何在 spark 中配置 kerberos
编辑: 这是我在 spark-shell 中尝试过的代码片段,在这段代码中,我没有使用任何 spark api,因此它可以正常工作。
val config = new Configuration();
config.set("fs.default.name", "hdfs://192.168.1.1:8020");
config.set("fs.file.impl", …Run Code Online (Sandbox Code Playgroud) 管道外部的交叉验证。
val naivebayes
val indexer
val pipeLine = new Pipeline().setStages(Array(indexer, naiveBayes))
val paramGrid = new ParamGridBuilder()
.addGrid(naiveBayes.smoothing, Array(1.0, 0.1, 0.3, 0.5))
.build()
val crossValidator = new CrossValidator().setEstimator(pipeLine)
.setEvaluator(new MulticlassClassificationEvaluator)
.setNumFolds(2).setEstimatorParamMaps(paramGrid)
val crossValidatorModel = crossValidator.fit(trainData)
val predictions = crossValidatorModel.transform(testData)
Run Code Online (Sandbox Code Playgroud)
管道内的交叉验证
val naivebayes
val indexer
// param grid for multiple parameter
val paramGrid = new ParamGridBuilder()
.addGrid(naiveBayes.smoothing, Array(0.35, 0.1, 0.2, 0.3, 0.5))
.build()
// validator for naive bayes
val crossValidator = new CrossValidator().setEstimator(naiveBayes)
.setEvaluator(new MulticlassClassificationEvaluator)
.setNumFolds(2).setEstimatorParamMaps(paramGrid)
// pipeline to execute compound transformation …Run Code Online (Sandbox Code Playgroud) pipeline cross-validation apache-spark apache-spark-ml apache-spark-mllib
这是我正在运行的示例代码.
使用mod列作为分区创建测试镶木地板数据集.
scala> val test = spark.range(0 , 100000000).withColumn("mod", $"id".mod(40))
test: org.apache.spark.sql.DataFrame = [id: bigint, mod: bigint]
scala> test.write.partitionBy("mod").mode("overwrite").parquet("test_pushdown_filter")
Run Code Online (Sandbox Code Playgroud)
之后,我将这些数据作为数据框架读取并在分区列上应用过滤器即mod.
scala> val df = spark.read.parquet("test_pushdown_filter").filter("mod = 5")
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, mod: int]
scala> df.queryExecution.executedPlan
res1: org.apache.spark.sql.execution.SparkPlan =
*FileScan parquet [id#16L,mod#17] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/kprajapa/WorkSpace/places/test_pushdown_filter], PartitionCount: 1, PartitionFilters: [
isnotnull(mod#17), (mod#17 = 5)], PushedFilters: [], ReadSchema: struct<id:bigint>
Run Code Online (Sandbox Code Playgroud)
你可以在执行计划中看到它只读取1个分区.
但是,如果您将相同的过滤器应用于数据集.它读取所有分区,然后应用过滤器.
scala> case class Test(id: Long, mod: Long)
defined class Test
scala> val ds = spark.read.parquet("test_pushdown_filter").as[Test].filter(_.mod==5)
ds: …Run Code Online (Sandbox Code Playgroud) apache-spark apache-spark-sql apache-spark-dataset catalyst-optimizer
Column如果仅使用Literal(没有数据框列),有什么方法可以评估我的表达式。
例如,类似:
val result: Int = someFunction(lit(3) * lit(5))
//result: Int = 15
Run Code Online (Sandbox Code Playgroud)
要么
import org.apache.spark.sql.function.sha1
val result: String = someFunction(sha1(lit("5")))
//result: String = ac3478d69a3c81fa62e60f5c3696165a4e5e6ac4
Run Code Online (Sandbox Code Playgroud)
我可以使用数据框进行评估
val result = Seq(1).toDF.select(sha1(lit("5"))).as[String].first
//result: String = ac3478d69a3c81fa62e60f5c3696165a4e5e6ac4
Run Code Online (Sandbox Code Playgroud)
但是,有什么方法可以在不使用数据框的情况下获得相同的结果?
我已经配置了3个Node Spark Cluster.并使用start-thriftserver.sh带有一些自定义属性的脚本启动Spark Thrift Service .并且还在群集的spark.executor.extraJavaOptions -XX:MaxPermSize=1024m -XX:PermSize=256m每个spark-default.sh文件中添加了属性.
使用Hive JDBC驱动程序,我能够连接spark-sql,尝试了一些查询.
但是过了一段时间它就会抛出PermGen Space错误.并且在重启thrift服务这么多次之后它会抛出相同的错误.
我有一个Spark (version 1.3.1)申请.其中,我试图将一个转换Java bean RDD JavaRDD<Message>为Dataframe,它有许多具有不同数据类型的字段(整数,字符串,列表,映射,双精度).
但是,当我执行我的代码时.
messages.foreachRDD(new Function2<JavaRDD<Message>,Time,Void>(){
@Override
public Void call(JavaRDD<Message> arg0, Time arg1) throws Exception {
SQLContext sqlContext = SparkConnection.getSqlContext();
DataFrame df = sqlContext.createDataFrame(arg0, Message.class);
df.registerTempTable("messages");
Run Code Online (Sandbox Code Playgroud)
我收到了这个错误
/06/12 17:27:40 INFO JobScheduler: Starting job streaming job 1434110260000 ms.0 from job set of time 1434110260000 ms
15/06/12 17:27:40 ERROR JobScheduler: Error running job streaming job 1434110260000 ms.1
scala.MatchError: interface java.util.List (of class java.lang.Class)
at org.apache.spark.sql.SQLContext$$anonfun$getSchema$1.apply(SQLContext.scala:1193)
at org.apache.spark.sql.SQLContext$$anonfun$getSchema$1.apply(SQLContext.scala:1192)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) …Run Code Online (Sandbox Code Playgroud)