A python script made with the help of chat-GPT to backup tables in data blaze.
Downloads tables in json and converts / flattens them into csv for easy importing.
Update the areas that say CHANGE_ME and desired duration to keep backups (currently set to 30 days).
Note: for security reasons consider making API token an environmental variable instead of hard coding it into the script. I left mine in as there is no sensitive data I'm concerned about.
Also has healtcheck but I have not tested this yet (currently commented out).
import requests
import csv
import os
import shutil
from datetime import datetime, timedelta
# Replace with your actual API token
API_TOKEN = "CHANGE_ME" # Your API token found in API tab
BASE_URL = "https://data-api.blaze.today/api/database/" # Leave as is
DATABASE_ID = "CHANGE_ME" # Corresponding database ID found in API documentation tab
# List of tables to back up (manually add table IDs and names)
TABLES = [
{"id": "TABLE_ID", "name": "TABLE_NAME"} # CHANGE_ME - TABLE_ID (found in API documentation tab) and TABLE_NAME (actual name of table - case sensitive)
]
# Directory to store backups
BACKUP_DIR = "CHANGE_ME" # Name of folder to backup tables
RETENTION_DAYS = 30 # Delete any backups older than variable (in days)
# Healthcheck URL (optional, replace with your URL)
#HEALTHCHECK_URL = "https://hc-ping.com/YOUR_UNIQUE_HEALTHCHECK_ID"
# Function to delete old backups
def delete_old_backups():
if not os.path.exists(BACKUP_DIR):
print(f"Backup directory '{BACKUP_DIR}' does not exist. Skipping cleanup.")
return # Exit function early if the directory doesn't exist
cutoff_time = datetime.now() - timedelta(days=RETENTION_DAYS)
for folder in os.listdir(BACKUP_DIR):
folder_path = os.path.join(BACKUP_DIR, folder)
if os.path.isdir(folder_path): # Ensure it's a directory
try:
folder_time = datetime.strptime(folder, "%Y-%m-%d_%H-%M-%S")
if folder_time < cutoff_time:
shutil.rmtree(folder_path) # Delete folder
print(f"Deleted old backup: {folder}")
except ValueError:
pass # Ignore folders that don't match the timestamp format
# **Recursive Function to Flatten and Clean Data**
def simplify_data(data):
if isinstance(data, dict): # If data is a dictionary
simplified_dict = {}
for key, value in data.items():
processed_value = simplify_data(value) # Recursively clean data
if isinstance(processed_value, list) and all(isinstance(item, dict) and "value" in item for item in processed_value):
# Convert list of dicts into a list of "value" strings
simplified_dict[key] = [item["value"] for item in processed_value]
else:
simplified_dict[key] = processed_value
return {k: v for k, v in simplified_dict.items() if k not in ["id", "order", "color"]} # Remove top-level fields
elif isinstance(data, list): # If data is a list
return [simplify_data(item) for item in data]
else:
return data # Return unchanged values
# Function to fetch only the 'results' array from a table
def get_table_data(table_id, table_name):
url = f"{BASE_URL}rows/table/{table_id}/?user_field_names=true"
headers = {"Authorization": f"Token {API_TOKEN}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
rows = data.get("results", [])
return simplify_data(rows) # Transform data before saving
else:
print(f"Error fetching rows for {table_name}: {response.status_code}, {response.text}")
return None
# Function to write data to a CSV file
def save_to_csv(data, file_path):
if not data:
return # Skip empty data
# Get column headers (unique keys from all rows)
headers = set()
for row in data:
headers.update(row.keys())
headers = sorted(headers) # Sort headers for consistency
with open(file_path, "w", newline="", encoding="utf-8") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=headers)
writer.writeheader() # Write column names
for row in data:
cleaned_row = {}
for key, value in row.items():
if isinstance(value, list):
cleaned_row[key] = ", ".join(map(str, value)) # Convert lists to comma-separated values
else:
cleaned_row[key] = value # Keep normal values unchanged
writer.writerow(cleaned_row) # Write formatted row
# Function to back up all tables
def backup_tables():
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
backup_folder = os.path.join(BACKUP_DIR, timestamp)
os.makedirs(backup_folder, exist_ok=True)
success = True # Track success/failure
for table in TABLES:
table_id = table["id"]
table_name = table["name"]
print(f"Backing up table: {table_name} (ID: {table_id})")
data = get_table_data(table_id, table_name)
if data:
backup_file = os.path.join(backup_folder, f"{table_name}.csv")
save_to_csv(data, backup_file) # Save data as CSV
print(f"Saved: {backup_file}")
else:
success = False # Mark failure
# Send healthcheck notification
# if HEALTHCHECK_URL:
# requests.get(HEALTHCHECK_URL if success else f"{HEALTHCHECK_URL}/fail")
if __name__ == "__main__":
delete_old_backups() # Step 1: Remove old backups
backup_tables() # Step 2: Create a new backup