Storage Access allows your Data App to read data from and write data back to Keboola Storage tables in real-time. Your app connects directly to Keboola’s storage through Query Service via SQL, enabling:
This feature is available for both Streamlit and Python/JS Data Apps. Code examples on this page use Python; the same concepts apply when calling the Query Service API from JavaScript.
Snowflake only. Storage Access currently works only on projects using the Snowflake storage backend. BigQuery support is coming soon.
Use Storage Access when you need to:
Stick with Input Mapping when:
When you enable Storage Access, Keboola creates a dedicated workspace for your Data App. This workspace contains a database user with specific permissions (SELECT, INSERT, UPDATE, DELETE, TRUNCATE) on the tables you’ve selected.
Your Data App
│
▼
Query Service ────► Workspace User ────► Storage Tables
│ │
│ │
└── Handles authentication, └── Your selected tables
billing, metadata refresh with granted permissions
Your app communicates with Storage through the Query Service API, not directly with Snowflake. This provides:
The recommended Python client library is keboola-query-service (also available for JavaScript/TypeScript as @keboola/query-service).
The workspace is ephemeral - a fresh workspace is created each time your app starts (including wake-up from sleep):
| Event | Workspace Action |
|---|---|
| App deploys | New workspace created |
| App wakes from sleep | New workspace created (old one deleted) |
| App redeployed | New workspace created (old one deleted) |
| App deleted | Workspace deleted |
This design ensures:
Notes:
If you manage Data App configurations through the Storage API (or via automation/agents) rather than the UI, the same writable-table selection is expressed in the configuration JSON under storage.output.tables. Each entry is a table the app gets read/write permissions on:
{
"storage": {
"output": {
"tables": [
{
"destination": "out.c-data-app.mvc-crashes",
"unload_strategy": "direct-grant"
}
]
}
}
}
destination — the full Storage table ID (<stage>.<bucket>.<table>) the app should be able to read and write. The table must exist before the app is deployed.unload_strategy: "direct-grant" — required marker that tells the platform “grant the app’s workspace direct SELECT/INSERT/UPDATE/DELETE/TRUNCATE on this table.” Tables without this strategy in storage.output.tables are not exposed via Storage Access.To add or remove writable tables programmatically, update the Data App’s configuration via the Storage API (Component Configurations endpoint) and redeploy the app for the new permissions to take effect.
Click Deploy (or Redeploy for existing apps). During deployment:
Install the Keboola Query Service client:
In pyproject.toml (Python):
dependencies = [
"keboola-query-service>=0.2.0",
]
In your Python code:
import json
import os
from keboola_query_service import Client
# Storage Access config is set by the platform when the feature is enabled.
# workspace_id is read from the manifest file (recommended); the other values
# are plain env vars.
try:
branch_id = os.environ["BRANCH_ID"]
query_service_url = os.environ["QUERY_SERVICE_URL"]
with open(os.environ["KBC_WORKSPACE_MANIFEST_PATH"]) as f:
workspace_id = json.load(f)["workspaceId"]
except (KeyError, FileNotFoundError) as e:
raise RuntimeError(
"Storage Access is not enabled for this app. "
"Enable it in Advanced Settings and redeploy."
) from e
# Initialize the Query Service client
client = Client(
base_url=query_service_url,
token=os.environ["KBC_TOKEN"],
)
To read a table you’ve selected in the UI:
import pandas as pd
# Query a table - use the full table ID (bucket.table)
results = client.execute_query(
branch_id=branch_id,
workspace_id=workspace_id,
statements=['SELECT * FROM "in.c-main"."customers" LIMIT 1000'],
)
# One QueryResult per statement — we sent one statement, so take the first.
result = results[0]
# Convert to DataFrame
df = pd.DataFrame(result.data, columns=[c.name for c in result.columns])
print(df.head())
Table naming convention:
"bucket_stage.bucket_name"."table_name""in.c-sales"."orders" for a table orders in bucket in.c-salesYou can run any SELECT query against your permitted tables:
# Join multiple tables
query = """
SELECT
c.customer_name,
SUM(o.amount) as total_spent
FROM "in.c-main"."customers" c
JOIN "in.c-main"."orders" o ON c.id = o.customer_id
GROUP BY c.customer_name
ORDER BY total_spent DESC
LIMIT 10
"""
results = client.execute_query(
branch_id=branch_id,
workspace_id=workspace_id,
statements=[query],
)
result = results[0]
Storage Access allows your app to modify data in Storage tables using standard SQL statements via the Query Service. This is useful for:
The Query Service accepts raw SQL and does not support parameterized queries or server-side bind variables. Your application is responsible for validating every untrusted value before interpolating it into a statement. See the Validate and sanitize user input guidance for patterns.
You can use standard SQL INSERT and UPDATE statements directly via the Query Service. Pass statements as a list — the SDK will execute them (transactionally by default) and return one result per statement:
# INSERT new records
client.execute_query(
branch_id=branch_id,
workspace_id=workspace_id,
statements=['''
INSERT INTO "in.c-main"."approvals" ("id", "name", "status", "updated_at")
VALUES (1, 'New Record', 'pending', CURRENT_TIMESTAMP)
'''],
)
# UPDATE existing records
client.execute_query(
branch_id=branch_id,
workspace_id=workspace_id,
statements=['''
UPDATE "in.c-main"."approvals"
SET status = 'approved', updated_at = CURRENT_TIMESTAMP
WHERE id = 123
'''],
)
# DELETE records
client.execute_query(
branch_id=branch_id,
workspace_id=workspace_id,
statements=['''
DELETE FROM "in.c-main"."approvals"
WHERE status = 'cancelled'
'''],
)
The Query Service automatically handles metadata refresh in Storage after write operations, so row counts and table statistics stay current without any additional calls.
To remove all data from a table:
client.execute_query(
branch_id=branch_id,
workspace_id=workspace_id,
statements=['TRUNCATE TABLE "in.c-main"."temp_data"'],
)
Truncation removes every row in the target table immediately and cannot be undone through the Query Service. Use with caution.
Metadata refresh: After any write operation, Keboola automatically refreshes the table metadata. This ensures:
Concurrency: Multiple users of your app may write simultaneously. If you need to prevent conflicts, you must handle this in your application logic:
# Example: Optimistic locking with version column
query = """
UPDATE "in.c-main"."records"
SET status = 'approved', version = version + 1
WHERE id = 123 AND version = 5
"""
results = client.execute_query(
branch_id=branch_id,
workspace_id=workspace_id,
statements=[query],
)
if results[0].rows_affected == 0:
raise Exception("Record was modified by another user. Please refresh and try again.")
When Storage Access is enabled, the platform sets these environment variables in your Data App container:
| Variable | Description |
|---|---|
KBC_WORKSPACE_MANIFEST_PATH |
Path to the workspace manifest JSON file. The file contains workspaceId (and other workspace metadata). Recommended source for the workspace ID. |
WORKSPACE_ID |
ID of the provisioned workspace for this app. Also available in the manifest file (above) — prefer reading the manifest in new code. |
BRANCH_ID |
Storage API branch ID of the project. |
QUERY_SERVICE_URL |
URL of the Query Service API (stack-specific). |
KBC_TOKEN |
Keboola Storage API token. |
If Storage Access is not enabled, KBC_WORKSPACE_MANIFEST_PATH / WORKSPACE_ID / BRANCH_ID / QUERY_SERVICE_URL are not set. Read them with a clear error message for users:
import json
import os
try:
branch_id = os.environ["BRANCH_ID"]
query_service_url = os.environ["QUERY_SERVICE_URL"]
with open(os.environ["KBC_WORKSPACE_MANIFEST_PATH"]) as f:
workspace_id = json.load(f)["workspaceId"]
except (KeyError, FileNotFoundError) as e:
raise RuntimeError(
"Storage Access is not enabled. Enable it in Advanced Settings and redeploy."
) from e
For the full list of environment variables exposed to Data Apps, see the data-app-python-js runtime README.
| Aspect | Input Mapping | Direct Storage Access |
|---|---|---|
| Data freshness | Snapshot at deploy time | Real-time, always current |
| Data loading | CSV files loaded to /data/in/tables/ |
Query on demand via API |
| Write capability | None (read-only) | INSERT, UPDATE, DELETE, TRUNCATE |
| Dataset size | Limited by container memory | Virtually unlimited (pagination) |
| Configuration | Select tables in UI | Select tables + enable toggle |
| Use case | Static dashboards, reports | Interactive apps, data entry |
You can use both together: Input Mapping for reference data that rarely changes, Storage Access for data you need to read/write in real-time.
This example shows a simple Flask app that reads records from Storage and allows users to update their status.
app.py:
from flask import Flask, request, render_template_string
import json
import os
from keboola_query_service import Client
app = Flask(__name__)
# Read Storage Access config once at startup.
# workspace_id is read from the manifest (recommended); other values are env vars.
BRANCH_ID = os.environ["BRANCH_ID"]
with open(os.environ["KBC_WORKSPACE_MANIFEST_PATH"]) as f:
WORKSPACE_ID = json.load(f)["workspaceId"]
qs_client = Client(
base_url=os.environ["QUERY_SERVICE_URL"],
token=os.environ["KBC_TOKEN"],
)
ALLOWED_STATUSES = {"pending", "approved", "rejected"}
@app.route("/", methods=["GET", "POST"])
def index():
if request.method == "POST":
# Validate and sanitize user input BEFORE it reaches SQL.
# int() guarantees record_id is a number; the allowlist guarantees
# new_status is one of three exact strings. This is the only reason
# the f-string below is safe — do not add other form fields to the
# query without analogous validation.
record_id = int(request.form["record_id"])
new_status = request.form["status"]
if new_status not in ALLOWED_STATUSES:
return "Invalid status", 400
qs_client.execute_query(
branch_id=BRANCH_ID,
workspace_id=WORKSPACE_ID,
statements=[f'''
UPDATE "in.c-main"."approvals"
SET status = '{new_status}', updated_at = CURRENT_TIMESTAMP
WHERE id = {record_id}
'''],
)
# Load current records
results = qs_client.execute_query(
branch_id=BRANCH_ID,
workspace_id=WORKSPACE_ID,
statements=['SELECT id, name, status, updated_at FROM "in.c-main"."approvals" ORDER BY id'],
)
result = results[0]
column_names = [c.name for c in result.columns]
records = [dict(zip(column_names, row)) for row in result.data]
return render_template_string(TEMPLATE, records=records)
TEMPLATE = """
<!DOCTYPE html>
<html>
<head><title>Approval Manager</title></head>
<body>
<h1>Pending Approvals</h1>
<table border="1">
<tr><th>ID</th><th>Name</th><th>Status</th><th>Action</th></tr>
</table>
</body>
</html>
"""
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)
pyproject.toml:
[project]
name = "approval-app"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
"flask>=3.0.0",
"keboola-query-service>=0.2.0",
]
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
1. Handle missing workspace gracefully
import json
import os
try:
branch_id = os.environ["BRANCH_ID"]
query_service_url = os.environ["QUERY_SERVICE_URL"]
with open(os.environ["KBC_WORKSPACE_MANIFEST_PATH"]) as f:
workspace_id = json.load(f)["workspaceId"]
except (KeyError, FileNotFoundError):
# Storage Access is not enabled — show a user-friendly error
import streamlit as st # or use your framework's error handling
st.error("Storage Access is not enabled for this app. Enable it in Advanced Settings and redeploy.")
st.stop()
2. Validate and sanitize user input to prevent SQL injection
Since the Query Service accepts raw SQL strings, you must validate all user input before including it in queries:
# ❌ DANGEROUS - never do this
query = f"SELECT * FROM table WHERE id = {user_input}"
# ✅ SAFE - validate types and use allowlists
safe_id = int(user_input) # Ensure it's actually a number
query = f"SELECT * FROM table WHERE id = {safe_id}"
# ✅ For string values, use an allowlist of permitted values
ALLOWED_STATUSES = {"pending", "approved", "rejected"}
if status not in ALLOWED_STATUSES:
raise ValueError(f"Invalid status: {status}")
query = f"UPDATE table SET status = '{status}' WHERE id = {safe_id}"
ⓘ Tip: Safer interpolation helpers are coming
First-class SQL.literal() / SQL.ident() / sql.format() helpers (with dialect-aware escaping and a SafeSql trust marker) are in development in the Keboola Query Service Python SDK and JavaScript SDK and will replace the allowlist/type-coercion patterns above once a release ships. Until then, the validation approach shown here is the recommended interim solution — especially for arbitrary string input, which is genuinely hard to sanitize by hand.
3. Implement keyset pagination for large datasets
Use keyset (cursor-based) pagination instead of OFFSET, which can produce duplicates or gaps on live data:
page_size = 1000
last_id = 0 # Start from the beginning
while True:
results = client.execute_query(
branch_id=branch_id,
workspace_id=workspace_id,
statements=[f'''
SELECT * FROM "in.c-main"."my_table"
WHERE id > {last_id}
ORDER BY id ASC
LIMIT {page_size}
'''],
)
rows = results[0].data
if not rows:
break
process_batch(rows)
last_id = rows[-1][0] # Update cursor to last row's id
4. Cache frequently-used data
For Streamlit apps, use st.cache_data:
import streamlit as st
@st.cache_data(ttl=300) # Cache for 5 minutes
def load_reference_data():
results = client.execute_query(
branch_id=branch_id,
workspace_id=workspace_id,
statements=['SELECT * FROM "in.c-main"."reference_data"'],
)
result = results[0]
return pd.DataFrame(result.data, columns=[c.name for c in result.columns])
For Python/JS (non-Streamlit) apps, use a simple in-memory cache:
import time
_cache = {}
_cache_ttl = 300 # seconds
def get_cached_data(key, query_fn):
now = time.time()
if key in _cache and now - _cache[key]["ts"] < _cache_ttl:
return _cache[key]["data"]
data = query_fn()
_cache[key] = {"data": data, "ts": now}
return data
5. Track write operations
Write operations are automatically tracked by the Query Service for billing purposes. For additional application-level auditing, log to stdout (visible in Data App container logs):
import logging
logging.basicConfig(level=logging.INFO)
logging.info(f"User {current_user} updated record {record_id} to status {new_status}")
# Output goes to stdout → visible in the Terminal Log tab of your Data App