OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Save Dataframes as table using pytest

  • Thread starter Thread starter Ananya
  • Start date Start date
A

Ananya

Guest
I am new to pytest and I am trying to test a pyspark code which reads a parquet file,converts a double type to decimal and appends the data to the table. My test is case is failing whenI try to execute 'df.weite.mode.append('append').saveasTable(tablename"). Could someone please tell me what I am missing here?

pytest.py

Code:
@pytest.fixture(scope="module")
def sample_df(spark):
    data = [(1,2.0, 12,'Test')]
    schema = StructType(
        [
            StructField("foo1", IntegerType(), True),
            StructField("foo2", DoubleType(), True),
            StructField("foo3", IntegerType(), True),
            StructField("Name", StringType(), True)
        ]
    )
    sample_df = spark.createDataFrame(data, schema)
    sample_df.createOrReplaceTempView("dummy_table")
    return sample_df

@pytest.mark.parametrize('dbutils', [run_params], indirect=['dbutils'])
def test_loadDataIntoForecastTable( spark_mock, dbutils,sample_df):
    spark_mock.read.parquet.return_value = sample_df
    loadDataIntoForecastTable(spark_mock,"test","test","test")
    spark_mock.sql.assert_called()

main class:

Code:
def loadDataIntoForecastTable(spark,bar1,bar2, s3_file_name):
    df = spark.read.parquet(s3_file_name)

    for dtype in df.dtypes:
        if(dtype[1] == 'double'):
            column = dtype[0]
            df = df.withColumn(column,df[column].cast("decimal(38,9)"))


    #delete existing data from the table
    df.select("foo1").distinct().createOrReplaceTempView("tempTableDeleteExisting")

    delete_query = """DELETE 
                      FROM {} fha
                      WHERE ID in (SELECT foo1 from tempTableDeleteExisting)"""


    delete_sql =  delete_query.format(bar2)
    print("the delete sql is : ",delete_sql)
    spark.sql(delete_sql)

    #update table
    df.write.mode("append").option("mergeSchema", "true").saveAsTable(fact_horizon_finance_analytics)

This is the stacktrace from my pytest file:

test_loadDataIntoForecastTable[dbutils0] - py4j.protocol.Py4JJavaError: An error occurred while calling o159.saveAsTable.

could someone please help?
<p>I am new to pytest and I am trying to test a pyspark code which reads a parquet file,converts a double type to decimal and appends the data to the table. My test is case is failing whenI try to execute 'df.weite.mode.append('append').saveasTable(tablename").
Could someone please tell me what I am missing here?</p>
<p>pytest.py</p>
<pre><code>@pytest.fixture(scope="module")
def sample_df(spark):
data = [(1,2.0, 12,'Test')]
schema = StructType(
[
StructField("foo1", IntegerType(), True),
StructField("foo2", DoubleType(), True),
StructField("foo3", IntegerType(), True),
StructField("Name", StringType(), True)
]
)
sample_df = spark.createDataFrame(data, schema)
sample_df.createOrReplaceTempView("dummy_table")
return sample_df

@pytest.mark.parametrize('dbutils', [run_params], indirect=['dbutils'])
def test_loadDataIntoForecastTable( spark_mock, dbutils,sample_df):
spark_mock.read.parquet.return_value = sample_df
loadDataIntoForecastTable(spark_mock,"test","test","test")
spark_mock.sql.assert_called()
</code></pre>
<p>main class:</p>
<pre><code>def loadDataIntoForecastTable(spark,bar1,bar2, s3_file_name):
df = spark.read.parquet(s3_file_name)

for dtype in df.dtypes:
if(dtype[1] == 'double'):
column = dtype[0]
df = df.withColumn(column,df[column].cast("decimal(38,9)"))


#delete existing data from the table
df.select("foo1").distinct().createOrReplaceTempView("tempTableDeleteExisting")

delete_query = """DELETE
FROM {} fha
WHERE ID in (SELECT foo1 from tempTableDeleteExisting)"""


delete_sql = delete_query.format(bar2)
print("the delete sql is : ",delete_sql)
spark.sql(delete_sql)

#update table
df.write.mode("append").option("mergeSchema", "true").saveAsTable(fact_horizon_finance_analytics)
</code></pre>
<p>This is the stacktrace from my pytest file:</p>
<blockquote>
<p>test_loadDataIntoForecastTable[dbutils0] -
py4j.protocol.Py4JJavaError: An error occurred while calling
o159.saveAsTable.</p>
</blockquote>
<p>could someone please help?</p>
 

Latest posts

Top