小编Cho*_*eat的帖子

Spark Streaming - 读写Kafka主题

我正在使用Spark Streaming处理两个Kafka队列之间的数据,但我似乎找不到从Spark写Kafka的好方法.我试过这个:

input.foreachRDD(rdd =>
  rdd.foreachPartition(partition =>
    partition.foreach {
      case x: String => {
        val props = new HashMap[String, Object]()

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        println(x)
        val producer = new KafkaProducer[String, String](props)
        val message = new ProducerRecord[String, String]("output", null, x)
        producer.send(message)
      }
    }
  )
)
Run Code Online (Sandbox Code Playgroud)

并且它按预期工作,但是为每个消息实例化一个新的KafkaProducer在真实环境中显然是不可行的,我正在尝试解决它.

我想为每个进程保留一个实例的引用,并在需要发送消息时访问它.如何从Spark Streaming写入Kafka?

scala apache-kafka apache-spark spark-streaming spark-streaming-kafka

33
推荐指数
5
解决办法
4万
查看次数

PySpark从执行程序登录

在执行程序上使用pyspark访问Spark的log4j记录器的正确方法是什么?

在驱动程序中这样做很容易,但我似乎无法理解如何访问执行程序上的日志记录功能,以便我可以在本地登录并让YARN收集本地日志.

有没有办法访问本地记录器?

标准的日志记录过程是不够的,因为我无法从执行程序访问spark上下文.

python log4j apache-spark pyspark

15
推荐指数
2
解决办法
1万
查看次数

在cakephp中呈现一个空白视图

我需要阻止视图在指定的情况下呈现,但我无法理解如何防止它呈现.

我试过了

$this->autoRender=false
Run Code Online (Sandbox Code Playgroud)

但没有任何反应,可能是因为我使用的API引擎管理渲染与常规控制器不同.有人知道这样做的诀窍吗?

cakephp view

9
推荐指数
4
解决办法
1万
查看次数

将图像置于css圆圈中心

在此输入图像描述

这是CSS圈子中的图像.我希望圆圈围绕图像,因此图像应该位于中心.我怎样才能做到这一点?

HTML:

<div class="circletag" id="nay">
    <img src="/images/no.png">
</div>
Run Code Online (Sandbox Code Playgroud)

CSS:

div.circletag {
    display: block;
    width: 40px;
    height: 40px;
    background: #E6E7ED;
    -moz-border-radius: 20px;
    -webkit-border-radius: 20px;
}
div.circletag.img {

}
Run Code Online (Sandbox Code Playgroud)

html css

8
推荐指数
1
解决办法
4万
查看次数

BadArgumentError:带游标的_MultiQuery在ndb中需要__key__顺序

我无法理解这个错误意味着什么,显然,没有人在互联网上得到同样的错误

BadArgumentError:带游标的_MultiQuery需要__key__订购

这发生在这里:

return SocialNotification.query().order(-SocialNotification.date).filter(SocialNotification.source_key.IN(nodes_list)).fetch_page(10)
Run Code Online (Sandbox Code Playgroud)

该属性source_key显然是一个键,nodes_list是以前检索的实体键列表.

我需要的是找到所有SocialNotifications具有source_key与列表中的一个键匹配的字段的字段.

google-app-engine google-cloud-datastore

8
推荐指数
1
解决办法
3934
查看次数

使用播放SBT自动重载,编译时间极慢!和scala-js

我遇到SBT问题.我的项目是这个的一个分支:https://github.com/vmunier/play-with-scalajs-example,目前几乎完全相同.我只添加了几行代码和导入的scalatags.

当我用sbt启动服务器时,我运行该项目,这需要很多时间.自动重载功能也是如此.对于代码的任何更改,我每次都在谈论2到3分钟.即使对于SBT,这也太慢了.

SBT版本0.13.0播放版本2.1 Scala版本2.9.1 scalatags版本2.10 scalajs 0.2.4

这是SBT控制台中的输出

    [info] Compiling 1 Scala source to /home/chobeat/git/2048-in-scala-js/scalajs/target/scala-2.10/classes...
