小编pet*_*ter的帖子

在数据量方面,火花流的限制是什么?

我有数千万行数据.是否可以使用火花流在一周或一天内分析所有这些?在数据量方面,火花流的限制是什么?我不确定什么是上限,什么时候我应该将它们放入我的数据库,因为Stream可能无法再处理它们了.我也有不同的时间窗口1,3,6小时等,我使用窗口操作来分隔数据.

请在下面找到我的代码:

conf = SparkConf().setAppName(appname)
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc,300)
sqlContext = SQLContext(sc)
channels = sc.cassandraTable("abc","channels")
topic = 'abc.crawled_articles'
kafkaParams = {"metadata.broker.list": "0.0.0.0:9092"}

category = 'abc.crawled_article'
category_stream = KafkaUtils.createDirectStream(ssc, [category], kafkaParams)
category_join_stream = category_stream.map(lambda x:read_json(x[1])).filter(lambda x:x!=0).map(lambda x:categoryTransform(x)).filter(lambda x:x!=0).map(lambda x:(x['id'],x))


article_stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)
article_join_stream=article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:TransformInData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(x['id'].encode("utf-8") ,x))

#axes topic  integration the article and the axes
axes_topic = 'abc.crawled_axes'
axes_stream = KafkaUtils.createDirectStream(ssc, [axes_topic], kafkaParams)
axes_join_stream = axes_stream.filter(lambda x:'delete' not …
Run Code Online (Sandbox Code Playgroud)

datastax-enterprise apache-spark spark-streaming

8
推荐指数
1
解决办法
997
查看次数

Cassandra错误消息:由于本地暂停而未将节点标记为关闭.为什么?

我有6个节点,1个Solr,5个Spark节点,使用数据传输.我的群集位于与Amazon EC2类似的服务器上,具有EBS卷.每个节点有3个EBS卷,使用LVM组成逻辑数据磁盘.在我的OPS中心,同一节点经常无响应,这导致我的数据系统连接超时.我的数据量约为400GB,包含3个副本.我有20个流媒体作业,每分钟有一个批处理间隔.这是我的错误消息:

/var/log/cassandra/output.log:WARN 13:44:31,868 Not marking nodes down due to local pause of 53690474502 > 5000000000
/var/log/cassandra/system.log:WARN [GossipTasks:1] 2016-09-25 16:40:34,944 FailureDetector.java:258 - Not marking nodes down due to local pause of 64532052919 > 5000000000 
/var/log/cassandra/system.log:WARN [GossipTasks:1] 2016-09-25 16:59:12,023 FailureDetector.java:258 - Not marking nodes down due to local pause of 66027485893 > 5000000000 
/var/log/cassandra/system.log:WARN [GossipTasks:1] 2016-09-26 13:44:31,868 FailureDetector.java:258 - Not marking nodes down due to local pause of 53690474502 > 5000000000
Run Code Online (Sandbox Code Playgroud)

编辑:

这些是我更具体的配置.我想知道我做错了什么,如果是的话,我怎么能详细了解它是什么以及如何解决它?

out heap设置为

MAX_HEAP_SIZE="16G"
HEAP_NEWSIZE="4G"
Run Code Online (Sandbox Code Playgroud)

当前堆:

[root@iZ11xsiompxZ ~]# jstat …
Run Code Online (Sandbox Code Playgroud)

amazon-ec2 cassandra datastax apache-spark datastax-startup

8
推荐指数
1
解决办法
6367
查看次数

如何处理Spark ALS从MLlib生成的模型中的新用户/项目?

目前,当一个新用户到来时,我无法更新我的推荐系统,这与未添加用户和项目矩阵有关.我在哪里可以找到这个以及如何做到这一点?谢谢

model.userFactors model.itemFactors

recommendation-engine machine-learning collaborative-filtering apache-spark

5
推荐指数
1
解决办法
2728
查看次数

为什么我的 Cassandra 数据库读取数据太慢?想要在 10 秒内读取 100,000 行

我有一个 cassandra 表“文章”,有 400,000 行

primary key (source,created_at desc)
Run Code Online (Sandbox Code Playgroud)

当我使用以下方式查询数据时:

select * from articles where source = 'abc' and created_at <= '2016-01-01 00:00:00'
Run Code Online (Sandbox Code Playgroud)

读取 110,000 行需要 8 分钟。

这非常慢,我不知道错误出在哪里。

我想在 10 秒内读取 100,000 行。不确定这是否可能?

这里有更多细节:

I have 3 nodes, replication factor =2, stragegy=SimpleStrategy, 4CPU, 32G RAM
I am using Cassandra-driver-3.0.0. 
Run Code Online (Sandbox Code Playgroud)

我不确定它是来自 python 还是 Cassandra,因为我们也在使用 python。

这是我的 CQL 架构:

CREATE TABLE crawler.articles (
    source text,
    created_at timestamp,
    id text,
    category text,
    channel text,
    last_crawled timestamp,
    text text,
    thumbnail text,
    title …
Run Code Online (Sandbox Code Playgroud)

mysql cassandra

4
推荐指数
1
解决办法
6598
查看次数