小编Kau*_*hal的帖子

cassandra的cqlsh控制台中的操作超时错误

我有三个节点Cassandra Cluster,我创建了一个表,其中有超过2,000,000行.

当我select count(*) from userdetails在cqlsh中执行this()查询时,我收到此错误:

OperationTimedOut:errors = {},last_host = 192.168.1.2

当我为较少的行或限制50,000运行计数功能时,它工作正常.

java bigdata cassandra cqlsh datastax

14
推荐指数
2
解决办法
2万
查看次数

spark ssc.textFileStream不从目录中流式传输任何文件

我正在尝试使用eclipse(使用maven conf)和2个worker来执行下面的代码,每个都有2个核心或者也尝试过spark-submit.

public class StreamingWorkCount implements Serializable {

    public static void main(String[] args) {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
        JavaStreamingContext jssc = new JavaStreamingContext(
                "spark://192.168.1.19:7077", "JavaWordCount",
                new Duration(1000));
        JavaDStream<String> trainingData = jssc.textFileStream(
                "/home/bdi-user/kaushal-drive/spark/data/training").cache();
        trainingData.foreach(new Function<JavaRDD<String>, Void>() {

            public Void call(JavaRDD<String> rdd) throws Exception {
                List<String> output = rdd.collect();
                System.out.println("Sentences Collected from files " + output);
                return null;
            }
        });

        trainingData.print();
        jssc.start();
        jssc.awaitTermination();
    }
}
Run Code Online (Sandbox Code Playgroud)

并记录该代码

15/01/22 21:57:13 INFO FileInputDStream: New files at time 1421944033000 ms:

15/01/22 21:57:13 INFO JobScheduler: Added jobs for time 1421944033000 …
Run Code Online (Sandbox Code Playgroud)

filesystems data-stream apache-spark spark-streaming

13
推荐指数
2
解决办法
1万
查看次数

无法更改spark-cassandra-connector中的身份验证

我正在创建一个Spark-Cassandra应用程序(Spark 1.6.0和spark-cassandra-connector 1.6.0-M1),其中我要求多个用户输入他们的Cassandra属性,如主机,用户名,密码,Keyspace,表等.

要动态更改上述属性并从Cassandra表创建数据框,我用Google搜索并找到了一些信息

http://www.russellspitzer.com/2016/02/16/Multiple-Clusters-SparkSql-Cassandra/

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#setting-cluster-and-keyspace-level-options

val csc = new CassandraSQLContext(SparkConnection._sc)

csc.setConf(s"${cluster}/spark.cassandra.connection.host", host)
csc.setConf(s"${cluster}/spark.cassandra.connection.port", port)
csc.setConf(s"${cluster}/spark.cassandra.auth.username", username)
csc.setConf(s"${cluster}/spark.cassandra.auth.password", password)

csc.read.format("org.apache.spark.sql.cassandra")
                  .options(Map("cluster" -> cluster, "keyspace" -> keySpace, "table" -> table))
                  .load()
Run Code Online (Sandbox Code Playgroud)

我尝试了提及属性,群集那些不需要身份验证连接成功,但当我尝试使用用户名和密码属性连接安全群集时,我收到一些错误.

Exception in thread "Thread-10" java.io.IOException: Failed to open native connection to Cassandra at {192.168.1.17}:9042
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:162)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148)
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
    at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$.getTokenFactory(CassandraRDDPartitioner.scala:184)
    at org.apache.spark.sql.cassandra.CassandraSourceRelation$.apply(CassandraSourceRelation.scala:267)
    at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:57)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
    at com.bdbizviz.pa.spark.util.ServiceUtil$.readData(ServiceUtil.scala:97)
    at com.bdbizviz.pa.spark.services.SparkServices$$anon$1.run(SparkServices.scala:114)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.AuthenticationException: Authentication error …
Run Code Online (Sandbox Code Playgroud)

cassandra apache-spark apache-spark-sql spark-cassandra-connector

7
推荐指数
1
解决办法
1220
查看次数

为什么DataFrame.saveAsTable("df")将表保存到不同的HDFS主机?

我已经使用Spark(1.4.0)配置了Hive(1.13.1),我可以从配置单元访问所有数据库和表,我的仓库目录是 hdfs://192.168.1.17:8020/user/hive/warehouse

