跳到主要内容

Java 批处理

在开发数据库应用程序时,我们常常需要执行多条SQL语句。如果这些语句是相似的,批处理(Batch Processing)技术可以显著提高执行效率,特别是在处理大量数据时。本文将详细介绍JDBC中的批处理机制,帮助初学者掌握这一重要技术。

什么是批处理?

批处理是指将多条SQL语句收集起来,一次性提交给数据库执行,而不是每条语句单独提交。这种方式可以:

  1. 减少网络通信的开销
  2. 降低数据库执行多条语句的时间
  3. 提高应用程序的整体性能
提示

批处理特别适合需要重复执行类似SQL语句的场景,如批量插入、更新或删除数据。

JDBC批处理的基本操作

JDBC提供了以下接口方法来支持批处理操作:

  • addBatch() - 向批处理中添加命令
  • executeBatch() - 执行批处理中的所有命令
  • clearBatch() - 清空批处理命令

批处理的实现方式

在JDBC中,有两种方式实现批处理:

  1. Statement批处理:用于执行不同的SQL语句
  2. PreparedStatement批处理:用于执行相同结构但参数不同的SQL语句

Statement批处理

当需要执行多条不同的SQL语句时,可以使用Statement对象的批处理功能。

java
import java.sql.*;

public class StatementBatchExample {
public static void main(String[] args) {
Connection conn = null;
Statement stmt = null;

try {
// 1. 注册JDBC驱动
Class.forName("com.mysql.jdbc.Driver");

// 2. 打开连接
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/testdb", "username", "password");

// 3. 关闭自动提交
conn.setAutoCommit(false);

// 4. 创建Statement对象
stmt = conn.createStatement();

// 5. 添加批处理命令
stmt.addBatch("INSERT INTO employees VALUES(101, 'John', 'Developer')");
stmt.addBatch("INSERT INTO employees VALUES(102, 'Mary', 'Designer')");
stmt.addBatch("UPDATE employees SET salary = 50000 WHERE id = 100");
stmt.addBatch("DELETE FROM employees WHERE id = 103");

// 6. 执行批处理
int[] updateCounts = stmt.executeBatch();

// 7. 提交事务
conn.commit();

// 8. 处理结果
System.out.println("批处理执行结果:");
for (int i = 0; i < updateCounts.length; i++) {
System.out.println("命令 " + (i + 1) + ": " + updateCounts[i] + " 行受影响");
}

} catch (BatchUpdateException e) {
// 处理批处理异常
try {
if (conn != null) conn.rollback();
} catch (SQLException se) {
se.printStackTrace();
}
System.out.println("批处理错误代码: " + e.getErrorCode());
int[] updateCounts = e.getUpdateCounts();
for (int i = 0; i < updateCounts.length; i++) {
System.out.println("命令 " + (i + 1) + ": " + updateCounts[i]);
}
e.printStackTrace();
} catch (Exception e) {
// 处理其他异常
try {
if (conn != null) conn.rollback();
} catch (SQLException se) {
se.printStackTrace();
}
e.printStackTrace();
} finally {
// 关闭资源
try {
if (stmt != null) stmt.close();
if (conn != null) conn.close();
} catch (SQLException se) {
se.printStackTrace();
}
}
}
}

输出:

批处理执行结果:
命令 1: 1 行受影响
命令 2: 1 行受影响
命令 3: 1 行受影响
命令 4: 0 行受影响

PreparedStatement批处理

当需要重复执行相同结构的SQL语句(仅参数不同)时,PreparedStatement的批处理更加高效。

java
import java.sql.*;

