为什么我的 Cassandra 数据库读取数据太慢?想要在 10 秒内读取 100,000 行

pet*_*ter 4 mysql cassandra

我有一个 cassandra 表“文章”,有 400,000 行

primary key (source,created_at desc)
Run Code Online (Sandbox Code Playgroud)

当我使用以下方式查询数据时:

select * from articles where source = 'abc' and created_at <= '2016-01-01 00:00:00'
Run Code Online (Sandbox Code Playgroud)

读取 110,000 行需要 8 分钟。

这非常慢,我不知道错误出在哪里。

我想在 10 秒内读取 100,000 行。不确定这是否可能?

这里有更多细节:

I have 3 nodes, replication factor =2, stragegy=SimpleStrategy, 4CPU, 32G RAM
I am using Cassandra-driver-3.0.0. 
Run Code Online (Sandbox Code Playgroud)

我不确定它是来自 python 还是 Cassandra,因为我们也在使用 python。

这是我的 CQL 架构:

CREATE TABLE crawler.articles (
    source text,
    created_at timestamp,
    id text,
    category text,
    channel text,
    last_crawled timestamp,
    text text,
    thumbnail text,
    title text,
    url text,
    PRIMARY KEY (source, created_at, id)
) WITH CLUSTERING ORDER BY (created_at DESC, id ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"ALL"}'
AND comment = ''
AND compaction = {'sstable_size_in_mb': '160', 'enabled': 'true', 'unchecked_tombstone_compaction': 'false', 'tombstone_compaction_interval': '86400', 'tombstone_threshold': '0.2', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 604800
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';

CREATE INDEX articles_id_idx ON crawler.articles (id);
CREATE INDEX articles_url_idx ON crawler.articles (url);
Run Code Online (Sandbox Code Playgroud)

编辑:

我想查询最近几天内的新文章,因此我的查询是:

SELECT * FROM articles WHERE source = 'any source' 
AND created_at >= '2016-01-08 00:00:00'
Run Code Online (Sandbox Code Playgroud)

插入样本如下:

INSERT INTO articles (source,created_at,id,category,channel,last_crawled,text,thumbnail,title,url) 
VALUES ('money',1452417991000,'1290141063','news_video_top','',1452418260000,'','http://inews.gtimg.com/newsapp_ls/0/143487758_150120/0','article title','http://view.inews.qq.com/a/VID2016011002195801');
Run Code Online (Sandbox Code Playgroud)

客户端代码:

'''
import sys
import logging
from cassandra import ConsistencyLevel

timespan = int(sys.argv[1])
source = str(sys.argv[2])

logging.basicConfig(filename='statistics-%d.log' % (timespan), format='%(asctime)-15s %(filename)s %(name)-8s %(message)s', level=logging.INFO)

class Whitelist(logging.Filter):
    def __init__(self, *whitelist):
        self.whitelist = [logging.Filter(name) for name in whitelist]

    def filter(self, record):
        return any(f.filter(record) for f in self.whitelist)

for handler in logging.root.handlers:
    handler.addFilter(Whitelist('statistics'))

log = logging.getLogger('statistics')

try:
    from datetime import datetime, timedelta

    if __name__ == '__main__':
        pass

    from cassandra.cluster import Cluster

    log.info('[%d] connecting cassandra...' % (timespan))
    cluster = Cluster(['xxx', 'xxx', 'xxx'])
    session = cluster.connect('crawler')

    cluster = Cluster(['xxx', 'xxx', 'xxx'])
    session_statis = cluster.connect('statistics')

    created_at = datetime.utcnow() + timedelta(hours=-timespan)

    print "[%s] FINDING ..." % (datetime.utcnow().isoformat())
    statuses = {}

    stmt = session.prepare("select * from articles where source = ? and created_at >= ? ")
    category_stmt = session.prepare('SELECT category FROM channels WHERE source = ? and id = ?')

    rows = session.execute(stmt, [source, created_at])

    for row in rows:
        try:
            if row.channel and source != 'toutiao':
                category = session.execute(category_stmt, ['zhihu' if row.source=='zhihuzero' else row.source, row.channel])
                statuses[row.id] = {'source':row.source, 'timespan': str(timespan), 'id': row.id, 'title':row.title, 'thumbnail':row.thumbnail, 'url':row.url, 'text':row.text, 'created_at':row.created_at, 'category': category[0].category, 'author':'', 'genre':row.category }
            else:
                statuses[row.id] = {'source':row.source, 'timespan': str(timespan), 'id': row.id, 'title':row.title, 'thumbnail':row.thumbnail, 'url':row.url, 'text':row.text, 'created_at':row.created_at, 'category': row.category, 'author':'', 'genre':'' }

        except Exception, e:
            continue

    print "%s weibos ..." % (len(statuses))
    print "[%s] CACULATING ..." % (datetime.utcnow().isoformat())
    stmt = session.prepare('SELECT article, MAX(comments) AS comments,MAX(likes) AS likes,MAX(reads) AS reads,MAX(shares) AS shares FROM axes WHERE article = ? AND at >= ?')

    for statuses_id, status in statuses.iteritems():
        rows = session.execute(stmt, [statuses_id, datetime.utcnow() + timedelta(hours=-timespan)])
        for row in rows:
            if source == 'toutiao':
                if not row.article is None:
                    status['reads'] = row.reads
                    status['likes'] = row.likes
                    status['shares'] = row.shares
                    status['comments'] = row.comments
                    status['speed'] = row.comments
                else:
                    status['reads'] = 0
                    status['likes'] = 0
                    status['shares'] = 0
                    status['comments'] = 0
                    status['speed'] = 0
            elif source == 'weibohao':
                if not row.article is None:
                    status['reads'] = row.reads
                    status['likes'] = row.likes
                    status['shares'] = row.shares
                    status['comments'] = row.comments
                    # status['speed'] = row.comments - row.comments_1
                    status['speed'] = row.shares
                else:
                    status['reads'] = 0
                    status['likes'] = 0
                    status['shares'] = 0
                    status['comments'] = 0
                    status['speed'] = 0
            elif source == 'tencent':
                if not row.article is None:
                    status['reads'] = row.reads
                    status['likes'] = row.likes
                    status['shares'] = row.shares
                    status['comments'] = row.comments
                    # status['speed'] = row.comments - row.comments_1
                    status['speed'] = row.comments
                else:
                    status['reads'] = 0
                    status['likes'] = 0
                    status['shares'] = 0
                    status['comments'] = 0
                    status['speed'] = 0
            elif source == 'zhihu':
                if not row.article is None:
                    status['reads'] = row.reads
                    status['likes'] = row.likes
                    status['shares'] = row.shares
                    status['comments'] = row.comments
                    # status['speed'] = row.comments - row.comments_1
                    status['speed'] = row.likes
                else:
                    status['reads'] = 0
                    status['likes'] = 0
                    status['shares'] = 0
                    status['comments'] = 0
                    status['speed'] = 0
            elif source == 'buluo':
                if not row.article is None:
                    status['reads'] = row.reads
                    status['likes'] = row.likes
                    status['shares'] = row.shares
                    status['comments'] = row.comments
                    # status['speed'] = row.comments - row.comments_1
                    status['speed'] = row.reads
                else:
                    status['reads'] = 0
                    status['likes'] = 0
                    status['shares'] = 0
                    status['comments'] = 0
                    status['speed'] = 0
        elif source == 'zhihuzero':
                if not row.article is None:
                    status['reads'] = row.reads
                    status['likes'] = row.likes
                    status['shares'] = row.shares
                    status['comments'] = row.comments
                    # status['speed'] = row.comments - row.comments_1
                    status['speed'] = row.likes
                else:
                    status['reads'] = 0
                    status['likes'] = 0
                    status['shares'] = 0
                    status['comments'] = 0
                    status['speed'] = 0

    statuses = sorted(statuses.iteritems(), key=lambda (k, v): (v['speed'], k), reverse=True)[:1000]

    print "[%s] TRUNCATING ..." % (datetime.utcnow().isoformat())
    session_statis.execute('DELETE FROM statistics WHERE source = %s AND timespan = %s', (source, str(timespan))) #, consistency_level=ConsistencyLevel.QUORUM

    print "[%s] UPDATING ..." % (datetime.utcnow().isoformat())
    for i, status in statuses:
        if status['speed'] > 0:
            session_statis.execute('insert into statistics.statistics(source,timespan,id,title,thumbnail,url,text,created_at,category,genre,author,reads,likes,comments,shares,speed) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)', (status['source'], status['timespan'], status['id'], status['title'], status['thumbnail'], status['url'], status['text'], status['created_at'], status['category'], status['genre'], status['author'], status['reads'], status['likes'], status['comments'], status['shares'], status['speed']))
        else:
            print status['id'], status['url']

    print "[%s] DONE ..." % (datetime.utcnow().isoformat())
    log.info('[%d] done' % (timespan))

except Exception, e:
    print 'except ===:', e
Run Code Online (Sandbox Code Playgroud)

感谢您的回复!

Jim*_*yer 5

您的用例有点不寻常。Cassandra 更多地用于少量行上的事务操作,而不是像在 hadoop 中那样进行批量处理。

您执行查询的方式是访问单个节点上的一个分区并将 100K 行传输到您的客户端。通过网络传输的数据量很大,我不确定您为什么要这样做。您按顺序执行所有操作,因此您不会获得并行性,也不会从拥有三个节点中获益。

通常,如果您想在 Cassandra 中对大量行进行批量处理,您可以使用 Spark 在每个节点上进行分布式处理,而不是按顺序将大量数据获取到客户端。

此外,您创建的两个索引看起来效果不佳。Cassandra 索引适用于基数较低的字段,但您似乎是在高基数字段上创建索引。Cassandra 索引与关系数据库中的索引有很大不同。

我必须查看您的客户端代码才能知道您是否在那里做了一些效率低下的事情。通常获取大量行会触发分页,所以我不确定你是如何处理的。