我正在使用Spark Streaming处理两个Kafka队列之间的数据,但我似乎找不到从Spark写Kafka的好方法.我试过这个:
input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach {
case x: String => {
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
}
}
)
)
Run Code Online (Sandbox Code Playgroud)
并且它按预期工作,但是为每个消息实例化一个新的KafkaProducer在真实环境中显然是不可行的,我正在尝试解决它.
我想为每个进程保留一个实例的引用,并在需要发送消息时访问它.如何从Spark Streaming写入Kafka?
scala apache-kafka apache-spark spark-streaming spark-streaming-kafka
在执行程序上使用pyspark访问Spark的log4j记录器的正确方法是什么?
在驱动程序中这样做很容易,但我似乎无法理解如何访问执行程序上的日志记录功能,以便我可以在本地登录并让YARN收集本地日志.
有没有办法访问本地记录器?
标准的日志记录过程是不够的,因为我无法从执行程序访问spark上下文.
我需要阻止视图在指定的情况下呈现,但我无法理解如何防止它呈现.
我试过了
$this->autoRender=false
Run Code Online (Sandbox Code Playgroud)
但没有任何反应,可能是因为我使用的API引擎管理渲染与常规控制器不同.有人知道这样做的诀窍吗?

这是CSS圈子中的图像.我希望圆圈围绕图像,因此图像应该位于中心.我怎样才能做到这一点?
HTML:
<div class="circletag" id="nay">
<img src="/images/no.png">
</div>
Run Code Online (Sandbox Code Playgroud)
CSS:
div.circletag {
display: block;
width: 40px;
height: 40px;
background: #E6E7ED;
-moz-border-radius: 20px;
-webkit-border-radius: 20px;
}
div.circletag.img {
}
Run Code Online (Sandbox Code Playgroud) 我无法理解这个错误意味着什么,显然,没有人在互联网上得到同样的错误
BadArgumentError:带游标的_MultiQuery需要
__key__订购
这发生在这里:
return SocialNotification.query().order(-SocialNotification.date).filter(SocialNotification.source_key.IN(nodes_list)).fetch_page(10)
Run Code Online (Sandbox Code Playgroud)
该属性source_key显然是一个键,nodes_list是以前检索的实体键列表.
我需要的是找到所有SocialNotifications具有source_key与列表中的一个键匹配的字段的字段.
我遇到SBT问题.我的项目是这个的一个分支:https://github.com/vmunier/play-with-scalajs-example,目前几乎完全相同.我只添加了几行代码和导入的scalatags.
当我用sbt启动服务器时,我运行该项目,这需要很多时间.自动重载功能也是如此.对于代码的任何更改,我每次都在谈论2到3分钟.即使对于SBT,这也太慢了.
SBT版本0.13.0播放版本2.1 Scala版本2.9.1 scalatags版本2.10 scalajs 0.2.4
这是SBT控制台中的输出
[info] Compiling 1 Scala source to /home/chobeat/git/2048-in-scala-js/scalajs/target/scala-2.10/classes...
[info] Preoptimizing /home/chobeat/git/2048-in-scala-js/scalajvm/public/javascripts/scalajs/scalajs-example-preopt.js ...
[warn] Referring to non-existent class scala_scalajs_test_JasmineTest
[warn] Referring to non-existent method example_test_ScalaJSExampleTest$.expect__Lscala_scalajs_js_Any__Lorg_scalajs_jasmine_JasmineExpectation
[warn] called from example_test_ScalaJSExampleTest$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp__V
[warn] called from example_test_ScalaJSExampleTest$$anonfun$1$$anonfun$apply$mcV$sp$1.apply__V
[warn] called from example_test_ScalaJSExampleTest$$anonfun$1$$anonfun$apply$mcV$sp$1.apply__O
[warn] called from scala_Predef$.require__Z__Lscala_Function0__V
[warn] called from scala_collection_Iterator$class.scala_collection_Iterator$class__copyToArray__Lscala_collection_Iterator__O__I__I__V
[warn] called from scala_collection_AbstractIterator.copyToArray__O__I__I__V
[warn] called from scala_collection_TraversableOnce$class.scala_collection_TraversableOnce$class__copyToArray__Lscala_collection_TraversableOnce__O__I__V
[warn] called from scala_collection_AbstractTraversable.copyToArray__O__I__V
[warn] called from scala_collection_TraversableOnce$class.scala_collection_TraversableOnce$class__toArray__Lscala_collection_TraversableOnce__Lscala_reflect_ClassTag__O
[warn] called from scala_collection_AbstractTraversable.toArray__Lscala_reflect_ClassTag__O
[warn] called from scala_collection_immutable_StringLike$class.scala_collection_immutable_StringLike$class__format__Lscala_collection_immutable_StringLike__Lscala_collection_Seq__T
[warn] …Run Code Online (Sandbox Code Playgroud) 正如标题所说,IntelliJ一直要求在每次与存储库合并时设置我的sbt项目的Scala SDK(它们在多项目中).就像某个设置文件被覆盖特定项目但我似乎无法找到问题所在.任何关于在哪里看的见解?
我需要在 kafka 应用程序上执行单元测试,避免使用第三方库。
我现在的问题是我想清除测试之间的所有主题,但我不知道如何。
这是我的临时解决方案:提交每次测试后产生的每条消息,并将所有测试消费者放在同一个消费者组中。
override protected def afterEach():Unit={
val cleanerConsumer= newConsumer(Seq.empty)
val topics=cleanerConsumer.listTopics()
println("pulisco")
cleanerConsumer.subscribe(topics.keySet())
cleanerConsumer.poll(100)
cleanerConsumer.commitSync()
cleanerConsumer.close()
}
Run Code Online (Sandbox Code Playgroud)
但这不起作用,我不知道为什么。
例如,当我在测试中创建一个新的消费者时,它messages包含在前一个测试中产生的消息。
val consumerProbe = newConsumer(SMSGatewayTopic)
val messages = consumerProbe.poll(1000)
Run Code Online (Sandbox Code Playgroud)
我该如何解决这个问题?
我需要从存储在树中的出版物列表中计算h-index.
我所做的是以低位顺序遍历树,获得引用位置数列表
看起来像:
line 1 10
line 2 5
line 3 4
line 4 0
Run Code Online (Sandbox Code Playgroud)
我应该停在第3行并返回3.问题在于给出的例子,在这种情况下
line 1 4
line 2 0
line 3 0
Run Code Online (Sandbox Code Playgroud)
它停在2,因为4> 1但0> 3是假的.它应该返回1.你能解释一下为什么吗?我知道这更像是一个数学家的问题,但在那之后我可能需要重新实现它,如果出现严重错误的话.
这是代码
int index_h_calc(rbtree_node n, int *i){
if (n == NULL) {
fputs("<empty tree>\n", stdout);
return 0;
}
if (n->right != NULL)
index_h_calc(n->right,i);
graduat *grad;
grad=n->value;
if(DEBUG)
printf("linea %d %d %s\n ",*i,(int)grad->tot,grad->name);
if(*i+1>=(int)grad->tot) {
return *i;
} else
*i+=1;
if (n->left != NULL)
index_h_calc(n->left,i);
return *i;
}
Run Code Online (Sandbox Code Playgroud) I see an unusual behaviour with PyKafka, a client that I just recently began to use.
The error is the following:
Failed to connect newly created broker for b'4758e4ee1af6':9092
{0: <pykafka.broker.Broker at 0x7f319e19be10 (host=b'4758e4ee1af6',port=9092, id=0)>}
Run Code Online (Sandbox Code Playgroud)
The source of the error is in this lines:
self.client = KafkaClient(hosts=BROKER_ADDRESS, broker_version="0.10.1.0")
consumer = self.client.topics[bytes(self.input_topic,"UTF-8")].get_balanced_consumer(
consumer_group=bytes(self.consumer_group,"UTF-8"),
auto_commit_enable=True
)
Run Code Online (Sandbox Code Playgroud)
Debugging I saw that the client use the correct string IP to connect to the seed broker but when the list of brokers is retrieved, their …