Skip to content Skip to sidebar Skip to footer

Aws Glue To Redshift: Duplicate Data?

Here are some bullet points in terms of how I have things setup: I have CSV files uploaded to S3 and a Glue crawler setup to create the table and schema. I have a Glue job setup th

Solution 1:

Currently Glue doesn't support bookmarking for JDBC sources.

You can implement upsert/merge into Redshift in Glue job using postactions option (code in Scala):

val fields = sourceDf.columns.mkString(",")

glueContext.getJDBCSink(
  catalogConnection = "RedshiftConnectionTest",
  options = JsonOptions(Map(
    "database"->"conndb",
    "dbtable"->"staging_schema.staging_table",
    "postactions"-> 
        s"""
           DELETE FROM dst_schema.dst_table USING staging_schema.staging_table AS S WHERE dst_table.id = S.id;
           INSERT INTO dst_schema.dst_table ($fields) SELECT $fields FROM staging_schema.staging_table;
           DROP TABLE IF EXISTS staging_schema.staging_table
        """
  )),
  redshiftTmpDir = tempDir,
  transformationContext = "redshift-output"
).writeDynamicFrame(DynamicFrame(sourceDf, glueContext))

If you just want to delete existing table then you can use preactions parameter instead:

glueContext.getJDBCSink(
  catalogConnection = "RedshiftConnectionTest",
  options = JsonOptions(Map(
    "database"->"conndb",
    "dbtable"->"dst_schema.dst_table",
    "preactions"->"DELETE FROM dst_schema.dst_table"
  )),
  redshiftTmpDir = tempDir,
  transformationContext = "redshift-output"
).writeDynamicFrame(DynamicFrame(sourceDf, glueContext))

Solution 2:

So long as you have a unique key on your tables, ideally a integer primary key. Then way that I tackle this is as follows:

  1. Implement a scheduling tool to allow running of jobs in order. I recommend Airflow.
  2. Initiate the Glue job to read from source and write to a staging table. (the staging table will only contain the output from that glue run, not necessarily all rows)
  3. Wait for that Glue job to finish (using scheduling tool)
  4. Initiate a SQL job running on Redshift that :

a) deletes the matching rows from the target table

delete from target
where id in (select id from staging);

b) Inserts the data from staging to target table

insertinto target select*from staging;

c) truncates the staging table

d) vacuum and analyze both tables

vacuum target to100 percent;
analyze target;
vacuum staging;

Solution 3:

You could use python module pg8000 in order to connect to Redfshift and execute SQL to delete (drop/truncate) the data from your Glue script. pg8000 is pure python so it works with Glue.

Check out this link: AWS Glue - Truncate destination postgres table prior to insert

I have tried it and it works fine. Hope this help you out,

Solution 4:

If you are looking to do a full load, you can use spark/Pyspark databricks library to do an overwrite of the table:

df.write\
  .format("com.databricks.spark.redshift")\
  .option("url", redshift_url)\
  .option("dbtable", redshift_table)\
  .option("user", user)\
  .option("password", readshift_password)\
  .option("aws_iam_role", redshift_copy_role)\
  .option("tempdir", args["TempDir"])\
  .mode("overwrite")\
  .save()

Per Databricks/Spark documentation:

Overwriting an existing table: By default, this library uses transactions to perform overwrites, which are implemented by deleting the destination table, creating a new empty table and appending rows to it.

You can take a look at databricks documentation in here

Solution 5:

Glue Jobs do support Bookmarking with JDBC sources. It all depends on the presence of a key (column) that is either 'increasing' or 'decreasing'.

https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

Post a Comment for "Aws Glue To Redshift: Duplicate Data?"