import sqlite3
import pandas as pd
class SQLiteTable:
def __init__(self, db_file):
self.db_file = db_file
self.conn = sqlite3.connect(db_file)
self.cursor = self.conn.cursor()
def create_table(self, table_name, columns):
create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns)})"
self.cursor.execute(create_table_query)
self.conn.commit()
def insert_data(self, table_name, data):
insert_query = f"INSERT INTO {table_name} VALUES ({', '.join(['?'] * len(data))})"
self.cursor.execute(insert_query, data)
self.conn.commit()
def delete_data(self, table_name, condition):
delete_query = f"DELETE FROM {table_name} WHERE {condition}"
self.cursor.execute(delete_query)
self.conn.commit()
def truncate_table(self, table_name):
truncate_query = f"DELETE FROM {table_name}"
self.cursor.execute(truncate_query)
self.conn.commit()
def select_data(self, table_name, condition=None):
select_query = f"SELECT * FROM {table_name}"
if condition:
select_query += f" WHERE {condition}"
self.cursor.execute(select_query)
return self.cursor.fetchall()
def begin_transaction(self):
self.conn.execute('BEGIN')
def commit_transaction(self):
self.conn.execute('COMMIT')
def close(self):
self.conn.close()
def count_data(self, table_name):
count_query = f"SELECT COUNT(*) FROM {table_name}"
self.cursor.execute(count_query)
return self.cursor.fetchone()[0]
def export_data(self, table_name, export_file):
export_query = f"SELECT * FROM {table_name}"
df = pd.read_sql_query(export_query, self.conn)
df.to_csv(export_file, index=False)
def import_data(self, table_name, import_file):
df = pd.read_csv(import_file)
df.to_sql(table_name, self.conn, if_exists='replace', index=False)
# 使用例
if __name__ == "__main__":
db_file = 'mydatabase.db'
my_table = SQLiteTable(db_file)
table_name = 'mytable'
columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']
my_table.truncate_table(table_name)
try:
#my_table.begin_transaction() # トランザクションを開始
# テーブルの作成
my_table.create_table(table_name, columns)
# データの挿入
data1 = (1, 'Alice', 25)
data2 = (2, 'Bob', 30)
data3 = (3, 'Charlie', 22)
my_table.insert_data(table_name, data1)
my_table.insert_data(table_name, data2)
my_table.insert_data(table_name, data3)
# データのカウント
count = my_table.count_data(table_name)
print(f"Total rows in {table_name}: {count}")
# データのエクスポート
export_file = 'exported_data.csv'
my_table.export_data(table_name, export_file)
print(f"Data exported to {export_file}")
# データの削除
my_table.delete_data(table_name, "id=1") # idが1のデータを削除
# データのインポート
import_file = 'exported_data.csv'
my_table.import_data(table_name, import_file)
print(f"Data imported from {import_file}")
#my_table.commit_transaction() # トランザクションをコミット
# データの選択
print("\nSelected Data:")
result = my_table.select_data(table_name)
for row in result:
print(row)
except Exception as e:
print("Error:", e)
my_table.conn.rollback() # エラーが発生した場合、トランザクションをロールバック
finally:
my_table.close()
テスト
import unittest
import sqlite3
import os
import pandas as pd
from tempfile import mkstemp
from sqlite_table_manager import SQLiteTable
class TestSQLiteTable(unittest.TestCase):
def setUp(self):
self.db_fd, self.db_file = mkstemp()
self.my_table = SQLiteTable(self.db_file)
def tearDown(self):
self.my_table.close()
os.close(self.db_fd)
os.remove(self.db_file)
def test_create_table(self):
table_name = 'mytable'
columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']
self.my_table.create_table(table_name, columns)
connection = sqlite3.connect(self.db_file)
cursor = connection.cursor()
cursor.execute("PRAGMA table_info(mytable)")
column_names = [column[1] for column in cursor.fetchall()]
connection.close()
self.assertEqual(column_names, ['id', 'name', 'age'])
def test_insert_data(self):
table_name = 'mytable'
columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']
self.my_table.create_table(table_name, columns)
data = (1, 'Alice', 25)
self.my_table.insert_data(table_name, data)
count = self.my_table.count_data(table_name)
self.assertEqual(count, 1)
def test_delete_data(self):
table_name = 'mytable'
columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']
self.my_table.create_table(table_name, columns)
data1 = (1, 'Alice', 25)
data2 = (2, 'Bob', 30)
self.my_table.insert_data(table_name, data1)
self.my_table.insert_data(table_name, data2)
self.my_table.delete_data(table_name, "id=1")
count = self.my_table.count_data(table_name)
self.assertEqual(count, 1)
def test_truncate_table(self):
table_name = 'mytable'
columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']
self.my_table.create_table(table_name, columns)
data1 = (1, 'Alice', 25)
data2 = (2, 'Bob', 30)
self.my_table.insert_data(table_name, data1)
self.my_table.insert_data(table_name, data2)
self.my_table.truncate_table(table_name)
count = self.my_table.count_data(table_name)
self.assertEqual(count, 0)
def test_select_data(self):
table_name = 'mytable'
columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']
self.my_table.create_table(table_name, columns)
data1 = (1, 'Alice', 25)
data2 = (2, 'Bob', 30)
self.my_table.insert_data(table_name, data1)
self.my_table.insert_data(table_name, data2)
result = self.my_table.select_data(table_name)
self.assertEqual(len(result), 2)
def test_count_data(self):
table_name = 'mytable'
columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']
self.my_table.create_table(table_name, columns)
data1 = (1, 'Alice', 25)
data2 = (2, 'Bob', 30)
self.my_table.insert_data(table_name, data1)
self.my_table.insert_data(table_name, data2)
count = self.my_table.count_data(table_name)
self.assertEqual(count, 2)
def test_export_data(self):
table_name = 'mytable'
columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']
self.my_table.create_table(table_name, columns)
data1 = (1, 'Alice', 25)
data2 = (2, 'Bob', 30)
self.my_table.insert_data(table_name, data1)
self.my_table.insert_data(table_name, data2)
export_file = 'exported_data.csv'
self.my_table.export_data(table_name, export_file)
self.assertTrue(os.path.isfile(export_file))
exported_df = pd.read_csv(export_file)
self.assertEqual(len(exported_df), 2)
self.assertTrue('id' in exported_df.columns)
def test_import_data(self):
table_name = 'mytable'
columns = ['id INTEGER PRIMARY KEY', 'name TEXT', 'age INTEGER']
self.my_table.create_table(table_name, columns)
export_file = 'exported_data.csv'
data_to_export = pd.DataFrame({'id': [1, 2], 'name': ['Alice', 'Bob'], 'age': [25, 30]})
data_to_export.to_csv(export_file, index=False)
self.my_table.import_data(table_name, export_file)
count = self.my_table.count_data(table_name)
self.assertEqual(count, 2)
result = self.my_table.select_data(table_name)
self.assertEqual(len(result), 2)
self.assertEqual(result, [(1, 'Alice', 25), (2, 'Bob', 30)])
def test_save_dataframe_to_database(self):
data = {'id': [1, 2, 3], 'name': ['Alice', 'Bob', 'Charlie'], 'age': [25, 30, 22]}
df = pd.DataFrame(data)
table_name = 'test_table'
my_table = SQLiteTable(self.db_file)
my_table.save_dataframe_to_database(df, table_name)
query = f"SELECT * FROM {table_name}"
self.cursor.execute(query)
result = self.cursor.fetchall()
expected_result = [(1, 'Alice', 25), (2, 'Bob', 30), (3, 'Charlie', 22)]
self.assertEqual(result, expected_result)
if __name__ == '__main__':
unittest.main()