添加第二个节点后 Cassandra 中持续超时

aro*_*oth 5 java timeout cassandra

我正在尝试将相当大的数据量(约 4100 万行)从 SQL 数据库迁移到 Cassandra。我之前已经使用一半的数据集进行了试运行,一切都完全按照预期进行。

问题是,现在我正在尝试完整迁移,Cassandra 不断抛出超时错误。例如:

[INFO] [talledLocalContainer] com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:10112 (com.datastax.driver.core.exceptions.DriverException: Timed out waiting for server response))
[INFO] [talledLocalContainer]   at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
[INFO] [talledLocalContainer]   at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289)
[INFO] [talledLocalContainer]   at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205)
[INFO] [talledLocalContainer]   at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52)
[INFO] [talledLocalContainer]   at com.mycompany.tasks.CassandraMigrationTask.execute(CassandraMigrationTask.java:164)
[INFO] [talledLocalContainer]   at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
[INFO] [talledLocalContainer]   at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
[INFO] [talledLocalContainer] Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:10112 (com.datastax.driver.core.exceptions.DriverException: Timed out waiting for server response))
[INFO] [talledLocalContainer]   at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:108)
[INFO] [talledLocalContainer]   at com.datastax.driver.core.RequestHandler$1.run(RequestHandler.java:179)
[INFO] [talledLocalContainer]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[INFO] [talledLocalContainer]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[INFO] [talledLocalContainer]   at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

我尝试增加 中的超时值cassandra.yaml,这增加了迁移在超时之前能够运行的时间(大致与超时的增加成比例)。

在更改超时设置之前,我的堆栈跟踪看起来更像是:

[INFO] [talledLocalContainer] com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency ONE (1 replica were required but only 0 acknowledged the write)
[INFO] [talledLocalContainer]   at com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:54)
[INFO] [talledLocalContainer]   at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:289)
[INFO] [talledLocalContainer]   at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:205)
[INFO] [talledLocalContainer]   at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52)
[INFO] [talledLocalContainer]   at com.mycompany.tasks.CassandraMigrationTask.execute(CassandraMigrationTask.java:164)
[INFO] [talledLocalContainer]   at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
[INFO] [talledLocalContainer]   at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
[INFO] [talledLocalContainer] Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency ONE (1 replica were required but only 0 acknowledged the write)
[INFO] [talledLocalContainer]   at com.datastax.driver.core.exceptions.WriteTimeoutException.copy(WriteTimeoutException.java:54)
[INFO] [talledLocalContainer]   at com.datastax.driver.core.Responses$Error.asException(Responses.java:99)
[INFO] [talledLocalContainer]   at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:140)
[INFO] [talledLocalContainer]   at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:249)
[INFO] [talledLocalContainer]   at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:433)
[INFO] [talledLocalContainer]   at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:697)
[INFO] [talledLocalContainer]   at com.datastax.shaded.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
[INFO] [talledLocalContainer]   at com.datastax.shaded.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
[INFO] [talledLocalContainer]   at com.datastax.shaded.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
[INFO] [talledLocalContainer]   at com.datastax.shaded.netty.channel.Channels.fireMessageReceived(Channels.java:296)
[INFO] [talledLocalContainer]   at com.datastax.shaded.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70)
Run Code Online (Sandbox Code Playgroud)

目前我的超时设置是:

# How long the coordinator should wait for read operations to complete
read_request_timeout_in_ms: 30000
# How long the coordinator should wait for seq or index scans to complete
range_request_timeout_in_ms: 30000
# How long the coordinator should wait for writes to complete
write_request_timeout_in_ms: 30000
# How long the coordinator should wait for counter writes to complete
counter_write_request_timeout_in_ms: 30000
# How long a coordinator should continue to retry a CAS operation
# that contends with other proposals for the same row
cas_contention_timeout_in_ms: 1000
# How long the coordinator should wait for truncates to complete
# (This can be much longer, because unless auto_snapshot is disabled
# we need to flush first so we can snapshot before removing the data.)
truncate_request_timeout_in_ms: 60000
# The default timeout for other, miscellaneous operations
request_timeout_in_ms: 20000
Run Code Online (Sandbox Code Playgroud)

...这让我在超时发生之前插入了大约 150 万行。原来的超时设置是:

# How long the coordinator should wait for read operations to complete
read_request_timeout_in_ms: 5000
# How long the coordinator should wait for seq or index scans to complete
range_request_timeout_in_ms: 10000
# How long the coordinator should wait for writes to complete
write_request_timeout_in_ms: 2000
# How long the coordinator should wait for counter writes to complete
counter_write_request_timeout_in_ms: 5000
# How long a coordinator should continue to retry a CAS operation
# that contends with other proposals for the same row
cas_contention_timeout_in_ms: 1000
# How long the coordinator should wait for truncates to complete
# (This can be much longer, because unless auto_snapshot is disabled
# we need to flush first so we can snapshot before removing the data.)
truncate_request_timeout_in_ms: 60000
# The default timeout for other, miscellaneous operations
request_timeout_in_ms: 10000
Run Code Online (Sandbox Code Playgroud)

...这导致大约每 300,000 行发生一次超时。

从我成功运行到现在,发生的唯一重大变化是我向 Cassandra 部署添加了第二个节点。因此,直观上我认为这个问题与数据从第一个节点到第二个节点的传播有关(例如,随着<some process>插入的数据量线性缩放,并且当只有一个节点时不使用它)节点)。但我没有看到任何可能对配置/缓解此问题有用的明显选项。

如果相关的话,我会在迁移期间使用批处理语句,通常每批最多使用 100 到 200 条语句/行。

我的密钥空间最初是设置的WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 },但我将其更改为WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }看看这是否会产生任何影响。事实并非如此。

我还尝试显式设置ConsistencyLevel.ANY所有插入语句(以及封闭的批处理语句)。这也没有什么区别。

尽管第一个节点肯定比第二个节点显示更多的“操作”,但两个节点上的 Cassandra 日志似乎没有任何有趣的内容:

第一个节点 - 454317 操作

INFO  [SlabPoolCleaner] 2016-01-25 19:46:08,806 ColumnFamilyStore.java:905 - Enqueuing flush of assetproperties_flat: 148265302 (14%) on-heap, 0 (0%) off-heap
INFO  [MemtableFlushWriter:15] 2016-01-25 19:46:08,807 Memtable.java:347 - Writing Memtable-assetproperties_flat@350387072(20.557MiB serialized bytes, 454317 ops, 14%/0% of on/off-heap limit)
INFO  [MemtableFlushWriter:15] 2016-01-25 19:46:09,393 Memtable.java:382 - Completed flushing /var/cassandra/data/itb/assetproperties_flat-e83359a0c34411e593abdda945619e28/itb-assetproperties_flat-tmp-ka-32-Data.db (5.249MiB) for commitlog position ReplayPosition(segmentId=1453767930194, position=15188257)
Run Code Online (Sandbox Code Playgroud)

第二个节点 - 2020 年运营

INFO  [BatchlogTasks:1] 2016-01-25 19:46:33,961 ColumnFamilyStore.java:905 - Enqueuing flush of batchlog: 4923957 (0%) on-heap, 0 (0%) off-heap
INFO  [MemtableFlushWriter:22] 2016-01-25 19:46:33,962 Memtable.java:347 - Writing Memtable-batchlog@796821497(4.453MiB serialized bytes, 2020 ops, 0%/0% of on/off-heap limit)
INFO  [MemtableFlushWriter:22] 2016-01-25 19:46:33,963 Memtable.java:393 - Completed flushing /var/cassandra/data/system/batchlog-0290003c977e397cac3efdfdc01d626b/system-batchlog-tmp-ka-11-Data.db; nothing needed to be retained.  Commitlog position was ReplayPosition(segmentId=1453767955411, position=18567563)
Run Code Online (Sandbox Code Playgroud)

有没有人遇到过类似的问题,如果有,解决方法是什么?

是否建议仅使第二个节点脱机,仅使用第一个节点运行迁移,然后运行nodetool repair以使第二个节点恢复同步?

编辑

