-
Notifications
You must be signed in to change notification settings - Fork 106
Expand file tree
/
Copy pathroute_backend_documents.py
More file actions
957 lines (834 loc) · 43 KB
/
route_backend_documents.py
File metadata and controls
957 lines (834 loc) · 43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
# route_backend_documents.py
from config import *
from functions_authentication import *
from functions_documents import *
from functions_settings import *
from utils_cache import invalidate_personal_search_cache
from functions_debug import *
from functions_activity_logging import log_document_upload, log_document_metadata_update_transaction
import os
import requests
from flask import current_app
from swagger_wrapper import swagger_route, get_auth_security
from functions_debug import debug_print
def register_route_backend_documents(app):
@app.route('/api/get_file_content', methods=['POST'])
@swagger_route(security=get_auth_security())
@login_required
@user_required
@enabled_required("enable_user_workspace")
def get_file_content():
data = request.get_json()
user_id = get_current_user_id()
conversation_id = data.get('conversation_id')
file_id = data.get('file_id')
debug_print(f"[GET_FILE_CONTENT] Starting - user_id={user_id}, conversation_id={conversation_id}, file_id={file_id}")
if not user_id:
debug_print(f"[GET_FILE_CONTENT] ERROR: User not authenticated")
return jsonify({'error': 'User not authenticated'}), 401
if not conversation_id or not file_id:
debug_print(f"[GET_FILE_CONTENT] ERROR: Missing conversation_id or file_id")
return jsonify({'error': 'Missing conversation_id or id'}), 400
try:
_ = cosmos_conversations_container.read_item(
item=conversation_id,
partition_key=conversation_id
)
except CosmosResourceNotFoundError:
return jsonify({'error': 'Conversation not found'}), 404
except Exception as e:
return jsonify({'error': f'Error reading conversation: {str(e)}'}), 500
add_file_task_to_file_processing_log(document_id=file_id, user_id=user_id, content="Conversation exists, retrieving file content")
try:
query_str = """
SELECT * FROM c
WHERE c.conversation_id = @conversation_id
AND c.id = @file_id
"""
items = list(cosmos_messages_container.query_items(
query=query_str,
parameters=[
{'name': '@conversation_id', 'value': conversation_id},
{'name': '@file_id', 'value': file_id}
],
partition_key=conversation_id
))
if not items:
add_file_task_to_file_processing_log(document_id=file_id, user_id=user_id, content="File not found in conversation")
return jsonify({'error': 'File not found in conversation'}), 404
debug_print(f"[GET_FILE_CONTENT] Found {len(items)} items for file_id={file_id}")
debug_print(f"[GET_FILE_CONTENT] First item structure: {json.dumps(items[0], default=str, indent=2)}")
add_file_task_to_file_processing_log(document_id=file_id, user_id=user_id, content="File found, processing content: " + str(items))
items_sorted = sorted(items, key=lambda x: x.get('chunk_index', 0))
filename = items_sorted[0].get('filename', 'Untitled')
is_table = items_sorted[0].get('is_table', False)
debug_print(f"[GET_FILE_CONTENT] Filename: {filename}, is_table: {is_table}")
add_file_task_to_file_processing_log(document_id=file_id, user_id=user_id, content="Combining file content from chunks, filename: " + filename + ", is_table: " + str(is_table))
combined_parts = []
for idx, it in enumerate(items_sorted):
fc = it.get('file_content', '')
debug_print(f"[GET_FILE_CONTENT] Chunk {idx}: file_content type={type(fc).__name__}, len={len(fc) if hasattr(fc, '__len__') else 'N/A'}")
if isinstance(fc, list):
debug_print(f"[GET_FILE_CONTENT] Processing list of {len(fc)} items")
# If file_content is a list of dicts, join their 'content' fields
text_chunks = []
for chunk_idx, chunk in enumerate(fc):
debug_print(f"[GET_FILE_CONTENT] List item {chunk_idx} type: {type(chunk).__name__}")
if isinstance(chunk, dict):
text_chunks.append(chunk.get('content', ''))
elif isinstance(chunk, str):
text_chunks.append(chunk)
else:
debug_print(f"[GET_FILE_CONTENT] Unexpected chunk type in list: {type(chunk).__name__}")
combined_parts.append("\n".join(text_chunks))
elif isinstance(fc, str):
debug_print(f"[GET_FILE_CONTENT] Processing string content")
# If it's already a string, just append
combined_parts.append(fc)
else:
# If it's neither a list nor a string, handle as needed (e.g., skip or log)
debug_print(f"[GET_FILE_CONTENT] WARNING: Unexpected file_content type: {type(fc).__name__}, value: {fc}")
pass
combined_content = "\n".join(combined_parts)
debug_print(f"[GET_FILE_CONTENT] Combined content length: {len(combined_content)}")
if not combined_content:
add_file_task_to_file_processing_log(document_id=file_id, user_id=user_id, content="Combined file content is empty")
debug_print(f"[GET_FILE_CONTENT] ERROR: Combined content is empty")
return jsonify({'error': 'File content not found'}), 404
debug_print(f"[GET_FILE_CONTENT] Successfully returning file content")
return jsonify({
'file_content': combined_content,
'filename': filename,
'is_table': is_table
}), 200
except Exception as e:
debug_print(f"[GET_FILE_CONTENT] EXCEPTION: {str(e)}")
debug_print(f"[GET_FILE_CONTENT] Traceback: {traceback.format_exc()}")
add_file_task_to_file_processing_log(document_id=file_id, user_id=user_id, content="Error retrieving file content: " + str(e))
return jsonify({'error': f'Error retrieving file content: {str(e)}'}), 500
@app.route('/api/documents/upload', methods=['POST'])
@swagger_route(security=get_auth_security())
@login_required
@user_required
@file_upload_required
@enabled_required("enable_user_workspace")
def api_user_upload_document():
user_id = get_current_user_id()
if not user_id:
return jsonify({'error': 'User not authenticated'}), 401
if 'file' not in request.files:
return jsonify({'error': 'No file part in the request'}), 400 # Changed error message slightly
files = request.files.getlist('file') # Handle multiple files potentially
if not files or all(not f.filename for f in files):
return jsonify({'error': 'No file selected or files have no name'}), 400
processed_docs = []
upload_errors = []
for file in files:
if not file.filename:
upload_errors.append(f"Skipped a file with no name.")
continue
# --- CHANGE: Use original filename directly ---
original_filename = file.filename
# Keep secure_filename ONLY for creating the temporary file path suffix
# to avoid issues with OS path characters, BUT DO NOT use its output elsewhere.
safe_suffix_filename = secure_filename(original_filename)
file_ext = os.path.splitext(safe_suffix_filename)[1].lower() # Get extension from safely-suffixed name for temp file
# --- CHANGE: Validate using the original filename ---
if not allowed_file(original_filename):
upload_errors.append(f"File type not allowed for: {original_filename}")
continue
# --- Check extension existence from original filename ---
if not os.path.splitext(original_filename)[1]:
upload_errors.append(f"Could not determine file extension for: {original_filename}")
continue
# 1) Save the file temporarily
parent_document_id = str(uuid.uuid4())
temp_file_path = None # Initialize
try:
# The user can configure the app service to use azure storage for temp files,
# Check if the 'sc-temp-files' folder exists, and if so, use it.
# Otherwise, use the default system temp directory.
sc_temp_files_dir = "/sc-temp-files" if os.path.exists("/sc-temp-files") else ""
# Use NamedTemporaryFile for automatic cleanup, generate safe suffix
with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext, dir=sc_temp_files_dir) as tmp_file:
file.save(tmp_file.name)
temp_file_path = tmp_file.name
except Exception as e:
upload_errors.append(f"Failed to save temporary file for {original_filename}: {e}")
if temp_file_path and os.path.exists(temp_file_path):
os.remove(temp_file_path) # Clean up if partially created
continue # Skip this file
try:
# 2) Create the Cosmos metadata with status="Queued"
# --- CHANGE: Use original_filename for file_name ---
create_document(
file_name=original_filename,
user_id=user_id,
document_id=parent_document_id,
num_file_chunks=0, # This likely gets updated later
status="Queued for processing"
)
# (Optional) set initial percentage
update_document(
document_id=parent_document_id,
user_id=user_id,
percentage_complete=0
)
# 3) Now run heavy-lifting in a background thread
# --- CHANGE: Pass original_filename ---
future = current_app.extensions['executor'].submit_stored(
parent_document_id,
process_document_upload_background,
document_id=parent_document_id,
user_id=user_id,
temp_file_path=temp_file_path,
original_filename=original_filename
)
processed_docs.append({'document_id': parent_document_id, 'filename': original_filename})
# Log document upload activity
try:
# Get file size from the original file object before it's processed
file_size = 0
try:
file.seek(0, 2) # Seek to end
file_size = file.tell()
file.seek(0) # Reset to beginning
except:
file_size = 0
log_document_upload(
user_id=user_id,
container_type='personal',
document_id=parent_document_id,
file_size=file_size,
file_type=file_ext
)
except Exception as log_error:
# Don't let activity logging errors interrupt upload flow
print(f"Activity logging error for document upload: {log_error}")
except Exception as e:
upload_errors.append(f"Failed to queue processing for {original_filename}: {e}")
# Clean up temp file if queuing failed after saving
if temp_file_path and os.path.exists(temp_file_path):
os.remove(temp_file_path)
# 4) Return immediately to the user with doc IDs and any errors
response_status = 200 if processed_docs and not upload_errors else 207 # Multi-Status if partial success/errors
if not processed_docs and upload_errors: response_status = 400 # Bad Request if all failed
# Invalidate search cache for this user since documents were added
if processed_docs:
invalidate_personal_search_cache(user_id)
# NOTE: For workspace uploads, we do NOT create conversations or chat messages.
# Files uploaded to workspaces are for document storage/management, not for immediate chat interaction.
# Users can later search these documents in chat if needed.
return jsonify({
'message': f'Processed {len(processed_docs)} file(s). Check status periodically.',
'document_ids': [doc['document_id'] for doc in processed_docs],
'processed_filenames': [doc['filename'] for doc in processed_docs],
'errors': upload_errors
}), response_status
@app.route('/api/documents', methods=['GET'])
@swagger_route(security=get_auth_security())
@login_required
@user_required
@enabled_required("enable_user_workspace")
def api_get_user_documents():
user_id = get_current_user_id()
if not user_id:
return jsonify({'error': 'User not authenticated'}), 401
# --- 1) Read pagination and filter parameters ---
page = request.args.get('page', default=1, type=int)
page_size = request.args.get('page_size', default=10, type=int)
search_term = request.args.get('search', default=None, type=str)
classification_filter = request.args.get('classification', default=None, type=str)
author_filter = request.args.get('author', default=None, type=str)
keywords_filter = request.args.get('keywords', default=None, type=str)
abstract_filter = request.args.get('abstract', default=None, type=str)
# Ensure page and page_size are positive
if page < 1: page = 1
if page_size < 1: page_size = 10
# Limit page size to prevent abuse? (Optional)
# page_size = min(page_size, 100)
# --- 2) Build dynamic WHERE clause and parameters ---
# Include documents owned by user OR shared with user via shared_user_ids
query_conditions = ["(c.user_id = @user_id OR ARRAY_CONTAINS(c.shared_user_ids, @user_id))"]
query_params = [{"name": "@user_id", "value": user_id}]
param_count = 0 # To generate unique parameter names
# Add user_id prefix for shared_user_ids with status
user_id_prefix = f"{user_id},"
query_params.append({"name": "@user_id_prefix", "value": user_id_prefix})
# Replace the main ownership/shared condition
query_conditions[0] = (
"(c.user_id = @user_id "
"OR ARRAY_CONTAINS(c.shared_user_ids, @user_id) "
"OR EXISTS(SELECT VALUE s FROM s IN c.shared_user_ids WHERE STARTSWITH(s, @user_id_prefix)))"
)
# General Search (File Name / Title)
if search_term:
param_name = f"@search_term_{param_count}"
# Case-insensitive search using LOWER and CONTAINS
query_conditions.append(f"(CONTAINS(LOWER(c.file_name ?? ''), LOWER({param_name})) OR CONTAINS(LOWER(c.title ?? ''), LOWER({param_name})))")
query_params.append({"name": param_name, "value": search_term})
param_count += 1
# Classification Filter
if classification_filter:
param_name = f"@classification_{param_count}"
if classification_filter.lower() == 'none':
# Filter for documents where classification is null, undefined, or empty string
query_conditions.append(f"(NOT IS_DEFINED(c.document_classification) OR c.document_classification = null OR c.document_classification = '')")
# No parameter needed for this specific condition
else:
query_conditions.append(f"c.document_classification = {param_name}")
query_params.append({"name": param_name, "value": classification_filter})
param_count += 1
# Author Filter (Assuming 'authors' is an array of strings)
if author_filter:
param_name = f"@author_{param_count}"
# Use ARRAY_CONTAINS for searching within the authors array (case-insensitive)
# Note: This checks if the array *contains* the exact author string.
# Case-insensitive substring match for any author
query_conditions.append(f"EXISTS(SELECT VALUE a FROM a IN c.authors WHERE CONTAINS(LOWER(a), LOWER({param_name})))")
query_params.append({"name": param_name, "value": author_filter})
param_count += 1
# Keywords Filter (Assuming 'keywords' is an array of strings)
if keywords_filter:
param_name = f"@keywords_{param_count}"
# Case-insensitive substring match for any keyword
query_conditions.append(f"EXISTS(SELECT VALUE k FROM k IN c.keywords WHERE CONTAINS(LOWER(k), LOWER({param_name})))")
query_params.append({"name": param_name, "value": keywords_filter})
param_count += 1
# Abstract Filter
if abstract_filter:
param_name = f"@abstract_{param_count}"
# Case-insensitive search using LOWER and CONTAINS
query_conditions.append(f"CONTAINS(LOWER(c.abstract ?? ''), LOWER({param_name}))")
query_params.append({"name": param_name, "value": abstract_filter})
param_count += 1
# Combine conditions into the WHERE clause
where_clause = " AND ".join(query_conditions)
# --- 3) First query: get total count based on filters ---
try:
count_query_str = f"SELECT VALUE COUNT(1) FROM c WHERE {where_clause}"
# debug_print(f"Count Query: {count_query_str}") # Optional Debugging
# debug_print(f"Count Params: {query_params}") # Optional Debugging
count_items = list(cosmos_user_documents_container.query_items(
query=count_query_str,
parameters=query_params,
enable_cross_partition_query=True # May be needed if user_id is not partition key
))
total_count = count_items[0] if count_items else 0
except Exception as e:
print(f"Error executing count query: {e}") # Log the error
return jsonify({"error": f"Error counting documents: {str(e)}"}), 500
# --- 4) Second query: fetch the page of data based on filters ---
try:
offset = (page - 1) * page_size
# Note: ORDER BY c._ts DESC to show newest first
data_query_str = f"""
SELECT *
FROM c
WHERE {where_clause}
ORDER BY c._ts DESC
OFFSET {offset} LIMIT {page_size}
"""
# debug_print(f"Data Query: {data_query_str}") # Optional Debugging
# debug_print(f"Data Params: {query_params}") # Optional Debugging
docs = list(cosmos_user_documents_container.query_items(
query=data_query_str,
parameters=query_params,
enable_cross_partition_query=True # May be needed if user_id is not partition key
))
# Add shared_approval_status and owner_id for each doc
for doc in docs:
doc["owner_id"] = doc.get("user_id") # Always set owner_id to the original user_id
if doc.get("user_id") == user_id:
doc["shared_approval_status"] = "owner"
else:
# Find entry for this user in shared_user_ids
status = None
for entry in doc.get("shared_user_ids", []):
if entry.startswith(f"{user_id},"):
status = entry.split(",", 1)[1]
break
doc["shared_approval_status"] = status or "none"
except Exception as e:
print(f"Error executing data query: {e}") # Log the error
return jsonify({"error": f"Error fetching documents: {str(e)}"}), 500
# --- new: do we have any legacy documents? ---
try:
legacy_q = """
SELECT VALUE COUNT(1)
FROM c
WHERE c.user_id = @user_id
AND NOT IS_DEFINED(c.percentage_complete)
"""
legacy_docs = list(
cosmos_user_documents_container.query_items(
query=legacy_q,
parameters=[{"name":"@user_id","value":user_id}],
enable_cross_partition_query=True
)
)
legacy_count = legacy_docs[0] if legacy_docs else 0
except Exception as e:
print(f"Error executing legacy query: {e}")
# --- 5) Return results ---
return jsonify({
"documents": docs,
"page": page,
"page_size": page_size,
"total_count": total_count,
"needs_legacy_update_check": legacy_count > 0
}), 200
@app.route('/api/documents/<document_id>', methods=['GET'])
@swagger_route(security=get_auth_security())
@login_required
@user_required
@enabled_required("enable_user_workspace")
def api_get_user_document(document_id):
user_id = get_current_user_id()
if not user_id:
return jsonify({'error': 'User not authenticated'}), 401
return get_document(user_id, document_id)
@app.route('/api/documents/<document_id>', methods=['PATCH'])
@swagger_route(security=get_auth_security())
@login_required
@user_required
@enabled_required("enable_user_workspace")
def api_patch_user_document(document_id):
user_id = get_current_user_id()
if not user_id:
return jsonify({'error': 'User not authenticated'}), 401
data = request.get_json() # new metadata values from the client
# Track which fields were updated
updated_fields = {}
# Update allowed fields
# You can decide which fields can be updated from the client
if 'title' in data:
update_document(
document_id=document_id,
user_id=user_id,
title=data['title']
)
updated_fields['title'] = data['title']
if 'abstract' in data:
update_document(
document_id=document_id,
user_id=user_id,
abstract=data['abstract']
)
updated_fields['abstract'] = data['abstract']
if 'keywords' in data:
# Expect a list or a comma-delimited string
if isinstance(data['keywords'], list):
update_document(
document_id=document_id,
user_id=user_id,
keywords=data['keywords']
)
updated_fields['keywords'] = data['keywords']
else:
# if client sends a comma-separated string of keywords
keywords_list = [kw.strip() for kw in data['keywords'].split(',')]
update_document(
document_id=document_id,
user_id=user_id,
keywords=keywords_list
)
updated_fields['keywords'] = keywords_list
if 'publication_date' in data:
update_document(
document_id=document_id,
user_id=user_id,
publication_date=data['publication_date']
)
updated_fields['publication_date'] = data['publication_date']
if 'document_classification' in data:
update_document(
document_id=document_id,
user_id=user_id,
document_classification=data['document_classification']
)
updated_fields['document_classification'] = data['document_classification']
# Add authors if you want to allow editing that
if 'authors' in data:
# if you want a list, or just store a string
# here is one approach:
if isinstance(data['authors'], list):
update_document(
document_id=document_id,
user_id=user_id,
authors=data['authors']
)
updated_fields['authors'] = data['authors']
else:
authors_list = [data['authors']]
update_document(
document_id=document_id,
user_id=user_id,
authors=authors_list
)
updated_fields['authors'] = authors_list
# Save updates back to Cosmos
try:
# Log the metadata update transaction if any fields were updated
if updated_fields:
# Get document details for logging - handle tuple return
doc_response = get_document(user_id, document_id)
doc = None
# Handle tuple return (response, status_code)
if isinstance(doc_response, tuple):
resp, status_code = doc_response
if hasattr(resp, "get_json"):
doc = resp.get_json()
else:
doc = resp
elif hasattr(doc_response, "get_json"):
doc = doc_response.get_json()
else:
doc = doc_response
if doc and isinstance(doc, dict):
log_document_metadata_update_transaction(
user_id=user_id,
document_id=document_id,
workspace_type='personal',
file_name=doc.get('file_name', 'Unknown'),
updated_fields=updated_fields,
file_type=doc.get('file_type')
)
return jsonify({'message': 'Document metadata updated successfully'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/documents/<document_id>', methods=['DELETE'])
@swagger_route(security=get_auth_security())
@login_required
@user_required
@enabled_required("enable_user_workspace")
def api_delete_user_document(document_id):
user_id = get_current_user_id()
if not user_id:
return jsonify({'error': 'User not authenticated'}), 401
try:
delete_document(user_id, document_id)
delete_document_chunks(document_id)
# Invalidate search cache since document was deleted
invalidate_personal_search_cache(user_id)
return jsonify({'message': 'Document deleted successfully'}), 200
except Exception as e:
return jsonify({'error': f'Error deleting document: {str(e)}'}), 500
@app.route('/api/documents/<document_id>/extract_metadata', methods=['POST'])
@swagger_route(security=get_auth_security())
@login_required
@user_required
@enabled_required("enable_user_workspace")
def api_extract_user_metadata(document_id):
"""
POST /api/documents/<document_id>/extract_metadata
Queues a background job that calls extract_document_metadata()
and updates the document in Cosmos DB with the new metadata.
"""
user_id = get_current_user_id()
if not user_id:
return jsonify({'error': 'User not authenticated'}), 401
settings = get_settings()
if not settings.get('enable_extract_meta_data'):
return jsonify({'error': 'Metadata extraction not enabled'}), 403
# Queue the background task and store with tracking key
future = current_app.extensions['executor'].submit_stored(
f"{document_id}_metadata",
process_metadata_extraction_background,
document_id=document_id,
user_id=user_id
)
# Return an immediate response to the user
return jsonify({
'message': 'Metadata extraction has been queued. Check document status periodically.',
'document_id': document_id
}), 200
@app.route("/api/get_citation", methods=["POST"])
@swagger_route(security=get_auth_security())
@login_required
@user_required
def get_citation():
data = request.get_json()
user_id = get_current_user_id()
citation_id = data.get("citation_id")
if not user_id:
return jsonify({"error": "User not authenticated"}), 401
if not citation_id:
return jsonify({"error": "Missing citation_id"}), 400
try:
search_client_user = CLIENTS['search_client_user']
chunk = search_client_user.get_document(key=citation_id)
# Check if user owns the document or if document is shared with user
chunk_user_id = chunk.get("user_id")
chunk_shared_user_ids = chunk.get("shared_user_ids", [])
# Allow access if user is owner or in shared_user_ids (prefix match)
is_shared = any(
entry == user_id or entry.startswith(f"{user_id},")
for entry in chunk_shared_user_ids
)
if chunk_user_id != user_id and not is_shared:
return jsonify({"error": "Unauthorized access to citation"}), 403
return jsonify({
"cited_text": chunk.get("chunk_text", ""),
"file_name": chunk.get("file_name", ""),
"page_number": chunk.get("chunk_sequence", 0)
}), 200
except ResourceNotFoundError:
pass
try:
search_client_group = CLIENTS['search_client_group']
group_chunk = search_client_group.get_document(key=citation_id)
return jsonify({
"cited_text": group_chunk.get("chunk_text", ""),
"file_name": group_chunk.get("file_name", ""),
"page_number": group_chunk.get("chunk_sequence", 0)
}), 200
except ResourceNotFoundError:
pass
try:
search_client_public = CLIENTS['search_client_public']
public_chunk = search_client_public.get_document(key=citation_id)
return jsonify({
"cited_text": public_chunk.get("chunk_text", ""),
"file_name": public_chunk.get("file_name", ""),
"page_number": public_chunk.get("chunk_sequence", 0)
}), 200
except ResourceNotFoundError:
return jsonify({"error": "Citation not found in user, group, or public docs"}), 404
except Exception as e:
return jsonify({"error": f"Unexpected error: {str(e)}"}), 500
@app.route('/api/documents/upgrade_legacy', methods=['POST'])
@swagger_route(security=get_auth_security())
@login_required
@user_required
@enabled_required("enable_user_workspace")
def api_upgrade_legacy_user_documents():
user_id = get_current_user_id()
# returns how many docs were updated
count = upgrade_legacy_documents(user_id)
return jsonify({
"message": f"Upgraded {count} document(s) to the new format."
}), 200
# Document Sharing API Endpoints
@app.route('/api/documents/<document_id>/share', methods=['POST'])
@swagger_route(security=get_auth_security())
@login_required
@user_required
@enabled_required("enable_user_workspace")
def api_share_document(document_id):
"""Share a document with a user"""
user_id = get_current_user_id()
if not user_id:
return jsonify({'error': 'User not authenticated'}), 401
data = request.get_json()
target_user_id = data.get('user_id')
if not target_user_id:
return jsonify({'error': 'user_id is required'}), 400
try:
# Check if user owns the document
doc = get_document(user_id, document_id)
if not doc:
return jsonify({'error': 'Document not found or access denied'}), 404
# Share the document
success = share_document_with_user(document_id, user_id, target_user_id)
if success:
# Invalidate cache for both owner and target user
invalidate_personal_search_cache(user_id)
invalidate_personal_search_cache(target_user_id)
return jsonify({'message': 'Document shared successfully'}), 200
else:
return jsonify({'error': 'Failed to share document'}), 500
except Exception as e:
return jsonify({'error': f'Error sharing document: {str(e)}'}), 500
@app.route('/api/documents/<document_id>/unshare', methods=['DELETE'])
@swagger_route(security=get_auth_security())
@login_required
@user_required
@enabled_required("enable_user_workspace")
def api_unshare_document(document_id):
"""Remove sharing of a document from a user"""
user_id = get_current_user_id()
if not user_id:
return jsonify({'error': 'User not authenticated'}), 401
data = request.get_json()
target_user_id = data.get('user_id')
if not target_user_id:
return jsonify({'error': 'user_id is required'}), 400
try:
# Check if user owns the document
doc = get_document(user_id, document_id)
if not doc:
return jsonify({'error': 'Document not found or access denied'}), 404
# Unshare the document
success = unshare_document_from_user(document_id, user_id, target_user_id)
if success:
# Invalidate cache for both owner and target user
invalidate_personal_search_cache(user_id)
invalidate_personal_search_cache(target_user_id)
return jsonify({'message': 'Document unshared successfully'}), 200
else:
return jsonify({'error': 'Failed to unshare document'}), 500
except Exception as e:
return jsonify({'error': f'Error unsharing document: {str(e)}'}), 500
@app.route('/api/documents/<document_id>/shared-users', methods=['GET'])
@swagger_route(security=get_auth_security())
@login_required
@user_required
@enabled_required("enable_user_workspace")
def api_get_shared_users(document_id):
"""Get list of users a document is shared with, including approval status"""
user_id = get_current_user_id()
if not user_id:
return jsonify({'error': 'User not authenticated'}), 401
try:
# Check if user owns the document
doc = get_document(user_id, document_id)
if not doc:
return jsonify({'error': 'Document not found or access denied'}), 404
# Get shared users (now returns [{'id': oid, 'approval_status': status}, ...])
shared_user_objs = get_shared_users_for_document(document_id, user_id)
# Get user details from Microsoft Graph
shared_users = []
if shared_user_objs:
access_token = get_valid_access_token()
if access_token:
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
for entry in shared_user_objs:
oid = entry['id']
approval_status = entry.get('approval_status', 'unknown')
try:
# Get user details from Microsoft Graph
graph_url = f"https://graph.microsoft.com/v1.0/users/{oid}"
response = requests.get(graph_url, headers=headers)
if response.status_code == 200:
user_data = response.json()
shared_users.append({
'id': oid,
'approval_status': approval_status,
'displayName': user_data.get('displayName', 'Unknown User'),
'email': user_data.get('mail') or user_data.get('userPrincipalName', '')
})
else:
# If we can't get user details, still include the ID
shared_users.append({
'id': oid,
'approval_status': approval_status,
'displayName': 'Unknown User',
'email': ''
})
except Exception as e:
print(f"Error fetching user details for {oid}: {e}")
shared_users.append({
'id': oid,
'approval_status': approval_status,
'displayName': 'Unknown User',
'email': ''
})
return jsonify({'shared_users': shared_users}), 200
except Exception as e:
return jsonify({'error': f'Error getting shared users: {str(e)}'}), 500
@app.route('/api/documents/<document_id>/remove-self', methods=['DELETE'])
@swagger_route(security=get_auth_security())
@login_required
@user_required
@enabled_required("enable_user_workspace")
def api_remove_self_from_document(document_id):
"""Remove current user from shared document"""
user_id = get_current_user_id()
if not user_id:
return jsonify({'error': 'User not authenticated'}), 401
try:
# Always get the document and extract the dict robustly
doc_response = get_document(user_id, document_id)
doc = None
status_code = None
# Handle (response, status) tuple
if isinstance(doc_response, tuple):
resp, status_code = doc_response
if hasattr(resp, "get_json"):
doc = resp.get_json()
else:
doc = resp
elif hasattr(doc_response, "status_code") and hasattr(doc_response, "get_json"):
status_code = doc_response.status_code
doc = doc_response.get_json()
else:
doc = doc_response
if status_code is not None and status_code != 200:
return jsonify({'error': 'Document not found or access denied'}), 404
if not doc or not isinstance(doc, dict):
return jsonify({'error': 'Document not found or access denied'}), 404
# Check if user is the owner - owners cannot remove themselves
if doc.get('user_id') == user_id:
return jsonify({'error': 'Document owners cannot remove themselves from their own documents'}), 400
# Remove user from shared_user_ids (pass user_id as both requester and target for self-removal)
success = unshare_document_from_user(document_id, user_id, user_id)
if success:
# Invalidate cache for user who removed themselves
invalidate_personal_search_cache(user_id)
return jsonify({'message': 'Successfully removed from shared document'}), 200
else:
return jsonify({'error': 'Failed to remove from shared document'}), 500
except Exception as e:
print(f"[ERROR] /api/documents/{document_id}/remove-self: {e}", flush=True)
return jsonify({'error': f'Error removing from shared document: {str(e)}'}), 500
@app.route('/api/documents/<document_id>/approve-share', methods=['POST'])
@swagger_route(security=get_auth_security())
@login_required
@user_required
@enabled_required("enable_user_workspace")
def api_approve_shared_document(document_id):
"""Approve a document that was shared with the current user."""
user_id = get_current_user_id()
if not user_id:
return jsonify({'error': 'User not authenticated'}), 401
try:
# Get the document
document_item = cosmos_user_documents_container.read_item(
item=document_id,
partition_key=document_id
)
shared_user_ids = document_item.get('shared_user_ids', [])
updated = False
new_shared_user_ids = []
for entry in shared_user_ids:
if entry.startswith(f"{user_id},"):
if entry != f"{user_id},approved":
new_shared_user_ids.append(f"{user_id},approved")
updated = True
else:
new_shared_user_ids.append(entry)
else:
new_shared_user_ids.append(entry)
if updated:
document_item['shared_user_ids'] = new_shared_user_ids
document_item['last_updated'] = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
cosmos_user_documents_container.upsert_item(document_item)
# Update all chunks with the new shared_user_ids
try:
chunks = get_all_chunks(document_id, document_item.get('user_id'))
for chunk in chunks:
chunk_id = chunk.get('id')
if chunk_id:
try:
update_chunk_metadata(
chunk_id=chunk_id,
user_id=document_item.get('user_id'),
group_id=None,
public_workspace_id=None,
document_id=document_id,
shared_user_ids=new_shared_user_ids
)
except Exception as chunk_e:
print(f"Warning: Failed to update chunk {chunk_id}: {chunk_e}")
except Exception as e:
print(f"Warning: Failed to update chunks for document {document_id}: {e}")
# Invalidate cache for user who approved (their search results changed)
if updated:
invalidate_personal_search_cache(user_id)
return jsonify({'message': 'Share approved' if updated else 'Already approved'}), 200
except Exception as e:
return jsonify({'error': f'Error approving shared document: {str(e)}'}), 500