但是,当我试图通过Spark-Shell(使用master)将Dataframe保存到Hive使用df.saveAsTable("df")函数时,我收到了这个错误.

15/07/03 14:48:59 INFO audit: ugi=user  ip=unknown-ip-addr  cmd=get_database: default   
15/07/03 14:48:59 INFO HiveMetaStore: 0: get_table : db=default tbl=df
15/07/03 14:48:59 INFO audit: ugi=user  ip=unknown-ip-addr  cmd=get_table : db=default tbl=df   
java.net.ConnectException: Call From bdiuser-Vostro-3800/127.0.1.1 to 192.168.1.19:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:783)
    at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:730)
    at org.apache.hadoop.ipc.Client.call(Client.java:1414)
    at org.apache.hadoop.ipc.Client.call(Client.java:1363)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
    at com.sun.proxy.$Proxy14.getFileInfo(Unknown Source)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at …
Run Code Online (Sandbox Code Playgroud)

hadoop hdfs apache-spark apache-spark-sql

6
推荐指数
3
解决办法
4万
查看次数

如何将外部 hdfs 连接到独立的 Spark

我正在为我的应用程序使用 3 节点 Standalone Spark (1.6.0) 集群。

这是从外部 Hadoop 源获取数据。没有 hadoop 身份验证,应用程序工作正常。但是当我要在我的外部 hadoop 集群应用程序中启用 kerberos 安全性时,由于这个错误而失败。

org.apache.hadoop.security.AccessControlException: SIMPLE authentication is not enabled.  Available:[TOKEN, KERBEROS]
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
    at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
    at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1764)
    at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1124)
    at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1120)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1120)
    at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1398)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:73)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at 
Run Code Online (Sandbox Code Playgroud)

我没有找到任何实现,如何在 spark 中配置 kerberos

编辑: 这是我在 spark-shell 中尝试过的代码片段,在这段代码中,我没有使用任何 spark api,因此它可以正常工作。

val config = new Configuration();
config.set("fs.default.name", "hdfs://192.168.1.1:8020");
config.set("fs.file.impl", …
Run Code Online (Sandbox Code Playgroud)

security hadoop kerberos hdfs apache-spark

5
推荐指数
0
解决办法
3042
查看次数

与 Spark 中的管道进行交叉验证

管道外部的交叉验证。

val naivebayes
val indexer
val pipeLine = new Pipeline().setStages(Array(indexer, naiveBayes))

val paramGrid = new ParamGridBuilder()
   .addGrid(naiveBayes.smoothing, Array(1.0, 0.1, 0.3, 0.5))
   .build()
val crossValidator = new CrossValidator().setEstimator(pipeLine)
   .setEvaluator(new MulticlassClassificationEvaluator)
   .setNumFolds(2).setEstimatorParamMaps(paramGrid)

val crossValidatorModel = crossValidator.fit(trainData)

val predictions = crossValidatorModel.transform(testData)
Run Code Online (Sandbox Code Playgroud)

管道内的交叉验证

val naivebayes
val indexer

// param grid for multiple parameter
val paramGrid = new ParamGridBuilder()
   .addGrid(naiveBayes.smoothing, Array(0.35, 0.1, 0.2, 0.3, 0.5))
   .build()

// validator for naive bayes
val crossValidator = new CrossValidator().setEstimator(naiveBayes)
   .setEvaluator(new MulticlassClassificationEvaluator)
   .setNumFolds(2).setEstimatorParamMaps(paramGrid)

// pipeline to execute compound transformation …
Run Code Online (Sandbox Code Playgroud)

pipeline cross-validation apache-spark apache-spark-ml apache-spark-mllib

5
推荐指数
1
解决办法
714
查看次数

Parquet过滤器下推功能不适用于Spark Dataset API

这是我正在运行的示例代码.

使用mod列作为分区创建测试镶木地板数据集.

scala> val test = spark.range(0 , 100000000).withColumn("mod", $"id".mod(40))
test: org.apache.spark.sql.DataFrame = [id: bigint, mod: bigint]

scala> test.write.partitionBy("mod").mode("overwrite").parquet("test_pushdown_filter")
Run Code Online (Sandbox Code Playgroud)

之后,我将这些数据作为数据框架读取并在分区列上应用过滤器即mod.

scala> val df = spark.read.parquet("test_pushdown_filter").filter("mod = 5")
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, mod: int]

