我很难在Spark文档操作中找到导致shuffle和操作的操作.在这个列表中,哪些确实会导致混乱而哪些不会导致混乱?
地图和过滤器没有.但是,我不确定其他人.
map(func)
filter(func)
flatMap(func)
mapPartitions(func)
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed)
union(otherDataset)
intersection(otherDataset)
distinct([numTasks]))
groupByKey([numTasks])
reduceByKey(func, [numTasks])
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks])
join(otherDataset, [numTasks])
cogroup(otherDataset, [numTasks])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)
Run Code Online (Sandbox Code Playgroud) 我正在编写一个需要在特定HDFS路径中获取新二进制文件的组件,以便我可以根据这些数据进行一些在线学习.所以,我想在流中读取Flume从HDFS创建的二进制文件.我找到了spark API提供的几个函数,比如
public JavaDStream<byte[]> binaryRecordsStream(String directory,int recordLength)
Run Code Online (Sandbox Code Playgroud)
和
public <K,V,F extends org.apache.hadoop.mapreduce.InputFormat<K,V>>
JavaPairInputDStream<K,V> fileStream(String directory, Class<K> kClass, Class<V> vClass, Class<F> fClass)
Run Code Online (Sandbox Code Playgroud)
但是,我真的不知道如何使用这些功能.我试过binaryRecordStream
,但它定义了文件的具体长度,所以它并不好.
对于fileStream
功能,我用过:
SparkConf sparkConf = new SparkConf().setAppName("SparkFileStreamTest").setMaster("local[2]");
// Create the context with the specified batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(durationInMillis));
JavaPairInputDStream<LongWritable, BytesWritable> inputDStream = jssc.fileStream(hdfsPath, LongWritable.class, BytesWritable.class, CustomInputFormat.class);
//**********************************************************************
JavaPairInputDStream<LongWritable, BytesWritable> inputDStream = jssc.fileStream(
hdfsPath, LongWritable.class, BytesWritable.class, CustomInputFormat.class);
JavaDStream<byte[]> content = inputDStream.map(new Function<Tuple2<LongWritable, BytesWritable>, byte[]>() {
@Override
public byte[] call(Tuple2<LongWritable, BytesWritable> tuple2) …
Run Code Online (Sandbox Code Playgroud) 使用Apache Kafka创建者提供的Confluent Platform平台,我有一个问题:
在Schema Registry API Reference的文档中,他们提到了"Subject"的抽象.您在"subject"下注册了一个模式,其主题为topicName-key或topicName-value,但是没有解释为什么需要(因为它暗示)一个单独的模式来表示消息的键和值.给定主题.也没有任何直接的声明,即注册"主题"必然将模式与该主题相关联,而不是助记.
进一步令人困惑的是,该页面上的后续示例("获取主题的模式版本"和"在主题下注册新模式")不使用该主题名称的格式,而是仅使用主题名称作为"主题"值.如果有人对a)为什么每个主题有这两个"主题",以及b)正确用法是什么有任何见解,那将非常感激.
有没有办法获得弹性搜索文档的日期和时间?
我正在通过spark运行es查询,并且不希望查看我已经处理过的所有文档.相反,我想阅读上次程序运行和现在之间摄取的唯一文档.
最有效的方法是什么?
我看过了;
Elasticsearch版本5.6
当我尝试使用我的相应模式使用Avro运行Kafka Consumer时,它返回错误"AvroRuntimeException:格式错误的数据.长度为负:-40".我看到其他人有类似的问题,将字节数组转换为json,Avro写入和读取,以及Kafka Avro Binary*编码器.我也引用了这个消费者组示例,它们都很有帮助,但到目前为止这个错误没有任何帮助..它可以工作到这部分代码(第73行)
解码器解码器= DecoderFactory.get().binaryDecoder(byteArrayInputStream,null);
我已经尝试了其他解码器并打印出byteArrayInputStream变量的内容,它看起来我相信你会期望序列化的avro数据看起来(在消息中我可以看到模式和一些数据以及一些格式错误的数据)我打印出来了使用.available()方法可用的字节,返回594.我无法理解为什么会发生此错误.Apache Nifi用于生成具有来自hdfs的相同模式的Kafka流.我将不胜感激任何帮助.
java ×3
apache-kafka ×2
apache-spark ×2
avro ×2
apache-nifi ×1
hadoop ×1
python ×1
scala ×1
streaming ×1