Pythonでsqlite3を使用する

import sqlite3
import pandas as pd


class SQLiteTable:
    def __init__(self, db_file):
        self.db_file = db_file
        self.conn = sqlite3.connect(db_file)
        self.cursor = self.conn.cursor()

    def create_table(self, table_name, columns):
        create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns)})"
        self.cursor.execute(create_table_query)
        self.conn.commit()

    def insert_data(self, table_name, data):
        insert_query = f"INSERT INTO {table_name} VALUES ({', '.join(['?'] * len(data))})"
        self.cursor.execute(insert_query, data)
        self.conn.commit()

    def delete_data(self, table_name, condition):
        delete_query = f"DELETE FROM {table_name} WHERE {condition}"
        self.cursor.execute(delete_query)
        self.conn.commit()

    def truncate_table(self, table_name):
        truncate_query = f"DELETE FROM {table_name}"
        self.cursor.execute(truncate_query)
        self.conn.commit()

    def select_data(self, table_name, condition=None):
        select_query = f"SELECT * FROM {table_name}"
        if condition:
            select_query += f" WHERE {condition}"
        self.cursor.execute(select_query)
        return self.cursor.fetchall()

    def begin_transaction(self):
        self.conn.execute('BEGIN')

    def commit_transaction(self):
        self.conn.execute('COMMIT')

    def close(self):
        self.conn.close()

    def count_data(self, table_name):
        count_query = f"SELECT COUNT(*) FROM {table_name}"
        self.cursor.execute(count_query)
        return self.cursor.fetchone()[0]

    def export_data(self, table_name, export_file):
        export_query = f"SELECT * FROM {table_name}"
        df = pd.read_sql_query(export_query, self.conn)
        df.to_csv(export_file, index=False)

    def import_data(self, table_name, import_file):
        df = pd.read_csv(import_file)
        df.to_sql(table_name, self.conn, if_exists='replace', index=False)


# 使用例
if __name__ == "__main__":
    db_file = 'mydatabase.db'
    my_table = SQLiteTable(db_file)

    table_name = 'mytable'
    columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']

    my_table.truncate_table(table_name)  

    try:
        #my_table.begin_transaction()  # トランザクションを開始

        # テーブルの作成
        my_table.create_table(table_name, columns)

        # データの挿入
        data1 = (1, 'Alice', 25)
        data2 = (2, 'Bob', 30)
        data3 = (3, 'Charlie', 22)
        my_table.insert_data(table_name, data1)
        my_table.insert_data(table_name, data2)
        my_table.insert_data(table_name, data3)

        # データのカウント
        count = my_table.count_data(table_name)
        print(f"Total rows in {table_name}: {count}")

        # データのエクスポート
        export_file = 'exported_data.csv'
        my_table.export_data(table_name, export_file)
        print(f"Data exported to {export_file}")

        # データの削除
        my_table.delete_data(table_name, "id=1")  # idが1のデータを削除

        # データのインポート
        import_file = 'exported_data.csv'
        my_table.import_data(table_name, import_file)
        print(f"Data imported from {import_file}")

        #my_table.commit_transaction()  # トランザクションをコミット

        # データの選択
        print("\nSelected Data:")
        result = my_table.select_data(table_name)
        for row in result:
            print(row)

    except Exception as e:
        print("Error:", e)
        my_table.conn.rollback()  # エラーが発生した場合、トランザクションをロールバック

    finally:
        my_table.close()

テスト

import unittest
import sqlite3
import os
import pandas as pd
from tempfile import mkstemp
from sqlite_table_manager import SQLiteTable