scala> df.queryExecution.executedPlan
res1: org.apache.spark.sql.execution.SparkPlan =
*FileScan parquet [id#16L,mod#17] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/kprajapa/WorkSpace/places/test_pushdown_filter], PartitionCount: 1, PartitionFilters: [
isnotnull(mod#17), (mod#17 = 5)], PushedFilters: [], ReadSchema: struct<id:bigint>
Run Code Online (Sandbox Code Playgroud)

你可以在执行计划中看到它只读取1个分区.

但是,如果您将相同的过滤器应用于数据集.它读取所有分区,然后应用过滤器.

scala> case class Test(id: Long, mod: Long)
defined class Test

scala> val ds = spark.read.parquet("test_pushdown_filter").as[Test].filter(_.mod==5)
ds: …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql apache-spark-dataset catalyst-optimizer

5
推荐指数
0
解决办法
2342
查看次数

如何在没有数据框的情况下在Spark中执行列表达式

Column如果仅使用Literal(没有数据框列),有什么方法可以评估我的表达式。

例如,类似:

val result: Int = someFunction(lit(3) * lit(5))
//result: Int = 15
Run Code Online (Sandbox Code Playgroud)

要么

import org.apache.spark.sql.function.sha1
val result: String = someFunction(sha1(lit("5")))
//result: String = ac3478d69a3c81fa62e60f5c3696165a4e5e6ac4
Run Code Online (Sandbox Code Playgroud)

我可以使用数据框进行评估

val result = Seq(1).toDF.select(sha1(lit("5"))).as[String].first
//result: String = ac3478d69a3c81fa62e60f5c3696165a4e5e6ac4
Run Code Online (Sandbox Code Playgroud)

但是,有什么方法可以在不使用数据框的情况下获得相同的结果?

apache-spark apache-spark-sql

5
推荐指数
1
解决办法
2104
查看次数

Spark Sql正在抛出PermGen Space Error

我已经配置了3个Node Spark Cluster.并使用start-thriftserver.sh带有一些自定义属性的脚本启动Spark Thrift Service .并且还在群集的spark.executor.extraJavaOptions -XX:MaxPermSize=1024m -XX:PermSize=256m每个spark-default.sh文件中添加了属性.

使用Hive JDBC驱动程序,我能够连接spark-sql,尝试了一些查询.

但是过了一段时间它就会抛出PermGen Space错误.并且在重启thrift服务这么多次之后它会抛出相同的错误.

在此输入图像描述

jvm permgen apache-spark apache-spark-sql

3
推荐指数
1
解决办法
3390
查看次数

scala.MatchError:在Dataframes中

我有一个Spark (version 1.3.1)申请.其中,我试图将一个转换Java bean RDD JavaRDD<Message>为Dataframe,它有许多具有不同数据类型的字段(整数,字符串,列表,映射,双精度).

但是,当我执行我的代码时.

messages.foreachRDD(new Function2<JavaRDD<Message>,Time,Void>(){
            @Override
            public Void call(JavaRDD<Message> arg0, Time arg1) throws Exception {
                SQLContext sqlContext = SparkConnection.getSqlContext();
                DataFrame df = sqlContext.createDataFrame(arg0, Message.class);
                df.registerTempTable("messages");
Run Code Online (Sandbox Code Playgroud)

我收到了这个错误

/06/12 17:27:40 INFO JobScheduler: Starting job streaming job 1434110260000 ms.0 from job set of time 1434110260000 ms
15/06/12 17:27:40 ERROR JobScheduler: Error running job streaming job 1434110260000 ms.1
scala.MatchError: interface java.util.List (of class java.lang.Class)
    at org.apache.spark.sql.SQLContext$$anonfun$getSchema$1.apply(SQLContext.scala:1193)
    at org.apache.spark.sql.SQLContext$$anonfun$getSchema$1.apply(SQLContext.scala:1192)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) …
Run Code Online (Sandbox Code Playgroud)

java scala apache-spark spark-streaming apache-spark-sql

0
推荐指数
1
解决办法
3673
查看次数