|
27 | 27 | import org.apache.amoro.table.TableIdentifier; |
28 | 28 | import org.apache.amoro.table.TableMetaStore; |
29 | 29 | import org.apache.commons.lang3.StringUtils; |
| 30 | +import org.apache.hadoop.conf.Configuration; |
30 | 31 | import org.slf4j.Logger; |
31 | 32 | import org.slf4j.LoggerFactory; |
32 | 33 |
|
@@ -58,7 +59,8 @@ public static Set<TableFormat> tableFormats( |
58 | 59 | .collect(Collectors.toSet()); |
59 | 60 | } else { |
60 | 61 | // Generate table format from catalog type for compatibility with older versions |
61 | | - switch (metastoreType) { |
| 62 | + String normalizedType = normalizeCatalogType(metastoreType); |
| 63 | + switch (normalizedType) { |
62 | 64 | case CatalogMetaProperties.CATALOG_TYPE_AMS: |
63 | 65 | return Sets.newHashSet(TableFormat.MIXED_ICEBERG); |
64 | 66 | case CatalogMetaProperties.CATALOG_TYPE_CUSTOM: |
@@ -111,83 +113,91 @@ public static Map<String, String> mergeCatalogPropertiesToTable( |
111 | 113 | public static TableMetaStore buildMetaStore(CatalogMeta catalogMeta) { |
112 | 114 | // load storage configs |
113 | 115 | TableMetaStore.Builder builder = TableMetaStore.builder(); |
| 116 | + boolean isLocalStorage = false; |
114 | 117 | if (catalogMeta.getStorageConfigs() != null) { |
115 | 118 | Map<String, String> storageConfigs = catalogMeta.getStorageConfigs(); |
116 | | - if (CatalogMetaProperties.STORAGE_CONFIGS_VALUE_TYPE_HADOOP.equalsIgnoreCase( |
117 | | - CatalogUtil.getCompatibleStorageType(storageConfigs))) { |
| 119 | + String storageType = CatalogUtil.getCompatibleStorageType(storageConfigs); |
| 120 | + isLocalStorage = |
| 121 | + CatalogMetaProperties.STORAGE_CONFIGS_VALUE_TYPE_LOCAL.equalsIgnoreCase(storageType); |
| 122 | + if (CatalogMetaProperties.STORAGE_CONFIGS_VALUE_TYPE_HADOOP.equalsIgnoreCase(storageType)) { |
118 | 123 | String coreSite = storageConfigs.get(CatalogMetaProperties.STORAGE_CONFIGS_KEY_CORE_SITE); |
119 | 124 | String hdfsSite = storageConfigs.get(CatalogMetaProperties.STORAGE_CONFIGS_KEY_HDFS_SITE); |
120 | 125 | String hiveSite = storageConfigs.get(CatalogMetaProperties.STORAGE_CONFIGS_KEY_HIVE_SITE); |
121 | 126 | builder |
122 | 127 | .withBase64CoreSite(coreSite) |
123 | 128 | .withBase64MetaStoreSite(hiveSite) |
124 | 129 | .withBase64HdfsSite(hdfsSite); |
| 130 | + } else if (isLocalStorage) { |
| 131 | + builder.withConfiguration(new Configuration()); |
125 | 132 | } |
126 | 133 | } |
127 | 134 |
|
128 | | - boolean loadAuthFromAMS = |
129 | | - propertyAsBoolean( |
130 | | - catalogMeta.getCatalogProperties(), |
131 | | - CatalogMetaProperties.LOAD_AUTH_FROM_AMS, |
132 | | - CatalogMetaProperties.LOAD_AUTH_FROM_AMS_DEFAULT); |
133 | | - // load auth configs from ams |
134 | | - if (loadAuthFromAMS) { |
135 | | - if (catalogMeta.getAuthConfigs() != null) { |
136 | | - Map<String, String> authConfigs = catalogMeta.getAuthConfigs(); |
137 | | - String authType = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_TYPE); |
138 | | - LOG.info("TableMetaStore use auth config in catalog meta, authType is {}", authType); |
| 135 | + if (!isLocalStorage) { |
| 136 | + boolean loadAuthFromAMS = |
| 137 | + propertyAsBoolean( |
| 138 | + catalogMeta.getCatalogProperties(), |
| 139 | + CatalogMetaProperties.LOAD_AUTH_FROM_AMS, |
| 140 | + CatalogMetaProperties.LOAD_AUTH_FROM_AMS_DEFAULT); |
| 141 | + // load auth configs from ams |
| 142 | + if (loadAuthFromAMS) { |
| 143 | + if (catalogMeta.getAuthConfigs() != null) { |
| 144 | + Map<String, String> authConfigs = catalogMeta.getAuthConfigs(); |
| 145 | + String authType = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_TYPE); |
| 146 | + LOG.info("TableMetaStore use auth config in catalog meta, authType is {}", authType); |
| 147 | + if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_SIMPLE.equalsIgnoreCase(authType)) { |
| 148 | + String hadoopUsername = |
| 149 | + authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_HADOOP_USERNAME); |
| 150 | + builder.withSimpleAuth(hadoopUsername); |
| 151 | + } else if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_KERBEROS.equalsIgnoreCase( |
| 152 | + authType)) { |
| 153 | + String krb5 = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_KRB5); |
| 154 | + String keytab = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_KEYTAB); |
| 155 | + String principal = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_PRINCIPAL); |
| 156 | + builder.withBase64KrbAuth(keytab, krb5, principal); |
| 157 | + } else if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_AK_SK.equalsIgnoreCase( |
| 158 | + authType)) { |
| 159 | + String accessKey = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_ACCESS_KEY); |
| 160 | + String secretKey = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_SECRET_KEY); |
| 161 | + builder.withAkSkAuth(accessKey, secretKey); |
| 162 | + } |
| 163 | + } |
| 164 | + } |
| 165 | + |
| 166 | + // cover auth configs from ams with auth configs in properties |
| 167 | + String authType = |
| 168 | + catalogMeta.getCatalogProperties().get(CatalogMetaProperties.AUTH_CONFIGS_KEY_TYPE); |
| 169 | + if (StringUtils.isNotEmpty(authType)) { |
| 170 | + LOG.info("TableMetaStore use auth config in properties, authType is {}", authType); |
139 | 171 | if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_SIMPLE.equalsIgnoreCase(authType)) { |
140 | 172 | String hadoopUsername = |
141 | | - authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_HADOOP_USERNAME); |
| 173 | + catalogMeta |
| 174 | + .getCatalogProperties() |
| 175 | + .get(CatalogMetaProperties.AUTH_CONFIGS_KEY_HADOOP_USERNAME); |
142 | 176 | builder.withSimpleAuth(hadoopUsername); |
143 | 177 | } else if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_KERBEROS.equalsIgnoreCase( |
144 | 178 | authType)) { |
145 | | - String krb5 = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_KRB5); |
146 | | - String keytab = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_KEYTAB); |
147 | | - String principal = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_PRINCIPAL); |
| 179 | + String krb5 = |
| 180 | + catalogMeta.getCatalogProperties().get(CatalogMetaProperties.AUTH_CONFIGS_KEY_KRB5); |
| 181 | + String keytab = |
| 182 | + catalogMeta.getCatalogProperties().get(CatalogMetaProperties.AUTH_CONFIGS_KEY_KEYTAB); |
| 183 | + String principal = |
| 184 | + catalogMeta |
| 185 | + .getCatalogProperties() |
| 186 | + .get(CatalogMetaProperties.AUTH_CONFIGS_KEY_PRINCIPAL); |
148 | 187 | builder.withBase64KrbAuth(keytab, krb5, principal); |
149 | 188 | } else if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_AK_SK.equalsIgnoreCase(authType)) { |
150 | | - String accessKey = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_ACCESS_KEY); |
151 | | - String secretKey = authConfigs.get(CatalogMetaProperties.AUTH_CONFIGS_KEY_SECRET_KEY); |
| 189 | + String accessKey = |
| 190 | + catalogMeta |
| 191 | + .getCatalogProperties() |
| 192 | + .get(CatalogMetaProperties.AUTH_CONFIGS_KEY_ACCESS_KEY); |
| 193 | + String secretKey = |
| 194 | + catalogMeta |
| 195 | + .getCatalogProperties() |
| 196 | + .get(CatalogMetaProperties.AUTH_CONFIGS_KEY_SECRET_KEY); |
152 | 197 | builder.withAkSkAuth(accessKey, secretKey); |
153 | 198 | } |
154 | 199 | } |
155 | 200 | } |
156 | | - |
157 | | - // cover auth configs from ams with auth configs in properties |
158 | | - String authType = |
159 | | - catalogMeta.getCatalogProperties().get(CatalogMetaProperties.AUTH_CONFIGS_KEY_TYPE); |
160 | | - if (StringUtils.isNotEmpty(authType)) { |
161 | | - LOG.info("TableMetaStore use auth config in properties, authType is {}", authType); |
162 | | - if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_SIMPLE.equalsIgnoreCase(authType)) { |
163 | | - String hadoopUsername = |
164 | | - catalogMeta |
165 | | - .getCatalogProperties() |
166 | | - .get(CatalogMetaProperties.AUTH_CONFIGS_KEY_HADOOP_USERNAME); |
167 | | - builder.withSimpleAuth(hadoopUsername); |
168 | | - } else if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_KERBEROS.equalsIgnoreCase( |
169 | | - authType)) { |
170 | | - String krb5 = |
171 | | - catalogMeta.getCatalogProperties().get(CatalogMetaProperties.AUTH_CONFIGS_KEY_KRB5); |
172 | | - String keytab = |
173 | | - catalogMeta.getCatalogProperties().get(CatalogMetaProperties.AUTH_CONFIGS_KEY_KEYTAB); |
174 | | - String principal = |
175 | | - catalogMeta |
176 | | - .getCatalogProperties() |
177 | | - .get(CatalogMetaProperties.AUTH_CONFIGS_KEY_PRINCIPAL); |
178 | | - builder.withBase64KrbAuth(keytab, krb5, principal); |
179 | | - } else if (CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_AK_SK.equalsIgnoreCase(authType)) { |
180 | | - String accessKey = |
181 | | - catalogMeta |
182 | | - .getCatalogProperties() |
183 | | - .get(CatalogMetaProperties.AUTH_CONFIGS_KEY_ACCESS_KEY); |
184 | | - String secretKey = |
185 | | - catalogMeta |
186 | | - .getCatalogProperties() |
187 | | - .get(CatalogMetaProperties.AUTH_CONFIGS_KEY_SECRET_KEY); |
188 | | - builder.withAkSkAuth(accessKey, secretKey); |
189 | | - } |
190 | | - } |
191 | 201 | return builder.build(); |
192 | 202 | } |
193 | 203 |
|
@@ -246,4 +256,14 @@ private static boolean propertyAsBoolean( |
246 | 256 | } |
247 | 257 | return defaultValue; |
248 | 258 | } |
| 259 | + |
| 260 | + private static String normalizeCatalogType(String metastoreType) { |
| 261 | + if (metastoreType == null) { |
| 262 | + return null; |
| 263 | + } |
| 264 | + if ("filesystem".equalsIgnoreCase(metastoreType)) { |
| 265 | + return CatalogMetaProperties.CATALOG_TYPE_HADOOP; |
| 266 | + } |
| 267 | + return metastoreType; |
| 268 | + } |
249 | 269 | } |
0 commit comments