Java 批处理
在开发数据库应用程序时,我们常常需要执行多条SQL语句。如果这些语句是相似的,批处理(Batch Processing)技术可以显著提高执行效率,特别是在处理大量数据时。本文将详细介绍JDBC中的批处理机制,帮助初学者掌握这一重要技术。
什么是批处理?
批处理是指将多条SQL语句收集起来,一次性提交给数据库执行,而不是每条语句单独提交。这种方式可以:
- 减少网络通信的开销
- 降低数据库执行多条语句的时间
- 提高应用程序的整体性能
批处理特别适合需要重复执行类似SQL语句的场景,如批量插入、更新或删除数据。
JDBC批处理的基本操作
JDBC提供了以下接口方法来支持批处理操作:
addBatch()
- 向批处理中添加命令executeBatch()
- 执行批处理中的所有命令clearBatch()
- 清空批处理命令
批处理的实现方式
在JDBC中,有两种方式实现批处理:
- Statement批处理:用于执行不同的SQL语句
- PreparedStatement批处理:用于执行相同结构但参数不同的SQL语句
Statement批处理
当需要执行多条不同的SQL语句时,可以使用Statement对象的批处理功能。
import java.sql.*;
public class StatementBatchExample {
public static void main(String[] args) {
Connection conn = null;
Statement stmt = null;
try {
// 1. 注册JDBC驱动
Class.forName("com.mysql.jdbc.Driver");
// 2. 打开连接
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/testdb", "username", "password");
// 3. 关闭自动提交
conn.setAutoCommit(false);
// 4. 创建Statement对象
stmt = conn.createStatement();
// 5. 添加批处理命令
stmt.addBatch("INSERT INTO employees VALUES(101, 'John', 'Developer')");
stmt.addBatch("INSERT INTO employees VALUES(102, 'Mary', 'Designer')");
stmt.addBatch("UPDATE employees SET salary = 50000 WHERE id = 100");
stmt.addBatch("DELETE FROM employees WHERE id = 103");
// 6. 执行批处理
int[] updateCounts = stmt.executeBatch();
// 7. 提交事务
conn.commit();
// 8. 处理结果
System.out.println("批处理执行结果:");
for (int i = 0; i < updateCounts.length; i++) {
System.out.println("命令 " + (i + 1) + ": " + updateCounts[i] + " 行受影响");
}
} catch (BatchUpdateException e) {
// 处理批处理异常
try {
if (conn != null) conn.rollback();
} catch (SQLException se) {
se.printStackTrace();
}
System.out.println("批处理错误代码: " + e.getErrorCode());
int[] updateCounts = e.getUpdateCounts();
for (int i = 0; i < updateCounts.length; i++) {
System.out.println("命令 " + (i + 1) + ": " + updateCounts[i]);
}
e.printStackTrace();
} catch (Exception e) {
// 处理其他异常
try {
if (conn != null) conn.rollback();
} catch (SQLException se) {
se.printStackTrace();
}
e.printStackTrace();
} finally {
// 关闭资源
try {
if (stmt != null) stmt.close();
if (conn != null) conn.close();
} catch (SQLException se) {
se.printStackTrace();
}
}
}
}
输出:
批处理执行结果:
命令 1: 1 行受影响
命令 2: 1 行受影响
命令 3: 1 行受影响
命令 4: 0 行受影响
PreparedStatement批处理
当需要重复执行相同结构的SQL语句(仅参数不同)时,PreparedStatement的批处理更加高效。
import java.sql.*;
public class PreparedStatementBatchExample {
public static void main(String[] args) {
Connection conn = null;
PreparedStatement pstmt = null;
try {
// 1. 注册JDBC驱动
Class.forName("com.mysql.jdbc.Driver");
// 2. 打开连接
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/testdb", "username", "password");
// 3. 关闭自动提交
conn.setAutoCommit(false);
// 4. 准备数据
int[] ids = {201, 202, 203, 204, 205};
String[] names = {"Alice", "Bob", "Charlie", "David", "Eve"};
String[] roles = {"Analyst", "Manager", "Developer", "Tester", "Designer"};
// 5. 创建PreparedStatement对象
pstmt = conn.prepareStatement("INSERT INTO employees VALUES(?, ?, ?)");
// 6. 添加批处理命令
for (int i = 0; i < ids.length; i++) {
pstmt.setInt(1, ids[i]);
pstmt.setString(2, names[i]);
pstmt.setString(3, roles[i]);
pstmt.addBatch();
}
// 7. 执行批处理
int[] updateCounts = pstmt.executeBatch();
// 8. 提交事务
conn.commit();
// 9. 处理结果
System.out.println("批处理执行结果:");
int totalInserted = 0;
for (int count : updateCounts) {
if (count > 0) {
totalInserted += count;
}
}
System.out.println("成功插入 " + totalInserted + " 条记录");
} catch (BatchUpdateException e) {
// 处理批处理异常
try {
if (conn != null) conn.rollback();
} catch (SQLException se) {
se.printStackTrace();
}
System.out.println("批处理错误代码: " + e.getErrorCode());
int[] updateCounts = e.getUpdateCounts();
for (int i = 0; i < updateCounts.length; i++) {
if (updateCounts[i] == Statement.EXECUTE_FAILED) {
System.out.println("命令 " + (i + 1) + " 执行失败");
}
}
e.printStackTrace();
} catch (Exception e) {
// 处理其他异常
try {
if (conn != null) conn.rollback();
} catch (SQLException se) {
se.printStackTrace();
}
e.printStackTrace();
} finally {
// 关闭资源
try {
if (pstmt != null) pstmt.close();
if (conn != null) conn.close();
} catch (SQLException se) {
se.printStackTrace();
}
}
}
}
输出:
批处理执行结果:
成功插入 5 条记录
批处理性能优化
为了进一步提升批处理性能,可以考虑以下几点:
1. 合理的批次大小
批处理虽然可以提高性能,但批次太大可能会导致内存问题或数据库负担过重。一般建议每批次处理100-1000条记录,具体数量需要根据实际情况测试决定。
// 示例:每500条记录执行一次批处理
int batchSize = 500;
int count = 0;
for (int i = 0; i < 10000; i++) {
pstmt.setInt(1, i);
pstmt.setString(2, "name" + i);
pstmt.addBatch();
count++;
if (count % batchSize == 0) {
pstmt.executeBatch();
conn.commit();
System.out.println("已提交 " + count + " 条记录");
}
}
// 处理剩余的批次
if (count % batchSize != 0) {
pstmt.executeBatch();
conn.commit();
}
2. 设置rewriteBatchedStatements参数(MySQL)
对于MySQL数据库,可以通过设置连接URL参数rewriteBatchedStatements=true
来优化批处理:
conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/testdb?rewriteBatchedStatements=true",
"username",
"password");
这个参数使JDBC驱动将多条INSERT语句合并为一个多值插入语句,显著提高性能。
3. 预设 Statement 的 Fetch Size
当查询大量数据时,可以通过设置 Fetch Size 来优化内存使用:
Statement stmt = conn.createStatement();
stmt.setFetchSize(100); // 每次从数据库获取100条记录
实际应用场景
场景1:数据导入
假设我们需要从CSV文件中导入数百万条产品数据到数据库:
import java.io.*;
import java.sql.*;
public class ProductImporter {
public static void main(String[] args) {
String csvFile = "products.csv";
Connection conn = null;
PreparedStatement pstmt = null;
BufferedReader br = null;
String line = "";
String cvsSplitBy = ",";
int batchSize = 1000;
int count = 0;
try {
// 建立数据库连接
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/inventory?rewriteBatchedStatements=true",
"username", "password");
conn.setAutoCommit(false);
// 准备SQL语句
pstmt = conn.prepareStatement(
"INSERT INTO products (id, name, category, price, quantity) VALUES (?, ?, ?, ?, ?)");
// 读取CSV文件
br = new BufferedReader(new FileReader(csvFile));
// 跳过标题行
br.readLine();
// 处理每一行数据
while ((line = br.readLine()) != null) {
String[] data = line.split(cvsSplitBy);
pstmt.setInt(1, Integer.parseInt(data[0])); // id
pstmt.setString(2, data[1]); // name
pstmt.setString(3, data[2]); // category
pstmt.setDouble(4, Double.parseDouble(data[3])); // price
pstmt.setInt(5, Integer.parseInt(data[4])); // quantity
pstmt.addBatch();
count++;
// 每1000条执行一次批处理
if (count % batchSize == 0) {
pstmt.executeBatch();
conn.commit();
System.out.println("已导入 " + count + " 条产品数据");
}
}
// 处理剩余数据
if (count % batchSize != 0) {
pstmt.executeBatch();
conn.commit();
}
System.out.println("数据导入完成,总共导入 " + count + " 条产品数据");
} catch (Exception e) {
try {
if (conn != null) conn.rollback();
} catch (SQLException se) {
se.printStackTrace();
}
e.printStackTrace();
} finally {
try {
if (br != null) br.close();
if (pstmt != null) pstmt.close();
if (conn != null) conn.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
场景2:定期数据清理
假设我们需要定期清理过期的日志数据,按批次删除以避免锁定表太长时间:
import java.sql.*;
import java.time.LocalDate;
public class LogCleaner {
public static void main(String[] args) {
Connection conn = null;
PreparedStatement pstmt = null;
try {
// 建立数据库连接
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/logdb",
"username", "password");
conn.setAutoCommit(false);
// 计算30天前的日期
LocalDate thirtyDaysAgo = LocalDate.now().minusDays(30);
Date cutoffDate = Date.valueOf(thirtyDaysAgo);
// 分页参数
int batchSize = 5000;
int deletedTotal = 0;
int deletedInBatch;
// 准备SQL语句 - 使用LIMIT限制每批删除的数量
pstmt = conn.prepareStatement(
"DELETE FROM system_logs WHERE log_date < ? LIMIT ?");
pstmt.setDate(1, cutoffDate);
pstmt.setInt(2, batchSize);
// 循环删除,直到没有记录被删除
do {
deletedInBatch = pstmt.executeUpdate();
conn.commit();
deletedTotal += deletedInBatch;
System.out.println("已删除 " + deletedInBatch + " 条过期日志,总计: " + deletedTotal);
// 小暂停,避免数据库负载过高
Thread.sleep(100);
} while (deletedInBatch > 0);
System.out.println("清理完成,总共删除 " + deletedTotal + " 条过期日志");
} catch (Exception e) {
try {
if (conn != null) conn.rollback();
} catch (SQLException se) {
se.printStackTrace();
}
e.printStackTrace();
} finally {
try {
if (pstmt != null) pstmt.close();
if (conn != null) conn.close();
} catch (SQLException se) {
se.printStackTrace();
}
}
}
}
批处理的注意事项
-
异常处理:批处理操作可能会在中途失败。JDBC提供了
BatchUpdateException
类,通过getUpdateCounts()
方法可以获取每条语句的执行结果。 -
事务控制:批处理通常与事务一起使用。如果批处理中的某些操作失败,可以回滚整个事务。
-
内存消耗:批处理会在内存中缓存多条SQL语句,所以批次太大可能导致内存问题。
-
数据库限制:不同数据库对批处理的支持和限制不同,需要查阅相应的文档。
-
返回值:
executeBatch()
方法返回一个整型数组,表示每条语句影响的行数。对于不返回行数的语句,可能返回Statement.SUCCESS_NO_INFO
(-2)。
在某些数据库下,如果批处理中的一条语句失败,默认情况下会继续执行后续语句。若要改变这一行为,可以设置Connection
对象的属性。例如,在某些驱动程序中:
conn.setProperty("continueBatchOnError", "false");
总结
批处理是JDBC提供的一种优化技术,通过将多条SQL语句打包一次性提交给数据库,可以大幅提高数据操作的效率。在处理大批量数据时,批处理几乎是必不可少的性能优化手段。
主要记住以下几点:
- 使用
addBatch()
添加SQL语句或参数到批处理中 - 使用
executeBatch()
执行批处理 - 选择合适的批处理大小
- 将批处理与事务结合使用
- 正确处理批处理可能出现的异常
掌握批处理技术,将帮助你开发出性能更高的数据库应用程序。
练习
-
编写一个程序,使用批处理向数据库中插入1000条学生记录,每100条提交一次。
-
修改上面的程序,故意在第500条记录中引入一个错误,然后处理可能出现的
BatchUpdateException
,查看哪些记录已成功插入,哪些记录失败。 -
编写一个程序,对"orders"表中状态为"已完成"的订单进行批量归档(从orders表移动到order_history表)。
扩展资源
通过学习批处理技术,你已经迈出了JDBC性能优化的第一步!在实际项目中,合理应用批处理可以显著提高数据处理效率。