-
Notifications
You must be signed in to change notification settings - Fork 449
Expand file tree
/
Copy pathmodels.py
More file actions
8395 lines (7575 loc) · 391 KB
/
models.py
File metadata and controls
8395 lines (7575 loc) · 391 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import itertools
import json
import pathlib
import re
import shutil
import tempfile
import requests
from typing import (
Any,
Dict,
Iterator,
List,
NamedTuple,
Optional,
Sequence,
Tuple,
TYPE_CHECKING,
Union,
)
from google.api_core import operation
from google.api_core import exceptions as api_exceptions
from google.auth import credentials as auth_credentials
from google.auth.transport import requests as google_auth_requests
from google.protobuf import duration_pb2
import proto
from google.cloud import aiplatform
from google.cloud.aiplatform import base
from google.cloud.aiplatform import constants
from google.cloud.aiplatform import explain
from google.cloud.aiplatform import initializer
from google.cloud.aiplatform import jobs
from google.cloud.aiplatform import models
from google.cloud.aiplatform import utils
from google.cloud.aiplatform.utils import gcs_utils
from google.cloud.aiplatform.utils import _explanation_utils
from google.cloud.aiplatform.utils import _ipython_utils
from google.cloud.aiplatform import model_evaluation
from google.cloud.aiplatform.compat.services import endpoint_service_client
from google.cloud.aiplatform.compat.services import (
deployment_resource_pool_service_client,
)
from google.cloud.aiplatform.compat.types import (
deployment_resource_pool as gca_deployment_resource_pool_compat,
deployed_model_ref as gca_deployed_model_ref_compat,
encryption_spec as gca_encryption_spec,
endpoint_v1beta1 as gca_endpoint_v1beta1_compat,
endpoint as gca_endpoint_compat,
explanation as gca_explanation_compat,
io as gca_io_compat,
machine_resources_v1beta1 as gca_machine_resources_v1beta1_compat,
machine_resources as gca_machine_resources_compat,
model as gca_model_compat,
model_service as gca_model_service_compat,
env_var as gca_env_var_compat,
service_networking as gca_service_networking,
)
from google.cloud.aiplatform.constants import (
prediction as prediction_constants,
)
from google.cloud.aiplatform_v1.types import model as model_v1
from google.protobuf import field_mask_pb2, timestamp_pb2
from google.protobuf import json_format
if TYPE_CHECKING:
from google.cloud.aiplatform.prediction import LocalModel
_DEFAULT_MACHINE_TYPE = "n1-standard-2"
_DEPLOYING_MODEL_TRAFFIC_SPLIT_KEY = "0"
_SUCCESSFUL_HTTP_RESPONSE = 300
_RAW_PREDICT_DEPLOYED_MODEL_ID_KEY = "X-Vertex-AI-Deployed-Model-Id"
_RAW_PREDICT_MODEL_RESOURCE_KEY = "X-Vertex-AI-Model"
_RAW_PREDICT_MODEL_VERSION_ID_KEY = "X-Vertex-AI-Model-Version-Id"
_LOGGER = base.Logger(__name__)
_SUPPORTED_MODEL_FILE_NAMES = [
"model.pkl",
"model.joblib",
"model.bst",
"model.mar",
"saved_model.pb",
"saved_model.pbtxt",
]
_SUPPORTED_EVAL_PREDICTION_TYPES = [
"classification",
"regression",
]
class VersionInfo(NamedTuple):
"""VersionInfo class envelopes returned Model version information.
Attributes:
version_id:
The version ID of the model.
create_time:
Timestamp when this Model version was uploaded into Vertex AI.
update_time:
Timestamp when this Model version was most recently updated.
model_display_name:
The user-defined name of the model this version belongs to.
model_resource_name:
The fully-qualified model resource name.
e.g. projects/{project}/locations/{location}/models/{model_display_name}
version_aliases:
User provided version aliases so that a model version can be referenced via
alias (i.e. projects/{project}/locations/{location}/models/{model_display_name}@{version_alias}).
Default is None.
version_description:
The description of this version.
Default is None.
"""
version_id: str
version_create_time: timestamp_pb2.Timestamp
version_update_time: timestamp_pb2.Timestamp
model_display_name: str
model_resource_name: str
version_aliases: Optional[Sequence[str]] = None
version_description: Optional[str] = None
class Prediction(NamedTuple):
"""Prediction class envelopes returned Model predictions and the Model id.
Attributes:
predictions:
The predictions that are the output of the predictions
call. The schema of any single prediction may be specified via
Endpoint's DeployedModels' [Model's][google.cloud.aiplatform.v1beta1.DeployedModel.model]
[PredictSchemata's][google.cloud.aiplatform.v1beta1.Model.predict_schemata]
deployed_model_id:
ID of the Endpoint's DeployedModel that served this prediction.
metadata:
The metadata that is the output of the predictions call.
model_version_id:
ID of the DeployedModel's version that served this prediction.
model_resource_name:
The fully-qualified resource name of the model that served this prediction.
explanations:
The explanations of the Model's predictions. It has the same number
of elements as instances to be explained. Default is None.
"""
predictions: List[Any]
deployed_model_id: str
metadata: Optional[Any] = None
model_version_id: Optional[str] = None
model_resource_name: Optional[str] = None
explanations: Optional[Sequence[gca_explanation_compat.Explanation]] = None
class DeploymentResourcePool(base.VertexAiResourceNounWithFutureManager):
client_class = utils.DeploymentResourcePoolClientWithOverride
_resource_noun = "deploymentResourcePools"
_getter_method = "get_deployment_resource_pool"
_list_method = "list_deployment_resource_pools"
_delete_method = "delete_deployment_resource_pool"
_parse_resource_name_method = "parse_deployment_resource_pool_path"
_format_resource_name_method = "deployment_resource_pool_path"
def __init__(
self,
deployment_resource_pool_name: str,
project: Optional[str] = None,
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
):
"""Retrieves a DeploymentResourcePool.
Args:
deployment_resource_pool_name (str):
Required. The fully-qualified resource name or ID of the
deployment resource pool. Example:
"projects/123/locations/us-central1/deploymentResourcePools/456"
or "456" when project and location are initialized or passed.
project (str):
Optional. Project containing the deployment resource pool to
retrieve. If not set, the project given to `aiplatform.init`
will be used.
location (str):
Optional. Location containing the deployment resource pool to
retrieve. If not set, the location given to `aiplatform.init`
will be used.
credentials: Optional[auth_credentials.Credentials]=None,
Custom credentials to use to retrieve the deployment resource
pool. If not set, the credentials given to `aiplatform.init`
will be used.
"""
super().__init__(
project=project,
location=location,
credentials=credentials,
resource_name=deployment_resource_pool_name,
)
deployment_resource_pool_name = utils.full_resource_name(
resource_name=deployment_resource_pool_name,
resource_noun=self._resource_noun,
parse_resource_name_method=self._parse_resource_name,
format_resource_name_method=self._format_resource_name,
project=project,
location=location,
)
self._gca_resource = self._get_gca_resource(
resource_name=deployment_resource_pool_name
)
@classmethod
def create(
cls,
deployment_resource_pool_id: str,
project: Optional[str] = None,
location: Optional[str] = None,
metadata: Sequence[Tuple[str, str]] = (),
credentials: Optional[auth_credentials.Credentials] = None,
machine_type: Optional[str] = None,
min_replica_count: int = 1,
max_replica_count: int = 1,
accelerator_type: Optional[str] = None,
accelerator_count: Optional[int] = None,
autoscaling_target_cpu_utilization: Optional[int] = None,
autoscaling_target_accelerator_duty_cycle: Optional[int] = None,
sync=True,
create_request_timeout: Optional[float] = None,
reservation_affinity_type: Optional[str] = None,
reservation_affinity_key: Optional[str] = None,
reservation_affinity_values: Optional[List[str]] = None,
spot: bool = False,
required_replica_count: Optional[int] = 0,
) -> "DeploymentResourcePool":
"""Creates a new DeploymentResourcePool.
Args:
deployment_resource_pool_id (str):
Required. User-specified name for the new deployment resource
pool.
project (str):
Optional. Project containing the deployment resource pool to
retrieve. If not set, the project given to `aiplatform.init`
will be used.
location (str):
Optional. Location containing the deployment resource pool to
retrieve. If not set, the location given to `aiplatform.init`
will be used.
metadata (Sequence[Tuple[str, str]]):
Optional. Strings which should be sent along with the request as
metadata.
credentials: Optional[auth_credentials.Credentials]=None,
Optional. Custom credentials to use to retrieve the deployment
resource pool. If not set, the credentials given to
`aiplatform.init` will be used.
machine_type (str):
Optional. Machine type to use for the deployment resource pool.
If not set, the default machine type of `n1-standard-2` is
used.
min_replica_count (int):
Optional. The minimum replica count of the new deployment
resource pool. Each replica serves a copy of each model deployed
on the deployment resource pool. If this value is less than
`max_replica_count`, then autoscaling is enabled, and the actual
number of replicas will be adjusted to bring resource usage in
line with the autoscaling targets.
max_replica_count (int):
Optional. The maximum replica count of the new deployment
resource pool.
accelerator_type (str):
Optional. Hardware accelerator type. Must also set accelerator_
count if used. One of NVIDIA_TESLA_K80, NVIDIA_TESLA_P100,
NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4, or
NVIDIA_TESLA_A100.
accelerator_count (int):
Optional. The number of accelerators attached to each replica.
autoscaling_target_cpu_utilization (int):
Optional. Target CPU utilization value for autoscaling. A
default value of 60 will be used if not specified.
autoscaling_target_accelerator_duty_cycle (int):
Optional. Target accelerator duty cycle percentage to use for
autoscaling. Must also set accelerator_type and accelerator
count if specified. A default value of 60 will be used if
accelerators are requested and this is not specified.
sync (bool):
Optional. Whether to execute this method synchronously. If
False, this method will be executed in a concurrent Future and
any downstream object will be immediately returned and synced
when the Future has completed.
create_request_timeout (float):
Optional. The create request timeout in seconds.
reservation_affinity_type (str):
Optional. The type of reservation affinity.
One of NO_RESERVATION, ANY_RESERVATION, SPECIFIC_RESERVATION,
SPECIFIC_THEN_ANY_RESERVATION, SPECIFIC_THEN_NO_RESERVATION
reservation_affinity_key (str):
Optional. Corresponds to the label key of a reservation resource.
To target a SPECIFIC_RESERVATION by name, use `compute.googleapis.com/reservation-name` as the key
and specify the name of your reservation as its value.
reservation_affinity_values (List[str]):
Optional. Corresponds to the label values of a reservation resource.
This must be the full resource name of the reservation.
Format: 'projects/{project_id_or_number}/zones/{zone}/reservations/{reservation_name}'
spot (bool):
Optional. Whether to schedule the deployment workload on spot VMs.
required_replica_count (int):
Optional. Number of required available replicas for the
deployment to succeed. This field is only needed when partial
model deployment/mutation is desired, with a value greater than
or equal to 1 and fewer than or equal to min_replica_count. If
set, the model deploy/mutate operation will succeed once
available_replica_count reaches required_replica_count, and the
rest of the replicas will be retried.
Returns:
DeploymentResourcePool
"""
api_client = cls._instantiate_client(location=location, credentials=credentials)
project = project or initializer.global_config.project
location = location or initializer.global_config.location
return cls._create(
api_client=api_client,
deployment_resource_pool_id=deployment_resource_pool_id,
project=project,
location=location,
metadata=metadata,
credentials=credentials,
machine_type=machine_type,
min_replica_count=min_replica_count,
max_replica_count=max_replica_count,
accelerator_type=accelerator_type,
accelerator_count=accelerator_count,
reservation_affinity_type=reservation_affinity_type,
reservation_affinity_key=reservation_affinity_key,
reservation_affinity_values=reservation_affinity_values,
autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization,
autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle,
spot=spot,
sync=sync,
create_request_timeout=create_request_timeout,
required_replica_count=required_replica_count,
)
@classmethod
@base.optional_sync()
def _create(
cls,
api_client: deployment_resource_pool_service_client.DeploymentResourcePoolServiceClient,
deployment_resource_pool_id: str,
project: Optional[str] = None,
location: Optional[str] = None,
metadata: Sequence[Tuple[str, str]] = (),
credentials: Optional[auth_credentials.Credentials] = None,
machine_type: Optional[str] = None,
min_replica_count: int = 1,
max_replica_count: int = 1,
accelerator_type: Optional[str] = None,
accelerator_count: Optional[int] = None,
reservation_affinity_type: Optional[str] = None,
reservation_affinity_key: Optional[str] = None,
reservation_affinity_values: Optional[List[str]] = None,
autoscaling_target_cpu_utilization: Optional[int] = None,
autoscaling_target_accelerator_duty_cycle: Optional[int] = None,
spot: bool = False,
sync=True,
create_request_timeout: Optional[float] = None,
required_replica_count: Optional[int] = 0,
) -> "DeploymentResourcePool":
"""Creates a new DeploymentResourcePool.
Args:
api_client (DeploymentResourcePoolServiceClient):
Required. DeploymentResourcePoolServiceClient used to make the
underlying CreateDeploymentResourcePool API call.
deployment_resource_pool_id (str):
Required. User-specified name for the new deployment resource
pool.
project (str):
Optional. Project containing the deployment resource pool to
retrieve. If not set, the project given to `aiplatform.init`
will be used.
location (str):
Optional. Location containing the deployment resource pool to
retrieve. If not set, the location given to `aiplatform.init`
will be used.
metadata (Sequence[Tuple[str, str]]):
Optional. Strings which should be sent along with the request as
metadata.
credentials: Optional[auth_credentials.Credentials]=None,
Optional. Custom credentials to use to retrieve the deployment
resource pool. If not set, the credentials given to
`aiplatform.init` will be used.
machine_type (str):
Optional. Machine type to use for the deployment resource pool.
If not set, the default machine type of `n1-standard-2` is
used.
min_replica_count (int):
Optional. The minimum replica count of the new deployment
resource pool. Each replica serves a copy of each model deployed
on the deployment resource pool. If this value is less than
`max_replica_count`, then autoscaling is enabled, and the actual
number of replicas will be adjusted to bring resource usage in
line with the autoscaling targets.
max_replica_count (int):
Optional. The maximum replica count of the new deployment
resource pool.
accelerator_type (str):
Optional. Hardware accelerator type. Must also set accelerator_
count if used. One of NVIDIA_TESLA_K80, NVIDIA_TESLA_P100,
NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4, or
NVIDIA_TESLA_A100.
accelerator_count (int):
Optional. The number of accelerators attached to each replica.
reservation_affinity_type (str):
Optional. The type of reservation affinity.
One of NO_RESERVATION, ANY_RESERVATION, SPECIFIC_RESERVATION,
SPECIFIC_THEN_ANY_RESERVATION, SPECIFIC_THEN_NO_RESERVATION
reservation_affinity_key (str):
Optional. Corresponds to the label key of a reservation resource.
To target a SPECIFIC_RESERVATION by name, use `compute.googleapis.com/reservation-name` as the key
and specify the name of your reservation as its value.
reservation_affinity_values (List[str]):
Optional. Corresponds to the label values of a reservation resource.
This must be the full resource name of the reservation.
Format: 'projects/{project_id_or_number}/zones/{zone}/reservations/{reservation_name}'
autoscaling_target_cpu_utilization (int):
Optional. Target CPU utilization value for autoscaling. A
default value of 60 will be used if not specified.
autoscaling_target_accelerator_duty_cycle (int):
Optional. Target accelerator duty cycle percentage to use for
autoscaling. Must also set accelerator_type and accelerator
count if specified. A default value of 60 will be used if
accelerators are requested and this is not specified.
spot (bool):
Optional. Whether to schedule the deployment workload on spot VMs.
sync (bool):
Optional. Whether to execute this method synchronously. If
False, this method will be executed in a concurrent Future and
any downstream object will be immediately returned and synced
when the Future has completed.
create_request_timeout (float):
Optional. The create request timeout in seconds.
required_replica_count (int):
Optional. Number of required available replicas for the
deployment to succeed. This field is only needed when partial
model deployment/mutation is desired, with a value greater than
or equal to 1 and fewer than or equal to min_replica_count. If
set, the model deploy/mutate operation will succeed once
available_replica_count reaches required_replica_count, and the
rest of the replicas will be retried.
Returns:
DeploymentResourcePool
"""
parent = initializer.global_config.common_location_path(
project=project, location=location
)
dedicated_resources = gca_machine_resources_compat.DedicatedResources(
min_replica_count=min_replica_count,
max_replica_count=max_replica_count,
spot=spot,
required_replica_count=required_replica_count,
)
machine_spec = gca_machine_resources_compat.MachineSpec(
machine_type=machine_type
)
if autoscaling_target_cpu_utilization:
autoscaling_metric_spec = (
gca_machine_resources_compat.AutoscalingMetricSpec(
metric_name=(
"aiplatform.googleapis.com/prediction/online/cpu/utilization"
),
target=autoscaling_target_cpu_utilization,
)
)
dedicated_resources.autoscaling_metric_specs.extend(
[autoscaling_metric_spec]
)
if accelerator_type and accelerator_count:
utils.validate_accelerator_type(accelerator_type)
machine_spec.accelerator_type = accelerator_type
machine_spec.accelerator_count = accelerator_count
if autoscaling_target_accelerator_duty_cycle:
autoscaling_metric_spec = gca_machine_resources_compat.AutoscalingMetricSpec(
metric_name="aiplatform.googleapis.com/prediction/online/accelerator/duty_cycle",
target=autoscaling_target_accelerator_duty_cycle,
)
dedicated_resources.autoscaling_metric_specs.extend(
[autoscaling_metric_spec]
)
if reservation_affinity_type:
machine_spec.reservation_affinity = utils.get_reservation_affinity(
reservation_affinity_type,
reservation_affinity_key,
reservation_affinity_values,
)
dedicated_resources.machine_spec = machine_spec
gapic_drp = gca_deployment_resource_pool_compat.DeploymentResourcePool(
dedicated_resources=dedicated_resources
)
operation_future = api_client.create_deployment_resource_pool(
parent=parent,
deployment_resource_pool=gapic_drp,
deployment_resource_pool_id=deployment_resource_pool_id,
metadata=metadata,
timeout=create_request_timeout,
)
_LOGGER.log_create_with_lro(cls, operation_future)
created_drp = operation_future.result()
_LOGGER.log_create_complete(cls, created_drp, "deployment resource pool")
return cls._construct_sdk_resource_from_gapic(
gapic_resource=created_drp,
project=project,
location=location,
credentials=credentials,
)
def query_deployed_models(
self,
project: Optional[str] = None,
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
) -> List[gca_deployed_model_ref_compat.DeployedModelRef]:
"""Lists the deployed models using this resource pool.
Args:
project (str):
Optional. Project to retrieve list from. If not set, project
set in aiplatform.init will be used.
location (str):
Optional. Location to retrieve list from. If not set, location
set in aiplatform.init will be used.
credentials (auth_credentials.Credentials):
Optional. Custom credentials to use to retrieve list. Overrides
credentials set in aiplatform.init.
Returns:
List of DeployedModelRef objects containing the endpoint ID and
deployed model ID of the deployed models using this resource pool.
"""
location = location or initializer.global_config.location
api_client = DeploymentResourcePool._instantiate_client(
location=location, credentials=credentials
)
response = api_client.query_deployed_models(
deployment_resource_pool=self.resource_name
)
return list(
itertools.chain(page.deployed_model_refs for page in response.pages)
)
@classmethod
def list(
cls,
filter: Optional[str] = None,
order_by: Optional[str] = None,
project: Optional[str] = None,
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
) -> List["models.DeploymentResourcePool"]:
"""Lists the deployment resource pools.
filter (str):
Optional. An expression for filtering the results of the request.
For field names both snake_case and camelCase are supported.
order_by (str):
Optional. A comma-separated list of fields to order by, sorted in
ascending order. Use "desc" after a field name for descending.
Supported fields: `display_name`, `create_time`, `update_time`
project (str):
Optional. Project to retrieve list from. If not set, project
set in aiplatform.init will be used.
location (str):
Optional. Location to retrieve list from. If not set, location
set in aiplatform.init will be used.
credentials (auth_credentials.Credentials):
Optional. Custom credentials to use to retrieve list. Overrides
credentials set in aiplatform.init.
Returns:
List of deployment resource pools.
"""
return cls._list(
filter=filter,
order_by=order_by,
project=project,
location=location,
credentials=credentials,
)
class Endpoint(base.VertexAiResourceNounWithFutureManager, base.PreviewMixin):
client_class = utils.EndpointClientWithOverride
_resource_noun = "endpoints"
_getter_method = "get_endpoint"
_list_method = "list_endpoints"
_delete_method = "delete_endpoint"
_parse_resource_name_method = "parse_endpoint_path"
_format_resource_name_method = "endpoint_path"
_preview_class = "google.cloud.aiplatform.aiplatform.preview.models.Endpoint"
@property
def preview(self):
"""Return an Endpoint instance with preview features enabled."""
from google.cloud.aiplatform.preview import models as preview_models
if not hasattr(self, "_preview_instance"):
self._preview_instance = preview_models.Endpoint(
self.resource_name, credentials=self.credentials
)
return self._preview_instance
def __init__(
self,
endpoint_name: str,
project: Optional[str] = None,
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
):
"""Retrieves an endpoint resource.
Args:
endpoint_name (str):
Required. A fully-qualified endpoint resource name or endpoint ID.
Example: "projects/123/locations/us-central1/endpoints/456" or
"456" when project and location are initialized or passed.
project (str):
Optional. Project to retrieve endpoint from. If not set, project
set in aiplatform.init will be used.
location (str):
Optional. Location to retrieve endpoint from. If not set, location
set in aiplatform.init will be used.
credentials (auth_credentials.Credentials):
Optional. Custom credentials to use to upload this model. Overrides
credentials set in aiplatform.init.
"""
super().__init__(
project=project,
location=location,
credentials=credentials,
resource_name=endpoint_name,
)
endpoint_name = utils.full_resource_name(
resource_name=endpoint_name,
resource_noun="endpoints",
parse_resource_name_method=self._parse_resource_name,
format_resource_name_method=self._format_resource_name,
project=project,
location=location,
)
# Lazy load the Endpoint gca_resource until needed
self._gca_resource = gca_endpoint_compat.Endpoint(name=endpoint_name)
self.authorized_session = None
@property
def _prediction_client(self) -> utils.PredictionClientWithOverride:
# The attribute might not exist due to issues in
# `VertexAiResourceNounWithFutureManager._sync_object_with_future_result`
# We should switch to @functools.cached_property once its available.
if not getattr(self, "_prediction_client_value", None):
self._prediction_client_value = initializer.global_config.create_client(
client_class=utils.PredictionClientWithOverride,
credentials=self.credentials,
location_override=self.location,
prediction_client=True,
)
return self._prediction_client_value
@property
def _prediction_async_client(self) -> utils.PredictionAsyncClientWithOverride:
# The attribute might not exist due to issues in
# `VertexAiResourceNounWithFutureManager._sync_object_with_future_result`
# We should switch to @functools.cached_property once its available.
if not getattr(self, "_prediction_async_client_value", None):
self._prediction_async_client_value = (
initializer.global_config.create_client(
client_class=utils.PredictionAsyncClientWithOverride,
credentials=self.credentials,
location_override=self.location,
prediction_client=True,
)
)
return self._prediction_async_client_value
def _skipped_getter_call(self) -> bool:
"""Check if GAPIC resource was populated by call to get/list API methods
Returns False if `_gca_resource` is None or fully populated. Returns True
if `_gca_resource` is partially populated
"""
return self._gca_resource and not self._gca_resource.create_time
def _sync_gca_resource_if_skipped(self) -> None:
"""Sync GAPIC service representation of Endpoint class resource only if
get_endpoint() was never called."""
if self._skipped_getter_call():
self._gca_resource = self._get_gca_resource(
resource_name=self._gca_resource.name
)
def _assert_gca_resource_is_available(self) -> None:
"""Ensures Endpoint getter was called at least once before
asserting on gca_resource's availability."""
super()._assert_gca_resource_is_available()
self._sync_gca_resource_if_skipped()
@property
def traffic_split(self) -> Dict[str, int]:
"""A map from a DeployedModel's ID to the percentage of this Endpoint's
traffic that should be forwarded to that DeployedModel.
If a DeployedModel's ID is not listed in this map, then it receives no traffic.
The traffic percentage values must add up to 100, or map must be empty if
the Endpoint is to not accept any traffic at a moment.
"""
self._sync_gca_resource()
return dict(self._gca_resource.traffic_split)
@property
def network(self) -> Optional[str]:
"""The full name of the Google Compute Engine
[network](https://cloud.google.com/vpc/docs/vpc#networks) to which this
Endpoint should be peered.
Takes the format `projects/{project}/global/networks/{network}`. Where
{project} is a project number, as in `12345`, and {network} is a network name.
Private services access must already be configured for the network. If left
unspecified, the Endpoint is not peered with any network.
"""
self._assert_gca_resource_is_available()
return getattr(self._gca_resource, "network", None)
@property
def private_service_connect_config(
self,
) -> Optional[gca_service_networking.PrivateServiceConnectConfig]:
"""The Private Service Connect configuration for this Endpoint."""
self._assert_gca_resource_is_available()
return getattr(self._gca_resource, "private_service_connect_config", None)
@property
def dedicated_endpoint_dns(self) -> Optional[str]:
"""The dedicated endpoint dns for this Endpoint.
This property is only available if dedicated endpoint is enabled.
If dedicated endpoint is not enabled, this property returns None.
"""
if re.match(r"^projects/.*/endpoints/.*$", self._gca_resource.name):
dedicated_endpoint_dns = getattr(
self._gca_resource, "dedicated_endpoint_dns", None
)
if self.dedicated_endpoint_enabled and not dedicated_endpoint_dns:
self._sync_gca_resource()
dedicated_endpoint_dns = getattr(
self._gca_resource, "dedicated_endpoint_dns", None
)
return dedicated_endpoint_dns
return None
@property
def dedicated_endpoint_enabled(self) -> bool:
"""The dedicated endpoint is enabled for this Endpoint.
This property will be true if dedicated endpoint is enabled.
"""
if re.match(r"^projects/.*/endpoints/.*$", self._gca_resource.name):
self._assert_gca_resource_is_available()
return getattr(self._gca_resource, "dedicated_endpoint_enabled", False)
return False
@classmethod
def create(
cls,
display_name: Optional[str] = None,
description: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = (),
project: Optional[str] = None,
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
encryption_spec_key_name: Optional[str] = None,
sync=True,
create_request_timeout: Optional[float] = None,
endpoint_id: Optional[str] = None,
enable_request_response_logging=False,
request_response_logging_sampling_rate: Optional[float] = None,
request_response_logging_bq_destination_table: Optional[str] = None,
dedicated_endpoint_enabled=False,
inference_timeout: Optional[int] = None,
) -> "Endpoint":
"""Creates a new endpoint.
Args:
display_name (str):
Optional. The user-defined name of the Endpoint.
The name can be up to 128 characters long and can be consist
of any UTF-8 characters.
description (str):
Optional. The description of the Endpoint.
labels (Dict[str, str]):
Optional. The labels with user-defined metadata to
organize your Endpoints.
Label keys and values can be no longer than 64
characters (Unicode codepoints), can only
contain lowercase letters, numeric characters,
underscores and dashes. International characters
are allowed.
See https://goo.gl/xmQnxf for more information
and examples of labels.
metadata (Sequence[Tuple[str, str]]):
Optional. Strings which should be sent along with the request as
metadata.
project (str):
Optional. Project to retrieve endpoint from. If not set, project
set in aiplatform.init will be used.
location (str):
Optional. Location to retrieve endpoint from. If not set, location
set in aiplatform.init will be used.
credentials (auth_credentials.Credentials):
Optional. Custom credentials to use to upload this model. Overrides
credentials set in aiplatform.init.
encryption_spec_key_name (str):
Optional. The Cloud KMS resource identifier of the customer
managed encryption key used to protect the model. Has the
form:
``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``.
The key needs to be in the same region as where the compute
resource is created.
If set, this Endpoint and all sub-resources of this Endpoint will be secured by this key.
Overrides encryption_spec_key_name set in aiplatform.init.
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
create_request_timeout (float):
Optional. The timeout for the create request in seconds.
endpoint_id (str):
Optional. The ID to use for endpoint, which will become
the final component of the endpoint resource name. If
not provided, Vertex AI will generate a value for this
ID.
This value should be 1-10 characters, and valid
characters are /[0-9]/. When using HTTP/JSON, this field
is populated based on a query string argument, such as
``?endpoint_id=12345``. This is the fallback for fields
that are not included in either the URI or the body.
enable_request_response_logging (bool):
Optional. Whether to enable request & response logging for this endpoint.
request_response_logging_sampling_rate (float):
Optional. The request response logging sampling rate. If not set, default is 0.0.
request_response_logging_bq_destination_table (str):
Optional. The request response logging bigquery destination. If not set, will create a table with name:
``bq://{project_id}.logging_{endpoint_display_name}_{endpoint_id}.request_response_logging``.
dedicated_endpoint_enabled (bool):
Optional. If enabled, a dedicated dns will be created and your
traffic will be fully isolated from other customers' traffic and
latency will be reduced.
inference_timeout (int):
Optional. It defines the prediction timeout, in seconds, for online predictions using cloud-based endpoints. This applies to either PSC endpoints, when private_service_connect_config is set, or dedicated endpoints, when dedicated_endpoint_enabled is true.
Returns:
endpoint (aiplatform.Endpoint):
Created endpoint.
"""
api_client = cls._instantiate_client(location=location, credentials=credentials)
if not display_name:
display_name = cls._generate_display_name()
utils.validate_display_name(display_name)
if labels:
utils.validate_labels(labels)
project = project or initializer.global_config.project
location = location or initializer.global_config.location
predict_request_response_logging_config = None
if enable_request_response_logging:
predict_request_response_logging_config = (
gca_endpoint_compat.PredictRequestResponseLoggingConfig(
enabled=True,
sampling_rate=request_response_logging_sampling_rate,
bigquery_destination=gca_io_compat.BigQueryDestination(
output_uri=request_response_logging_bq_destination_table
),
)
)
client_connection_config = None
if (
inference_timeout is not None
and inference_timeout > 0
and dedicated_endpoint_enabled
):
client_connection_config = gca_endpoint_compat.ClientConnectionConfig(
inference_timeout=duration_pb2.Duration(seconds=inference_timeout)
)
return cls._create(
api_client=api_client,
display_name=display_name,
project=project,
location=location,
description=description,
labels=labels,
metadata=metadata,
credentials=credentials,
encryption_spec=initializer.global_config.get_encryption_spec(
encryption_spec_key_name=encryption_spec_key_name
),
sync=sync,
create_request_timeout=create_request_timeout,
endpoint_id=endpoint_id,
predict_request_response_logging_config=predict_request_response_logging_config,
dedicated_endpoint_enabled=dedicated_endpoint_enabled,
client_connection_config=client_connection_config,
)
@classmethod
@base.optional_sync()
def _create(
cls,
api_client: endpoint_service_client.EndpointServiceClient,
display_name: str,
project: str,
location: str,
description: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = (),
credentials: Optional[auth_credentials.Credentials] = None,
encryption_spec: Optional[gca_encryption_spec.EncryptionSpec] = None,
network: Optional[str] = None,
sync=True,
create_request_timeout: Optional[float] = None,
endpoint_id: Optional[str] = None,
predict_request_response_logging_config: Optional[
gca_endpoint_compat.PredictRequestResponseLoggingConfig
] = None,
private_service_connect_config: Optional[
gca_service_networking.PrivateServiceConnectConfig
] = None,
dedicated_endpoint_enabled=False,
client_connection_config: Optional[
gca_endpoint_compat.ClientConnectionConfig
] = None,
) -> "Endpoint":
"""Creates a new endpoint by calling the API client.
Args:
api_client (EndpointServiceClient):