我正在尝试使用Avro来读取/写入Kafka的消息.有没有人有一个使用Avro二进制编码器编码/解码将被放在消息队列中的数据的例子?
我需要Avro部件而不是Kafka部件.或者,也许我应该看一个不同的解决方案?基本上,我正试图在空间方面找到更有效的JSON解决方案.刚刚提到Avro,因为它比JSON更紧凑.
我正在尝试使用spark进行一些简单的机器学习任务.我使用pyspark和spark 1.2.0来做一个简单的逻辑回归问题.我有120万条培训记录,我记录了记录的功能.当我将散列函数的数量设置为1024时,程序运行正常,但是当我将散列函数的数量设置为16384时,程序会多次失败并出现以下错误:
Py4JJavaError: An error occurred while calling o84.trainLogisticRegressionModelWithSGD.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 4 times, most recent failure: Lost task 1.3 in stage 4.0 (TID 9, workernode0.sparkexperience4a7.d5.internal.cloudapp.net): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) …Run Code Online (Sandbox Code Playgroud) python java distributed-computing logistic-regression apache-spark
我在Python中看到了decorator 示例:
def makebold(fn):
def wrapped():
return "<b>" + fn() + "</b>"
return wrapped
def makeitalic(fn):
def wrapped():
return "<i>" + fn() + "</i>"
return wrapped
@makebold
@makeitalic
def hello():
return "hello world"
print hello() ## returns <b><i>hello world</i></b>
Run Code Online (Sandbox Code Playgroud)
并且有些好奇它如何在Java中实现,所以我搜索并使用Decorator Design Pattern获得了一些示例.
public class Main {
public static void main(String[] args) {
Wrapper word = new BoldWrapper(new ItalicWrapper());
// display <b><i>hello world</i></b>
System.out.println(word.make("Hello World"));
}
}
public interface Wrapper {
public String make(String str);
}
public class …Run Code Online (Sandbox Code Playgroud) 我使用Apache avro架构与Kafka 0.0.8V.我在生产者/消费者端使用相同的模式.目前暂无任何变化的模式.但是当我尝试使用消息时,我在消费者处得到了一些例外.为什么我会收到此错误?
制片人
public void sendFile(String topic, GenericRecord payload, Schema schema) throws CoreException, IOException {
BinaryEncoder encoder = null;
ByteArrayOutputStream out = null;
try {
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<GenericRecord>(schema);
out = new ByteArrayOutputStream();
encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(payload, encoder);
encoder.flush();
byte[] serializedBytes = out.toByteArray();
KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>(topic, serializedBytes);
producer.send(message);
}
Run Code Online (Sandbox Code Playgroud)
消费者
public void run() {
try {
ConsumerIterator<byte[], byte[]> itr = stream.iterator();
while (itr.hasNext()) {
byte[] data = itr.next().message();
Schema schema = …Run Code Online (Sandbox Code Playgroud) 我正在编写一个消费者,一旦将一系列记录提交给Mongo,就会手动提交偏移量.
在发生Mongo错误或任何其他错误的情况下,尝试将记录持久化到错误处理集合以便在以后重播.如果Mongo失败,那么我希望消费者在尝试从Kakfa的非公开偏移中读取记录之前停止处理一段时间.
以下示例有效,但我想知道这种情况的最佳做法是什么?
while (true) {
boolean commit = false;
try {
ConsumerRecords<K, V> records = consumer.poll(consumerTimeout);
kafkaMessageProcessor.processRecords(records);
commit = true;
}
catch (Exception e) {
logger.error("Unable to consume closing consumer and restarting", e);
try {
consumer.close();
}
catch (Exception consumerCloseError) {
logger.error("Unable to close consumer", consumerCloseError);
}
logger.error(String.format("Attempting recovery in [%d] milliseconds.", recoveryInterval), e);
Thread.sleep(recoveryInterval);
consumer = createConsumer(properties);
}
if (commit) {
consumer.commitSync();
}
}
private KafkaConsumer<K, V> createConsumer(Properties properties) {
KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(properties);
consumer.subscribe(topics); …Run Code Online (Sandbox Code Playgroud) 我正在尝试对从Kafka读取的(假)apache web服务器日志运行有状态Spark Streaming计算.目标是"会话化"类似于此博客文章的网络流量
唯一的区别是我希望"会话化"IP命中的每个页面,而不是整个会话.我能够在批处理模式下使用Spark从假网络流量文件中读取此内容,但现在我想在流式上下文中执行此操作.
日志文件从Kafka读取并解析为K/V成对(String, (String, Long, Long))或
(IP, (requestPage, time, time)).
然后我打电话groupByKey()给这个K/V pair.在批处理模式下,这将产生:
(String, CollectionBuffer((String, Long, Long), ...) 要么
(IP, CollectionBuffer((requestPage, time, time), ...)
在StreamingContext中,它产生一个:
(String, ArrayBuffer((String, Long, Long), ...) 像这样:
(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
Run Code Online (Sandbox Code Playgroud)
但是,随着下一个微分类(DStream)的到来,该信息被丢弃.
最终我想要的是ArrayBuffer随着时间的推移填充,因为给定的IP继续交互并对其数据运行一些计算以"会话化"页面时间.
我认为实现这一目标的运营商是" updateStateByKey." 我在使用这个操作符时遇到了一些麻烦(我是Spark和Scala的新手);
任何帮助表示赞赏.
迄今:
val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
def updateGroupByKey(
a: Seq[(String, ArrayBuffer[(String, Long, Long)])],
b: Option[(String, ArrayBuffer[(String, Long, Long)])]
): Option[(String, ArrayBuffer[(String, Long, Long)])] = {
}
Run Code Online (Sandbox Code Playgroud) Class ProdsTransformer:
def __init__(self):
self.products_lookup_hmap = {}
self.broadcast_products_lookup_map = None
def create_broadcast_variables(self):
self.broadcast_products_lookup_map = sc.broadcast(self.products_lookup_hmap)
def create_lookup_maps(self):
// The code here builds the hashmap that maps Prod_ID to another space.
pt = ProdsTransformer ()
pt.create_broadcast_variables()
pairs = distinct_users_projected.map(lambda x: (x.user_id,
pt.broadcast_products_lookup_map.value[x.Prod_ID]))
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
"例外:您似乎尝试从广播变量,操作或转换中引用SparkContext.SparkContext只能用于驱动程序,而不能用于在工作程序上运行的代码.有关更多信息,请参阅SPARK-5063."
任何有关如何处理广播变量的帮助都会很棒!
当我等到我的火花apache工作完成时,我试图避免"while(true)"解决方案,但没有成功.
我有一个spark应用程序,它假设处理一些数据并将结果放到数据库中,我确实从我的spring服务中调用它,并希望等到作业完成.
例:
启动器方法:
@Override
public void run(UUID docId, String query) throws Exception {
launcher.addAppArgs(docId.toString(), query);
SparkAppHandle sparkAppHandle = launcher.startApplication();
sparkAppHandle.addListener(new SparkAppHandle.Listener() {
@Override
public void stateChanged(SparkAppHandle handle) {
System.out.println(handle.getState() + " new state");
}
@Override
public void infoChanged(SparkAppHandle handle) {
System.out.println(handle.getState() + " new state");
}
});
System.out.println(sparkAppHandle.getState().toString());
}
Run Code Online (Sandbox Code Playgroud)
如何正确等待,直到处理程序状态为"已完成".
我正在尝试在 Windows 10 上设置 kafka 事务生产者。我从 CLI 下载并作为单节点运行 kafka。运行如下:
(仅更改了 server.properties -> log.dirs )
一切都很好,卡夫卡已经启动并运行。默认测试:kafka-console- Producer 、 kafka-console-consumer 效果良好。
public class SampleTest {
private final static Logger logger = LoggerFactory.getLogger(SampleTest.class);
public static void main(String[] args) {
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer");
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // enable idempotence
producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-trx-id123"); // set transaction id
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
Producer<String, String> producer = new KafkaProducer<>(producerConfig);
producer.initTransactions();
try {
producer.beginTransaction();
String firstMsg = "Hello 1";
producer.send(new …Run Code Online (Sandbox Code Playgroud) 根据我的理解,Kafka消费者按顺序从指定的分区读取消息...
我们计划拥有多个Kafka使用者(Java),它们具有相同的组I ..如果它从指定的分区顺序读取,那么我们如何实现高吞吐量..例如,Producer发布消息,如每秒40个.消费者流程每秒消息1 ..虽然我们可以有多个消费者,但不能有40 rt ??? 如我错了请纠正我...
在我们的情况下,消费者只有在消息成功处理后才能提交偏移..这些消息将被重新处理......有没有更好的解决方案???
我正在使用Spark和cassandra.我正在从我的表中读取一些行,以便使用PrimaryKey删除主题.这是我的代码:
val lines = sc.cassandraTable[(String, String, String, String)](CASSANDRA_SCHEMA, table).
select("a","b","c","d").
where("d=?", d).cache()
lines.foreach(r => {
val session: Session = connector.openSession
val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+r._1 +"' and ctid='"+r._2+"'and cvid='"+r._3+"';"
session.execute(delete)
session.close()
})
Run Code Online (Sandbox Code Playgroud)
但是这种方法为每一行创建一个会话,这需要很多时间.那么是否可以使用sc.CassandraTable或其他更好的方法删除我的行.
谢谢
java ×8
apache-kafka ×5
apache-spark ×5
python ×3
avro ×2
scala ×2
annotations ×1
pyspark ×1
spring-kafka ×1