OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

Issue with aws glue job On prem oracle table to cloud

  • Thread starter Thread starter daturm girl
  • Start date Start date
D

daturm girl

Guest
I am facing a writer issue to an iceberg table using the aws glue job . My use case is to read an oracle table from onprem and write to iceberg table using aws glue. read from onprem table is good, but write is causing the error

I have added the job parameter as data lake formats as iceberg . The table already exists in the database with sufficient permission

Code:
2024-06-27 15:59:56,539 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(77)): Error from Python:Traceback (most recent call last):
  File "/tmp/test_iceberg.py", line 66, in <module>
    glueContext.write_data_frame.from_catalog(
  File "/opt/amazon/lib/python3.10/site-packages/awsglue/dataframewriter.py", line 20, in from_catalog
    return self._glue_context.write_data_frame_from_catalog(frame, db, table_name, redshift_tmp_dir,
  File "/opt/amazon/lib/python3.10/site-packages/awsglue/context.py", line 413, in write_data_frame_from_catalog
    return DataSink(j_sink, self).writeDataFrame(frame, self)
  File "/opt/amazon/lib/python3.10/site-packages/awsglue/data_sink.py", line 35, in writeDataFrame
    return DataFrame(self._jsink.pyWriteDataFrame(data_frame._jdf, glue_context._glue_scala_context, callsite(), info), self._sql_ctx)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 330, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o148.pyWriteDataFrame. Trace:
py4j.Py4JException: Method pyWriteDataFrame([class com.amazonaws.services.glue.DynamicFrame, class com.amazonaws.services.glue.GlueContext, class java.lang.String, class java.lang.String]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:750)

code

Code:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext ,DynamicFrame
from awsglue.job import Job
import os
import boto3
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import *
from pyspark import SparkConf 

## @params: [JOB_NAME]
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
aws_account_id = boto3.client('sts').get_caller_identity().get('Account')

catalog_name = "glue_catalog"
warehouse_path = f"s3://s3-bucket/d_extract/"


args = getResolvedOptions(sys.argv, ['JOB_NAME'])

conf = (
                SparkConf()
                .set("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY")  # test
                .set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "LEGACY")  # test
                .set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY")  # test
                .set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
                .set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
                .set("spark.hadoop.fs.s3a.server-side-encryption-algorithm", "AES256")
                .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
                .set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
                .set("spark.sql.catalog.glue_catalog.warehouse", warehouse_path)
                .set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
                .set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
                .set("spark.sql.catalog.glue_catalog.glue.lakeformation-enabled", "true")
                .set("spark.sql.catalog.glue_catalog.glue.id", aws_account_id)
            )
sc = SparkContext(conf=conf).getOrCreate()

glueContext = GlueContext(sc)
#spark = glueContext.spark_session
job = Job(glueContext)
logger = glueContext.get_logger()
job.init(args['JOB_NAME'], args)


connection_options = {
        "useConnectionProperties": "true",
        "dbtable": "db.table_nm",
        "connectionName": "db_conn",
        "hashfield": "OPEN_DT_ID" ,
         "hashpartitions": 7 ,
    }

glue_df = glueContext.create_dynamic_frame.from_options(
        connection_type="oracle",
        connection_options=connection_options,
        transformation_ctx="glue_df",
    )
    
glue_df.show(5)



glueContext.write_data_frame.from_catalog(
    frame=glue_df,
    database="d_extract",
    table_name="aws_table_name",
    transformation_ctx = "glue_df"
)
job.commit()
<p>I am facing a writer issue to an iceberg table using the aws glue job . My use case is to read an oracle table from onprem and write to iceberg table using aws glue. read from onprem table is good, but write is causing the error</p>
<p>I have added the job parameter as data lake formats as iceberg . The table already exists in the database with sufficient permission</p>
<pre><code>2024-06-27 15:59:56,539 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(77)): Error from Python:Traceback (most recent call last):
File "/tmp/test_iceberg.py", line 66, in <module>
glueContext.write_data_frame.from_catalog(
File "/opt/amazon/lib/python3.10/site-packages/awsglue/dataframewriter.py", line 20, in from_catalog
return self._glue_context.write_data_frame_from_catalog(frame, db, table_name, redshift_tmp_dir,
File "/opt/amazon/lib/python3.10/site-packages/awsglue/context.py", line 413, in write_data_frame_from_catalog
return DataSink(j_sink, self).writeDataFrame(frame, self)
File "/opt/amazon/lib/python3.10/site-packages/awsglue/data_sink.py", line 35, in writeDataFrame
return DataFrame(self._jsink.pyWriteDataFrame(data_frame._jdf, glue_context._glue_scala_context, callsite(), info), self._sql_ctx)
File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 330, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o148.pyWriteDataFrame. Trace:
py4j.Py4JException: Method pyWriteDataFrame([class com.amazonaws.services.glue.DynamicFrame, class com.amazonaws.services.glue.GlueContext, class java.lang.String, class java.lang.String]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)


</code></pre>
<p>code</p>
<pre><code>import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext ,DynamicFrame
from awsglue.job import Job
import os
import boto3
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import *
from pyspark import SparkConf

## @params: [JOB_NAME]
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
aws_account_id = boto3.client('sts').get_caller_identity().get('Account')

catalog_name = "glue_catalog"
warehouse_path = f"s3://s3-bucket/d_extract/"


args = getResolvedOptions(sys.argv, ['JOB_NAME'])

conf = (
SparkConf()
.set("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY") # test
.set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "LEGACY") # test
.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY") # test
.set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
.set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
.set("spark.hadoop.fs.s3a.server-side-encryption-algorithm", "AES256")
.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.glue_catalog.warehouse", warehouse_path)
.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.set("spark.sql.catalog.glue_catalog.glue.lakeformation-enabled", "true")
.set("spark.sql.catalog.glue_catalog.glue.id", aws_account_id)
)
sc = SparkContext(conf=conf).getOrCreate()

glueContext = GlueContext(sc)
#spark = glueContext.spark_session
job = Job(glueContext)
logger = glueContext.get_logger()
job.init(args['JOB_NAME'], args)


connection_options = {
"useConnectionProperties": "true",
"dbtable": "db.table_nm",
"connectionName": "db_conn",
"hashfield": "OPEN_DT_ID" ,
"hashpartitions": 7 ,
}

glue_df = glueContext.create_dynamic_frame.from_options(
connection_type="oracle",
connection_options=connection_options,
transformation_ctx="glue_df",
)

glue_df.show(5)



glueContext.write_data_frame.from_catalog(
frame=glue_df,
database="d_extract",
table_name="aws_table_name",
transformation_ctx = "glue_df"
)
job.commit()
</code></pre>
Continue reading...
 

Latest posts

I
Replies
0
Views
1
impact christian
I
Top