目录
  • 查看两个数据库的同名表的字段名差异
    • 问题描述
    • 解决方案
    • mysql-utilities
  • Python数据库之间差异对比 

    查看两个数据库的同名表的字段名差异

    问题描述

    开发过程中有多个测试环境,测试环境 A 加了字段,测试环境 B 忘了加,字段名对不上,同一项目就报错了

    Python如何查看两个数据库的同名表的字段名差异

    CREATE DATABASE `a` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
    CREATE DATABASE `b` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
    
    USE `a`;
    CREATE TABLE `student` (
      `id` int(11) AUTO_INCREMENT,
      `class_id` int(11) DEFAULT NULL,
      `name` varchar(255),
      `birthday` date,
      `chinese` int(11),
      `math` int(11),
      `english` int(11),
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    USE `b`;
    CREATE TABLE `student` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `class_id` int(11) DEFAULT NULL,
      `name` varchar(255) DEFAULT NULL,
      `birthday` date DEFAULT NULL,
      `gender` tinyint(4) DEFAULT NULL,
      `chinese` int(11) DEFAULT NULL,
      `math` int(11) DEFAULT NULL,
      `english` int(11) DEFAULT NULL,
      `physics` int(11) DEFAULT NULL,
      `chemistry` int(11) DEFAULT NULL,
      `biology` int(11) DEFAULT NULL,
      PRIMARY KEY (`id`),
      KEY `idx_class_id` (`class_id`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    

    解决方案

    安装

    pip install SQLAlchemy
    pip install pymysql

    代码

    from collections import defaultdict
    
    import sqlalchemy
    from sqlalchemy.engine.reflection import Inspector
    
    
    def get_table_column_map(inspector: Inspector):
        """获取数据库中所有表对应的字段"""
        table_column_map = defaultdict(set)
        table_names = inspector.get_table_names()
        for table_name in table_names:
            columns = inspector.get_columns(table_name)  # 字段信息
            # indexes = inspect.get_indexes(table_name)  # 索引信息
            for column in columns:
                table_column_map[table_name].add(column['name'])
        return table_column_map
    
    
    def compare_table_column_difference(a, b):
        """对比同名表字段的差异"""
        table_names = set(a) & set(b)  # 同名表
        for table_name in table_names:
            columns_a = a[table_name]
            columns_b = b[table_name]
            difference = columns_a ^ columns_b
            if difference:
                print(table_name, difference)
    
    
    engine = sqlalchemy.create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/a')
    engine1 = sqlalchemy.create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/b')
    inspector = sqlalchemy.inspect(engine)
    inspector1 = sqlalchemy.inspect(engine1)
    table_column_map = get_table_column_map(inspector)
    table_column_map1 = get_table_column_map(inspector1)
    compare_table_column_difference(table_column_map, table_column_map1)
    # student {'gender', 'chemistry', 'physics', 'biology'}
    

    mysql-utilities

    也可以使用专门的工具——mysql-utilities 中的 mysqldiff

    文档

    mysqldiff --help

    命令

    mysqldiff --server1=user:pass@host:port --server2=user:pass@host:port db1.object1:db2.object1
    

    例子

    mysqldiff --server1=root:123456@127.0.0.1:3306 --server2=root:123456@127.0.0.1:3306 a.student:b.student
    

    效果

    Python如何查看两个数据库的同名表的字段名差异

    --- a.student
    +++ b.student
    @@ -3,8 +3,13 @@
       `class_id` int(11) DEFAULT NULL,
       `name` varchar(255) DEFAULT NULL,
       `birthday` date DEFAULT NULL,
    +  `gender` tinyint(4) DEFAULT NULL,
       `chinese` int(11) DEFAULT NULL,
       `math` int(11) DEFAULT NULL,
       `english` int(11) DEFAULT NULL,
    -  PRIMARY KEY (`id`)
    +  `physics` int(11) DEFAULT NULL,
    +  `chemistry` int(11) DEFAULT NULL,
    +  `biology` int(11) DEFAULT NULL,
    +  PRIMARY KEY (`id`),
    +  KEY `idx_class_id` (`class_id`) USING BTREE
     ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    

    优点:比较内容更详尽

    缺点:无法批量,要自己指定比较对象(稍微改写即可克服)

    Python数据库之间差异对比
     

    此脚本用于两个数据库之间的表、列、栏位、索引的差异对比。

    cat oracle_diff.py
    #!/home/dba/.pyenv/versions/3.5.2/bin/python
    #coding=utf-8
    import cx_Oracle
    import time
    import difflib
    import os
    v_host=os.popen('echo $HOSTNAME')
    class Oracle_Status_Output():
        def __init__(self,username,password,tns):
            try:
                self.db = cx_Oracle.connect(username,password,tns)
                self.cursor = self.db.cursor()
            except Exception as e:
                print('Wrong')
                print(e)
        def schemas_tables_count(self,sql,db):
            try:  
                self.cursor.execute(sql)
                v_result=self.cursor.fetchall()
                #print(v_result)
                count = 0
                for i in range(len(v_result)):
                    #print(v_result[i][1],'--',v_result[i][0])
                    count = int(v_result[i][0]) + count
                print(db,'Count Tables','--',count)
            except Exception as e:
                print('Wrong--schemas_tables_count()')
                print(e)
        def schemas_tables_list(self,sql):
            try:
                self.cursor.execute(sql)
                v_result=self.cursor.fetchall()
                #print(v_result)
                return v_result
            except Exception as e:
                print('Wrong--schemas_tables_list()')
                print(e)
        def schemas_tables_columns_list(self,sql,data):
            try:
                self.cursor.execute(sql,A=data)
                v_result=self.cursor.fetchall()
                return v_result
            except Exception as e:
                print('schemas_tables_columns_list')
                print(e)
        def schemas_tables_indexes_list(self,sql,data):
            try:
                self.cursor.execute(sql,A=data)
                v_result=self.cursor.fetchall()
                return v_result
            except Exception as e:
                print('schemas_tables_indexes_list')
                print(e)
        def close(self):
            self.db.close()
    schemas_tables_count_sql = "SELECT COUNT(1),S.OWNER FROM DBA_TABLES S WHERE S.OWNER in ('PAY','BOSS','SETTLE','ISMP','TEMP_DSF','ACCOUNT') GROUP BY S.OWNER"
    schemas_tables_list_sql  = "SELECT S.OWNER||'.'||S.TABLE_NAME FROM DBA_TABLES S WHERE S.OWNER in ('PAY','BOSS','SETTLE','ISMP','TEMP_DSF','ACCOUNT')"
    schemas_tables_columns_sql = "select a.OWNER||'.'||a.TABLE_NAME||'.'||a.COLUMN_NAME||'.'||a.DATA_TYPE||'.'||a.DATA_LENGTH from dba_tab_columns a where a.OWNER||'.'||a.TABLE_NAME = :A"
    schemas_tables_indexes_sql = "SELECT T.table_owner||'.'||T.table_name||'.'||T.index_name FROM DBA_INDEXES T WHERE T.table_owner||'.'||T.table_name = :A"
    jx_db  = Oracle_Status_Output('dbadmin','QazWsx12','106.15.109.134:1522/paydb')
    pro_db = Oracle_Status_Output('dbadmin','QazWsx12','localhost:1521/paydb')
    jx_db.schemas_tables_count(schemas_tables_count_sql,'JX ')
    pro_db.schemas_tables_count(schemas_tables_count_sql,'PRO')
    jx_schemas_tables  = jx_db.schemas_tables_list(schemas_tables_list_sql)
    pro_schemas_tables = pro_db.schemas_tables_list(schemas_tables_list_sql)
    #print(jx_schemas_tables)
    #print(pro_schemas_tables)
    def diff_jx_pro(listA,listB,listClass):
        if listA !=[] and listB !=[]:
            #listD  = list(set(listA).union(set(listB)))
            listC  = sorted(list(set(listA).intersection(set(listB))))
            listAC = sorted(list(set(listA).difference(set(listC))))
            listBC = sorted(list(set(listB).difference(set(listC))))
            #if sorted(listD) == sorted(listC):
            #    print('All Tables OK')
            if listC == []:
                #print('JX ',listClass,':',listA)
                #print('PRO ',listClass,':',listB)
                print('Intersection>>','JX ',listClass,':',listAC,'--->','PRO',listClass,':',listBC)
            elif listAC != [] or listBC != []:
                print('Difference  >>','JX ',listClass,':',listAC,'--->','PRO',listClass,':',listBC)
            else:
                pass
            return listC
           
    if __name__ == '__main__':
        #diff_jx_pro(jx_schemas_tables,pro_schemas_tables)
        tables_lists = diff_jx_pro(jx_schemas_tables,pro_schemas_tables,'Tables')
        for i in range(len(tables_lists)):
            table_name = "".join(tuple(tables_lists[i]))
            #print(table_name)
            jx_schemas_tables_columns = jx_db.schemas_tables_columns_list(schemas_tables_columns_sql,table_name)
            pro_schemas_tables_columns = pro_db.schemas_tables_columns_list(schemas_tables_columns_sql,table_name)
            diff_jx_pro(jx_schemas_tables_columns,pro_schemas_tables_columns,'Columns')
        for i in range(len(tables_lists)):
            table_name = "".join(tuple(tables_lists[i]))
            #print(table_name)
            jx_schemas_tables_indexes = jx_db.schemas_tables_indexes_list(schemas_tables_indexes_sql,table_name)
            pro_schemas_tables_indexes = pro_db.schemas_tables_indexes_list(schemas_tables_indexes_sql,table_name)
            diff_jx_pro(jx_schemas_tables_indexes,pro_schemas_tables_indexes,'Indexes')
        jx_db.close()
        pro_db.close()
        print('------------Table, column and index check completed--------------')

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持。

    声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。