目录
  • 前言
  • 脚本介绍
    • 表结构
    • MySQL 元数据
  • DEMO 演示
    • SQL 查询冗余索引
      • 后记

        前言

        最近在搞标准化巡检平台,通过 MySQL 的元数据分析一些潜在的问题。冗余索引也是一个非常重要的巡检目,表中索引过多,会导致表空间占用较大,索引的数量与表的写入速度与索引数成线性关系(微秒级),如果发现有冗余索引,建议立即审核删除。

        PS:之前见过一个客户的数据库上面竟然创建 300 多个索引!?当时的想法是 “他们在玩排列组合呢” 表写入非常慢,严重影响性能和表维护的复杂度。

        脚本介绍

        表结构

        下方是演示的表结构:

        CREATE TABLE `index_test03` (
          `id` bigint(20) NOT NULL AUTO_INCREMENT,
          `name` varchar(20) NOT NULL,
          `create_time` varchar(20) NOT NULL,
          PRIMARY KEY (`id`),
          UNIQUE KEY `uqi_name` (`name`),
          KEY `idx_name` (`name`),
          KEY `idx_name_createtime`(name, create_time)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
        

        MySQL 元数据

        MySQL 可以通过 information_schema.STATISTICS 表查询索引信息:

        SELECT * from information_schema.STATISTICS  where TABLE_SCHEMA = 'test02' and TABLE_NAME = 'index_test03';
        TABLE_CATALOG TABLE_SCHEMA TABLE_NAME NON_UNIQUE INDEX_SCHEMA INDEX_NAME SEQ_IN_INDEX COLUMN_NAME COLLATION CARDINALITY SUB_PART PACKED NULLABLE INDEX_TYPE COMMENT INDEX_COMMENT
        def test02 index_test03 0 test02 PRIMARY 1 id A 0 NULL NULL   BTREE    
        def test02 index_test03 0 test02 uqi_name 1 name A 0 NULL NULL   BTREE    
        def test02 index_test03 1 test02 idx_name 1 name A 0 NULL NULL   BTREE    
        def test02 index_test03 1 test02 idx_name_createtime 1 name A 0 NULL NULL   BTREE    
        def test02 index_test03 1 test02 idx_name_createtime 2 create_time A 0 NULL NULL   BTREE    

        脚本通过获得 STATISTICS 表中的索引信息来分析表中是否存在冗余索引,分析粒度为表级别。

        DEMO 演示

        需要使用 pandas 模块。

        import pandas as pd
        
        df_table_level = pd.read_excel('/Users/cooh/Desktop/STATISTICS.xlsx')
        
        table_indexes = df_table_level['INDEX_NAME'].drop_duplicates().tolist()
        
        _indexes = list()
        for index_name in table_indexes:
            index_info = {'index_cols': df_table_level[df_table_level['INDEX_NAME'] == index_name]['COLUMN_NAME'].tolist(),
                          'non_unique': df_table_level[df_table_level['INDEX_NAME'] == index_name]['NON_UNIQUE'].tolist()[0],
                          'index_name': index_name
                          }
            _indexes.append(index_info)
        
        content = ''
        election_dict = {i['index_name']: 0 for i in _indexes}
        
        while len(_indexes) > 0:
            choice_index_1 = _indexes.pop(0)
        
            for choice_index_2 in _indexes:
                # 对比两个索引字段的个数,使用字段小的进行迭代
                min_len = min([len(choice_index_1['index_cols']), len(choice_index_2['index_cols'])])
        
                # 获得相似字段的个数据
                similarity_col = 0
                for i in range(min_len):
                    # print(i)
                    if choice_index_1['index_cols'][i] == choice_index_2['index_cols'][i]:
                        similarity_col += 1
        
                # 然后进行逻辑判断
                if similarity_col == 0:
                    # print('毫无冗余')
                    pass
                else:
                    # 两个索引的字段包含内容都相同,说明两个索引完全相同,接下来就需要从中选择一个删除
                    if len(choice_index_1['index_cols']) == similarity_col and len(
                            choice_index_2['index_cols']) == similarity_col:
                        # 等于 0 表示有唯一约束
                        if choice_index_1['non_unique'] == 1:
                            content += '索引 {0} 与索引 {1} 重复, '.format(choice_index_2['index_name'], choice_index_1['index_name'])
                            election_dict[choice_index_1['index_name']] += 1
                        elif choice_index_2['non_unique'] == 1:
                            content += '索引 {0} 与索引 {1} 重复, '.format(choice_index_1['index_name'], choice_index_2['index_name'])
                            election_dict[choice_index_2['index_name']] += 1
                        else:
                            content += '索引 {0} 与索引 {1} 重复, '.format(choice_index_2['index_name'], choice_index_1['index_name'])
                            election_dict[choice_index_1['index_name']] += 1
        
                    elif len(choice_index_1['index_cols']) == similarity_col and choice_index_1['non_unique'] != 0:
                        content += '索引 {0} 与索引 {1} 重复, '.format(choice_index_2['index_name'], choice_index_1['index_name'])
                        election_dict[choice_index_1['index_name']] += 1
        
                    elif len(choice_index_2['index_cols']) == similarity_col and choice_index_1['non_unique'] != 0:
                        content += '索引 {0} 与索引 {1} 重复, '.format(choice_index_1['index_name'], choice_index_2['index_name'])
                        election_dict[choice_index_2['index_name']] += 1
        
        redundancy_indexes = list()
        for _k_name, _vote in election_dict.items():
            if _vote > 0:
                redundancy_indexes.append(_k_name)
        
        content += '建议删除索引:{0}'.format(', '.join(redundancy_indexes))
        
        print(content)
        

        输出结果:

        索引 uqi_name 与索引 idx_name 重复, 索引 idx_name_createtime 与索引 idx_name 重复, 建议删除索引:idx_name

        SQL 查询冗余索引

        MySQL 5.7 是可以直接通过 sys 元数据库中的视图来查冗余索引的,但是云上 RDS 用户看不到 sys 库。所以才被迫写这个脚本,因为实例太多了,一个一个看不现实。如果你是自建的 MySQL,就不用费那么大劲了,直接使用下面 SQL 来统计。

        select * from sys.schema_redundant_indexes;

        后记

        删除索引属于高危操作,删除前需要多次 check 后再删除。上面是一个 demo 可以包装成函数,使用 pandas 以表为粒度传入数据,就可以嵌入到程序中。有问题欢迎评论沟通。

        声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。