Skip to content

Setting KustoSourceOptions.KUSTO_READ_MODE as ForceSingleMode is throwing exception IOException in post request:Incorrect inner plaintext #410

@vikasparihar

Description

@vikasparihar

Describe the bug

Setting KustoSourceOptions.KUSTO_READ_MODE as ForceSingleMode is throwing exception IOException in post request:Incorrect inner plaintext

Setting KustoSourceOptions.KUSTO_READ_MODE as ForceDistributedMode is working as expected

To Reproduce

Add option(KustoSourceOptions.KUSTO_READ_MODE ,ReadMode.ForceSingleMode.toString()) to read the data
val resultDf = spark.read.format("com.microsoft.kusto.spark.datasource").
option(KustoSourceOptions.KUSTO_QUERY, "some query").
option(KustoSourceOptions.KUSTO_DATABASE, "databaseName").
option(KustoSourceOptions.KUSTO_CLUSTER, "clusterName").
option(KustoSinkOptions.KUSTO_ACCESS_TOKEN, "").
option(KustoSourceOptions.KUSTO_READ_MODE ,ReadMode.ForceSingleMode.toString()).
load()

Expected behavior

I expect the connector to honor the Kusto_Read_Mode and execute the query in single mode

Screenshots
[Driver]: User class threw exception: com.microsoft.azure.kusto.data.exceptions.DataServiceException: IOException in post request:Incorrect inner plaintext: no content type
com.microsoft.azure.kusto.data.exceptions.DataServiceException: IOException in post request:Incorrect inner plaintext: no content type
at com.microsoft.azure.kusto.data.http.HttpPostUtils.post(HttpPostUtils.java:77) ~[app.jar:?]
at com.microsoft.azure.kusto.data.ClientImpl.lambda$executeToJsonResult$1(ClientImpl.java:224) ~[app.jar:?]
at com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity.invoke(MonitoredActivity.java:33) ~[app.jar:?]
at com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity.invoke(MonitoredActivity.java:26) ~[app.jar:?]
at com.microsoft.azure.kusto.data.ClientImpl.executeToJsonResult(ClientImpl.java:223) ~[app.jar:?]
at com.microsoft.azure.kusto.data.ClientImpl.executeImpl(ClientImpl.java:173) ~[app.jar:?]
at com.microsoft.azure.kusto.data.ClientImpl.lambda$execute$0(ClientImpl.java:122) ~[app.jar:?]
at com.microsoft.azure.kusto.data.instrumentation.MonitoredActivity.invoke(MonitoredActivity.java:33) ~[app.jar:?]
at com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:121) ~[app.jar:?]
at com.microsoft.azure.kusto.data.ClientImpl.execute(ClientImpl.java:116) ~[app.jar:?]
at com.microsoft.kusto.spark.utils.ExtendedKustoClient.$anonfun$executeEngine$1(ExtendedKustoClient.scala:550) ~[app.jar:?]
at com.microsoft.kusto.spark.utils.KustoDataSourceUtils$$anon$2.apply(KustoDataSourceUtils.scala:509) ~[app.jar:?]
at kusto_connector_shaded.io.github.resilience4j.retry.Retry.lambda$decorateCheckedSupplier$3f69f149$1(Retry.java:137) ~[app.jar:?]
at kusto_connector_shaded.io.github.resilience4j.retry.Retry.executeCheckedSupplier(Retry.java:419) ~[app.jar:?]
at com.microsoft.kusto.spark.utils.KustoDataSourceUtils$.retryApplyFunction(KustoDataSourceUtils.scala:512) ~[app.jar:?]
at com.microsoft.kusto.spark.utils.ExtendedKustoClient.executeEngine(ExtendedKustoClient.scala:552) ~[app.jar:?]
at com.microsoft.kusto.spark.datasource.KustoReader$.singleBuildScan(KustoReader.scala:88) ~[app.jar:?]
at com.microsoft.kusto.spark.datasource.KustoRelation.$anonfun$buildScanImpl$1(KustoRelation.scala:129) ~[app.jar:?]
at scala.util.Try$.apply(Try.scala:213) ~[scala-library-2.12.15.jar:?]
at com.microsoft.kusto.spark.datasource.KustoRelation.buildScanImpl(KustoRelation.scala:118) ~[app.jar:?]
at com.microsoft.kusto.spark.datasource.KustoRelation.buildScan(KustoRelation.scala:100) ~[app.jar:?]
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:328) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:362) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:418) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:361) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:328) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63) ~[spark-catalyst_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) ~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) ~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) ~[spark-catalyst_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78) ~[spark-catalyst_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) ~[scala-library-2.12.15.jar:?]
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) ~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator.foreach(Iterator.scala:943) ~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator.foreach$(Iterator.scala:943) ~[scala-library-2.12.15.jar:?]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) ~[scala-library-2.12.15.jar:?]
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) ~[scala-library-2.12.15.jar:?]
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) ~[scala-library-2.12.15.jar:?]
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75) ~[spark-catalyst_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) ~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) ~[spark-catalyst_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78) ~[spark-catalyst_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) ~[scala-library-2.12.15.jar:?]
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) ~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator.foreach(Iterator.scala:943) ~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator.foreach$(Iterator.scala:943) ~[scala-library-2.12.15.jar:?]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) ~[scala-library-2.12.15.jar:?]
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) ~[scala-library-2.12.15.jar:?]
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) ~[scala-library-2.12.15.jar:?]
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75) ~[spark-catalyst_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) ~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) ~[spark-catalyst_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:70) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:511) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$4(QueryExecution.scala:174) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:120) ~[spark-catalyst_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:562) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:780) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:174) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:158) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.QueryExecution.assertSparkPlanned(QueryExecution.scala:178) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:185) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:182) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:238) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:294) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.QueryExecution.explainStringLocal(QueryExecution.scala:263) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:105) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:780) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3869) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at org.apache.spark.sql.Dataset.count(Dataset.scala:3173) ~[spark-sql_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]
at com.kustoquerypoc.Workflow$.main(Workflow.scala:74) ~[app.jar:?]
at com.kustoquerypoc.Workflow.main(Workflow.scala) ~[app.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_382]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_382]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_382]
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:739) ~[spark-yarn_2.12-3.3.1.5.1.5.6.jar:3.3.1.5.1.5.6]

Desktop (please complete the following information):

  • OS: [e.g. iOS]
  • Version [e.g. 22]

Additional context
I am running a spark batch job on HDI5.1 running spark 3.3.
I am using version 5.2.2 for artifact kusto-spark_3.0_2.12

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions