最强总结!数据库脏数据清理完整指南!!

最强总结!数据库脏数据清理完整指南!!

大家好,我是岳哥。

数据质量是数据库管理中的关键问题。本文将系统地介绍数据库中脏数据的识别、清理和预防方法,帮助您提升数据质量!

一、脏数据类型识别1. 常见脏数据类型代码语言:javascript复制-- 1. 重复数据

SELECT name, email, COUNT(*) as count

FROM users

GROUP BY name, email

HAVING COUNT(*) > 1;

-- 2. 空值异常

SELECT *

FROM customers

WHERE phone IS NULL

OR TRIM(phone) = ''

OR phone = 'null'

OR phone = 'undefined';

-- 3. 格式不统一

SELECT DISTINCT

email,

CASE

WHEN email NOT REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' THEN '格式无效'

ELSE '格式有效'

END as validation

FROM users;

-- 4. 超出范围的值

SELECT *

FROM products

WHERE price < 0

OR quantity < 0

OR price > 999999;

-- 5. 冗余数据

SELECT p.*, c.name as category_name

FROM products p

LEFT JOIN categories c ON p.category_id = c.id

WHERE c.id IS NULL;

-- 6. 不一致的数据

SELECT o.order_id, o.total_amount,

SUM(oi.quantity * oi.unit_price) as calculated_total

FROM orders o

JOIN order_items oi ON o.order_id = oi.order_id

GROUP BY o.order_id, o.total_amount

HAVING o.total_amount != SUM(oi.quantity * oi.unit_price);

2. 脏数据识别工具数据质量分析SQL代码语言:javascript复制-- 创建数据质量分析函数

DELIMITER $$

CREATE PROCEDURE analyze_table_quality(IN table_name VARCHAR(100))

BEGIN

SET @sql = CONCAT('

-- 1. 总记录数

SELECT COUNT(*) as total_records FROM ', table_name, ';

-- 2. 空值统计

SELECT

column_name,

COUNT(*) as null_count,

ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM ', table_name, '), 2) as null_percentage

FROM information_schema.columns

WHERE table_name = "', table_name, '"

GROUP BY column_name;

-- 3. 重复值检测

SELECT

COUNT(*) as total,

COUNT(DISTINCT column_name) as distinct_count,

ROUND((COUNT(*) - COUNT(DISTINCT column_name)) * 100.0 / COUNT(*), 2) as duplicate_percentage

FROM ', table_name, '

GROUP BY column_name

HAVING COUNT(*) > COUNT(DISTINCT column_name);

-- 4. 异常值检测 (数值字段)

SELECT column_name,

MIN(column_value) as min_value,

MAX(column_value) as max_value,

AVG(column_value) as avg_value,

STDDEV(column_value) as std_dev

FROM ', table_name, '

WHERE column_name IN (

SELECT column_name

FROM information_schema.columns

WHERE data_type IN ("int", "decimal", "float")

AND table_name = "', table_name, '"

)

GROUP BY column_name;

');

PREPARE stmt FROM @sql;

EXECUTE stmt;

DEALLOCATE PREPARE stmt;

END $$

DELIMITER ;

-- 使用示例

CALL analyze_table_quality('customers');

二、脏数据清理方法1. 基础清理操作代码语言:javascript复制-- 1. 删除重复数据(保留最新记录)

WITH DuplicateCTE AS (

SELECT *,

ROW_NUMBER() OVER (

PARTITION BY email

ORDER BY created_at DESC

) as row_num

FROM users

)

DELETE FROM DuplicateCTE WHERE row_num > 1;

-- 2. 标准化空值

UPDATE customers

SET

phone = NULL WHERE TRIM(phone) = '' OR phone = 'null' OR phone = 'undefined',

email = NULL WHERE TRIM(email) = '' OR email = 'null' OR email = 'undefined';

-- 3. 格式统一化

UPDATE users

SET

name = TRIM(LOWER(name)),

email = TRIM(LOWER(email));

-- 4. 修正超出范围的值

UPDATE products

SET

price = 0 WHERE price < 0,

quantity = 0 WHERE quantity < 0,

price = 999999 WHERE price > 999999;

-- 5. 清理无效外键

DELETE FROM order_items

WHERE product_id NOT IN (SELECT id FROM products);

-- 6. 修正不一致数据

UPDATE orders o

SET total_amount = (

SELECT SUM(quantity * unit_price)

FROM order_items oi

WHERE oi.order_id = o.order_id

);

2. 高级清理方案代码语言:javascript复制-- 1. 创建数据清理日志表

CREATE TABLE data_cleaning_log (

id BIGINT AUTO_INCREMENT PRIMARY KEY,

table_name VARCHAR(100),

column_name VARCHAR(100),

cleaning_type VARCHAR(50),

old_value TEXT,

new_value TEXT,

affected_rows INT,

cleaned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP

);

-- 2. 创建数据清理存储过程

DELIMITER $$

CREATE PROCEDURE clean_customer_data()

BEGIN

DECLARE affected_rows INT;

-- 开启事务

START TRANSACTION;

-- 2.1 清理电话号码格式

UPDATE customers

SET phone = REGEXP_REPLACE(phone, '[^0-9]', '')

WHERE phone IS NOT NULL AND phone REGEXP '[^0-9]';

SET affected_rows = ROW_COUNT();

IF affected_rows > 0 THEN

INSERT INTO data_cleaning_log

(table_name, column_name, cleaning_type, affected_rows)

VALUES

('customers', 'phone', 'format_standardization', affected_rows);

END IF;

-- 2.2 清理邮箱地址

UPDATE customers

SET email = LOWER(TRIM(email))

WHERE email IS NOT NULL AND (email != LOWER(TRIM(email)));

SET affected_rows = ROW_COUNT();

IF affected_rows > 0 THEN

INSERT INTO data_cleaning_log

(table_name, column_name, cleaning_type, affected_rows)

VALUES

('customers', 'email', 'format_standardization', affected_rows);

END IF;

-- 2.3 清理无效的年龄数据

UPDATE customers

SET age = NULL

WHERE age < 0 OR age > 120;

SET affected_rows = ROW_COUNT();

IF affected_rows > 0 THEN

INSERT INTO data_cleaning_log

(table_name, column_name, cleaning_type, affected_rows)

VALUES

('customers', 'age', 'invalid_value_removal', affected_rows);

END IF;

-- 提交事务

COMMIT;

-- 返回清理报告

SELECT

cleaning_type,

SUM(affected_rows) as total_cleaned,

MAX(cleaned_at) as cleaning_time

FROM data_cleaning_log

WHERE table_name = 'customers'

GROUP BY cleaning_type;

END $$

DELIMITER ;

-- 3. 创建数据质量检查触发器

DELIMITER $$

CREATE TRIGGER before_customer_insert

BEFORE INSERT ON customers

FOR EACH ROW

BEGIN

-- 3.1 格式化电话号码

IF NEW.phone IS NOT NULL THEN

SET NEW.phone = REGEXP_REPLACE(NEW.phone, '[^0-9]', '');

END IF;

-- 3.2 格式化邮箱

IF NEW.email IS NOT NULL THEN

SET NEW.email = LOWER(TRIM(NEW.email));

END IF;

-- 3.3 验证年龄

IF NEW.age < 0 OR NEW.age > 120 THEN

SET NEW.age = NULL;

END IF;

END $$

DELIMITER ;

-- 4. 创建定期数据清理作业

EVENT clean_data_daily

ON SCHEDULE EVERY 1 DAY

STARTS CURRENT_TIMESTAMP

DO

BEGIN

CALL clean_customer_data();

END;

三、数据质量管理1.预防措施代码语言:javascript复制-- 1. 创建强制约束

ALTER TABLE users

ADD CONSTRAINT check_email

