How to Optimize Data Insertion Into Synapse

Written by anatolii | Published 2023/06/05
Tech Story Tags: synapse | sql | python | azure | azure-blob | pyodbc | programming | coding

TLDRLearn how to optimize data insertion into Synapse with the help of Azure Blob Service and PyODBC. Discover efficient techniques and best practices to seamlessly load and integrate large volumes of data into Synapse for faster analytics.via the TL;DR App

This one will be helpful for anyone who works with Azure Synapse Analytics.

Synapse combines data warehousing, big data processing, data integration, and advanced analytics into a single, integrated solution. It offers scalable storage and computes resources, allowing you to handle large volumes of data and perform complex analytics tasks efficiently. And most important in my case it offers SQL-based querying capabilities. And here the story begins.

To work with the data I would like to add some first. To make it automatically of course I have to use a program language like Python in this example. I’m looking at the Synapse docs and wondering which DB connector/driver I could use. I found the “ODBC Driver 17 for SQL Server”, with the help of pyodbc.

...
import pyodbc
...

def insert() -> None:

  conn = pyodbc.connect(f"Driver={{{'ODBC Driver 17 for SQL Server'}}};Server=tcp:{'host'},{'port'};Database={'database'};Uid={'username'};Pwd={'password'};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;")
  ...

It seems that the classic SQL INSERT INTO statement should work to add data to whatever table I would like to. And it actually does insert the data.

f"INSERT INTO [{schema_name}].[{table_name}] 
   ({', '.join([f'[{column_name}]' for column_name in column_names])}) 
   VALUES ({', '.join(['?'] * len(column_names))});"

Yeah, it is basic INSERT, it just inserts data, if there are a lot of rows this one becomes a very slow script to add the data. First, I found autocommit property that should be set to true, so it will automatically start and close the transaction.

conn.autocommit = True

With it or without, when I manually commit a transaction split script, there are no significant differences. The previous change didn’t make an effect on slow insertion. Realizing the need for a more effective approach, I began exploring alternative solutions and eventually discovered the executemany function of pyodbc. This newfound function opened up a world of possibilities by enabling the execution of multiple SQL, I had thought.

cursor.executemany(sql_statement, params)

The executemany function in pyodbc provided some improvement, it fell short of meeting my expectations in terms of speed. Despite its ability to execute multiple SQL statements in a single call, the overall performance remained slower than desired. This limitation prompted me to continue exploring alternative methods and optimizations to enhance further the speed and efficiency of my data processing workflows.

cursor.fast_executemany = True

fast_executemany as described on some websites it should boost the script to the cosmos. But not for this case. It did nothing to execute INSERT fast.

Eventually, after carefully reading the documentation about Azure SQL Server I realized that there is a special note about Synapse Analytics. And it said that Synapse does not support such statements at all.

That means the INSERT statement supports only one row per statement. E.g.:

# IS NOT SUPPORTED
INSERT INTO table_name (column_list) 
  VALUES (value_list_1), (value_list_2), ... (value_list_n);

But a single insert is executable:

# SUPPORTED
INSERT INTO table_name (column_list) VALUES (value_1);

It was a surprise to me cause I used executemany and it was working. And the actual reason to this is pyodbc and its driver which converts script to executable. In other words, one INSERT translated to MANY INSERT:

INSERT INTO table_name (column_list) VALUES (value_1);
INSERT INTO table_name (column_list) VALUES (value_2);
...
INSERT INTO table_name (column_list) VALUES (value_n);

Okay, then I found a possibility to use COPY statement for fast INSERT. For this approach, I have to use the file as a proxy for data.

 if not os.path.exists(local_path):
    os.mkdir(local_path)

 # Create a file in the local data directory to upload and download(use it after)
 local_file_name = str(uuid.uuid4()) + ".csv"
 upload_file_path = os.path.join(local_path, local_file_name)

 # Write text to the file
 with open(upload_file_path,'w') as out:
   csv_out=csv.writer(out)
   csv_out.writerows(values)

It’s not enough just to use the file on the machine where the code executes. The final script will be executed on SQL Synapse Server, the file must be available for reading. Here Azure Blob storage will be as another proxy to temporarily and privately store data files for Synapse script.

try:
  # Create the BlobServiceClient object
  blob_service_client = BlobServiceClient(account_url, credential=storage_secret)
  logger.info(f"Uploading file to blob: {local_file_name} storage in schema: {destination_schema_name} table: {table_name}")
  logger.info(f"Account URL: {account_url}, Container: {storage_container}, File: {local_file_name}")
  # Using Container Client to upload blob
  container_client = blob_service_client.get_container_client(storage_container)
  with open(file=upload_file_path, mode="rb") as csv_data:
    container_client.upload_blob(local_file_name, csv_data, content_settings=ContentSettings(content_type='text/csv'))
except Exception as ex:
  logger.error(f"Error: {ex} in schema: {destination_schema_name} table: {table_name}")
  raise
finally:
  if(os.path.exists(upload_file_path) and os.path.isfile(upload_file_path)):
    os.remove(upload_file_path)

Finally, write COPY INTO statement, execute and commit the transaction.

file_url = f"https://{storage_identity}.blob.core.windows.net/{storage_container}/{local_file_name}"

logger.info(f"Inserting data for schema: {destination_schema_name} table: {table_name}, rows count: {len(values)}")
sql = (f"COPY INTO [{destination_schema_name}].[{table_name}] ({', '.join([f'[{c}]' for c in columns])})"
  f"FROM '{file_url}' "
  f"WITH (CREDENTIAL=(IDENTITY='Storage Account Key', SECRET='{storage_secret}'))")
cursor.execute(sql)
conn.commit()

Voila, and fast insertion works. It executes much, much faster. It works fast as I assumed to be with classic INSERT INTO table_name (column_list) VALUES (value_list_1), (value_list_2), ... (value_list_n);.

The gem is eventually found. After working with files and blob storage I have to keep in mind to clear everything.

try:
  # Create the BlobServiceClient object
  blob_service_client = BlobServiceClient(account_url, credential=storage_secret)
  logger.info(f"Removing blob: {local_file_name} from storage in schema: {destination_schema_name} table: {table_name}")
  Using Container Client to delete blob
  container_client = blob_service_client.get_container_client(storage_container)
  container_client.delete_blobs(local_file_name)
  logger.info(f"Blob: {local_file_name} removed from storage in schema: {destination_schema_name} table: {table_name}")
except Exception as ex:
  logger.error(f"Error: {ex} in schema: {destination_schema_name} table: {table_name}")
  raise

Everything works, enjoy!


Written by anatolii | Developing and enjoying life. Life is one, implement yours. All the best in your endeavors.
Published by HackerNoon on 2023/06/05