使用python脚本批量更新或删除数据
日常有些需要全表更新某个条件的字段的需求, 直接使用update会产生大事务造成主从延迟等一些列影响稳定性的事件。
因此一般是基于主键去滚动更新,下面是一个例子。
代码语言:python代码运行次数:0运行复制# -*- coding: utf-8 -*-
import time
import mysql.connector
import logging
logging.basicConfig(filename='batch_sql.log', level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
# MySQL 连接信息
my_user = 'dts'
my_pass = '123456'
my_host = '192.168.31.181'
my_port = 3306
my_db = 'sbtest'
table_name = 'sbtest113'
# 获取id范围
get_id_range_sql = f"SELECT MIN(id), MAX(id) FROM {table_name} "
# 实际用的是类似下面2种
batch_sql_base = f"UPDATE {table_name} SET k='111111' WHERE pad like '%0%' "
#batch_sql_base = f"DELETE FROM {table_name} WHERE pad like '%0%' "
# 确定更新的步长
step = 2000
# 初始化总的影响行数
total_affected_rows = 0
# 倒计时时长(秒)
countdown = 5
# 记录开始时间
logging.info(f"------------------------------------------------------------")
logging.info(f"脚本开始执行,执行参数:用户={my_user},主机={my_host},端口={my_port},数据库={my_db},表名={table_name}")
# 倒计时
print(f"【即将执行的SQL样例】: {batch_sql_base} AND id between 1 and 1000 ;\n")
print(f"【准备开始刷数据(你有 {countdown} 秒钟时间可以按ctrl+c取消)】")
while countdown > 0:
print(f"倒计时: {countdown} 秒", end='\r')
time.sleep(1)
countdown -= 1
start_time = time.time()
try:
# 建立数据库连接
connection = mysql.connector.connect(
user=my_user,
password=my_pass,
host=my_host,
port=my_port,
database=my_db,
charset='utf8mb4',
ssl_disabled=True,
connect_timeout=10,
autocommit=True,
)
cursor = connection.cursor()
# 探活
cursor.execute("SELECT 1;")
result = cursor.fetchone()
if result is None:
print("连接失败,请检查")
exit(10)
# 查询 id 的最小值和最大值
cursor.execute(get_id_range_sql)
min_id, max_id = cursor.fetchone()
# 按步长循环更新数据
for start_id in range(min_id, max_id + 10000, step):
end_id = start_id + step - 1
# 构建 SQL 语句
batch_sql = batch_sql_base + f" AND id BETWEEN {start_id} AND {end_id} ;"
print(batch_sql)
#time.sleep(0.1)
# 执行 SQL 语句
cursor.execute(batch_sql)
# 累加每次更新操作影响的行数
total_affected_rows += cursor.rowcount
logging.info(f"执行 SQL: {batch_sql},影响行数: {cursor.rowcount}")
print("数据跑批完成")
print(f"数据跑批完成,总共影响了 {total_affected_rows} 行。")
logging.info(f"数据更新完成,总共影响了 {total_affected_rows} 行。")
except mysql.connector.Error as err:
print(f"数据库操作出错: {err}")
logging.error(f"数据库操作出错: {err}")
finally:
if 'cursor' in locals():
cursor.close()
if 'connection' in locals():
connection.close()
# 记录结束时间
end_time = time.time()
execution_time = end_time - start_time
logging.info(f"脚本执行结束,总执行时间: {execution_time:.2f} 秒")
效果如下图所示:
正常脚本跑完后,batch_sql.log 日志文件如下: