HikariCP 的 Postgresql 性能问题

bkr*_*ish 2 postgresql jdbc hikaricp

我正在尝试将大数据小批量加载到 PostgreSQL 服务器中的一个表中(总共 4000 万行)(每个 csv 中有 6000 行)。我认为 HikariCP 非常适合此目的。

这是我使用 Java 8 (1.8.0_65)、Postgres JDBC 驱动程序 9.4.1211 和 HikariCP 2.4.3 从数据插入中获得的吞吐量。

4 分 42 秒内 6000 行。

我做错了什么以及如何提高插入速度?

关于我的设置再多说几句:

  • 程序在我的笔记本电脑上运行在公司网络后面。
  • Postgres 服务器 9.4 是具有 db.m4.large 和 50 GB SSD 的 Amazon RDS。
  • 尚未在表上创建定义的显式索引或主键。
  • 程序使用大线程池异步插入每一行来容纳请求,如下所示:

    private static ExecutorService executorService = new ThreadPoolExecutor(5, 1000, 30L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100000));
    
    Run Code Online (Sandbox Code Playgroud)

数据源配置为:

        private DataSource getDataSource() {
                if (datasource == null) {
                    LOG.info("Establishing dataSource");
                    HikariConfig config = new HikariConfig();
                    config.setJdbcUrl(url);
                    config.setUsername(userName);
                    config.setPassword(password);
                    config.setMaximumPoolSize(600);// M4.large 648 connections tops
                    config.setAutoCommit(true); //I tried autoCommit=false and manually committed every 1000 rows but it only increased 2 minute and half for 6000 rows
                    config.addDataSourceProperty("dataSourceClassName","org.postgresql.ds.PGSimpleDataSource");
                    config.addDataSourceProperty("dataSource.logWriter", new PrintWriter(System.out));
                    config.addDataSourceProperty("cachePrepStmts", "true");
                    config.addDataSourceProperty("prepStmtCacheSize", "1000");
                    config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
                    config.setConnectionTimeout(1000);

                    datasource = new HikariDataSource(config);
                }
                return datasource;
            }
Run Code Online (Sandbox Code Playgroud)

这是我读取源数据的地方:

    private void readMetadata(String inputMetadata, String source) {
            BufferedReader br = null;
            FileReader fr = null;
            try {
                br = new BufferedReader(new FileReader(inputMetadata));
                String sCurrentLine = br.readLine();// skip header;
                if (!sCurrentLine.startsWith("xxx") && !sCurrentLine.startsWith("yyy")) {
                    callAsyncInsert(sCurrentLine, source);
                }
                while ((sCurrentLine = br.readLine()) != null) {
                    callAsyncInsert(sCurrentLine, source);
                }
            } catch (IOException e) {
                LOG.error(ExceptionUtils.getStackTrace(e));
            } finally {
                try {
                    if (br != null)
                        br.close();

                    if (fr != null)
                        fr.close();

                } catch (IOException ex) {
                    LOG.error(ExceptionUtils.getStackTrace(ex));
                }
            }
    }
Run Code Online (Sandbox Code Playgroud)

我正在异步插入数据(或尝试使用 jdbc!):

            private void callAsyncInsert(final String line, String source) {
                    Future<?> future = executorService.submit(new Runnable() {
                        public void run() {
                            try {
                                dataLoader.insertRow(line, source);
                            } catch (SQLException e) {
                                LOG.error(ExceptionUtils.getStackTrace(e));
                                try {
                                    errorBufferedWriter.write(line);
                                    errorBufferedWriter.newLine();
                                    errorBufferedWriter.flush();
                                } catch (IOException e1) {
                                    LOG.error(ExceptionUtils.getStackTrace(e1));
                                }
                            }
                        }
                    });
                    try {
                        if (future.get() != null) {
                            LOG.info("$$$$$$$$" + future.get().getClass().getName());
                        }
                    } catch (InterruptedException e) {
                        LOG.error(ExceptionUtils.getStackTrace(e));
                    } catch (ExecutionException e) {
                        LOG.error(ExceptionUtils.getStackTrace(e));
                    }
                }
Run Code Online (Sandbox Code Playgroud)

我的 DataLoader.insertRow 如下:

            public void insertRow(String row, String source) throws SQLException {
                    String[] splits = getRowStrings(row);
                    Connection conn = null;
                    PreparedStatement preparedStatement = null;
                    try {
                        if (splits.length == 15) {
                            String ... = splits[0];
                            //blah blah blah

                            String insertTableSQL = "insert into xyz(...) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ";
                            conn = getConnection();
                            preparedStatement = conn.prepareStatement(insertTableSQL);
                            preparedStatement.setString(1, column1);
                            //blah blah blah
                            preparedStatement.executeUpdate();
                            counter.incrementAndGet();
                            //if (counter.get() % 1000 == 0) {
                                //conn.commit();
                            //}
                        } else {
                            LOG.error("Invalid row:" + row);
                        }
                    } finally {
                        /*if (conn != null) {
                            conn.close();   //Do preparedStatement.close(); rather connection.close
                        }*/
                        if (preparedStatement != null) {
                            preparedStatement.close();
                        }
                    }
                }
Run Code Online (Sandbox Code Playgroud)

在 pgAdmin4 中进行监控时,我注意到了一些事情:

  • 最高每秒交易数接近50笔。
  • 活动数据库会话只有 1 个,会话总数为 15 个。
  • 块 I/O 过多(达到约 500,不确定这是否应该成为问题)

pgAdmin 的屏幕截图

bre*_*ttw 5

您绝对希望使用批量插入,在循环外部准备语句,并关闭自动提交。在伪代码中:

PreparedStatement stmt = conn.prepareStatement("insert into xyz(...) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
while ( <data> ) {
   stmt.setString(1, column1);
   //blah blah blah
   stmt.addBatch();
}
stmt.executeBatch();
conn.commit();
Run Code Online (Sandbox Code Playgroud)

即使单个连接上的单个线程也应该能够插入 > 5000 行/秒。

更新:如果你想多线程,连接数应该是数据库CPU核心数x1.5或2。处理线程数应该匹配,并且每个处理线程应该使用以下模式处理一个CSV文件多于。但是,您可能会发现对同一个表的许多并发插入会在数据库中产生过多的锁争用,在这种情况下,您需要减少处理线程的数量,直到找到最佳并发性。

适当大小的池和并发性应该很容易达到 >20K 行/秒。

另外,请升级到 HikariCP v2.6.0。