有些场景(比如drop/truncate table)可能需要扫描磁盘才能恢复数据, undrop-for-innodb就很好用, 但我的ibd2sql还不支持啊, 于是就准备给它加这么个功能. 当然得先验证下是否可行以及速度怎么样, 速度不行的话.
表的数据是一页页的放在磁盘(文件系统)上的. 只要磁盘上的数据没有删除,即使逻辑上删除了文件也是能恢复的, 如果时间短的话, 可以从文件系统级别根据inode恢复; 时间长了, 文件就不再完整了, 只能全盘扫的方式恢复了.
☞☞☞AI 智能聊天, 问答助手, AI 智能搜索, 免费无限量使用 DeepSeek R1 模型☜☜☜
![[MYSQL] python扫描磁盘恢复数据的可行性验证与速度测试](https://img.php.cn/upload/article/001/503/042/176460013296958.jpg)
那么扫描磁盘的时候我们怎么知道哪部分数据是我们要的数据呢? 这就得先看看数据文件的结构了.
innodb的数据是放在索引上的, 即INDEX_PAGE, 我们只需要扫描到我们需要的INDEX_PAGE即可. 怎么判断是否是我们需要的PAGE呢? 请看:
![[MYSQL] python扫描磁盘恢复数据的可行性验证与速度测试](https://img.php.cn/upload/article/001/503/042/176460013292605.jpg)
有个2字节的PAGE_LEVEL表示这是叶子节点,即方数据的; 还有个8字节的INDEX_ID表示这个页是对应的某个索引的. 而我们根据表可以找到其对应的索引, 并获取到对应的INDEXID; 既然要恢复, 那么我们肯定就知道要恢复的表了哦. 不知道也没关系, ibdata1/mysql.ibd里面是有记录哪些表是被删除的, 并且有相关的indexid.
立即学习“Python免费学习笔记(深入)”;
那么我们的恢复思路就是: 扫描ibdata1/mysql.ibd获取要恢复表的index; 扫描磁盘寻找对应的PAGE; 然后使用ibd2sql等工具将PAGE中的数据提取出来.
由于linux上一切皆文件, 磁盘也是文件, 所以我们就把磁盘当作普通文件读取即可. 然后将读取的结果进行校验.这种工作一个进程肯定是不够的, 所以支持并发是必须的. 本来还应该校验page是否完整的, 但算了.
![[MYSQL] python扫描磁盘恢复数据的可行性验证与速度测试](https://img.php.cn/upload/article/001/503/042/176460013250599.jpg)
理论已经有了,就可以试试效果了. 为了方便查看进度, 我做了个动态的进度条(每个进程指定自己在屏幕上的位置,并输出进度).
<code class="sql">-- 准备测试表和数据create table db1.t20251128_for_drop(id int primary key auto_increment, name varchar(200));insert into db1.t20251128_for_drop(name) values('ddcw');insert into db1.t20251128_for_drop(name) select name from db1.t20251128_for_drop;insert into db1.t20251128_for_drop(name) select name from db1.t20251128_for_drop;-- ....-- 然后干掉它(你就可以跑路了)drop table db1.t20251128_for_drop;</code>![[MYSQL] python扫描磁盘恢复数据的可行性验证与速度测试](https://img.php.cn/upload/article/001/503/042/176460013262430.jpg)
<code class="shell">python3 main.py /data/mysql_3306/mysqldata/mysql.ibd --delete --set table=tables | grep t20251128_for_droppython3 main.py /data/mysql_3306/mysqldata/mysql.ibd --delete --set table=indexes | grep 463</code>
![[MYSQL] python扫描磁盘恢复数据的可行性验证与速度测试](https://img.php.cn/upload/article/001/503/042/176460013328003.jpg)
我们这里扫描出2条是因为第一次建测试表的时候忘记加主键了, 其实不影响的, 但我还是删除了重建. 经过上面的步骤我们得到indexid为254
然后我们就可以根据上面拿到的indexid去扫盘了.
<code class="shell">python3 scan_drop_table_demo.py --device /dev/vda1 --indexid 254 --parallel 8</code>
![[MYSQL] python扫描磁盘恢复数据的可行性验证与速度测试](https://img.php.cn/upload/article/001/503/042/176460013350392.jpg)
![[MYSQL] python扫描磁盘恢复数据的可行性验证与速度测试](https://img.php.cn/upload/article/001/503/042/176460013340665.jpg)
![[MYSQL] python扫描磁盘恢复数据的可行性验证与速度测试](https://img.php.cn/upload/article/001/503/042/176460013358468.jpg)
看起来还是比较绚的(艹,忘记加点色了).
第一列是 进程逻辑ID,
第二列是 进度条
第三列是 进度百分比
第四列是 速度
第五列是 这个进程扫描磁盘的起止位置
第六列是 这个进程扫描到多少个匹配的page了.
![[MYSQL] python扫描磁盘恢复数据的可行性验证与速度测试](https://img.php.cn/upload/article/001/503/042/176460013394015.jpg)
花了151秒扫描了40GB的磁盘, 速度大概是271MB/s, 还行, 反正支持并发,上限还是很高的.
最后我们就可以解析扫描出来的结果了, 我这里忘记显示输出文件了. 没事, 反正是个demo
<code class="shell">python3 main.py 0000000000000254.page.ibd --sdi /data/mysql_3306/mysqldata/db1/t20251128_for_drop_new.ibd --sql --limit 10 --set leafno=0 --set rootno=0</code>
![[MYSQL] python扫描磁盘恢复数据的可行性验证与速度测试](https://img.php.cn/upload/article/001/503/042/176460013341922.jpg)
看起来没得问题, 但数据应该不全, 毕竟我这个测试环境比较闲,那文件系统肯定老早就给我回收一部分了,可恶!
源码见文末
所以,python扫描磁盘效果还是不错的, 速度也不错.
在能扫描磁盘后, 我们能恢复mysql的范围就更广了, 基本上数据物理上存在我们就能恢复, 感觉自己棒棒哒!
这个脚本起始很早就写好了的, 但之前测试的时候始终未成功, 后来发现我测试的那个环境的innodb-page-size是4K, 而我这个demo的pagesize是写死了的16K..... 就TM离谱!
由于只是测试demo脚本, 不建议用于生产, 可等我后续给它丫集成到ibd2sql后再考虑生产
附源码:
<code class="python">#!/usr/bin/env python3# write by ddcw @https://github.com/ddcw# 测试扫描磁获取相关Indexid的page的测试例子,验证可行性和效率import osimport sysimport statimport timeimport structimport shutilimport argparsefrom multiprocessing import ProcessPAGE_SIZE = 16384def print_error_and_exit(msg,exit_code=1):msg += ""sys.stdout.write(msg)sys.exit(exit_code)def print_info(msg):msg += ""sys.stdout.write(msg)def format_size(n):if n < 1024:return f'{n} B'elif n < 1024*1024:return f'{round(n/1024,2)} KB'elif n < 1024*1024*1024:return f'{round(n/1024/1024,2)} MB'elif n < 1024*1024*1024*1024:return f'{round(n/1024/1024/1024,2)} GB'elif n < 1024*1024*1024*1024*1024:return f'{round(n/1024/1024/1024/1024,2)} TB'else:return f'{round(n/1024/1024/1024/1024/1024,2)} PB'def _argparse():parser = argparse.ArgumentParser(add_help=True,description="测试扫描磁获取相关Indexid的page的测试例子,验证可行性和效率")parser.add_argument('--device',dest="DEVICE_NAME",required=True,help='磁盘设备/文件')parser.add_argument('--start',dest="OFFSET_START",type=int,default=0,help='要扫描的磁盘设备/文件的起始地址,默认0')parser.add_argument('--end',dest="OFFSET_END",type=int,default=-1,help='要磁盘设备/文件的结束地址,默认全部')parser.add_argument('--step',dest="OFFSET_STEP",type=int,default=512,help='扫描步长,默认512字节')parser.add_argument('--buffering',dest="BUFFERING",type=int,default=16*1024*1024,help='缓存大小(非open缓存),默认16MB')parser.add_argument('--indexid',dest="INDEXID",type=int,required=True,help='要扫描的表的Indexid')#parser.add_argument('--tablespaceid',dest="TABLESPACE_ID",help='要扫描的表的tablespace id')parser.add_argument('--parallel',dest="PARALLEL",type=int,default=1,help='并发度')parser.add_argument('--output',dest="OUTPUT_FILENAME",type=int,default=1,help='输出文件名,默认为indexid.page')parser = parser.parse_args()return parser# 初始化屏幕def init_screen():x = os.system('clear')columns,lines = shutil.get_terminal_size()return columns# 获取磁盘设备大小(可能是lv,可能是磁盘,也可能是文件)def get_size_from_dev(filename):f_stat = os.stat(filename)file_size = 0status = Trueif stat.S_ISREG(f_stat.st_mode): # filefile_size = f_stat.st_sizeelif stat.S_ISBLK(f_stat.st_mode):real_dev = ''try:real_dev = os.readlink(filename).split('/')[-1] # lvexcept:real_dev = os.path.basename(filename) # devwith open(f'/sys/class/block/{real_dev}/size') as f:sectors = int(f.read().strip())try:with open(f'/sys/class/block/{real_dev}/queue/hw_sector_size') as f:sector_size = int(f.read().strip())except:sector_size = 512file_size = sectors * sector_sizeelse:status = Falsereturn status,file_size# 监控进程,只展示效果,不干活的. (算逑, 不要它了,直接worker输出)def monitor(q):pass# worker进程,打工仔. (我将给你一个展示的机会!)def worker(p,filename,start,end,step,indexid,buffering,output_filename,screen_size=0):import timef = open(filename,'rb')fo = open(output_filename,'wb')f.seek(start,0)buff = b''readed_size = 0indexid = b'\x00\x00'+struct.pack('>Q',indexid)hc = 0while end > readed_size:start_time = time.time()readsize = buffering-len(buff)buff += f.read(readsize)if len(buff) < 16384:breakreaded_size += readsizeoffset = 0while True:data = buff[offset:offset+16384]if len(data) < 16384:breakif data[:4] == data[-8:-4] and data[24:26] == b'E\xbf' and data[64:74] == indexid:offset += 16384hc += 1fo.write(data)else:offset += stepbuff = buff[offset:]end_time = time.time()progress = min(100,round(readed_size/end*100,2))rate = format_size(readsize/(end_time-start_time)) + '/s'progress_bar = "#"*(int(progress)//2)progress_bar_wsp = " "*(50-len(progress_bar))content = f"[P{str(p+1).zfill(2)}] [{progress_bar}{progress_bar_wsp}] {progress}% {rate} {start}:{start+readed_size} {hc} {' '*5}"sys.stdout.write(f"\033[{p+2};0H{content}")sys.stdout.flush()fo.close()#import random#for i in range(20):#line_num = p+2#column = 1#content = ''.join([ '#' for _ in range(i) ])#sys.stdout.write(f"\033[{line_num};{column}HP[{p}]{content}")#sys.stdout.flush()#time.sleep(random.random())def main():starttime = time.time()parser = _argparse()filename = parser.DEVICE_NAMEparallel = parser.PARALLELif not os.path.exists(filename):print_error_and_exit(f'{filename} is not exists')screen_size = init_screen()status,file_size = get_size_from_dev(filename)if not status:print_error_and_exit(f'{filename} only support dev/file')msg = f"SCAN DEVICE {filename}({format_size(file_size)})"msg = " "*((screen_size-len(msg))//2) + msgpd = {}sys.stdout.write(f'{msg}')sys.stdout.flush()step = parser.OFFSET_STEPstart = parser.OFFSET_STARTend = parser.OFFSET_END if parser.OFFSET_END > start else file_sizeper_size = (end-start)//parallel//step*step+stepindexid = parser.INDEXIDoutput_filename_pre = '/tmp/' + str(indexid).zfill(16)+'.page'for x in range(parallel):output_filename = f"{output_filename_pre}{'.'+str(x) if parallel > 1 else ''}"pd[x] = Process(target=worker,args=(x,filename,start+x*per_size,per_size,step,indexid,parser.BUFFERING,output_filename,screen_size))for x in range(parallel):pd[x].start()for x in range(parallel):pd[x].join()stoptime = time.time()sys.stdout.write(f"\033[{parallel + 3};{0}HFinish! cost:{round(stoptime-starttime,2)} sec.")sys.stdout.flush()if __name__ == '__main__':main()</code>以上就是[MYSQL] python扫描磁盘恢复数据的可行性验证与速度测试的详细内容,更多请关注php中文网其它相关文章!
python怎么学习?python怎么入门?python在哪学?python怎么学才快?不用担心,这里为大家提供了python速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号