public class PreparedStatementBatchExample {
public static void main(String[] args) {
Connection conn = null;
PreparedStatement pstmt = null;

try {
// 1. 注册JDBC驱动
Class.forName("com.mysql.jdbc.Driver");

// 2. 打开连接
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/testdb", "username", "password");

// 3. 关闭自动提交
conn.setAutoCommit(false);

// 4. 准备数据
int[] ids = {201, 202, 203, 204, 205};
String[] names = {"Alice", "Bob", "Charlie", "David", "Eve"};
String[] roles = {"Analyst", "Manager", "Developer", "Tester", "Designer"};

// 5. 创建PreparedStatement对象
pstmt = conn.prepareStatement("INSERT INTO employees VALUES(?, ?, ?)");

// 6. 添加批处理命令
for (int i = 0; i < ids.length; i++) {
pstmt.setInt(1, ids[i]);
pstmt.setString(2, names[i]);
pstmt.setString(3, roles[i]);
pstmt.addBatch();
}

// 7. 执行批处理
int[] updateCounts = pstmt.executeBatch();

// 8. 提交事务
conn.commit();

// 9. 处理结果
System.out.println("批处理执行结果:");
int totalInserted = 0;
for (int count : updateCounts) {
if (count > 0) {
totalInserted += count;
}
}
System.out.println("成功插入 " + totalInserted + " 条记录");

} catch (BatchUpdateException e) {
// 处理批处理异常
try {
if (conn != null) conn.rollback();
} catch (SQLException se) {
se.printStackTrace();
}
System.out.println("批处理错误代码: " + e.getErrorCode());
int[] updateCounts = e.getUpdateCounts();
for (int i = 0; i < updateCounts.length; i++) {
if (updateCounts[i] == Statement.EXECUTE_FAILED) {
System.out.println("命令 " + (i + 1) + " 执行失败");
}
}
e.printStackTrace();
} catch (Exception e) {
// 处理其他异常
try {
if (conn != null) conn.rollback();
} catch (SQLException se) {
se.printStackTrace();
}
e.printStackTrace();
} finally {
// 关闭资源
try {
if (pstmt != null) pstmt.close();
if (conn != null) conn.close();
} catch (SQLException se) {
se.printStackTrace();
}
}
}
}

输出:

批处理执行结果:
成功插入 5 条记录

批处理性能优化

为了进一步提升批处理性能,可以考虑以下几点:

1. 合理的批次大小

批处理虽然可以提高性能,但批次太大可能会导致内存问题或数据库负担过重。一般建议每批次处理100-1000条记录,具体数量需要根据实际情况测试决定。

java
// 示例:每500条记录执行一次批处理
int batchSize = 500;
int count = 0;

for (int i = 0; i < 10000; i++) {
pstmt.setInt(1, i);
pstmt.setString(2, "name" + i);
pstmt.addBatch();
count++;

if (count % batchSize == 0) {
pstmt.executeBatch();
conn.commit();
System.out.println("已提交 " + count + " 条记录");
}
}

// 处理剩余的批次
if (count % batchSize != 0) {
pstmt.executeBatch();
conn.commit();
}

2. 设置rewriteBatchedStatements参数(MySQL)

对于MySQL数据库,可以通过设置连接URL参数rewriteBatchedStatements=true来优化批处理:

java
conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/testdb?rewriteBatchedStatements=true",
"username",
"password");

这个参数使JDBC驱动将多条INSERT语句合并为一个多值插入语句,显著提高性能。

3. 预设 Statement 的 Fetch Size

当查询大量数据时,可以通过设置 Fetch Size 来优化内存使用:

java
Statement stmt = conn.createStatement();
stmt.setFetchSize(100); // 每次从数据库获取100条记录

实际应用场景

场景1:数据导入

假设我们需要从CSV文件中导入数百万条产品数据到数据库:

java
import java.io.*;
import java.sql.*;

