This document specifies data manipulation operations in DataJoint Python: insert, update, and delete. These operations maintain referential integrity across the pipeline while supporting the workflow normalization paradigm.
DataJoint pipelines are designed around insert and delete as the primary data manipulation operations:
Insert: Add complete entities (rows) to tables
Delete: Remove entities and all dependent data (cascading)
This design maintains referential integrity at the entity level—each row represents a complete, self-consistent unit of data.
Updates are intentionally limited to the update1() method, which modifies a single row at a time. This is by design:
- Updates bypass the normal workflow
- They can create inconsistencies with derived data
- They should be used sparingly for corrective operations
Appropriate uses of update1():
- Fixing data entry errors
- Correcting metadata after the fact
- Administrative annotations
Inappropriate uses:
- Regular workflow operations
- Batch modifications
- Anything that should trigger recomputation
When source data changes, the correct pattern is:
# 1. Delete the incorrect data (cascades to all derived tables)
(SourceTable & {"key": value}).delete()
# 2. Insert the corrected data
SourceTable.insert1(corrected_row)
# 3. Recompute derived tables
DerivedTable.populate()This ensures all derived data remains consistent with its sources.
Signature:
def insert(
self,
rows,
replace=False,
skip_duplicates=False,
ignore_extra_fields=False,
allow_direct_insert=None,
chunk_size=None,
)Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
rows |
iterable | — | Data to insert |
replace |
bool | False |
Replace existing rows with matching PK |
skip_duplicates |
bool | False |
Silently skip duplicate keys |
ignore_extra_fields |
bool | False |
Ignore fields not in table |
allow_direct_insert |
bool | None |
Allow insert into auto-populated tables |
chunk_size |
int | None |
Insert in batches of this size |
| Format | Example |
|---|---|
| List of dicts | [{"id": 1, "name": "Alice"}, ...] |
| pandas DataFrame | pd.DataFrame({"id": [1, 2], "name": ["A", "B"]}) |
| polars DataFrame | pl.DataFrame({"id": [1, 2], "name": ["A", "B"]}) |
| numpy structured array | np.array([(1, "A")], dtype=[("id", int), ("name", "U10")]) |
| QueryExpression | OtherTable.proj(...) (INSERT...SELECT) |
| Path to CSV | Path("data.csv") |
# Single row
Subject.insert1({"subject_id": 1, "name": "Mouse001", "dob": "2024-01-15"})
# Multiple rows
Subject.insert([
{"subject_id": 1, "name": "Mouse001", "dob": "2024-01-15"},
{"subject_id": 2, "name": "Mouse002", "dob": "2024-01-16"},
])
# From DataFrame
df = pd.DataFrame({"subject_id": [1, 2], "name": ["M1", "M2"], "dob": ["2024-01-15", "2024-01-16"]})
Subject.insert(df)
# From query (INSERT...SELECT)
ActiveSubjects.insert(Subject & "status = 'active'")# Error on duplicate (default)
Subject.insert1({"subject_id": 1, ...}) # Raises DuplicateError if exists
# Skip duplicates silently
Subject.insert(rows, skip_duplicates=True)
# Replace existing rows
Subject.insert(rows, replace=True)Difference between skip and replace:
skip_duplicates: Keeps existing row unchangedreplace: Overwrites existing row with new values
# Error on extra fields (default)
Subject.insert1({"subject_id": 1, "unknown_field": "x"}) # Raises error
# Ignore extra fields
Subject.insert1({"subject_id": 1, "unknown_field": "x"}, ignore_extra_fields=True)Computed and Imported tables normally only accept inserts from their make() method:
# Raises DataJointError by default
ComputedTable.insert1({"key": 1, "result": 42})
# Explicit override
ComputedTable.insert1({"key": 1, "result": 42}, allow_direct_insert=True)For large datasets, insert in batches:
# Insert 10,000 rows at a time
Subject.insert(large_dataset, chunk_size=10000)Each chunk is a separate transaction. If interrupted, completed chunks persist.
Convenience wrapper for single-row inserts:
def insert1(self, row, **kwargs)Equivalent to insert((row,), **kwargs).
For large objects (Zarr arrays, HDF5 files), use staged insert to write directly to object storage:
with table.staged_insert1 as staged:
# Set primary key and metadata
staged.rec["session_id"] = 123
staged.rec["timestamp"] = datetime.now()
# Write large data directly to storage
zarr_path = staged.store("raw_data", ".zarr")
z = zarr.open(zarr_path, mode="w")
z[:] = large_array
staged.rec["raw_data"] = z
# Row automatically inserted on successful exit
# Storage cleaned up if exception occursSignature:
def update1(self, row: dict) -> NoneParameters:
row: Dictionary containing all primary key values plus attributes to update
# Update a single attribute
Subject.update1({"subject_id": 1, "name": "NewName"})
# Update multiple attributes
Subject.update1({
"subject_id": 1,
"name": "NewName",
"notes": "Updated on 2024-01-15"
})- Complete primary key: All PK attributes must be provided
- Exactly one match: Must match exactly one existing row
- No restrictions: Cannot call on restricted table
# Error: incomplete primary key
Subject.update1({"name": "NewName"})
# Error: row doesn't exist
Subject.update1({"subject_id": 999, "name": "Ghost"})
# Error: cannot update restricted table
(Subject & "subject_id > 10").update1({...})Setting an attribute to None resets it to its default value:
# Reset 'notes' to its default (NULL if nullable)
Subject.update1({"subject_id": 1, "notes": None})Appropriate:
# Fix a typo in metadata
Subject.update1({"subject_id": 1, "name": "Mouse001"}) # Was "Mous001"
# Add a note to an existing record
Session.update1({"session_id": 5, "notes": "Excluded from analysis"})Inappropriate (use delete + insert + populate instead):
# DON'T: Update source data that affects computed results
Trial.update1({"trial_id": 1, "stimulus": "new_stim"}) # Computed tables now stale!
# DO: Delete and recompute
(Trial & {"trial_id": 1}).delete() # Cascades to computed tables
Trial.insert1({"trial_id": 1, "stimulus": "new_stim"})
ComputedResults.populate()DataJoint intentionally does not provide update() for multiple rows:
- Consistency: Bulk updates easily create inconsistencies with derived data
- Auditability: Single-row updates are explicit and traceable
- Workflow: The insert/delete pattern maintains referential integrity
If you need to update many rows, iterate explicitly:
for key in (Subject & condition).keys():
Subject.update1({**key, "status": "archived"})Signature:
def delete(
self,
transaction: bool = True,
prompt: bool | None = None,
part_integrity: str = "enforce",
) -> intParameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
transaction |
bool | True |
Wrap in atomic transaction |
prompt |
bool | None |
Prompt for confirmation (default: config setting) |
part_integrity |
str | "enforce" |
Master-part integrity policy (see below) |
part_integrity values:
| Value | Behavior |
|---|---|
"enforce" |
Error if parts would be deleted without masters |
"ignore" |
Allow deleting parts without masters (breaks integrity) |
"cascade" |
Also delete masters when parts are deleted |
Returns: Number of deleted rows from the primary table.
Delete automatically cascades to all dependent tables:
# Deleting a subject deletes all their sessions, trials, and computed results
(Subject & {"subject_id": 1}).delete()Cascade order:
- Identify all tables with foreign keys referencing target
- Recursively delete matching rows in child tables
- Delete rows in target table
!!! version-added "New in 2.2"
Table.delete() now uses graph-driven cascade internally via dj.Diagram. User-facing behavior is unchanged — the same parameters and return values apply. For direct control over the cascade (preview, multi-schema operations), use the Diagram operational methods.
# Delete specific rows
(Subject & {"subject_id": 1}).delete()
# Delete matching a condition
(Session & "session_date < '2024-01-01'").delete()
# Delete all rows (use with caution!)
Subject.delete()When prompt=True (default from config):
About to delete:
Subject: 1 rows
Session: 5 rows
Trial: 150 rows
ProcessedData: 150 rows
Commit deletes? [yes, No]:
Disable for automated scripts:
Subject.delete(prompt=False)# Atomic delete (default) - all or nothing
(Subject & condition).delete(transaction=True)
# Non-transactional (for nested transactions)
(Subject & condition).delete(transaction=False)Cannot delete from part tables without deleting from master (by default):
# Error: cannot delete part without master
Session.Recording.delete()
# Allow breaking master-part integrity
Session.Recording.delete(part_integrity="ignore")
# Delete parts AND cascade up to delete master
Session.Recording.delete(part_integrity="cascade")part_integrity parameter:
| Value | Behavior |
|---|---|
"enforce" |
(default) Error if parts would be deleted without masters |
"ignore" |
Allow deleting parts without masters (breaks integrity) |
"cascade" |
Also delete masters when parts are deleted (maintains integrity) |
Fast delete without cascade or confirmation:
def delete_quick(self, get_count: bool = False) -> int | NoneUse cases:
- Internal cleanup
- Tables with no dependents
- When you've already handled dependencies
Behavior:
- No cascade to child tables
- No user confirmation
- Fails on FK constraint violation
# Quick delete (fails if has dependents)
(TempTable & condition).delete_quick()
# Get count of deleted rows
n = (TempTable & condition).delete_quick(get_count=True)Pre-validate rows before insertion:
def validate(self, rows, *, ignore_extra_fields=False) -> ValidationResultReturns: ValidationResult with:
is_valid: Boolean indicating all rows passederrors: List of (row_idx, field_name, error_message)rows_checked: Number of rows validated
result = Subject.validate(rows)
if result:
Subject.insert(rows)
else:
print(result.summary())
# Row 3, field 'dob': Invalid date format
# Row 7, field 'subject_id': Missing required field| Check | Description |
|---|---|
| Field existence | All fields must exist in table |
| NULL constraints | Required fields must have values |
| Primary key completeness | All PK fields must be present |
| UUID format | Valid UUID string or object |
| JSON serializability | JSON fields must be serializable |
| Codec validation | Custom type validation via codecs |
These constraints are only checked at database level:
- Foreign key references
- Unique constraints (beyond PK)
- Custom CHECK constraints
Part tables are inserted via their master:
@schema
class Session(dj.Manual):
definition = """
session_id : int
---
date : date
"""
class Recording(dj.Part):
definition = """
-> master
recording_id : int
---
duration : float
"""
# Insert master with parts
Session.insert1({"session_id": 1, "date": "2024-01-15"})
Session.Recording.insert([
{"session_id": 1, "recording_id": 1, "duration": 60.0},
{"session_id": 1, "recording_id": 2, "duration": 45.5},
])Deleting master cascades to parts:
# Deletes session AND all its recordings
(Session & {"session_id": 1}).delete()Cannot delete parts independently (by default):
# Error
Session.Recording.delete()
# Allow breaking master-part integrity
Session.Recording.delete(part_integrity="ignore")
# Or cascade up to also delete master
Session.Recording.delete(part_integrity="cascade")Single operations are atomic:
Subject.insert1(row) # Atomic
Subject.update1(row) # Atomic
Subject.delete() # Atomic (by default)For multi-table operations:
with dj.conn().transaction:
Parent.insert1(parent_row)
Child.insert(child_rows)
# Commits on successful exit
# Rolls back on exceptionWith chunk_size, each chunk is a separate transaction:
# Each chunk of 1000 rows commits independently
Subject.insert(large_dataset, chunk_size=1000)If interrupted, completed chunks persist.
| Error | Cause | Resolution |
|---|---|---|
DuplicateError |
Primary key already exists | Use skip_duplicates=True or replace=True |
IntegrityError |
Foreign key constraint violated | Insert parent rows first |
MissingAttributeError |
Required field not provided | Include all required fields |
UnknownAttributeError |
Field not in table | Use ignore_extra_fields=True or fix field name |
DataJointError |
Various validation failures | Check error message for details |
try:
Subject.insert(rows)
except dj.errors.DuplicateError as e:
# Handle specific duplicate
print(f"Duplicate: {e}")
except dj.errors.IntegrityError as e:
# Missing parent reference
print(f"Missing parent: {e}")
except dj.DataJointError as e:
# Other DataJoint errors
print(f"Error: {e}")# Good: Delete and reinsert
(Trial & key).delete()
Trial.insert1(corrected_trial)
DerivedTable.populate()
# Avoid: Update that creates stale derived data
Trial.update1({**key, "value": new_value}) # Derived tables now inconsistent!result = Subject.validate(rows)
if not result:
raise ValueError(result.summary())
Subject.insert(rows)with dj.conn().transaction:
session_key = Session.insert1(session_data, skip_duplicates=True)
Session.Recording.insert(recordings)
Session.Stimulus.insert(stimuli)# Good: Single insert call
Subject.insert(all_rows)
# Avoid: Loop of insert1 calls
for row in all_rows:
Subject.insert1(row) # Slow!# Always use prompt in interactive sessions
(Subject & condition).delete(prompt=True)
# Disable only in tested automated scripts
(Subject & condition).delete(prompt=False)| Operation | Method | Cascades | Transaction | Typical Use |
|---|---|---|---|---|
| Insert one | insert1() |
— | Implicit | Adding single entity |
| Insert many | insert() |
— | Per-chunk | Bulk data loading |
| Insert large object | staged_insert1 |
— | On exit | Zarr, HDF5 files |
| Update one | update1() |
— | Implicit | Surgical corrections |
| Delete | delete() |
Yes | Optional | Removing entities |
| Delete quick | delete_quick() |
No | No | Internal cleanup |
| Validate | validate() |
— | — | Pre-insert check |