class TestSQLiteTable(unittest.TestCase):
    def setUp(self):
        # テスト用の一時的なデータベースファイルを作成
        self.db_fd, self.db_file = mkstemp()
        self.my_table = SQLiteTable(self.db_file)

    def tearDown(self):
        # テスト後に一時的なデータベースファイルを削除
        self.my_table.close()
        os.close(self.db_fd)
        os.remove(self.db_file)

    def test_create_table(self):
        table_name = 'mytable'
        columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']

        self.my_table.create_table(table_name, columns)
        # テーブルが正しく作成されたかを確認
        connection = sqlite3.connect(self.db_file)
        cursor = connection.cursor()
        cursor.execute("PRAGMA table_info(mytable)")
        column_names = [column[1] for column in cursor.fetchall()]
        connection.close()
        self.assertEqual(column_names, ['id', 'name', 'age'])

    def test_insert_data(self):
        table_name = 'mytable'
        columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']
        self.my_table.create_table(table_name, columns)  # テーブルを作成

        data = (1, 'Alice', 25)
        self.my_table.insert_data(table_name, data)  # データを挿入

        count = self.my_table.count_data(table_name)  # データのカウント
        self.assertEqual(count, 1)  # カウントが1であることを確認

    def test_delete_data(self):
        table_name = 'mytable'
        columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']
        self.my_table.create_table(table_name, columns)  # テーブルを作成

        data1 = (1, 'Alice', 25)
        data2 = (2, 'Bob', 30)
        self.my_table.insert_data(table_name, data1)  # データを挿入
        self.my_table.insert_data(table_name, data2)

        self.my_table.delete_data(table_name, "id=1")  # idが1のデータを削除

        count = self.my_table.count_data(table_name)  # データのカウント
        self.assertEqual(count, 1)  # カウントが1であることを確認

    def test_truncate_table(self):
        table_name = 'mytable'
        columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']
        self.my_table.create_table(table_name, columns)  # テーブルを作成

        data1 = (1, 'Alice', 25)
        data2 = (2, 'Bob', 30)
        self.my_table.insert_data(table_name, data1)  # データを挿入
        self.my_table.insert_data(table_name, data2)

        self.my_table.truncate_table(table_name)  # テーブルを切り捨て

        count = self.my_table.count_data(table_name)  # データのカウント
        self.assertEqual(count, 0)  # カウントが0であることを確認

    def test_select_data(self):
        table_name = 'mytable'
        columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']
        self.my_table.create_table(table_name, columns)  # テーブルを作成

        data1 = (1, 'Alice', 25)
        data2 = (2, 'Bob', 30)
        self.my_table.insert_data(table_name, data1)  # データを挿入
        self.my_table.insert_data(table_name, data2)

        result = self.my_table.select_data(table_name)  # データを選択
        self.assertEqual(len(result), 2)  # 選択された行の数を確認

    def test_count_data(self):
        table_name = 'mytable'
        columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']
        self.my_table.create_table(table_name, columns)  # テーブルを作成

        data1 = (1, 'Alice', 25)
        data2 = (2, 'Bob', 30)
        self.my_table.insert_data(table_name, data1)  # データを挿入
        self.my_table.insert_data(table_name, data2)

        count = self.my_table.count_data(table_name)  # データのカウント
        self.assertEqual(count, 2)  # カウントが2であることを確認

    def test_export_data(self):
        table_name = 'mytable'
        columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']
        self.my_table.create_table(table_name, columns)  # テーブルを作成

        data1 = (1, 'Alice', 25)
        data2 = (2, 'Bob', 30)
        self.my_table.insert_data(table_name, data1)  # データを挿入
        self.my_table.insert_data(table_name, data2)

        export_file = 'exported_data.csv'
        self.my_table.export_data(table_name, export_file)  # データをエクスポート

        # エクスポートされたファイルが存在し、データが正しくエクスポートされたかを確認
        self.assertTrue(os.path.isfile(export_file))

        # エクスポートされたデータを読み込んで検証
        exported_df = pd.read_csv(export_file)
        self.assertEqual(len(exported_df), 2)  # エクスポートされた行の数を確認
        self.assertTrue('id' in exported_df.columns)  # カラムが存在することを確認

    def test_import_data(self):
        table_name = 'mytable'
        columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']
        self.my_table.create_table(table_name, columns)  # テーブルを作成

        export_file = 'exported_data.csv'
        data_to_export = pd.DataFrame({'id': [1, 2], 'name': ['Alice', 'Bob'], 'age': [25, 30]})
        data_to_export.to_csv(export_file, index=False)  # エクスポート用ファイルを作成

        self.my_table.import_data(table_name, export_file)  # データをインポート

        # データが正しくインポートされたかを確認
        count = self.my_table.count_data(table_name)
        self.assertEqual(count, 2)  # インポートされたデータの数を確認

        # インポートされたデータを選択して確認
        result = self.my_table.select_data(table_name)
        self.assertEqual(len(result), 2)  # 選択された行の数を確認
        self.assertEqual(result, [(1, 'Alice', 25), (2, 'Bob', 30)])  # データの一致を確認

    def test_save_dataframe_to_database(self):
        # テスト用のデータフレームを作成
        data = {'id': [1, 2, 3], 'name': ['Alice', 'Bob', 'Charlie'], 'age': [25, 30, 22]}
        df = pd.DataFrame(data)

        # テスト用のテーブル名を指定
        table_name = 'test_table'

        # SQLiteTableクラスのインスタンスを作成
        my_table = SQLiteTable(self.db_file)

        # データフレームをデータベースに保存
        my_table.save_dataframe_to_database(df, table_name)

        # データベースからデータを取得して検証
        query = f"SELECT * FROM {table_name}"
        self.cursor.execute(query)
        result = self.cursor.fetchall()

        # データベースから取得した結果とデータフレームを比較
        expected_result = [(1, 'Alice', 25), (2, 'Bob', 30), (3, 'Charlie', 22)]
        self.assertEqual(result, expected_result)
    # 他のテストメソッドも同様に作成することができます

if __name__ == '__main__':
    unittest.main()