-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathapi_v2.py
More file actions
2269 lines (1944 loc) · 94.4 KB
/
api_v2.py
File metadata and controls
2269 lines (1944 loc) · 94.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (c) 2017 Shotgun Software Inc.
#
# CONFIDENTIAL AND PROPRIETARY
#
# This work is provided "AS IS" and subject to the Shotgun Pipeline Toolkit
# Source Code License included in this distribution package. See LICENSE.
# By accessing, using, copying or modifying this work you indicate your
# agreement to the Shotgun Pipeline Toolkit Source Code License. All rights
# not expressly granted therein are reserved by Shotgun Software Inc.
import sys
import os
import re
import sqlite3
import json
import tempfile
import contextlib
import datetime
import copy
import base64
import glob
import threading
import hashlib
import html
import traceback
import time
import fnmatch
import sgtk
import sgtk.util
from sgtk import TankFileDoesNotExistError
from sgtk.commands.clone_configuration import clone_pipeline_configuration_html
from sgtk.authentication import serialize_user
from . import constants
from .. import command
logger = sgtk.platform.get_logger(__name__)
###########################################################################
# Classes
class ShotgunAPI(object):
"""
Public API
Callable methods from client. Every one of these methods can be called from the client.
"""
PUBLIC_API_METHODS = [
"get_actions",
"execute_action",
"list_supported_commands",
"open",
"pick_file_or_directory",
"pick_files_or_directories",
]
# Stores cache keys that have been validated and at
# what time that occurred.
CACHE_VALIDATED = dict()
CACHE_VALIDATION_INTERVAL = 2.0 # Seconds
# Stores data persistently per wss connection.
WSS_KEY_CACHE = dict()
DATABASE_FORMAT_VERSION = 1
# When the layout of the cache in a cache entry changes, bump this version
# so we invalidate all cached entries.
CACHE_ENTRY_SCHEMA_VERSION = 1
SOFTWARE_FIELDS = ["id", "code", "updated_at", "type", "engine", "projects"]
TOOLKIT_MANAGER = None
# Keys for the in-memory cache.
TASK_PARENT_TYPES = "task_parent_types"
PIPELINE_CONFIGS = "pipeline_configurations"
SITE_STATE_DATA = "site_state_data"
CONFIG_DATA = "config_data"
SOFTWARE_ENTITIES = "software_entities"
ENTITY_TYPE_WHITELIST = "entity_type_whitelist"
LEGACY_PROJECT_ACTIONS = "legacy_project_actions"
YML_FILE_DATA = "yml_file_data"
ENTITY_PARENT_PROJECTS = "entity_parent_projects"
SHOTGUN_YML_FILES = "shotgun_yml_files"
# We need to protect against concurrent bootstraps happening.
# This is a reentrant lock because get_actions is recursive
# when caching occurs, so might need to lock multiple times
# within the same thread.
_LOCK = threading.RLock()
def __init__(self, host, process_manager, wss_key):
"""
API Constructor.
Keep initialization pretty fast as it is created on every message.
:param host: Host interface to communicate with. Abstracts the client.
:param process_manager: Process Manager to use to interact with os processes.
:param str wss_key: The WSS connection's unique key.
"""
self._host = host
self._engine = sgtk.platform.current_engine()
self._bundle = sgtk.platform.current_bundle()
self._wss_key = wss_key
self._logger = sgtk.platform.get_logger("api-v2")
self._process_manager = process_manager
self._global_debug = sgtk.LogManager().global_debug
if constants.ENABLE_LEGACY_WORKAROUND in os.environ:
logger.debug("Legacy tank command pathway allowed for classic configs.")
self._allow_legacy_workaround = True
else:
logger.debug("Legacy tank command pathway disabled.")
self._allow_legacy_workaround = False
if self._wss_key not in self.WSS_KEY_CACHE:
self.WSS_KEY_CACHE[self._wss_key] = dict()
self._cache = self.WSS_KEY_CACHE[self._wss_key]
# Cache path on disk.
self._cache_path = os.path.join(
self._engine.cache_location,
"shotgun_engine_commands_v%s.sqlite" % self.DATABASE_FORMAT_VERSION,
)
###########################################################################
# Properties
@property
def host(self):
"""
The host associated with the currnt RPC transaction.
"""
return self._host
@property
def process_manager(self):
"""
The process manager object used to interact with os processes.
"""
return self._process_manager
###########################################################################
# Public methods
@sgtk.LogManager.log_timing
def execute_action(self, data):
"""
Executes the engine command associated with the action triggered
in the client.
:param dict data: The payload from the client.
"""
# When the v2 protocol is being used, the Shotgun web app makes use of
# Javascript Promises to handle the reply from the server, whether there
# is success or failure. When getting actions, we want to make sure that
# if there was some kind of unhandled exception that there is a proper
# reply to the client so that the Promise can be kept or broken, as is
# appropriate.
try:
self._execute_action(data)
except Exception:
self.host.reply(
dict(
err=self._get_exception_message(),
retcode=constants.COMMAND_FAILED,
out="",
),
)
logger.exception(traceback.format_exc())
def _execute_action(self, data):
"""
Executes the engine command associated with the action triggered
in the client.
:param dict data: The payload from the client.
"""
if data["name"] == "__clone_pc":
logger.debug("Clone configuration command received.")
self._clone_configuration(data)
logger.debug("Clone configuration successful.")
self.host.reply(
dict(
retcode=constants.COMMAND_SUCCEEDED,
err="",
out="",
),
)
return
project_entity, entities = self._get_entities_from_payload(data)
config_entity = data["pc"]
command_name = data["name"]
# We have to do a couple of things here. The first is that we ALWAYS
# stick to the non-legacy code path for the __core_info and __upgrade_check
# commands. Neither of these require a bootstrap to occur, and are therefore
# fast when run through the modern code path. In addition, the implementation
# of these "special" project-level commands already outputs markdown syntax,
# which is preferable to the HTML we would get from the tank command.
#
# The second bit is that we can identify commands that need to be run in
# legacy mode by whether the config entity dictionary we got from the client
# contains a "_legacy_config_root" key that was shoved into it when the
# get_actions method ran through its own legacy code path. In that case, we
# shell out to the tank command by way of our process_manager object instead
# of using the modern code path.
if (
command_name not in constants.LEGACY_EXEMPT_ACTIONS
and self._allow_legacy_workaround
):
if constants.LEGACY_CONFIG_ROOT in config_entity:
# The arguments list is the name of the command, then the entity
# type, and then a comma-separated list of entity ids.
entity_ids = [str(e["id"]) for e in entities]
(out, err, retcode) = self.process_manager.execute_toolkit_command(
config_entity[constants.LEGACY_CONFIG_ROOT],
"shotgun_run_action",
[data["name"], entities[0]["type"], ",".join(entity_ids)],
)
# Sanitize the output. By going the legacy route here, we're going
# to end up with HTML in the output from the tank command. We need
# to filter that and sanitize anything we keep, because the client
# is going to believe that we're sending v2-style output, which is
# taken and displayed as is, and is assumed to be markdown and not
# HTML.
self.host.reply(
dict(
retcode=retcode,
err=self._legacy_sanitize_output(err),
out=self._legacy_sanitize_output(out),
)
)
return
args_file = self._get_arguments_file(
dict(
config=config_entity,
name=data["name"],
entities=entities,
project=project_entity,
sys_path=self._compute_sys_path(),
base_configuration=constants.BASE_CONFIG_URI,
engine_name=constants.ENGINE_NAME,
logging_prefix=constants.LOGGING_PREFIX,
bundle_cache_fallback_paths=self._engine.sgtk.bundle_cache_fallback_paths,
user=serialize_user(sgtk.get_authenticated_user()),
),
)
script = os.path.join(
os.path.dirname(__file__), "scripts", "execute_command.py"
)
# We'll need the Python executable when we shell out. We need to make
# sure it's what's in the config's interpreter config file. To do that,
# we can get the config's descriptor object from the manager and ask
# it for the path. By this point, all of the config data has been cached,
# because it will have been looked up as part of the get_actions method
# when the menu asked for actions, so getting the manager and all of the
# config data will be essentially free.
manager = self._get_toolkit_manager()
with self._LOCK:
manager.bundle_cache_fallback_paths = (
self._engine.sgtk.bundle_cache_fallback_paths
)
all_pc_data = self._get_pipeline_configuration_data(
manager,
project_entity,
data,
)
python_exe = self._get_python_interpreter(
all_pc_data[config_entity["id"]]["descriptor"]
)
logger.debug("Python executable: %s", python_exe)
args = [python_exe, script, args_file]
logger.debug("Subprocess arguments: %s", args)
# Ensure the credentials are still valid before launching the command in
# a separate process. We need do to this in advance because the process
# that will be launched might not have PySide and as such won't be able
# to prompt the user to re-authenticate.
# If you are running in Shotgun Desktop 1.0.2 there is no authenticated
# user, only a script user, so skip this.
if sgtk.get_authenticated_user():
sgtk.get_authenticated_user().refresh_credentials()
retcode, stdout, stderr = command.Command.call_cmd(args)
# We need to filter stdout before we send it to the client.
# We look for lines that we know came from the custom log
# handler that the execute_command script builds, and we
# remove that header from those lines and keep them so that
# they're passed up to the client.
tag = constants.LOGGING_PREFIX
tag_length = len(tag)
filtered_output = []
# We check both stdout and stderr. We identify lines that start with
# our tag, and if we find one, we remove the tag, and then base64
# decode the rest of the message. The message is encoded this way
# because it collapses multi-line log messages into a single line
# of text. This is important, because there's only one tag per log
# message, and we would otherwise filter out everything in the log
# message that wasn't on the first line of text before any newlines.
for line in stdout.split("\n") + stderr.split("\n"):
if line.startswith(tag):
decoded = base64.b64decode(line[tag_length:]).decode("utf-8")
filtered_output.append(decoded)
filtered_output_string = "\n".join(filtered_output)
if retcode != 0:
logger.error("Command failed: %s", args)
logger.error("Failed command output: %s", filtered_output_string)
self.host.reply(
dict(
retcode=constants.COMMAND_FAILED,
out="",
err=filtered_output_string,
)
)
return
logger.debug("Command execution complete.")
self.host.reply(
dict(
retcode=constants.COMMAND_SUCCEEDED,
out=filtered_output_string,
err="",
),
)
@sgtk.LogManager.log_timing
def get_actions(self, data):
"""
RPC method that sends back a dictionary containing engine commands
for each pipeline configuration associated with the project.
:param dict data: The data passed down by the client. At a minimum,
the dict should contain the following keys: project_id, entity_id,
entity_type, pipeline_configs, and user.
"""
# When the v2 protocol is being used, the Shotgun web app makes use of
# Javascript Promises to handle the reply from the server, whether there
# is success or failure. When getting actions, we want to make sure that
# if there was some kind of unhandled exception that there is a proper
# reply to the client so that the Promise can be kept or broken, as is
# appropriate.
try:
self._get_actions(data)
except Exception:
self.host.reply(
dict(
err=self._get_exception_message(),
retcode=constants.CACHING_ERROR,
out="",
),
)
logger.exception(traceback.format_exc())
def _get_actions(self, data):
"""
RPC method that sends back a dictionary containing engine commands
for each pipeline configuration associated with the project.
:param dict data: The data passed down by the client. At a minimum,
the dict should contain the following keys: project_id, entity_id,
entity_type, pipeline_configs, and user.
"""
# If we weren't sent a usable entity id, we can just query the first one
# from the project. This isn't a big deal for us, because we're only
# concerned with picking the correct environment when we bootstrap
# during a caching operation. This is going to be the situation when
# a page is pre-loading entity-level commands on page load, as opposed to
# passing down a specific entity that's been selected in an already-loaded
# page.
if "entity_id" in data and data["entity_id"] == -1:
if not data.get("entity_type"):
# There's likely some gap in the pre-caching logic that's run on
# toolkit action menu init that's causing either an empty string
# or an undefined value for the entity type. We can't really do
# anything here in that case, so it's best to reply with an error
# that explains as best we can what's happened.
self.host.reply(
dict(
err="Toolkit received no entity type for this menu -- no actions can be returned!",
retcode=constants.CACHING_ERROR,
out="",
),
)
return
elif data["entity_type"] == "Project":
data["entity_id"] = data["project_id"]
else:
temp_entity = self._engine.shotgun.find_one(
data["entity_type"],
[["project", "is", dict(type="Project", id=data["project_id"])]],
)
if temp_entity:
data["entity_id"] = temp_entity["id"]
else:
# This is the case if we're on an entity page, but there
# are no entities that exist. The page's pre-caching call
# here can't proceed, but we can let it know what the situation
# is via the retcode, and the tank_action_menu will be sure to
# re-query the actions if an entity is created and the menu
# shown.
self.host.reply(
dict(
err="No entity existed when actions were requested. Please refresh the page.",
retcode=constants.CACHING_NOT_COMPLETED,
out="Please refresh the page to get Toolkit actions.",
),
)
return
# It's possible that we got multiple entities from the client, which
# would be the result of multiple entities being selected in the web
# interface. Regardless of that, as far as getting a list of actions
# is concerned, we only need one. As such, we just take the first one
# off the list.
project_entity, entities = self._get_entities_from_payload(data)
entity = entities[0]
manager = self._get_toolkit_manager()
with self._LOCK:
manager.bundle_cache_fallback_paths = (
self._engine.sgtk.bundle_cache_fallback_paths
)
all_pc_data = self._get_pipeline_configuration_data(
manager,
project_entity,
data,
)
# The first thing we do is check to see if we're dealing with a
# classic PTR setup. In that case, we're going to short-circuit
# the get_actions call and go into a legacy setup that makes use
# of the "tank" command by way of this api's process_manager.
did_legacy_lookup = False
all_actions = dict()
config_names = []
legacy_config_ids = []
if self._allow_legacy_workaround:
legacy_config_data = dict()
for config_id, config_data in all_pc_data.items():
config = config_data["entity"]
if config["descriptor"].required_storages:
# We're using os.path.dirname to chop the last directory off
# the end of the config descriptor path. This is because that
# path is routed to <root>/config, while the v1 api is just
# wanting the root path where the tank command lives.
legacy_config_data[config["name"]] = (
config["descriptor"].get_path(),
config,
)
legacy_config_ids.append(config_id)
# We're going to remove this config from the data structure
# housing all of the project's pipeline configuration information.
# With this, we can allow the legacy pathway to handle the classic
# configs, while allowing descriptor-driven configs to run through
# the v2 flow.
for config_id in legacy_config_ids:
del all_pc_data[config_id]
# If there are any classic configs, then we use the legacy code
# path.
if legacy_config_data:
logger.debug(
"Classic PTR config(s) found, proceeding with legacy code path."
)
self._legacy_process_configs(
legacy_config_data,
entity["type"],
project_entity["id"],
all_actions,
config_names,
)
did_legacy_lookup = True
# We will process the configs in 3 passes. The first pass decides whether a config should be process
# or not. We need to keep a list of the ids that the later passes will need to skip
config_ids_to_skip = set()
# Pass 1: Calculate and store lookup hash on all pipeline configurations
for pc_id, pc_data in all_pc_data.items():
pipeline_config = pc_data["entity"]
# The hash that acts as the key we'll use to look up our cached
# data will be based on the entity type and the pipeline config's
# descriptor uri. We can get the descriptor from the toolkit
# manager and pass that through along with the entity type from PTR
# to the core hook that computes the hash.
pc_descriptor = pipeline_config["descriptor"]
# We'll rebuild this pipeline_config dict to only include the keys
# that we know we want to pass back to the client. In the event that
# the interface to getting these config dicts changes in the future,
# it will help us keep from passing back uneeded data or, even worse,
# data that can't be serialized, which would cause an exception.
pipeline_config = dict(
id=pipeline_config["id"],
type=pipeline_config.get("type", "PipelineConfiguration"),
name=pipeline_config.get("name", "Primary"),
)
pc_data["entity"] = pipeline_config
# We start with an empty action set for this config. If we end up finding
# finding stuff for this entity type in this config, then the empty
# entry here will be replaced prior to replying to the client.
all_actions[pipeline_config["name"]] = dict(
actions=[],
config=pipeline_config,
)
# Let's see if this is even an entity type that we need to worry
# about. If it isn't, we can just move on to the next config.
#
# Note: The entity type whitelist contains entity type names that
# have been lower cased.
supported_entity_type = data[
"entity_type"
].lower() in self._get_entity_type_whitelist(
data.get("project_id"),
pc_descriptor,
)
if not supported_entity_type and not did_legacy_lookup:
logger.debug(
"Entity type %s is not supported by %r, no actions will be returned.",
data["entity_type"],
pc_descriptor,
)
config_ids_to_skip.add(pc_id)
continue
# In all cases except for Task entities, we'll already have a
# lookup hash computed. If we're dealing with a Task, though,
# it'll be a None and we'll need to compute it live. This is
# because the lookup hash depends on what the specific Task
# entity we're dealing with is linked to.
try:
lookup_hash = pc_data.get("lookup_hash") or self._get_lookup_hash(
pc_descriptor.get_uri(),
project_entity,
entity["type"],
entity["id"],
)
except TankTaskNotLinkedError:
# If we're dealing with a Task entity, it needs to be linked
# to something. If it's not, then we have nothing to pass
# back to the client, so we should inform the user as to
# how to proceed.
logger.debug("Task entity %s is not linked to an entity.", entity)
self.host.reply(
dict(
err="Link this Task to an entity and refresh to get Toolkit actions!",
retcode=constants.CACHING_ERROR,
out="",
),
)
return
pc_data["lookup_hash"] = lookup_hash
pc_data["descriptor"] = pc_descriptor
# Pass 2: Read the cached values from the database and store them in memory
with self._db_connect() as (connection, cursor):
for pc_id, pc_data in all_pc_data.items():
# If the config doesn't support the current entity_type we don't need to cache it
if pc_id in config_ids_to_skip:
continue
lookup_hash = pc_data.get("lookup_hash")
logger.debug("Querying: %s", lookup_hash)
pc_data["cached_data"] = []
try:
cursor.execute(
"SELECT commands, contents_hash FROM engine_commands WHERE lookup_hash=?",
(lookup_hash,),
)
pc_data["cached_data"] = list(cursor.fetchone() or [])
except sqlite3.OperationalError:
# This means the sqlite database hasn't been setup at all.
# In that case, we just continue on with cached_data set
# to an empty list, which will cause the caching subprocess
# to be spawned, which will setup the database and populate
# with the data we need. The behavior as a result of this
# is the same as if we ended up with a cache miss or an
# invalidated result due to a contents_hash mismatch.
logger.debug(
"Cache query failed for pipeline configuration id %s. Likely due to a missing table. "
"Will Triggering caching subprocess..." % (pc_id,)
)
# Pass 3: Decode cached data or trigger the caching process
for pc_id, pc_data in all_pc_data.items():
# If the config doesn't support the current entity_type we don't need to cache it
if pc_id in config_ids_to_skip:
continue
try:
cached_data = pc_data["cached_data"]
lookup_hash = pc_data.get("lookup_hash")
pipeline_config = pc_data["entity"]
decoded_data = None
if cached_data:
# The value will be bytes
# ensure_str doesn't accept a buffer as input
string_data = cached_data[0]
if isinstance(string_data, bytes):
string_data = string_data.decode("utf-8")
try:
decoded_data = sgtk.util.json.loads(string_data)
except Exception:
# Couldn't decode the data. This happens when loading an old pickled cache.
# We've switch to JSON for the Python 3 port.
pass
if decoded_data is not None:
# Cache hit.
cached_contents_hash = cached_data[1]
# We check the validity of the cache asynchronously in this
# situation. We want to go ahead and return the list of actions
# that we have cached, but in the background check to see whether
# the cache should be updated. This gives us the situation where
# this one invokation of get_actions returns old data, but all
# future requests will be correct until the next time the cache
# must be invalidated.
self._async_check_and_cache_actions(
data,
pc_data,
cached_contents_hash,
)
logger.debug("Cached contents hash is %s", cached_contents_hash)
logger.debug("Cache key was %s", lookup_hash)
actions = self._process_commands(
commands=decoded_data,
project=project_entity,
entities=entities,
)
logger.debug("Actions found in cache: %s", actions)
all_actions[pipeline_config["name"]] = dict(
actions=self._filter_by_project(
actions,
self._get_software_entities(),
project_entity,
),
config=pipeline_config,
)
logger.debug("Actions after project filtering: %s", actions)
else:
# Cache miss.
logger.debug("Commands not found in cache, caching now...")
# Caching is performed synchronously in this situation. We don't
# have anything to give to the client until it's done, so we do
# it in the main thread and wait for it to complete.
self._cache_actions(data, pc_data)
self._get_actions(data)
return
except TankCachingSubprocessFailed as exc:
logger.error(str(exc))
raise
except TankCachingUnresolvedEnvError as exc:
logger.warning(exc)
continue
except TankCachingEngineBootstrapError as exc:
logger.error(
"The Flow Production Tracking engine failed to initialize in the caching "
"subprocess. This most likely corresponds to a configuration "
"problem in the config %r as it relates to entity type %s."
% (pc_descriptor, entity["type"])
)
logger.debug(exc)
continue
# Combine the config names processed by the v2 flow with those handled
# by the legacy pathway.
config_names = config_names + [
p["entity"]["name"] for p in all_pc_data.values()
]
self.host.reply(
dict(
err="",
retcode=constants.SUCCESSFUL_LOOKUP,
actions=all_actions,
pcs=config_names,
),
)
def list_supported_commands(self, data):
"""
Get a list of all the commands this api supports
:param data: Message data {} (no data expected)
"""
self.host.reply(self.PUBLIC_API_METHODS)
def open(self, data):
"""
Open a file on localhost.
:param dict data: Message payload.
"""
try:
# Retrieve filepath. We should always get something in the payload, but if
# we didn't for some reason, passing on an empty string will ensure that
# a sane exception is raised later on when the existence of the path is
# checked.
filepath = data.get("filepath", "")
local_storages = data.get("local_storages")
# Logging here is for debugging purposes. We have a situation where some
# clients are reporting errors when opening files from Shotgun, and the
# error implies that we're getting a null value for the file path passed
# down from the web app. There are reasonable situations where this might
# happen, but in the cases we're attempting to debug it shouldn't be the
# case.
if filepath is None:
logger.warning(
"Flow Production Tracking requested a file open via local file linking, "
"but the provided file path is None."
)
else:
logger.debug(
"Flow Production Tracking requested a file open via local file linking. "
"The file path is: %s",
filepath,
)
if local_storages is None:
logger.debug(
"Local storages were not provided by Flow Production Tracking for the current file open request."
)
else:
logger.debug(
"Local storages were reported by Flow Production Tracking: %s",
local_storages,
)
result = self.process_manager.open(filepath)
# Send back information regarding the success of the operation.
reply = {}
reply["result"] = result
self.host.reply(reply)
except Exception as e:
logger.exception(e)
if hasattr(e, "message"):
self.host.report_error(e.message)
elif e.args:
self.host.report_error(str(e.args[0]))
else:
self.host.report_error("unknown error")
def pick_file_or_directory(self, data):
"""
Pick single file or directory.
:param dict data: Message payload. (no data expected)
"""
files = self.process_manager.pick_file_or_directory(False)
self.host.reply(files)
def pick_files_or_directories(self, data):
"""
Pick multiple files or directories.
:param dict data: Message payload. (no data expected)
"""
files = self.process_manager.pick_file_or_directory(True)
self.host.reply(files)
###########################################################################
# Context managers
@contextlib.contextmanager
def _db_connect(self):
"""
Context manager that initializes a DB connection on enter and
disconnects on exit.
"""
connection = sqlite3.connect(self._cache_path)
# This is to handle unicode properly - make sure that sqlite returns
# str objects for TEXT fields rather than unicode. Note that any unicode
# objects that are passed into the database will be automatically
# converted to UTF-8 strs, so this text_factory guarantees that any character
# representation will work for any language, as long as data is either input
# as UTF-8 (byte string) or unicode. And in the latter case, the returned data
# will always be unicode.
connection.text_factory = str
with connection:
with contextlib.closing(connection.cursor()) as cursor:
yield (connection, cursor)
def _compute_sys_path(self):
"""
:returns: Path to the current core.
"""
# While core swapping, the Python path is not updated with the new core's
# Python path, so make sure the current core is at the front of the Python
# path for our subprocesses.
python_folder = sgtk.bootstrap.ToolkitManager.get_core_python_path()
logger.debug("Adding %s to sys.path for subprocesses.", python_folder)
return python_folder
###########################################################################
# Internal methods
def _async_check_and_cache_actions(
self, data, config_data, cached_contents_hash=None
):
"""
Checks the validity of existing cached data and recaches if necessary.
..NOTE: This method runs asynchronously! To run the caching process in
a synchronous manner, see the _cache_actions method instead!
:param dict data: The data passed down from the wss client.
:param dict config_data: A dictionary that contains, at a minimum,
"lookup_hash", "contents_hash", "descriptor", and "entity" keys.
:param str cached_contents_hash: If given, represents the currently
cached contents hash for the configuration. If this is the same as
a newly-generated contents hash built prior to caching, then the
caching process will stop and exit without doing any additional
work. This represents the situation where we've been asked to
re-cache actions, but we then prove that the existing cached data
is still valid.
"""
lookup_hash = config_data["lookup_hash"]
now = time.time()
if lookup_hash in self.CACHE_VALIDATED:
time_since = now - self.CACHE_VALIDATED[lookup_hash]
if time_since < self.CACHE_VALIDATION_INTERVAL:
logger.debug(
"Recaching of data for %s has already been initiated. "
"This thread will exit without triggering a recache of "
"actions for this entry.",
lookup_hash,
)
return
self.CACHE_VALIDATED[lookup_hash] = now
logger.debug("Cache actions executing asynchronously...")
thread = threading.Thread(
target=self._cache_actions,
args=(
data,
config_data,
cached_contents_hash,
),
)
thread.start()
logger.debug("Cache actions thread started.")
@sgtk.LogManager.log_timing
def _cache_actions(self, data, config_data, cached_contents_hash=None):
"""
Triggers the caching or recaching of engine commands.
:param dict data: The data passed down from the wss client.
:param dict config_data: A dictionary that contains, at a minimum,
"lookup_hash", "contents_hash", "descriptor", and "entity" keys.
:param str cached_contents_hash: If given, represents the currently
cached contents hash for the configuration. If this is the same as
a newly-generated contents hash built prior to caching, then the
caching process will stop and exit without doing any additional
work. This represents the situation where we've been asked to
re-cache actions, but we then prove that the existing cached data
is still valid.
"""
logger.debug("Caching engine commands...")
descriptor = config_data["descriptor"]
script = os.path.join(os.path.dirname(__file__), "scripts", "get_commands.py")
contents_hash = self._get_contents_hash(
descriptor,
self._get_site_state_data(),
)
logger.debug("The new contents hash is %s", contents_hash)
# If we were given a cached contents hash, it means we need to
# check to see if it matches the contents hash that we generated
# above. If they match, then it means that the data already cached
# is valid. In that case, we just log and return.
if cached_contents_hash and str(contents_hash) == str(cached_contents_hash):
logger.debug(
"The data already cached has been validated and is not out of date. "
"New data will not be cached as a result."
)
return
else:
logger.debug("The cached data is out of date. Recaching...")
logger.debug("Executing script: %s", script)
# We'll need the Python executable when we shell out. We want to make sure
# we use the Python defined in the config's interpreter config file.
python_exe = self._get_python_interpreter(descriptor)
logger.debug("Python executable: %s", python_exe)
arg_config_data = dict(
lookup_hash=config_data["lookup_hash"],
contents_hash=contents_hash,
entity=config_data["entity"],
)
# Create a temp file for the script to write the command data into
actions_file = tempfile.mktemp()
args_file = self._get_arguments_file(
dict(
cache_file=self._cache_path,
output_file=actions_file,
data=data,
sys_path=self._compute_sys_path(),
base_configuration=constants.BASE_CONFIG_URI,
engine_name=constants.ENGINE_NAME,
config_data=arg_config_data,
config_is_mutable=(descriptor.is_immutable() is False),
bundle_cache_fallback_paths=self._engine.sgtk.bundle_cache_fallback_paths,
user=serialize_user(sgtk.get_authenticated_user()),
)
)
args = [python_exe, script, args_file]
logger.debug("Command arguments: %s", args)
# We lock here because we cannot allow concurrent bootstraps to
# occur. We potentially have other threads wanting to cache, so
# we protect ourselves from spawning concurrent caching subprocesses
# that might end up stepping on each other.
with self._LOCK:
retcode, stdout, stderr = command.Command.call_cmd(args)
if retcode == 0:
logger.debug("Command stdout: %s", stdout)
logger.debug("Command stderr: %s", stderr)
elif retcode == constants.UNRESOLVED_ENV_ERROR_EXIT_CORE:
logger.debug("Caching process was not able to resolve an environment.")
msg = (
"Could not resolve an environment for '{entity_type}' entity in pipeline configuration "
"'{config_name}' with id {config_id}.".format(
entity_type=data["entity_type"],
config_name=config_data["entity"]["name"],
config_id=config_data["entity"]["id"],
)
)
raise TankCachingUnresolvedEnvError(msg)
elif retcode == constants.ENGINE_INIT_ERROR_EXIT_CODE:
logger.debug("Caching subprocess reported a problem during bootstrap.")
raise TankCachingEngineBootstrapError("%s\n\n%s" % (stdout, stderr))
else:
logger.error("Command failed: %s", args)
logger.error("Failed command stdout: %s", stdout)
logger.error("Failed command stderr: %s", stderr)
logger.error("Failed command retcode: %s", retcode)
raise TankCachingSubprocessFailed("%s\n\n%s" % (stdout, stderr))
with open(actions_file, "rt") as f:
commands = json.load(f)
# Clean up the temp file
os.remove(actions_file)
self._write_commands_to_db(commands, config_data, contents_hash)
logger.debug("Caching complete.")
def _write_commands_to_db(self, commands, config_data, contents_hash):
"""