feat: implement internal task tracker with Python and DuckDB
This commit is contained in:
48
tasks/db.py
Normal file
48
tasks/db.py
Normal file
@@ -0,0 +1,48 @@
|
||||
import duckdb
|
||||
import os
|
||||
|
||||
DB_PATH = "data/tasks.duckdb"
|
||||
|
||||
def init_db():
|
||||
if not os.path.exists("data"):
|
||||
os.makedirs("data")
|
||||
|
||||
con = duckdb.connect(DB_PATH)
|
||||
con.execute("""
|
||||
CREATE TABLE IF NOT EXISTS tasks (
|
||||
id INTEGER PRIMARY KEY,
|
||||
title VARCHAR,
|
||||
status VARCHAR DEFAULT 'todo',
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
""")
|
||||
con.execute("CREATE SEQUENCE IF NOT EXISTS seq_task_id START 1")
|
||||
con.close()
|
||||
|
||||
def get_connection():
|
||||
return duckdb.connect(DB_PATH)
|
||||
|
||||
def add_task(title):
|
||||
con = get_connection()
|
||||
con.execute("INSERT INTO tasks (id, title) VALUES (nextval('seq_task_id'), ?)", [title])
|
||||
con.close()
|
||||
|
||||
def list_tasks(status=None):
|
||||
con = get_connection()
|
||||
if status:
|
||||
res = con.execute("SELECT * FROM tasks WHERE status = ? ORDER BY id", [status]).fetchall()
|
||||
else:
|
||||
res = con.execute("SELECT * FROM tasks ORDER BY id").fetchall()
|
||||
con.close()
|
||||
return res
|
||||
|
||||
def update_task(task_id, status):
|
||||
con = get_connection()
|
||||
con.execute("UPDATE tasks SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", [status, task_id])
|
||||
con.close()
|
||||
|
||||
def delete_task(task_id):
|
||||
con = get_connection()
|
||||
con.execute("DELETE FROM tasks WHERE id = ?", [task_id])
|
||||
con.close()
|
||||
Reference in New Issue
Block a user