diff --git a/pom.xml b/pom.xml index f030394..db93eb0 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ io.cdap.plugin snowflake-plugins - 1.1.5-SNAPSHOT + 1.1.5 jar Snowflake plugins @@ -47,7 +47,7 @@ 1.7.4 4.0 2.4 - 3.14.4 + 4.0.2 diff --git a/src/main/java/io/cdap/plugin/snowflake/common/BaseSnowflakeConfig.java b/src/main/java/io/cdap/plugin/snowflake/common/BaseSnowflakeConfig.java index 718a707..2ccb2c9 100644 --- a/src/main/java/io/cdap/plugin/snowflake/common/BaseSnowflakeConfig.java +++ b/src/main/java/io/cdap/plugin/snowflake/common/BaseSnowflakeConfig.java @@ -278,7 +278,7 @@ public boolean canConnect() { && !containsMacro(PROPERTY_PASSWORD) && !containsMacro(PROPERTY_WAREHOUSE) && !containsMacro(PROPERTY_ROLE) && !containsMacro(PROPERTY_CLIENT_ID) && !containsMacro(PROPERTY_CLIENT_SECRET) && !containsMacro(PROPERTY_REFRESH_TOKEN) - && !containsMacro(PROPERTY_PRIVATE_KEY)); + && !containsMacro(PROPERTY_PRIVATE_KEY) && !containsMacro(PROPERTY_PASSPHRASE)); } protected void validateConnection(FailureCollector collector) { @@ -299,8 +299,8 @@ protected void validateConnection(FailureCollector collector) { .withConfigProperty(PROPERTY_USERNAME); // TODO: for oauth2 - if (keyPairEnabled) { - failure.withConfigProperty(PROPERTY_PRIVATE_KEY); + if (Boolean.TRUE.equals(keyPairEnabled)) { + failure.withConfigProperty(PROPERTY_PRIVATE_KEY).withConfigProperty(PROPERTY_PASSPHRASE); } else { failure.withConfigProperty(PROPERTY_PASSWORD); } diff --git a/src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java b/src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java index a16207f..38aa738 100644 --- a/src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java +++ b/src/main/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessor.java @@ -23,7 +23,7 @@ import io.cdap.plugin.snowflake.common.OAuthUtil; import io.cdap.plugin.snowflake.common.exception.ConnectionTimeoutException; import io.cdap.plugin.snowflake.common.util.QueryUtil; -import net.snowflake.client.jdbc.SnowflakeBasicDataSource; +import net.snowflake.client.internal.api.implementation.datasource.SnowflakeBasicDataSource; import org.apache.http.impl.client.HttpClients; import java.io.BufferedWriter; @@ -135,10 +135,10 @@ private void initDataSource(SnowflakeBasicDataSource dataSource, BaseSnowflakeCo if (config.getOauth2Enabled()) { String accessToken = OAuthUtil.getAccessTokenByRefreshToken(HttpClients.createDefault(), config); - dataSource.setOauthToken(accessToken); - // The recommend way to pass token is in the password when you use the driver with connection pool. - // This is also a mandatory field, so adding the same. - // Refer https://github.com/snowflakedb/snowflake-jdbc/issues/1175 + // In JDBC 4.x, setOauthToken() was removed. The recommended approach is to explicitly + // set the authenticator to "oauth" and pass the access token as the password. + // Migration guide: https://docs.snowflake.com/en/developer-guide/jdbc/jdbc-migration + dataSource.setAuthenticator("oauth"); dataSource.setPassword(accessToken); } else if (config.getKeyPairEnabled()) { dataSource.setUser(config.getUsername()); diff --git a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeSinkAccessor.java b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeSinkAccessor.java index 0eba82d..7e18837 100644 --- a/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeSinkAccessor.java +++ b/src/main/java/io/cdap/plugin/snowflake/sink/batch/SnowflakeSinkAccessor.java @@ -17,7 +17,8 @@ package io.cdap.plugin.snowflake.sink.batch; import io.cdap.plugin.snowflake.common.client.SnowflakeAccessor; -import net.snowflake.client.jdbc.SnowflakeConnection; +import net.snowflake.client.api.connection.SnowflakeConnection; +import net.snowflake.client.api.connection.UploadStreamConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; @@ -51,9 +52,10 @@ public void uploadStream(InputStream inputStream, String stageDir) throws IOExce LOG.info("Uploading file '{}' to table stage", filename); try (Connection connection = dataSource.getConnection()) { + UploadStreamConfig uploadConfig = UploadStreamConfig.builder().setDestPrefix(null).setCompressData(true).build(); connection.unwrap(SnowflakeConnection.class).uploadStream(stageDir, - null, - inputStream, filename, true); + filename, + inputStream, uploadConfig); } catch (SQLException e) { throw new IOException(e); } diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java index 9ddf3d6..41fdcf1 100644 --- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java +++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java @@ -22,7 +22,8 @@ import io.cdap.plugin.snowflake.common.client.SnowflakeAccessor; import io.cdap.plugin.snowflake.common.util.QueryUtil; import io.cdap.plugin.snowflake.common.util.SchemaHelper; -import net.snowflake.client.jdbc.SnowflakeConnection; +import net.snowflake.client.api.connection.DownloadStreamConfig; +import net.snowflake.client.api.connection.SnowflakeConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; @@ -131,8 +132,9 @@ public void removeStageFile(String stageSplit) throws IOException { */ public CSVReader buildCsvReader(String stageSplit) throws IOException { try (Connection connection = dataSource.getConnection()) { + DownloadStreamConfig downloadStreamConfig = DownloadStreamConfig.builder().setDecompress(true).build(); InputStream downloadStream = connection.unwrap(SnowflakeConnection.class) - .downloadStream("@~", stageSplit, true); + .downloadStream("@~", stageSplit, downloadStreamConfig); InputStreamReader inputStreamReader = new InputStreamReader(downloadStream); return new CSVReader(inputStreamReader, ',', '"', escapeChar); } catch (SQLException e) { diff --git a/src/test/java/io/cdap/plugin/snowflake/common/BaseSnowflakeTest.java b/src/test/java/io/cdap/plugin/snowflake/common/BaseSnowflakeTest.java index 24b6588..b021cc8 100644 --- a/src/test/java/io/cdap/plugin/snowflake/common/BaseSnowflakeTest.java +++ b/src/test/java/io/cdap/plugin/snowflake/common/BaseSnowflakeTest.java @@ -22,7 +22,7 @@ import io.cdap.plugin.snowflake.common.client.SnowflakeAccessorTest; import io.cdap.plugin.snowflake.source.batch.SnowflakeBatchSourceConfig; import io.cdap.plugin.snowflake.source.batch.SnowflakeBatchSourceConfigBuilder; -import net.snowflake.client.jdbc.SnowflakeBasicDataSource; +import net.snowflake.client.internal.api.implementation.datasource.SnowflakeBasicDataSource; import org.junit.Assume; import org.junit.BeforeClass; import org.junit.ClassRule;