小编man*_*ato的帖子

Spark 1.6-无法在hadoop二进制路径中找到winutils二进制文件

我知道有一个非常相似的帖子(无法在hadoop二进制路径中找到winutils二进制文件),但是,我已经尝试了建议的每一步,但仍然出现相同的错误.

我正在尝试在Windows 7上使用Apache Spark版本1.6.0来执行此页面上的教程http://spark.apache.org/docs/latest/streaming-programming-guide.html,特别是使用此代码:

./bin/run-example streaming.JavaNetworkWordCount localhost 9999
Run Code Online (Sandbox Code Playgroud)

但是,此错误一直出现: 在此输入图像描述

阅读本文后, 无法在hadoop二进制路径中找到winutils二进制文件

我意识到我需要winutils.exe文件,所以我用它下载了一个hadoop二进制2.6.0,定义了一个名为HADOOP_HOME的环境变量:

 with value C:\Users\GERAL\Desktop\hadoop-2.6.0\bin  
Run Code Online (Sandbox Code Playgroud)

并将其放在路径上,如下所示:%HADOOP_HOME%

但是当我尝试代码时仍会出现相同的错误.有谁知道如何解决这个问题?

java hadoop apache-spark

12
推荐指数
1
解决办法
3万
查看次数

如何在Spark中按分区对键/值进行分组?

我有一个Spark Streaming应用程序,它每秒接收几条JSON消息,每个消息都有一个标识其源的ID.

使用此ID作为键,我能够执行a MapPartitionsToPair,从而创建一个JavaPairDStream,其中包含键/值对的RDD,每个分区一个键值对(因此,如果我收到5个JSON消息,例如,我得到一个RDD与5个分区,每个分区都将消息的ID作为密钥,并将JSON消息本身作为值).

我现在想做的是,我想将具有相同键的所有值分组到同一个分区中.因此,例如,如果我有3个带有键'a'的分区和2个带有键'b'的分区,我想创建一个带有2个分区而不是5个分区的新RDD,每个分区包含一个键所具有的所有值,一个用于'a'和一个'b'.

我怎么能做到这一点?到目前为止这是我的代码:

JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
            StorageLevels.MEMORY_AND_DISK_SER);

JavaPairDStream<String,String> streamGiveKey= streamData2.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
        @Override
        public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception {

            ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>();

            while (stringIterator.hasNext()){
                String c=stringIterator.next();
                if(c==null){
                    return null;

                }

                JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class);
                String key= retMap.getSid();
                Tuple2<String,String> b= new Tuple2<String,String>(key,c);
                a.add(b);

                System.out.print(b._1+"_"+b._2);
                // }
                //break;
            }


            return a;
        }
    });
Run Code Online (Sandbox Code Playgroud)

//我创建了一个JavaPairDStream,其中每个分区包含一个键/值对.

我尝试使用grouByKey(),但无论消息的数量是多少,我的分区号都是2.

我该怎么做?非常感谢.

parallel-processing apache-spark spark-streaming

4
推荐指数
1
解决办法
3714
查看次数

SQL查询返回Apache Ignite缓存的空结果

我正在尝试从Spark RDD执行插入到Ignite缓存中.我使用的是Ignite的2.2版和Spark的2.1版.

我采取的第一步是在单独的scala脚本中创建缓存,如下所示:

object Create_Ignite_Cache {

case class Custom_Class(
                   @(QuerySqlField @field)(index = true)   a: String,
                   @(QuerySqlField @field)(index = true)  b: String,
                   @(QuerySqlField @field)(index = true)  c: String,
                   @(QuerySqlField @field)(index = true)  d: String,
                   @(QuerySqlField @field)(index = true)  e: String,
                   @(QuerySqlField @field)(index = true)  f: String,
                   @(QuerySqlField @field)(index = true)  g: String,
                   @(QuerySqlField @field)(index = true)  h: String

                 )
def main(args: Array[String]): Unit = {
 val spi = new TcpDiscoverySpi
 val ipFinder = new TcpDiscoveryMulticastIpFinder
 val adresses = new util.ArrayList[String]
 adresses.add("127.0.0.1:48500..48520") …
Run Code Online (Sandbox Code Playgroud)

sql caching scala apache-spark ignite

2
推荐指数
1
解决办法
393
查看次数

无法使用 SQL 工具查询现有的 Ignite 缓存

我正在尝试查询我通过 Java 脚本创建的 Apache Ignite 缓存(版本 2.2):

    TcpDiscoverySpi spi = new TcpDiscoverySpi();
    TcpDiscoveryVmIpFinder ipFinder=new TcpDiscoveryMulticastIpFinder();
    List<String> adresses=new ArrayList<String>();
    adresses.add("127.0.0.1:48500..48520");
    ipFinder.setAddresses(adresses);
    spi.setIpFinder(ipFinder);

    IgniteConfiguration cfg=new IgniteConfiguration().setDiscoverySpi(spi).setClientMode(true);

    CacheConfiguration cache_conf=new CacheConfiguration<String,Custom_Class>().setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(1).
            setIndexedTypes(String.class,Custom_Class.class).setName("Spark_Ignite");

    Ignite ignite=Ignition.getOrStart(cfg);

    ignite.getOrCreateCache(cache_conf);

    System.out.println("[INFO] CACHE CREATED");
    ignite.close();
Run Code Online (Sandbox Code Playgroud)

我正在使用 DBeaver 对该缓存执行简单的 SQL 查询。

问题是,当我尝试进行查询时,出现此错误:

SELECT * FROM Custom_Class;

 Table "Custom_Class" not found; SQL statement:SELECT * FROM Custom_Class
Run Code Online (Sandbox Code Playgroud)

如果我运行此查询,则相同:

SELECT * FROM Spark_Ignite;

 Table "Spark_Ignite" not found; SQL statement:SELECT * FROM Spark_Ignite
Run Code Online (Sandbox Code Playgroud)

但是,如果我按照此处提到的说明进行操作:https://apacheignite-sql.readme.io/docs/sql-tooling,我可以毫无问题地获得查询结果。

我运行了 ignitevisor.sh,确实所有的缓存都在那里,而且都有记录:

在此处输入图片说明

这里可能有什么问题?

谢谢你。

更新

使用答案中提到的引号,我能够查询表,但它不显示任何记录,而 ignitevisor …

java sql caching dbeaver ignite

0
推荐指数
1
解决办法
1521
查看次数