CHECK (email REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$'),

ADD CONSTRAINT check_phone

CHECK (phone REGEXP '^[0-9]{11}$'),

ADD CONSTRAINT check_age

CHECK (age BETWEEN 0 AND 120);

-- 2. 创建唯一索引防止重复

CREATE UNIQUE INDEX idx_unique_email ON users(email);

-- 3. 设置默认值

ALTER TABLE products

ALTER COLUMN status SET DEFAULT 'active',

ALTER COLUMN created_at SET DEFAULT CURRENT_TIMESTAMP;

-- 4. 创建数据验证触发器

DELIMITER $$

CREATE TRIGGER before_user_insert

BEFORE INSERT ON users

FOR EACH ROW

BEGIN

-- 4.1 格式化数据

SET NEW.email = LOWER(TRIM(NEW.email));

SET NEW.name = TRIM(NEW.name);

-- 4.2 验证数据

IF LENGTH(NEW.password) < 8 THEN

SIGNAL SQLSTATE '45000'

SET MESSAGE_TEXT = 'Password must be at least 8 characters long';

END IF;

-- 4.3 设置默认值

IF NEW.status IS NULL THEN

SET NEW.status = 'active';

END IF;

END $$

DELIMITER ;

-- 5. 创建审计日志

CREATE TABLE audit_log (

id BIGINT AUTO_INCREMENT PRIMARY KEY,

table_name VARCHAR(100),

action_type ENUM('INSERT', 'UPDATE', 'DELETE'),

record_id BIGINT,

old_value JSON,

new_value JSON,

user_id BIGINT,

action_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP

);

-- 6. 创建审计触发器

DELIMITER $$

CREATE TRIGGER after_user_change

AFTER UPDATE ON users

FOR EACH ROW

BEGIN

INSERT INTO audit_log (

table_name,

action_type,

record_id,

old_value,

new_value,

user_id

)

VALUES (

'users',

'UPDATE',

NEW.id,

JSON_OBJECT(

'email', OLD.email,

'name', OLD.name,

'status', OLD.status

),

JSON_OBJECT(

'email', NEW.email,

'name', NEW.name,

'status', NEW.status

),

@current_user_id

);

END $$

DELIMITER ;

2. 监控方案代码语言:javascript复制-- 1. 创建数据质量监控表

CREATE TABLE data_quality_metrics (

id BIGINT AUTO_INCREMENT PRIMARY KEY,

table_name VARCHAR(100),

metric_name VARCHAR(100),

metric_value DECIMAL(10,2),

threshold DECIMAL(10,2),

check_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

status ENUM('OK', 'WARNING', 'ERROR') DEFAULT 'OK'

);

-- 2. 创建监控存储过程

DELIMITER $$

CREATE PROCEDURE monitor_data_quality()

BEGIN

DECLARE total_records INT;

DECLARE null_percentage DECIMAL(10,2);

DECLARE duplicate_percentage DECIMAL(10,2);

DECLARE invalid_percentage DECIMAL(10,2);

-- 2.1 检查空值比例

SELECT

COUNT(*) / (SELECT COUNT(*) FROM users) * 100 INTO null_percentage

FROM users

WHERE email IS NULL OR phone IS NULL;

INSERT INTO data_quality_metrics

(table_name, metric_name, metric_value, threshold, status)

VALUES

('users', 'null_percentage', null_percentage, 5.00,

CASE

WHEN null_percentage > 10 THEN 'ERROR'

WHEN null_percentage > 5 THEN 'WARNING'

ELSE 'OK'

END);

-- 2.2 检查重复值比例

WITH duplicates AS (

SELECT email, COUNT(*) as count

FROM users

GROUP BY email

HAVING COUNT(*) > 1

)

SELECT

COUNT(*) / (SELECT COUNT(*) FROM users) * 100 INTO duplicate_percentage

FROM duplicates;

INSERT INTO data_quality_metrics

(table_name, metric_name, metric_value, threshold, status)

VALUES

('users', 'duplicate_percentage', duplicate_percentage, 1.00,

CASE

WHEN duplicate_percentage > 2 THEN 'ERROR'

WHEN duplicate_percentage > 1 THEN 'WARNING'

ELSE 'OK'

END);

-- 2.3 检查无效数据比例

SELECT

COUNT(*) / (SELECT COUNT(*) FROM users) * 100 INTO invalid_percentage

FROM users

WHERE

email NOT REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' OR

phone NOT REGEXP '^[0-9]{11}$' OR

age NOT BETWEEN 0 AND 120;

INSERT INTO data_quality_metrics

(table_name, metric_name, metric_value, threshold, status)

VALUES

('users', 'invalid_percentage', invalid_percentage, 1.00,

CASE

WHEN invalid_percentage > 2 THEN 'ERROR'

WHEN invalid_percentage > 1 THEN 'WARNING'

ELSE 'OK'

END);

END $$

DELIMITER ;

-- 3. 创建报警触发器

DELIMITER $$

CREATE TRIGGER after_metric_insert

AFTER INSERT ON data_quality_metrics

FOR EACH ROW

BEGIN

-- 如果状态为ERROR,插入报警记录

IF NEW.status = 'ERROR' THEN

INSERT INTO alert_log (

metric_id,

alert_message,

alert_level

)

VALUES (

NEW.id,

CONCAT(

'Data quality alert for table ',

NEW.table_name,

': ',

NEW.metric_name,

' = ',

NEW.metric_value,

' (threshold: ',

NEW.threshold,

')'

),

'HIGH'

);

END IF;

END $$

DELIMITER ;

-- 4. 创建定时监控作业

CREATE EVENT monitor_data_quality_job

ON SCHEDULE EVERY 1 HOUR

DO CALL monitor_data_quality();

3. 报告和分析代码语言:javascript复制-- 1. 创建数据质量报告视图

CREATE VIEW data_quality_report AS

SELECT

table_name,

metric_name,

AVG(metric_value) as avg_value,

MAX(metric_value) as max_value,

MIN(metric_value) as min_value,

COUNT(CASE WHEN status = 'ERROR' THEN 1 END) as error_count,

COUNT(CASE WHEN status = 'WARNING' THEN 1 END) as warning_count,

COUNT(*) as total_checks,

DATE(check_time) as check_date

FROM data_quality_metrics

GROUP BY table_name, metric_name, DATE(check_time);

-- 2. 创建趋势分析视图

CREATE VIEW data_quality_trends AS

SELECT

table_name,

metric_name,

WEEK(check_time) as week_number,

YEAR(check_time) as year,

AVG(metric_value) as avg_value,

MAX(metric_value) as max_value,

MIN(metric_value) as min_value

FROM data_quality_metrics

GROUP BY table_name, metric_name, WEEK(check_time), YEAR(check_time);

-- 3. 创建问题汇总报告

CREATE VIEW data_quality_issues AS

SELECT

m.table_name,

m.metric_name,

m.metric_value,

m.threshold,

m.status,

m.check_time,

a.alert_message,

a.alert_level,

a.created_at as alert_time

FROM data_quality_metrics m

LEFT JOIN alert_log a ON m.id = a.metric_id

WHERE m.status IN ('WARNING', 'ERROR')

ORDER BY m.check_time DESC;

-- 4. 创建数据质量评分函数

DELIMITER $$

CREATE FUNCTION calculate_quality_score(

p_table_name VARCHAR(100),

p_date DATE

)

RETURNS DECIMAL(5,2)

DETERMINISTIC

BEGIN

DECLARE quality_score DECIMAL(5,2);

SELECT

100 - (

COUNT(CASE WHEN status = 'ERROR' THEN 1 END) * 10 +

COUNT(CASE WHEN status = 'WARNING' THEN 1 END) * 5

) / COUNT(*) * 100 INTO quality_score

FROM data_quality_metrics

WHERE table_name = p_table_name

AND DATE(check_time) = p_date;

RETURN COALESCE(quality_score, 100);

END $$

DELIMITER ;

-- 5. 创建月度报告存储过程

DELIMITER $$

CREATE PROCEDURE generate_monthly_report(IN p_year INT, IN p_month INT)

BEGIN

-- 5.1 总体质量评分

SELECT

table_name,

calculate_quality_score(table_name, LAST_DAY(CONCAT(p_year,'-',p_month,'-01'))) as quality_score,

COUNT(DISTINCT CASE WHEN status = 'ERROR' THEN date(check_time) END) as days_with_errors,

COUNT(DISTINCT CASE WHEN status = 'WARNING' THEN date(check_time) END) as days_with_warnings

FROM data_quality_metrics

WHERE YEAR(check_time) = p_year AND MONTH(check_time) = p_month

GROUP BY table_name;

-- 5.2 主要问题分析

SELECT

table_name,

metric_name,

COUNT(*) as issue_count,

AVG(metric_value) as avg_value,

MAX(metric_value) as max_value

FROM data_quality_metrics

WHERE YEAR(check_time) = p_year

AND MONTH(check_time) = p_month

AND status IN ('WARNING', 'ERROR')

GROUP BY table_name, metric_name

ORDER BY issue_count DESC;

-- 5.3 改进建议

WITH RankedIssues AS (

SELECT

table_name,

metric_name,

COUNT(*) as issue_count,

ROW_NUMBER() OVER (PARTITION BY table_name ORDER BY COUNT(*) DESC) as rn

FROM data_quality_metrics

WHERE YEAR(check_time) = p_year

AND MONTH(check_time) = p_month

AND status IN ('WARNING', 'ERROR')

GROUP BY table_name, metric_name

)

SELECT

table_name,

metric_name,

issue_count,

CASE

WHEN metric_name LIKE '%null%' THEN '建议添加非空约束或默认值'

WHEN metric_name LIKE '%duplicate%' THEN '建议添加唯一索引'

WHEN metric_name LIKE '%invalid%' THEN '建议加强数据验证'

ELSE '建议进行深入分析'

END as recommendation

FROM RankedIssues

WHERE rn <= 3;

END $$

DELIMITER ;

四、自动化清理方案1. 增量清理作业代码语言:javascript复制-- 1. 创建清理任务表

CREATE TABLE cleaning_tasks (

id BIGINT AUTO_INCREMENT PRIMARY KEY,

table_name VARCHAR(100),

last_processed_id BIGINT,

batch_size INT DEFAULT 1000,

status ENUM('PENDING', 'RUNNING', 'COMPLETED', 'FAILED'),

start_time TIMESTAMP,

end_time TIMESTAMP,

error_message TEXT,

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP

);

-- 2. 创建增量清理存储过程

DELIMITER $$

CREATE PROCEDURE clean_data_incrementally(

IN p_table_name VARCHAR(100),

IN p_batch_size INT

)

BEGIN

DECLARE v_task_id BIGINT;

DECLARE v_last_id BIGINT;

DECLARE v_max_id BIGINT;

DECLARE v_affected_rows INT;

DECLARE v_error_occurred BOOLEAN DEFAULT FALSE;

DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET v_error_occurred = TRUE;

-- 创建清理任务

INSERT INTO cleaning_tasks (table_name, batch_size, status, start_time)

VALUES (p_table_name, p_batch_size, 'RUNNING', NOW());

SET v_task_id = LAST_INSERT_ID();

-- 获取最大ID

SET @sql = CONCAT('SELECT MAX(id) INTO @max_id FROM ', p_table_name);

PREPARE stmt FROM @sql;

EXECUTE stmt;

DEALLOCATE PREPARE stmt;

SET v_max_id = @max_id;

-- 获取上次处理的ID

SELECT COALESCE(MAX(last_processed_id), 0)

INTO v_last_id

FROM cleaning_tasks

WHERE table_name = p_table_name

AND status = 'COMPLETED';

-- 开始批量处理

WHILE v_last_id < v_max_id AND NOT v_error_occurred DO

START TRANSACTION;

-- 清理重复数据

SET @sql = CONCAT('

WITH DuplicateRows AS (

SELECT id,

ROW_NUMBER() OVER (

PARTITION BY email

ORDER BY updated_at DESC

) as rn

FROM ', p_table_name, '

WHERE id > ', v_last_id, '

AND id <= ', v_last_id + p_batch_size, '

)

DELETE FROM ', p_table_name, '

WHERE id IN (

SELECT id FROM DuplicateRows WHERE rn > 1

)

');

PREPARE stmt FROM @sql;

EXECUTE stmt;

SET v_affected_rows = ROW_COUNT();

DEALLOCATE PREPARE stmt;

-- 清理无效数据

SET @sql = CONCAT('

UPDATE ', p_table_name, '

SET

email = CASE

WHEN email NOT REGEXP "^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$"

THEN NULL

ELSE LOWER(TRIM(email))

END,

phone = CASE

WHEN phone NOT REGEXP "^[0-9]{11}$"

THEN NULL

ELSE phone

END

WHERE id > ', v_last_id, '

AND id <= ', v_last_id + p_batch_size

);

PREPARE stmt FROM @sql;

EXECUTE stmt;

SET v_affected_rows = v_affected_rows + ROW_COUNT();

DEALLOCATE PREPARE stmt;

-- 更新任务状态

UPDATE cleaning_tasks

SET last_processed_id = v_last_id + p_batch_size,

affected_rows = v_affected_rows

WHERE id = v_task_id;

SET v_last_id = v_last_id + p_batch_size;

COMMIT;

-- 添加延迟避免过度占用系统资源

DO SLEEP(0.1);

END WHILE;

-- 更新任务完成状态

UPDATE cleaning_tasks

SET status = IF(v_error_occurred, 'FAILED', 'COMPLETED'),

end_time = NOW(),

error_message = IF(v_error_occurred, 'Error occurred during processing', NULL)

WHERE id = v_task_id;

END $$

DELIMITER ;

-- 3. 创建定时执行作业

CREATE EVENT incremental_cleaning_job

ON SCHEDULE EVERY 1 DAY

STARTS CURRENT_TIMESTAMP

DO

BEGIN

-- 为每个需要清理的表创建清理任务

CALL clean_data_incrementally('users', 1000);

CALL clean_data_incrementally('customers', 1000);

CALL clean_data_incrementally('orders', 1000);

END;

2. 并行清理方案代码语言:javascript复制-- 1. 创建分区清理任务表

CREATE TABLE partition_cleaning_tasks (

id BIGINT AUTO_INCREMENT PRIMARY KEY,

table_name VARCHAR(100),

partition_key VARCHAR(100),

partition_value VARCHAR(100),

status ENUM('PENDING', 'RUNNING', 'COMPLETED', 'FAILED'),

worker_id VARCHAR(36),

start_time TIMESTAMP,

end_time TIMESTAMP,

affected_rows INT,

error_message TEXT,

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP

);

-- 2. 创建工作进程注册表

CREATE TABLE cleaning_workers (

worker_id VARCHAR(36) PRIMARY KEY,

host_name VARCHAR(100),

last_heartbeat TIMESTAMP,

status ENUM('ACTIVE', 'INACTIVE'),

current_task_id BIGINT,

created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP

);

-- 3. 创建并行清理存储过程

DELIMITER $$

CREATE PROCEDURE create_parallel_cleaning_tasks(

IN p_table_name VARCHAR(100),

IN p_partition_key VARCHAR(100),

IN p_num_partitions INT

)

BEGIN

DECLARE v_min_value VARCHAR(100);

DECLARE v_max_value VARCHAR(100);

DECLARE v_partition_size INT;

DECLARE i INT DEFAULT 0;

-- 获取分区范围

SET @sql = CONCAT('

SELECT

MIN(', p_partition_key, '),

MAX(', p_partition_key, ')

INTO @min_val, @max_val

FROM ', p_table_name

);

PREPARE stmt FROM @sql;

EXECUTE stmt;

DEALLOCATE PREPARE stmt;

SET v_min_value = @min_val;

SET v_max_value = @max_val;

SET v_partition_size = CEIL((v_max_value - v_min_value) / p_num_partitions);

-- 创建分区任务

WHILE i < p_num_partitions DO

INSERT INTO partition_cleaning_tasks (

table_name,

partition_key,

partition_value,

status

)

VALUES (

p_table_name,

p_partition_key,

CONCAT(

v_min_value + (i * v_partition_size),

',',

LEAST(

v_min_value + ((i + 1) * v_partition_size),

v_max_value

)

),

'PENDING'

);

SET i = i + 1;

END WHILE;

END $$

-- 4. 创建工作进程心跳更新存储过程

CREATE PROCEDURE update_worker_heartbeat(

IN p_worker_id VARCHAR(36)

)

BEGIN

UPDATE cleaning_workers

SET last_heartbeat = NOW()

WHERE worker_id = p_worker_id;

-- 检查并重置超时的任务

UPDATE partition_cleaning_tasks

SET status = 'PENDING',

worker_id = NULL

WHERE worker_id IN (

SELECT worker_id

FROM cleaning_workers

WHERE TIME_TO_SEC(TIMEDIFF(NOW(), last_heartbeat)) > 300

)

AND status = 'RUNNING';

END $$

-- 5. 创建任务执行存储过程

CREATE PROCEDURE execute_cleaning_task(

IN p_worker_id VARCHAR(36),

IN p_task_id BIGINT

)

BEGIN

DECLARE v_table_name VARCHAR(100);

DECLARE v_partition_key VARCHAR(100);

DECLARE v_partition_value VARCHAR(100);

DECLARE v_error_occurred BOOLEAN DEFAULT FALSE;

DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET v_error_occurred = TRUE;

-- 获取任务信息

SELECT table_name, partition_key, partition_value

INTO v_table_name, v_partition_key, v_partition_value

FROM partition_cleaning_tasks

WHERE id = p_task_id;

-- 开始处理

UPDATE partition_cleaning_tasks

SET status = 'RUNNING',

worker_id = p_worker_id,

start_time = NOW()

WHERE id = p_task_id;

-- 解析分区范围

SET @partition_start = SUBSTRING_INDEX(v_partition_value, ',', 1);

SET @partition_end = SUBSTRING_INDEX(v_partition_value, ',', -1);

-- 执行清理操作

SET @sql = CONCAT('

UPDATE ', v_table_name, '

SET

email = CASE

WHEN email NOT REGEXP "^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$"

THEN NULL

ELSE LOWER(TRIM(email))

END,

phone = CASE

WHEN phone NOT REGEXP "^[0-9]{11}$"

THEN NULL

ELSE phone

END

WHERE ', v_partition_key, ' BETWEEN ', @partition_start, ' AND ', @partition_end

);

PREPARE stmt FROM @sql;

EXECUTE stmt;

SET @affected = ROW_COUNT();

DEALLOCATE PREPARE stmt;

-- 更新任务状态

UPDATE partition_cleaning_tasks

SET status = IF(v_error_occurred, 'FAILED', 'COMPLETED'),

end_time = NOW(),

affected_rows = IF(v_error_occurred, 0, @affected),

error_message = IF(v_error_occurred, 'Error during execution', NULL)

WHERE id = p_task_id;

END $$

DELIMITER ;

相关推荐

含有重的词语(带重的成语有哪些)(40个)
蓝钻8级成长值多少

蓝钻8级成长值多少

📅 07-17 👁️ 6933
dns缓存一般多久

dns缓存一般多久

📅 12-01 👁️ 1550