Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

<groupId>io.cdap.plugin</groupId>
<artifactId>snowflake-plugins</artifactId>
<version>1.1.5-SNAPSHOT</version>
<version>1.1.5</version>
<packaging>jar</packaging>
<name>Snowflake plugins</name>

Expand All @@ -47,7 +47,7 @@
<powermock.version>1.7.4</powermock.version>
<guice.version>4.0</guice.version>
<opencsv.version>2.4</opencsv.version>
<snowflake-jdbc.version>3.14.4</snowflake-jdbc.version>
<snowflake-jdbc.version>4.0.2</snowflake-jdbc.version>
</properties>

<repositories>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public boolean canConnect() {
&& !containsMacro(PROPERTY_PASSWORD) && !containsMacro(PROPERTY_WAREHOUSE)
&& !containsMacro(PROPERTY_ROLE) && !containsMacro(PROPERTY_CLIENT_ID)
&& !containsMacro(PROPERTY_CLIENT_SECRET) && !containsMacro(PROPERTY_REFRESH_TOKEN)
&& !containsMacro(PROPERTY_PRIVATE_KEY));
&& !containsMacro(PROPERTY_PRIVATE_KEY) && !containsMacro(PROPERTY_PASSPHRASE));
}

protected void validateConnection(FailureCollector collector) {
Expand All @@ -299,8 +299,8 @@ protected void validateConnection(FailureCollector collector) {
.withConfigProperty(PROPERTY_USERNAME);

// TODO: for oauth2
if (keyPairEnabled) {
failure.withConfigProperty(PROPERTY_PRIVATE_KEY);
if (Boolean.TRUE.equals(keyPairEnabled)) {
failure.withConfigProperty(PROPERTY_PRIVATE_KEY).withConfigProperty(PROPERTY_PASSPHRASE);
Comment thread
vikasrathee-cs marked this conversation as resolved.
} else {
failure.withConfigProperty(PROPERTY_PASSWORD);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.cdap.plugin.snowflake.common.OAuthUtil;
import io.cdap.plugin.snowflake.common.exception.ConnectionTimeoutException;
import io.cdap.plugin.snowflake.common.util.QueryUtil;
import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
import net.snowflake.client.internal.api.implementation.datasource.SnowflakeBasicDataSource;
Comment thread
vikasrathee-cs marked this conversation as resolved.
import org.apache.http.impl.client.HttpClients;

import java.io.BufferedWriter;
Expand Down Expand Up @@ -135,10 +135,10 @@ private void initDataSource(SnowflakeBasicDataSource dataSource, BaseSnowflakeCo

if (config.getOauth2Enabled()) {
String accessToken = OAuthUtil.getAccessTokenByRefreshToken(HttpClients.createDefault(), config);
dataSource.setOauthToken(accessToken);
// The recommend way to pass token is in the password when you use the driver with connection pool.
// This is also a mandatory field, so adding the same.
// Refer https://github.com/snowflakedb/snowflake-jdbc/issues/1175
// In JDBC 4.x, setOauthToken() was removed. The recommended approach is to explicitly
// set the authenticator to "oauth" and pass the access token as the password.
// Migration guide: https://docs.snowflake.com/en/developer-guide/jdbc/jdbc-migration
dataSource.setAuthenticator("oauth");
dataSource.setPassword(accessToken);
} else if (config.getKeyPairEnabled()) {
dataSource.setUser(config.getUsername());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package io.cdap.plugin.snowflake.sink.batch;

import io.cdap.plugin.snowflake.common.client.SnowflakeAccessor;
import net.snowflake.client.jdbc.SnowflakeConnection;
import net.snowflake.client.api.connection.SnowflakeConnection;
import net.snowflake.client.api.connection.UploadStreamConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
Expand Down Expand Up @@ -51,9 +52,10 @@ public void uploadStream(InputStream inputStream, String stageDir) throws IOExce
LOG.info("Uploading file '{}' to table stage", filename);

try (Connection connection = dataSource.getConnection()) {
UploadStreamConfig uploadConfig = UploadStreamConfig.builder().setDestPrefix(null).setCompressData(true).build();
connection.unwrap(SnowflakeConnection.class).uploadStream(stageDir,
null,
inputStream, filename, true);
filename,
inputStream, uploadConfig);
} catch (SQLException e) {
throw new IOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import io.cdap.plugin.snowflake.common.client.SnowflakeAccessor;
import io.cdap.plugin.snowflake.common.util.QueryUtil;
import io.cdap.plugin.snowflake.common.util.SchemaHelper;
import net.snowflake.client.jdbc.SnowflakeConnection;
import net.snowflake.client.api.connection.DownloadStreamConfig;
import net.snowflake.client.api.connection.SnowflakeConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
Expand Down Expand Up @@ -131,8 +132,9 @@ public void removeStageFile(String stageSplit) throws IOException {
*/
public CSVReader buildCsvReader(String stageSplit) throws IOException {
try (Connection connection = dataSource.getConnection()) {
DownloadStreamConfig downloadStreamConfig = DownloadStreamConfig.builder().setDecompress(true).build();
InputStream downloadStream = connection.unwrap(SnowflakeConnection.class)
.downloadStream("@~", stageSplit, true);
.downloadStream("@~", stageSplit, downloadStreamConfig);
InputStreamReader inputStreamReader = new InputStreamReader(downloadStream);
return new CSVReader(inputStreamReader, ',', '"', escapeChar);
} catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.cdap.plugin.snowflake.common.client.SnowflakeAccessorTest;
import io.cdap.plugin.snowflake.source.batch.SnowflakeBatchSourceConfig;
import io.cdap.plugin.snowflake.source.batch.SnowflakeBatchSourceConfigBuilder;
import net.snowflake.client.jdbc.SnowflakeBasicDataSource;
import net.snowflake.client.internal.api.implementation.datasource.SnowflakeBasicDataSource;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand Down
Loading