小编Spa*_*oat的帖子

什么是引起随机播放的Spark转换?

我很难在Spark文档操作中找到导致shuffle和操作的操作.在这个列表中,哪些确实会导致混乱而哪些不会导致混乱?

地图和过滤器没有.但是,我不确定其他人.

map(func)
filter(func)
flatMap(func)
mapPartitions(func)
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed)
union(otherDataset)
intersection(otherDataset)
distinct([numTasks]))
groupByKey([numTasks])
reduceByKey(func, [numTasks])
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks])
join(otherDataset, [numTasks])
cogroup(otherDataset, [numTasks])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)
Run Code Online (Sandbox Code Playgroud)

python java scala apache-spark

35
推荐指数
3
解决办法
1万
查看次数

如何使用spark Java API从HDFS读取二进制文件流?

我正在编写一个需要在特定HDFS路径中获取新二进制文件的组件,以便我可以根据这些数据进行一些在线学习.所以,我想在流中读取Flume从HDFS创建的二进制文件.我找到了spark API提供的几个函数,比如

public JavaDStream<byte[]> binaryRecordsStream(String directory,int recordLength)
Run Code Online (Sandbox Code Playgroud)

public <K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>> 
     JavaPairInputDStream<K,V> fileStream(String directory, Class<K> kClass, Class<V> vClass, Class<F> fClass)
Run Code Online (Sandbox Code Playgroud)

但是,我真的不知道如何使用这些功能.我试过binaryRecordStream,但它定义了文件的具体长度,所以它并不好.

对于fileStream功能,我用过:

SparkConf sparkConf = new   SparkConf().setAppName("SparkFileStreamTest").setMaster("local[2]");

// Create the context with the specified batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(durationInMillis));

JavaPairInputDStream<LongWritable, BytesWritable> inputDStream = jssc.fileStream(hdfsPath, LongWritable.class, BytesWritable.class, CustomInputFormat.class);

//**********************************************************************
JavaPairInputDStream<LongWritable, BytesWritable> inputDStream = jssc.fileStream(
            hdfsPath, LongWritable.class, BytesWritable.class, CustomInputFormat.class);

JavaDStream<byte[]> content = inputDStream.map(new Function<Tuple2<LongWritable, BytesWritable>, byte[]>() {
    @Override
    public byte[] call(Tuple2<LongWritable, BytesWritable> tuple2) …
Run Code Online (Sandbox Code Playgroud)

java streaming hadoop apache-spark

13
推荐指数
1
解决办法
2264
查看次数

汇合平台:架构注册表主题

使用Apache Kafka创建者提供的Confluent Platform平台,我有一个问题:

Schema Registry API Reference的文档中,他们提到了"Subject"的抽象.您在"subject"下注册了一个模式,其主题为topicName-keytopicName-value,但是没有解释为什么需要(因为它暗示)一个单独的模式来表示消息的键和值.给定主题.也没有任何直接的声明,即注册"主题"必然将模式与该主题相关联,而不是助记.

进一步令人困惑的是,该页面上的后续示例("获取主题的模式版本"和"在主题下注册新模式")使用该主题名称的格式,而是仅使用主题名称作为"主题"值.如果有人对a)为什么每个主题有这两个"主题",以及b)正确用法是什么有任何见解,那将非常感激.

avro apache-kafka

9
推荐指数
1
解决办法
4992
查看次数

如何在Elasticsearch中处理新的(上次运行后的索引)数据?

有没有办法获得弹性搜索文档的日期和时间?

我正在通过spark运行es查询,并且希望查看我已经处理过的所有文档.相反,我想阅读上次程序运行和现在之间摄取的唯一文档.

最有效的方法是什么?

我看过了;

  • 如果已经通过哪个分析查看,则更新以添加具有布尔数组的字段.否定是等待更新发生.
  • 每个时间帧方法的索引,它将按小时将当前索引分解为较小的索引.我看到的负数是打开文件描述符的数量.
  • ??

Elasticsearch版本5.6

elasticsearch

9
推荐指数
1
解决办法
134
查看次数

带解码器问题的Kafka Avro Consumer

当我尝试使用我的相应模式使用Avro运行Kafka Consumer时,它返回错误"AvroRuntimeException:格式错误的数据.长度为负:-40".我看到其他人有类似的问题,将字节数组转换为json,Avro写入和读取,以及Kafka Avro Binary*编码器.我也引用了这个消费者组示例,它们都很有帮助,但到目前为止这个错误没有任何帮助..它可以工作到这部分代码(第73行)

解码器解码器= DecoderFactory.get().binaryDecoder(byteArrayInputStream,null);

我已经尝试了其他解码器并打印出byteArrayInputStream变量的内容,它看起来我相信你会期望序列化的avro数据看起来(在消息中我可以看到模式和一些数据以及一些格式错误的数据)我打印出来了使用.available()方法可用的字节,返回594.我无法理解为什么会发生此错误.Apache Nifi用于生成具有来自hdfs的相同模式的Kafka流.我将不胜感激任何帮助.

java avro apache-kafka kafka-consumer-api apache-nifi

8
推荐指数
1
解决办法
7499
查看次数