我有数千万行数据.是否可以使用火花流在一周或一天内分析所有这些?在数据量方面,火花流的限制是什么?我不确定什么是上限,什么时候我应该将它们放入我的数据库,因为Stream可能无法再处理它们了.我也有不同的时间窗口1,3,6小时等,我使用窗口操作来分隔数据.
请在下面找到我的代码:
conf = SparkConf().setAppName(appname)
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc,300)
sqlContext = SQLContext(sc)
channels = sc.cassandraTable("abc","channels")
topic = 'abc.crawled_articles'
kafkaParams = {"metadata.broker.list": "0.0.0.0:9092"}
category = 'abc.crawled_article'
category_stream = KafkaUtils.createDirectStream(ssc, [category], kafkaParams)
category_join_stream = category_stream.map(lambda x:read_json(x[1])).filter(lambda x:x!=0).map(lambda x:categoryTransform(x)).filter(lambda x:x!=0).map(lambda x:(x['id'],x))
article_stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)
article_join_stream=article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:TransformInData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(x['id'].encode("utf-8") ,x))
#axes topic integration the article and the axes
axes_topic = 'abc.crawled_axes'
axes_stream = KafkaUtils.createDirectStream(ssc, [axes_topic], kafkaParams)
axes_join_stream = axes_stream.filter(lambda x:'delete' not …Run Code Online (Sandbox Code Playgroud) 我有6个节点,1个Solr,5个Spark节点,使用数据传输.我的群集位于与Amazon EC2类似的服务器上,具有EBS卷.每个节点有3个EBS卷,使用LVM组成逻辑数据磁盘.在我的OPS中心,同一节点经常无响应,这导致我的数据系统连接超时.我的数据量约为400GB,包含3个副本.我有20个流媒体作业,每分钟有一个批处理间隔.这是我的错误消息:
/var/log/cassandra/output.log:WARN 13:44:31,868 Not marking nodes down due to local pause of 53690474502 > 5000000000
/var/log/cassandra/system.log:WARN [GossipTasks:1] 2016-09-25 16:40:34,944 FailureDetector.java:258 - Not marking nodes down due to local pause of 64532052919 > 5000000000
/var/log/cassandra/system.log:WARN [GossipTasks:1] 2016-09-25 16:59:12,023 FailureDetector.java:258 - Not marking nodes down due to local pause of 66027485893 > 5000000000
/var/log/cassandra/system.log:WARN [GossipTasks:1] 2016-09-26 13:44:31,868 FailureDetector.java:258 - Not marking nodes down due to local pause of 53690474502 > 5000000000
Run Code Online (Sandbox Code Playgroud)
编辑:
这些是我更具体的配置.我想知道我做错了什么,如果是的话,我怎么能详细了解它是什么以及如何解决它?
out heap设置为
MAX_HEAP_SIZE="16G"
HEAP_NEWSIZE="4G"
Run Code Online (Sandbox Code Playgroud)
当前堆:
[root@iZ11xsiompxZ ~]# jstat …Run Code Online (Sandbox Code Playgroud) 目前,当一个新用户到来时,我无法更新我的推荐系统,这与未添加用户和项目矩阵有关.我在哪里可以找到这个以及如何做到这一点?谢谢
model.userFactors model.itemFactors
recommendation-engine machine-learning collaborative-filtering apache-spark
我有一个 cassandra 表“文章”,有 400,000 行
primary key (source,created_at desc)
Run Code Online (Sandbox Code Playgroud)
当我使用以下方式查询数据时:
select * from articles where source = 'abc' and created_at <= '2016-01-01 00:00:00'
Run Code Online (Sandbox Code Playgroud)
读取 110,000 行需要 8 分钟。
这非常慢,我不知道错误出在哪里。
我想在 10 秒内读取 100,000 行。不确定这是否可能?
这里有更多细节:
I have 3 nodes, replication factor =2, stragegy=SimpleStrategy, 4CPU, 32G RAM
I am using Cassandra-driver-3.0.0.
Run Code Online (Sandbox Code Playgroud)
我不确定它是来自 python 还是 Cassandra,因为我们也在使用 python。
这是我的 CQL 架构:
CREATE TABLE crawler.articles (
source text,
created_at timestamp,
id text,
category text,
channel text,
last_crawled timestamp,
text text,
thumbnail text,
title …Run Code Online (Sandbox Code Playgroud)