回答评论中的问题:

  1. 我正在使用 datastax Java 驱动程序,并且有一个服务器端任务(Quartz 作业),该任务使用 ORM 层(休眠)来查找下一个要迁移的数据块,将其写入 Cassandra,然后从 SQL 中清除它数据库。我使用以下代码连接到 Cassandra:

    public static Session getCassandraSession(String keyspace) {
        Session session = clusterSessions.get(keyspace);
        if (session != null && ! session.isClosed()) {
            //can use the cached session
            return session;
        }
    
        //create a new session for the specified keyspace
        Cluster cassandraCluster = getCluster();
        session = cassandraCluster.connect(keyspace);
    
        //cache and return the session
        clusterSessions.put(keyspace, session);
        return session;
    }
    
    private static Cluster getCluster() {
        if (cluster != null && ! cluster.isClosed()) {
            //can use the cached cluster
            return cluster;
        }
    
        //configure socket options
        SocketOptions options = new SocketOptions();
        options.setConnectTimeoutMillis(30000);
        options.setReadTimeoutMillis(300000);
        options.setTcpNoDelay(true);
    
        //spin up a fresh connection
        cluster = Cluster.builder().addContactPoint(Configuration.getCassandraHost()).withPort(Configuration.getCassandraPort())
                    .withCredentials(Configuration.getCassandraUser(), Configuration.getCassandraPass()).withSocketOptions(options).build();
    
        //log the cluster details for confirmation
        Metadata metadata = cluster.getMetadata();
        LOG.debug("Connected to Cassandra cluster: " + metadata.getClusterName());
        for ( Host host : metadata.getAllHosts() ) {
            LOG.debug("Datacenter:  " + host.getDatacenter() + "; Host:  " + host.getAddress() + "; Rack: " + host.getRack());
        }
    
        return cluster;
    }
    
    Run Code Online (Sandbox Code Playgroud)

    带有 的部分SocketOptions是最近添加的,因为最新的超时错误听起来像是来自 Java/客户端而不是来自 Cassandra 本身。

  2. 每批插入的记录不超过200条。典型值接近 100。

  3. 两个节点具有相同的规格:

    • 英特尔(R) 至强(R) CPU E3-1230 V2 @ 3.30GHz
    • 32GB内存
    • 256GB SSD(主)、2TB HDD(备份),均采用 RAID-1 配置
  4. 第一个节点:

    Pool Name                    Active   Pending      Completed   Blocked  All time blocked
    CounterMutationStage              0         0              0         0                 0
    ReadStage                         0         0          58155         0                 0
    RequestResponseStage              0         0         655104         0                 0
    MutationStage                     0         0         259151         0                 0
    ReadRepairStage                   0         0              0         0                 0
    GossipStage                       0         0          58041         0                 0
    CacheCleanupExecutor              0         0              0         0                 0
    AntiEntropyStage                  0         0              0         0                 0
    MigrationStage                    0         0              0         0                 0
    Sampler                           0         0              0         0                 0
    ValidationExecutor                0         0              0         0                 0
    CommitLogArchiver                 0         0              0         0                 0
    MiscStage                         0         0              0         0                 0
    MemtableFlushWriter               0         0             80         0                 0
    MemtableReclaimMemory             0         0             80         0                 0
    PendingRangeCalculator            0         0              3         0                 0
    MemtablePostFlush                 0         0            418         0                 0
    CompactionExecutor                0         0           8979         0                 0
    InternalResponseStage             0         0              0         0                 0
    HintedHandoff                     0         0              2         0                 0
    Native-Transport-Requests         1         0        1175338         0                 0
    
    Message type           Dropped
    RANGE_SLICE                  0
    READ_REPAIR                  0
    PAGED_RANGE                  0
    BINARY                       0
    READ                         0
    MUTATION                     0
    _TRACE                       0
    REQUEST_RESPONSE             0
    COUNTER_MUTATION             0
    
    Run Code Online (Sandbox Code Playgroud)

    第二个节点:

    Pool Name                    Active   Pending      Completed   Blocked  All time blocked
    CounterMutationStage              0         0              0         0                 0
    ReadStage                         0         0          55803         0                 0
    RequestResponseStage              0         0              1         0                 0
    MutationStage                     0         0         733828         0                 0
    ReadRepairStage                   0         0              0         0                 0
    GossipStage                       0         0          56623         0                 0
    CacheCleanupExecutor              0         0              0         0                 0
    AntiEntropyStage                  0         0              0         0                 0
    MigrationStage                    0         0              0         0                 0
    Sampler                           0         0              0         0                 0
    ValidationExecutor                0         0              0         0                 0
    CommitLogArchiver                 0         0              0         0                 0
    MiscStage                         0         0              0         0                 0
    MemtableFlushWriter               0         0            394         0                 0
    MemtableReclaimMemory             0         0            394         0                 0
    PendingRangeCalculator            0         0              2         0                 0
    MemtablePostFlush                 0         0            428         0                 0
    CompactionExecutor                0         0           8883         0                 0
    InternalResponseStage             0         0              0         0                 0
    HintedHandoff                     0         0              1         0                 0
    Native-Transport-Requests         0         0             70         0                 0
    
    Message type           Dropped
    RANGE_SLICE                  0
    READ_REPAIR                  0
    PAGED_RANGE                  0
    BINARY                       0
    READ                         0
    MUTATION                     0
    _TRACE                       0
    REQUEST_RESPONSE             0
    COUNTER_MUTATION             0
    
    Run Code Online (Sandbox Code Playgroud)
  5. 的输出nodetool ring很长。这是一个nodetool status替代方案:

    Datacenter: DC1
    ===============
    Status=Up/Down
    |/ State=Normal/Leaving/Joining/Moving
    --  Address         Load       Tokens  Owns    Host ID                               Rack
    UN  204.11.xxx.1  754.66 MB  1024    ?       8cf373d8-0b3e-4fd3-9e63-fdcdd8ce8cd4  RAC1
    UN  208.66.xxx.2  767.78 MB  1024    ?       42e1f336-84cb-4260-84df-92566961a220  RAC2
    
    Run Code Online (Sandbox Code Playgroud)
  6. 我将 Cassandra 的所有超时值增加了 10 倍,并且还设置了 Java 驱动程序的读取超时设置以匹配,现在我插入了8m 29.4m,没有任何问题。理论上,如果问题与超时值呈线性关系,那么我应该可以正常进行大约 15m 的插入(这至少足够好,我不需要不断地照顾迁移过程以等待每个新错误)。

