Python script for backing up data blaze

A python script made with the help of chat-GPT to backup tables in data blaze.

Downloads tables in json and converts / flattens them into csv for easy importing.

Update the areas that say CHANGE_ME and desired duration to keep backups (currently set to 30 days).

Note: for security reasons consider making API token an environmental variable instead of hard coding it into the script. I left mine in as there is no sensitive data I'm concerned about.

Also has healtcheck but I have not tested this yet (currently commented out).

import requests
import csv
import os
import shutil
from datetime import datetime, timedelta

# Replace with your actual API token
API_TOKEN = "CHANGE_ME" # Your API token found in API tab
BASE_URL = "https://data-api.blaze.today/api/database/" # Leave as is
DATABASE_ID = "CHANGE_ME"  # Corresponding database ID found in API documentation tab

# List of tables to back up (manually add table IDs and names)
TABLES = [
    {"id": "TABLE_ID", "name": "TABLE_NAME"}  # CHANGE_ME - TABLE_ID (found in API documentation tab) and TABLE_NAME (actual name of table - case sensitive)
]

# Directory to store backups
BACKUP_DIR = "CHANGE_ME"  # Name of folder to backup tables
RETENTION_DAYS = 30  # Delete any backups older than variable (in days)

# Healthcheck URL (optional, replace with your URL)
#HEALTHCHECK_URL = "https://hc-ping.com/YOUR_UNIQUE_HEALTHCHECK_ID"

# Function to delete old backups
def delete_old_backups():
    if not os.path.exists(BACKUP_DIR):
        print(f"Backup directory '{BACKUP_DIR}' does not exist. Skipping cleanup.")
        return  # Exit function early if the directory doesn't exist

    cutoff_time = datetime.now() - timedelta(days=RETENTION_DAYS)

    for folder in os.listdir(BACKUP_DIR):
        folder_path = os.path.join(BACKUP_DIR, folder)

        if os.path.isdir(folder_path):  # Ensure it's a directory
            try:
                folder_time = datetime.strptime(folder, "%Y-%m-%d_%H-%M-%S")
                if folder_time < cutoff_time:
                    shutil.rmtree(folder_path)  # Delete folder
                    print(f"Deleted old backup: {folder}")
            except ValueError:
                pass  # Ignore folders that don't match the timestamp format

# **Recursive Function to Flatten and Clean Data**
def simplify_data(data):
    if isinstance(data, dict):  # If data is a dictionary
        simplified_dict = {}
        for key, value in data.items():
            processed_value = simplify_data(value)  # Recursively clean data
            if isinstance(processed_value, list) and all(isinstance(item, dict) and "value" in item for item in processed_value):
                # Convert list of dicts into a list of "value" strings
                simplified_dict[key] = [item["value"] for item in processed_value]
            else:
                simplified_dict[key] = processed_value
        return {k: v for k, v in simplified_dict.items() if k not in ["id", "order", "color"]}  # Remove top-level fields
    elif isinstance(data, list):  # If data is a list
        return [simplify_data(item) for item in data]
    else:
        return data  # Return unchanged values

# Function to fetch only the 'results' array from a table
def get_table_data(table_id, table_name):
    url = f"{BASE_URL}rows/table/{table_id}/?user_field_names=true"
    headers = {"Authorization": f"Token {API_TOKEN}"}

    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        data = response.json()
        rows = data.get("results", [])

        return simplify_data(rows)  # Transform data before saving
    else:
        print(f"Error fetching rows for {table_name}: {response.status_code}, {response.text}")
        return None

# Function to write data to a CSV file
def save_to_csv(data, file_path):
    if not data:
        return  # Skip empty data

    # Get column headers (unique keys from all rows)
    headers = set()
    for row in data:
        headers.update(row.keys())

    headers = sorted(headers)  # Sort headers for consistency

    with open(file_path, "w", newline="", encoding="utf-8") as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=headers)
        writer.writeheader()  # Write column names

        for row in data:
            cleaned_row = {}
            for key, value in row.items():
                if isinstance(value, list):
                    cleaned_row[key] = ", ".join(map(str, value))  # Convert lists to comma-separated values
                else:
                    cleaned_row[key] = value  # Keep normal values unchanged

            writer.writerow(cleaned_row)  # Write formatted row

# Function to back up all tables
def backup_tables():
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    backup_folder = os.path.join(BACKUP_DIR, timestamp)
    os.makedirs(backup_folder, exist_ok=True)

    success = True  # Track success/failure

    for table in TABLES:
        table_id = table["id"]
        table_name = table["name"]
        print(f"Backing up table: {table_name} (ID: {table_id})")

        data = get_table_data(table_id, table_name)
        if data:
            backup_file = os.path.join(backup_folder, f"{table_name}.csv")
            save_to_csv(data, backup_file)  # Save data as CSV
            print(f"Saved: {backup_file}")
        else:
            success = False  # Mark failure

    # Send healthcheck notification
#    if HEALTHCHECK_URL:
#        requests.get(HEALTHCHECK_URL if success else f"{HEALTHCHECK_URL}/fail")

if __name__ == "__main__":
    delete_old_backups()  # Step 1: Remove old backups
    backup_tables()  # Step 2: Create a new backup

4 Likes