public class ProductImporter {
public static void main(String[] args) {
String csvFile = "products.csv";
Connection conn = null;
PreparedStatement pstmt = null;
BufferedReader br = null;
String line = "";
String cvsSplitBy = ",";
int batchSize = 1000;
int count = 0;

try {
// 建立数据库连接
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/inventory?rewriteBatchedStatements=true",
"username", "password");
conn.setAutoCommit(false);

// 准备SQL语句
pstmt = conn.prepareStatement(
"INSERT INTO products (id, name, category, price, quantity) VALUES (?, ?, ?, ?, ?)");

// 读取CSV文件
br = new BufferedReader(new FileReader(csvFile));

// 跳过标题行
br.readLine();

// 处理每一行数据
while ((line = br.readLine()) != null) {
String[] data = line.split(cvsSplitBy);

pstmt.setInt(1, Integer.parseInt(data[0])); // id
pstmt.setString(2, data[1]); // name
pstmt.setString(3, data[2]); // category
pstmt.setDouble(4, Double.parseDouble(data[3])); // price
pstmt.setInt(5, Integer.parseInt(data[4])); // quantity

pstmt.addBatch();
count++;

// 每1000条执行一次批处理
if (count % batchSize == 0) {
pstmt.executeBatch();
conn.commit();
System.out.println("已导入 " + count + " 条产品数据");
}
}

// 处理剩余数据
if (count % batchSize != 0) {
pstmt.executeBatch();
conn.commit();
}

System.out.println("数据导入完成,总共导入 " + count + " 条产品数据");

} catch (Exception e) {
try {
if (conn != null) conn.rollback();
} catch (SQLException se) {
se.printStackTrace();
}
e.printStackTrace();
} finally {
try {
if (br != null) br.close();
if (pstmt != null) pstmt.close();
if (conn != null) conn.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}

场景2:定期数据清理

假设我们需要定期清理过期的日志数据,按批次删除以避免锁定表太长时间:

java
import java.sql.*;
import java.time.LocalDate;

public class LogCleaner {
public static void main(String[] args) {
Connection conn = null;
PreparedStatement pstmt = null;

try {
// 建立数据库连接
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/logdb",
"username", "password");
conn.setAutoCommit(false);

// 计算30天前的日期
LocalDate thirtyDaysAgo = LocalDate.now().minusDays(30);
Date cutoffDate = Date.valueOf(thirtyDaysAgo);

// 分页参数
int batchSize = 5000;
int deletedTotal = 0;
int deletedInBatch;

// 准备SQL语句 - 使用LIMIT限制每批删除的数量
pstmt = conn.prepareStatement(
"DELETE FROM system_logs WHERE log_date < ? LIMIT ?");
pstmt.setDate(1, cutoffDate);
pstmt.setInt(2, batchSize);

// 循环删除,直到没有记录被删除
do {
deletedInBatch = pstmt.executeUpdate();
conn.commit();
deletedTotal += deletedInBatch;
System.out.println("已删除 " + deletedInBatch + " 条过期日志,总计: " + deletedTotal);

// 小暂停,避免数据库负载过高
Thread.sleep(100);
} while (deletedInBatch > 0);

System.out.println("清理完成,总共删除 " + deletedTotal + " 条过期日志");

} catch (Exception e) {
try {
if (conn != null) conn.rollback();
} catch (SQLException se) {
se.printStackTrace();
}
e.printStackTrace();
} finally {
try {
if (pstmt != null) pstmt.close();
if (conn != null) conn.close();
} catch (SQLException se) {
se.printStackTrace();
}
}
}
}

批处理的注意事项

  1. 异常处理:批处理操作可能会在中途失败。JDBC提供了BatchUpdateException类,通过getUpdateCounts()方法可以获取每条语句的执行结果。

  2. 事务控制:批处理通常与事务一起使用。如果批处理中的某些操作失败,可以回滚整个事务。

  3. 内存消耗:批处理会在内存中缓存多条SQL语句,所以批次太大可能导致内存问题。

  4. 数据库限制:不同数据库对批处理的支持和限制不同,需要查阅相应的文档。

  5. 返回值executeBatch()方法返回一个整型数组,表示每条语句影响的行数。对于不返回行数的语句,可能返回Statement.SUCCESS_NO_INFO(-2)。

警告

在某些数据库下,如果批处理中的一条语句失败,默认情况下会继续执行后续语句。若要改变这一行为,可以设置Connection对象的属性。例如,在某些驱动程序中:

java
conn.setProperty("continueBatchOnError", "false");

总结

批处理是JDBC提供的一种优化技术,通过将多条SQL语句打包一次性提交给数据库,可以大幅提高数据操作的效率。在处理大批量数据时,批处理几乎是必不可少的性能优化手段。

主要记住以下几点:

  1. 使用addBatch()添加SQL语句或参数到批处理中
  2. 使用executeBatch()执行批处理
  3. 选择合适的批处理大小
  4. 将批处理与事务结合使用
  5. 正确处理批处理可能出现的异常

掌握批处理技术,将帮助你开发出性能更高的数据库应用程序。

练习

  1. 编写一个程序,使用批处理向数据库中插入1000条学生记录,每100条提交一次。

  2. 修改上面的程序,故意在第500条记录中引入一个错误,然后处理可能出现的BatchUpdateException,查看哪些记录已成功插入,哪些记录失败。

  3. 编写一个程序,对"orders"表中状态为"已完成"的订单进行批量归档(从orders表移动到order_history表)。

扩展资源

通过学习批处理技术,你已经迈出了JDBC性能优化的第一步!在实际项目中,合理应用批处理可以显著提高数据处理效率。