OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Why is my PySpark row_number column messed up when applying a schema?

  • Thread starter Thread starter stats_guy
  • Start date Start date
S

stats_guy

Guest
I want to apply a schema to specific non-technical columns of a Spark DataFrame. Beforehand, I add an artificial ID using Window and row_number so that I can later join some other technical columns to the new DataFrame from the initial DataFrame. However, after applying the schema, the generated ID is messed up. Below is a code sample. Can someone explain why this happens and how to resolve the issue?

Code:
from pyspark.sql.functions import row_number, lit, col, monotonically_increasing_id, sum
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Sample DataFrame
data = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
df = spark.createDataFrame(data, ["id", "name"])


# Schema to apply
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
])

# Create ID column
w = Window().orderBy(lit('A'))
df = df.withColumn('_special_surrogate_id', row_number().over(w))

# Improved method
surrogate_key_field = StructField("_special_surrogate_id", StringType(), False)
schema_with_surrogate = StructType(schema.fields + [surrogate_key_field])

# Loop because sometimes it works and sometimes it does't work
for i in range(11):
    
    df_filtered = df.select("id", "name", "_special_surrogate_id")   
    df_filtered = spark.createDataFrame(df_filtered.rdd, schema_with_surrogate)

    combined_df = df.withColumnRenamed("id", "id1").join(df_filtered.withColumnRenamed("id", "id2"), on="_special_surrogate_id")

    print("Diffs in Iteration " + str(i) + ":")
    print(combined_df.withColumn("diff", (col("id1") != col("id2")).cast("integer")).agg(sum("diff")).collect()[0][0])
<p>I want to apply a schema to specific non-technical columns of a Spark DataFrame. Beforehand, I add an artificial ID using <code>Window</code> and <code>row_number</code> so that I can later join some other technical columns to the new DataFrame from the initial DataFrame. However, after applying the schema, the generated ID is messed up. Below is a code sample. Can someone explain why this happens and how to resolve the issue?</p>
<pre><code>from pyspark.sql.functions import row_number, lit, col, monotonically_increasing_id, sum
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Sample DataFrame
data = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
df = spark.createDataFrame(data, ["id", "name"])


# Schema to apply
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), False),
])

# Create ID column
w = Window().orderBy(lit('A'))
df = df.withColumn('_special_surrogate_id', row_number().over(w))

# Improved method
surrogate_key_field = StructField("_special_surrogate_id", StringType(), False)
schema_with_surrogate = StructType(schema.fields + [surrogate_key_field])

# Loop because sometimes it works and sometimes it does't work
for i in range(11):

df_filtered = df.select("id", "name", "_special_surrogate_id")
df_filtered = spark.createDataFrame(df_filtered.rdd, schema_with_surrogate)

combined_df = df.withColumnRenamed("id", "id1").join(df_filtered.withColumnRenamed("id", "id2"), on="_special_surrogate_id")

print("Diffs in Iteration " + str(i) + ":")
print(combined_df.withColumn("diff", (col("id1") != col("id2")).cast("integer")).agg(sum("diff")).collect()[0][0])

</code></pre>
 
Top