[info] Preoptimizing /home/chobeat/git/2048-in-scala-js/scalajvm/public/javascripts/scalajs/scalajs-example-preopt.js ...
[warn] Referring to non-existent class scala_scalajs_test_JasmineTest
[warn] Referring to non-existent method example_test_ScalaJSExampleTest$.expect__Lscala_scalajs_js_Any__Lorg_scalajs_jasmine_JasmineExpectation
[warn]   called from example_test_ScalaJSExampleTest$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp__V
[warn]   called from example_test_ScalaJSExampleTest$$anonfun$1$$anonfun$apply$mcV$sp$1.apply__V
[warn]   called from example_test_ScalaJSExampleTest$$anonfun$1$$anonfun$apply$mcV$sp$1.apply__O
[warn]   called from scala_Predef$.require__Z__Lscala_Function0__V
[warn]   called from scala_collection_Iterator$class.scala_collection_Iterator$class__copyToArray__Lscala_collection_Iterator__O__I__I__V
[warn]   called from scala_collection_AbstractIterator.copyToArray__O__I__I__V
[warn]   called from scala_collection_TraversableOnce$class.scala_collection_TraversableOnce$class__copyToArray__Lscala_collection_TraversableOnce__O__I__V
[warn]   called from scala_collection_AbstractTraversable.copyToArray__O__I__V
[warn]   called from scala_collection_TraversableOnce$class.scala_collection_TraversableOnce$class__toArray__Lscala_collection_TraversableOnce__Lscala_reflect_ClassTag__O
[warn]   called from scala_collection_AbstractTraversable.toArray__Lscala_reflect_ClassTag__O
[warn]   called from scala_collection_immutable_StringLike$class.scala_collection_immutable_StringLike$class__format__Lscala_collection_immutable_StringLike__Lscala_collection_Seq__T
[warn] …
Run Code Online (Sandbox Code Playgroud)

scala sbt playframework

8
推荐指数
1
解决办法
5745
查看次数

每次与存储库同步后,IntelliJ都会继续询问"Setup Scala SDK"

正如标题所说,IntelliJ一直要求在每次与存储库合并时设置我的sbt项目的Scala SDK(它们在多项目中).就像某个设置文件被覆盖特定项目但我似乎无法找到问题所在.任何关于在哪里看的见解?

scala intellij-idea sbt

8
推荐指数
1
解决办法
4495
查看次数

清除单元测试的 kafka 主题

我需要在 kafka 应用程序上执行单元测试,避免使用第三方库。

我现在的问题是我想清除测试之间的所有主题,但我不知道如何。

这是我的临时解决方案:提交每次测试后产生的每条消息,并将所有测试消费者放在同一个消费者组中。

override protected def afterEach():Unit={
    val cleanerConsumer= newConsumer(Seq.empty)
    val topics=cleanerConsumer.listTopics()
    println("pulisco")
    cleanerConsumer.subscribe(topics.keySet())
    cleanerConsumer.poll(100)
    cleanerConsumer.commitSync()
    cleanerConsumer.close()
}
Run Code Online (Sandbox Code Playgroud)

但这不起作用,我不知道为什么。

例如,当我在测试中创建一个新的消费者时,它messages包含在前一个测试中产生的消息。

val consumerProbe = newConsumer(SMSGatewayTopic)

val messages = consumerProbe.poll(1000)
Run Code Online (Sandbox Code Playgroud)

我该如何解决这个问题?

java unit-testing scala apache-kafka

8
推荐指数
1
解决办法
6416
查看次数

计算h指数

我需要从存储在树中的出版物列表中计算h-index.

我所做的是以低位顺序遍历树,获得引用位置数列表

看起来像:

line 1 10
line 2 5
line 3 4
line 4 0
Run Code Online (Sandbox Code Playgroud)

我应该停在第3行并返回3.问题在于给出的例子,在这种情况下

line 1 4
line 2 0
line 3 0
Run Code Online (Sandbox Code Playgroud)

它停在2,因为4> 1但0> 3是假的.它应该返回1.你能解释一下为什么吗?我知道这更像是一个数学家的问题,但在那之后我可能需要重新实现它,如果出现严重错误的话.

这是代码

  int index_h_calc(rbtree_node n, int *i){
    if (n == NULL) {
        fputs("<empty tree>\n", stdout);
        return 0;
    }
    if (n->right != NULL)
      index_h_calc(n->right,i);


    graduat *grad;
    grad=n->value;

    if(DEBUG)
      printf("linea %d %d %s\n ",*i,(int)grad->tot,grad->name);

    if(*i+1>=(int)grad->tot) {
      return *i;
    } else
      *i+=1;

    if (n->left != NULL)
      index_h_calc(n->left,i);

    return *i;
  }
Run Code Online (Sandbox Code Playgroud)

c algorithm tree list sorted

7
推荐指数
1
解决办法
1735
查看次数

PyKafka metadata in bytes instead of strings

I see an unusual behaviour with PyKafka, a client that I just recently began to use.

The error is the following:

Failed to connect newly created broker for b'4758e4ee1af6':9092
{0: <pykafka.broker.Broker at 0x7f319e19be10 (host=b'4758e4ee1af6',port=9092, id=0)>}
Run Code Online (Sandbox Code Playgroud)

The source of the error is in this lines:

    self.client = KafkaClient(hosts=BROKER_ADDRESS, broker_version="0.10.1.0")
consumer = self.client.topics[bytes(self.input_topic,"UTF-8")].get_balanced_consumer(
        consumer_group=bytes(self.consumer_group,"UTF-8"),
        auto_commit_enable=True
    )
Run Code Online (Sandbox Code Playgroud)

Debugging I saw that the client use the correct string IP to connect to the seed broker but when the list of brokers is retrieved, their …

python apache-kafka pykafka

5
推荐指数
1
解决办法
320
查看次数