小编Kyr*_*Kyr的帖子

如何使用Avro二进制编码器对Kafka消息进行编码/解码?

我正在尝试使用Avro来读取/写入Kafka的消息.有没有人有一个使用Avro二进制编码器编码/解码将被放在消息队列中的数据的例子?

我需要Avro部件而不是Kafka部件.或者,也许我应该看一个不同的解决方案?基本上,我正试图在空间方面找到更有效的JSON解决方案.刚刚提到Avro,因为它比JSON更紧凑.

java avro apache-kafka

27
推荐指数
3
解决办法
5万
查看次数

Spark Java错误:大小超过Integer.MAX_VALUE

我正在尝试使用spark进行一些简单的机器学习任务.我使用pyspark和spark 1.2.0来做一个简单的逻辑回归问题.我有120万条培训记录,我记录了记录的功能.当我将散列函数的数量设置为1024时,程序运行正常,但是当我将散列函数的数量设置为16384时,程序会多次失败并出现以下错误:

Py4JJavaError: An error occurred while calling o84.trainLogisticRegressionModelWithSGD.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 (TID 9, workernode0.sparkexperience4a7.d5.internal.cloudapp.net): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
    at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
    at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
    at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) …
Run Code Online (Sandbox Code Playgroud)

python java distributed-computing logistic-regression apache-spark

16
推荐指数
2
解决办法
1万
查看次数

Java中的装饰器

我在Python中看到了decorator 示例:

def makebold(fn):
    def wrapped():
        return "<b>" + fn() + "</b>"
    return wrapped

def makeitalic(fn):
    def wrapped():
        return "<i>" + fn() + "</i>"
    return wrapped

@makebold
@makeitalic
def hello():
    return "hello world"

print hello() ## returns <b><i>hello world</i></b>
Run Code Online (Sandbox Code Playgroud)

并且有些好奇它如何在Java中实现,所以我搜索并使用Decorator Design Pattern获得了一些示例.

public class Main {

    public static void main(String[] args) {
        Wrapper word = new BoldWrapper(new ItalicWrapper());
        // display <b><i>hello world</i></b>
        System.out.println(word.make("Hello World"));
    }
}

public interface Wrapper {

    public String make(String str);

}

public class …
Run Code Online (Sandbox Code Playgroud)

python java annotations

15
推荐指数
3
解决办法
1万
查看次数

Avro解码提供了java.io.EOFException

我使用Apache avro架构与Kafka 0.0.8V.我在生产者/消费者端使用相同的模式.目前暂无任何变化的模式.但是当我尝试使用消息时,我在消费者处得到了一些例外.为什么我会收到此错误?

制片人