aro*_*oth 1

好的,所以我可以通过做两件事来停止超时错误。首先,我增加了两台主机上 Cassandra 的超时值,如下所示:

# How long the coordinator should wait for read operations to complete
read_request_timeout_in_ms: 30000
# How long the coordinator should wait for seq or index scans to complete
range_request_timeout_in_ms: 30000
# How long the coordinator should wait for writes to complete
write_request_timeout_in_ms: 30000
# How long the coordinator should wait for counter writes to complete
counter_write_request_timeout_in_ms: 30000
# How long a coordinator should continue to retry a CAS operation
# that contends with other proposals for the same row
cas_contention_timeout_in_ms: 1000
# How long the coordinator should wait for truncates to complete
# (This can be much longer, because unless auto_snapshot is disabled
# we need to flush first so we can snapshot before removing the data.)
truncate_request_timeout_in_ms: 60000
# The default timeout for other, miscellaneous operations
request_timeout_in_ms: 20000
Run Code Online (Sandbox Code Playgroud)

我怀疑这些值不必要地大,但这些是我在一切开始工作时所拥有的。

解决方案的第二部分是在我的Java代码中调整客户端超时,如下所示:

//configure socket options
SocketOptions options = new SocketOptions();
options.setConnectTimeoutMillis(30000);
options.setReadTimeoutMillis(300000);
options.setTcpNoDelay(true);

//spin up a fresh connection (using the SocketOptions set up above)
cluster = Cluster.builder().addContactPoint(Configuration.getCassandraHost()).withPort(Configuration.getCassandraPort())
            .withCredentials(Configuration.getCassandraUser(), Configuration.getCassandraPass()).withSocketOptions(options).build();
Run Code Online (Sandbox Code Playgroud)

通过这两项更改,超时错误停止,数据迁移顺利完成。

正如 @MarcintheCloud 在上面的评论中正确指出的那样,增加超时值可能只会起到掩盖潜在问题的作用。但这对于我的情况来说已经足够好了,因为 1) 底层问题仅在非常高的负载下才会出现,2) 我只需要运行一次迁移过程,3) 一旦数据被迁移,实际的负载水平是几个数量级低于迁移期间经历的情况。

然而,了解根本原因似乎仍然值得。那是什么?好吧,我有两个理论:

  1. 正如@MarcintheCloud 所言,也许 1024 个令牌太多,无法合理地与 Cassandra 一起使用。也许因此,部署在重负载下会变得有点不稳定。

  2. 我的替代理论与两个节点之间的网络聊天有关。在我的部署中,第一个节点运行应用程序服务器实例、第一个 Cassandra 实例和主 SQL 数据库。第二个节点运行第二个 Cassandra 实例以及一个与主数据库近实时保持同步的 SQL 数据库副本。

    现在,迁移过程本质上同时执行两件事:它将数据写入 Cassandra,并从 SQL 数据库中删除数据。这两个操作都会生成需要通过网络传播到第二个节点的变更集。

    所以我的理论是,如果第一个节点上发生的更改足够快(因为 SSD确实允许非常高的 IO 吞吐量),则 SQL 和 Cassandra 更改日志(和/或第二个节点上的后续 IO 操作)的网络传输可能会发生变化。有时会相互竞争,从而在复制过程中引入额外的延迟并可能导致超时。如果存在足够的争用,一个进程或另一个进程可能会一次被阻塞几秒钟,这似乎是合理的,这足以在 Cassandra 的默认设置下触发超时错误。

这些是我能想到的合理的理论。尽管没有真正的测试方法来确认哪个(如果有)是正确的。