小编gia*_*dau的帖子

如何保存Spark消耗给ZK或Kafka的最新偏移量,并在重启后可以回读

Kafka 0.8.2用来从AdExchange接收数据然后我Spark Streaming 1.4.1用来存储数据MongoDB.

我的问题是当我重新启动我的Spark StreamingJob时,例如更新新版本,修复bug,添加新功能.它将继续阅读最新offsetkafka重启作业期间在当时那么我将数据丢失的AdX推卡夫卡.

我尝试类似的东西,auto.offset.reset -> smallest但它会从0 - >收到最后数据是巨大的,并在数据库中重复.

我也尝试设置具体的group.idconsumer.idSpark却是相同的.

如何保存最新的offset消耗,火花zookeeperkafka然后可以从读回最新的offset

apache-kafka apache-spark spark-streaming kafka-consumer-api

14
推荐指数
1
解决办法
1万
查看次数

如何修复Python中破坏的utf-8编码?

我的字符串是Niệm Bồ Tát (Thiá»n sư Nhất Hạnh),我想解码它Ni?m B? Tát (Thi?n s? Nh?t H?nh).我看到该网站可以做到这一点http://www.enderminh.com/minh/utf8-to-unicode-converter.aspx

我开始尝试使用Python

mystr = '09. Bát Nhã Tâm Kinh'
mystr.decode('utf-8')
Run Code Online (Sandbox Code Playgroud)

但实际上它不正确,因为原始字符串是utf-8但字符串显示不是我期待的结果.

注意:它是越南字符.

如何解决这个案子?那是Windows Unicode还是什么?如何在这里检测编码.

python unicode utf-8 character-encoding

9
推荐指数
2
解决办法
1万
查看次数

来自Kafka的Spark Streaming有错误,numRecords不能为负数

它有点奇怪的错误,因为我仍然将数据推送到kafka并消费来自kafka的消息,Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative 也有点奇怪.我搜索并且没有得到任何相关的资源.

让我解释一下我的集群.我有1个服务器是主服务器和从服务器运行mesos,在那上我设置了3个像这样的kafka经纪人.然后我在该集群上运行spark-job.经纪商 Mesos 我在用 spark 1.5.2

brokers:
  id: 0
  active: true
  state: running
  resources: cpus:1.00, mem:1024, heap:512, port:31000
  failover: delay:1m, max-delay:10m
  stickiness: period:10m, hostname:test-master
  task:
    id: broker-0-c32082d0-a544-4260-b7c4-0239d99f0972
    state: running
    endpoint: test-master:31000
  metrics:
    collected: 2016-01-25 17:46:47+08
    under-replicated-partitions: 0
    offline-partitions-count: 0
    is-active-controller: 1

  id: 1
  active: true
  state: running
  resources: cpus:1.00, mem:1024, heap:512, port:31001
  failover: delay:1m, max-delay:10m
  stickiness: period:10m, hostname:test-master
  task:
    id: broker-1-7b30d6ad-6b19-4420-b743-c6f7f1adfb07
    state: running
    endpoint: test-master:31001
  metrics:
    collected: 2016-01-25 …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark spark-streaming

8
推荐指数
2
解决办法
9322
查看次数

在Db上使用Spark Mongo-Hadoop读写时,应该打开错误状态

场景:我在maping之后收集了kafka的RDD并且减少了我想把它写入数据库db_aggregate集合date_xx.

映射时我需要从该db获取以获得先前的结果.

就像我写AI需要B(之前写过)的结果来计算然后写A到db.

我认为我面临的问题是当我在写入新记录时读取db_aggregate,db游标可能会被一个动作写入或读取关闭.

我在用 Spark 1.4.1 mongo-hadoop.1.4.1 mongo 2.6

功能:

def getPreviousAggregate(campaignId: String, publisher: String, width: Int, height: Int,
                           date: Int, month: Int, year: Int): BasicBSONObject = {
    findLatestAggregate(campaignId, publisher, width, height, date, month, year) match {
      case Some(toReturn) => return toReturn
      case None => {
        println("Not found previous date ....")
        val previousDate = Calendar.getInstance();
        previousDate.set(year, month, date)
        previousDate.add(Calendar.DATE, -1)
        val _date = previousDate.get(Calendar.DATE)
        val _month = previousDate.get(Calendar.MONTH)
        val _year = previousDate.get(Calendar.YEAR)
        findLatestAggregate(campaignId, publisher, width, …
Run Code Online (Sandbox Code Playgroud)

hadoop scala mongodb apache-spark

5
推荐指数
0
解决办法
861
查看次数

使用System.IO.SearchOption.AllDirectories时出错

我的代码

 DirectoryInfo[] dirstring = dir.GetDirectories(SearchTextbox.Text, System.IO.SearchOption.AllDirectories);
            FileInfo[] fileinfo = dir.GetFiles(SearchTextbox.Text, System.IO.SearchOption.AllDirectories);
Run Code Online (Sandbox Code Playgroud)

DIR = @ "d:\"; 有一个错误的预期UnauthorizedAccessException 如何捕获它并继续搜索其他文件夹和文件.

.net c#

4
推荐指数
1
解决办法
1782
查看次数

什么MIME可以读取.eml文件?

我使用ImapX库将电子邮件保存到.eml文件。我可以使用什么MIME来阅读它?MIME可以读取大多数Content-Type。

c# email mime imap eml

2
推荐指数
1
解决办法
7671
查看次数

MissingSchemaError:填充时尚未为模型"用户"注册架构

我为创建用户以及关注者和关注者编写模型

    var UserSchema = new Schema({
created_at : { type: Date, default: Date.now }
, name : String
,   hashedPass : String
,   salt: String
,   email : Email
,   avatar : {type: String, default: "images/default_profile.png" }
,   statuses_count : Number
,   screen_name : String
,   location : String
,   about : String
,   followers: [{ type: ObjectId, ref: 'Users' }]
,   following: [{ type: ObjectId, ref: 'Users' }]
,   followers_count : Number
,   following_count : Number
});
Run Code Online (Sandbox Code Playgroud)

并在创建跟随用户我做

UserSchema.statics.createFollowing …
Run Code Online (Sandbox Code Playgroud)

populate mongoose node.js

2
推荐指数
1
解决办法
5261
查看次数

越南电子邮件主题编码?

Subject: Re:
 =?UTF-8?Q?Th=E1=BA=A7y_g=E1=BB=ADi_b=C3=A0i_t=E1=BA=ADp_cho_em_v=E1=BB?=
 =?UTF-8?Q?=9Bi.?=
Run Code Online (Sandbox Code Playgroud)

我收到了一封有此主题标题的电子邮件.怎么解码?

c# email mime imaplib

1
推荐指数
2
解决办法
757
查看次数

使用node.io进行抓取时如何添加像Tor这样的代理?

我正在使用node.io来构建一个web scraper但是在找到方法的时候,我已经请求了很多,而且这个网站阻止了我.我不知道如何添加代理,比如使用Tor来向这个站点发出请求.

node.js web-scraping node.io

1
推荐指数
1
解决办法
3169
查看次数

toolStripComboBox设置字体样式?

我用comboBox 阅读了这个主题http://technicalsol.blogspot.com/2009/03/combobox-set-font-style.html但是在toolstripComboBox中不存在事件draw_item我需要你的帮助.我正在用C#编写简单的wordpad.

.net c# toolstrip

0
推荐指数
1
解决办法
1651
查看次数