public void sendFile(String topic, GenericRecord payload, Schema schema) throws CoreException, IOException {
    BinaryEncoder encoder = null;
    ByteArrayOutputStream out = null;
    try {
        DatumWriter<GenericRecord> writer = new SpecificDatumWriter<GenericRecord>(schema);
        out = new ByteArrayOutputStream();
        encoder = EncoderFactory.get().binaryEncoder(out, null);
        writer.write(payload, encoder);
        encoder.flush();

        byte[] serializedBytes = out.toByteArray();

        KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>(topic, serializedBytes);

            producer.send(message);
        }
Run Code Online (Sandbox Code Playgroud)

消费者

public void run() {
        try {
            ConsumerIterator<byte[], byte[]> itr = stream.iterator();
            while (itr.hasNext()) {

                byte[] data = itr.next().message();

                Schema schema = …
Run Code Online (Sandbox Code Playgroud)

java avro apache-kafka kafka-consumer-api

10
推荐指数
1
解决办法
2121
查看次数

Kafka 0.9如何在使用KafkaConsumer手动提交偏移时重新使用消息

我正在编写一个消费者,一旦将一系列记录提交给Mongo,就会手动提交偏移量.
在发生Mongo错误或任何其他错误的情况下,尝试将记录持久化到错误处理集合以便在以后重播.如果Mongo失败,那么我希望消费者在尝试从Kakfa的非公开偏移中读取记录之前停止处理一段时间.
以下示例有效,但我想知道这种情况的最佳做法是什么?

while (true) {
    boolean commit = false;
    try {
        ConsumerRecords<K, V> records = consumer.poll(consumerTimeout);
        kafkaMessageProcessor.processRecords(records);
        commit = true;
    }
    catch (Exception e) {
        logger.error("Unable to consume closing consumer and restarting", e);
        try {
           consumer.close();
        }
        catch (Exception consumerCloseError) {
            logger.error("Unable to close consumer", consumerCloseError);
        }
        logger.error(String.format("Attempting recovery in  [%d] milliseconds.", recoveryInterval), e);
        Thread.sleep(recoveryInterval);
        consumer = createConsumer(properties);
    }
    if (commit) {
        consumer.commitSync();
    }

}

private KafkaConsumer<K, V> createConsumer(Properties properties) {
    KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(properties);
    consumer.subscribe(topics); …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-consumer-api

8
推荐指数
2
解决办法
1万
查看次数

Spark Streaming groupByKey和updateStateByKey实现

我正在尝试对从Kafka读取的(假)apache web服务器日志运行有状态Spark Streaming计算.目标是"会话化"类似于此博客文章的网络流量

唯一的区别是我希望"会话化"IP命中的每个页面,而不是整个会话.我能够在批处理模式下使用Spark从假网络流量文件中读取此内容,但现在我想在流式上下文中执行此操作.

日志文件从Kafka读取并解析为K/V成对(String, (String, Long, Long))

(IP, (requestPage, time, time)).

然后我打电话groupByKey()给这个K/V pair.在批处理模式下,这将产生:

(String, CollectionBuffer((String, Long, Long), ...) 要么

(IP, CollectionBuffer((requestPage, time, time), ...)

在StreamingContext中,它产生一个:

(String, ArrayBuffer((String, Long, Long), ...) 像这样:

(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
Run Code Online (Sandbox Code Playgroud)

但是,随着下一个微分类(DStream)的到来,该信息被丢弃.

最终我想要的是ArrayBuffer随着时间的推移填充,因为给定的IP继续交互并对其数据运行一些计算以"会话化"页面时间.

我认为实现这一目标的运营商是" updateStateByKey." 我在使用这个操作符时遇到了一些麻烦(我是Spark和Scala的新手);

任何帮助表示赞赏.

迄今:

val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) 


    def updateGroupByKey(
                          a: Seq[(String, ArrayBuffer[(String, Long, Long)])],
                          b: Option[(String, ArrayBuffer[(String, Long, Long)])]
                          ): Option[(String, ArrayBuffer[(String, Long, Long)])] = {

  }
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-streaming

7
推荐指数
1
解决办法
2885
查看次数

Spark:广播变量:您似乎尝试从广播变量,操作或转换引用SparkContext

Class ProdsTransformer:

    def __init__(self):  
      self.products_lookup_hmap = {}
      self.broadcast_products_lookup_map = None

    def create_broadcast_variables(self):
      self.broadcast_products_lookup_map = sc.broadcast(self.products_lookup_hmap)

    def create_lookup_maps(self):
    // The code here builds the hashmap that maps Prod_ID to another space.

pt = ProdsTransformer ()
pt.create_broadcast_variables()  

pairs = distinct_users_projected.map(lambda x: (x.user_id,    
                         pt.broadcast_products_lookup_map.value[x.Prod_ID]))
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

"例外:您似乎尝试从广播变量,操作或转换中引用SparkContext.SparkContext只能用于驱动程序,而不能用于在工作程序上运行的代码.有关更多信息,请参阅SPARK-5063."

任何有关如何处理广播变量的帮助都会很棒!

python apache-spark pyspark

7
推荐指数
1
解决办法
1万
查看次数

如何从另一个应用程序启动它时正确等待apache spark启动器作业?

当我等到我的火花apache工作完成时,我试图避免"while(true)"解决方案,但没有成功.

我有一个spark应用程序,它假设处理一些数据并将结果放到数据库中,我确实从我的spring服务中调用它,并希望等到作业完成.

例:

启动器方法:

@Override
public void run(UUID docId, String query) throws Exception {
    launcher.addAppArgs(docId.toString(), query);

    SparkAppHandle sparkAppHandle = launcher.startApplication();

    sparkAppHandle.addListener(new SparkAppHandle.Listener() {
        @Override
        public void stateChanged(SparkAppHandle handle) {
            System.out.println(handle.getState() + " new  state");
        }

        @Override
        public void infoChanged(SparkAppHandle handle) {
            System.out.println(handle.getState() + " new  state");
        }
    });

    System.out.println(sparkAppHandle.getState().toString());
}
Run Code Online (Sandbox Code Playgroud)

如何正确等待,直到处理程序状态为"已完成".

java apache-spark spark-launcher

6
推荐指数
1
解决办法
4362
查看次数

如何设置kafka事务生产者

我正在尝试在 Windows 10 上设置 kafka 事务生产者。我从 CLI 下载并作为单节点运行 kafka。运行如下:

  • .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
  • .\bin\windows\kafka-server-start.bat .\config\server.properties

(仅更改了 server.properties -> log.dirs )

一切都很好,卡夫卡已经启动并运行。默认测试:kafka-console- Producer 、 kafka-console-consumer 效果良好。

public class SampleTest {

    private final static Logger logger = LoggerFactory.getLogger(SampleTest.class);

    public static void main(String[] args) {

        Properties producerConfig = new Properties();
        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer");
        producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // enable idempotence
        producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-trx-id123"); // set transaction id
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        Producer<String, String> producer = new KafkaProducer<>(producerConfig);

        producer.initTransactions();
        try {
            producer.beginTransaction();
            String firstMsg = "Hello 1";
            producer.send(new …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-producer-api spring-kafka

6
推荐指数
0
解决办法
6512
查看次数

Kafka消费者(0.8.2.2)可以批量阅读消息吗?

根据我的理解,Kafka消费者按顺序从指定的分区读取消息...

我们计划拥有多个Kafka使用者(Java),它们具有相同的组I ..如果它从指定的分区顺序读取,那么我们如何实现高吞吐量..例如,Producer发布消息,如每秒40个.消费者流程每秒消息1 ..虽然我们可以有多个消费者,但不能有40 rt ??? 如我错了请纠正我...

在我们的情况下,消费者只有在消息成功处理后才能提交偏移..这些消息将被重新处理......有没有更好的解决方案???

java apache-kafka kafka-consumer-api

5
推荐指数
1
解决办法
1万
查看次数

从Spark中的cassandra表中删除

我正在使用Spark和cassandra.我正在从我的表中读取一些行,以便使用PrimaryKey删除主题.这是我的代码:

val lines = sc.cassandraTable[(String, String, String, String)](CASSANDRA_SCHEMA, table).
  select("a","b","c","d").
  where("d=?", d).cache()

lines.foreach(r => {
    val session: Session = connector.openSession
    val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+r._1 +"' and ctid='"+r._2+"'and cvid='"+r._3+"';"
    session.execute(delete)
    session.close()
})
Run Code Online (Sandbox Code Playgroud)

但是这种方法为每一行创建一个会话,这需要很多时间.那么是否可以使用sc.CassandraTable或其他更好的方法删除我的行.

谢谢

scala cassandra-2.0 apache-spark

4
推荐指数
1
解决办法
5832
查看次数