
There are many ways to orchestrate a data flow in the cloud. One such option is to have an independent process pull data from source systems and land the latest batch of data in an Azure Data Lake as a single file. The next layer where you process the data can be handled in many ways. The most independent way to do this is to have the processing layer fetch the latest file from the Data Lake on its own. This ensures the processing layer is not dependent on a previous tool or service giving the file path to it, increasing fault tolerance.
In Databricks, there is no built in function to get the latest file from a Data Lake. There are other libraries available that can provide such functions, but it is advisable to always use standardized libraries and code as far as possible.
Below are 2 functions that can work together to go to a directory in an Azure Data Lake and return the full file path of the last modified file. This file can then be processed as normal in Databricks.
#os is for Miscellaneous operating system interfaces
#datetime is used to manipulate datetime values
import os
from datetime import datetime
def get_dir_content(pPath):
"""
get_dir_content:
For a folder in the data lake, get the list of files it contains, including all subfolders.
Return Full File Name as well as Last Modified Date time as a generator object.
Output requires conversion into list for consumption.
"""
#This for loop will check all directories and files inside the provided path
#For each file it contains, return a 2-D array with the file name and the last modified date time
#The consuming code will need to convert the generater object this returns to a list to consume it
#The yield function is used to ensure the entire directory contents is scanned. If you used return it would stop after the first object encountered.
for dir_path in dbutils.fs.ls(pPath):
if dir_path.isFile():
#os.stat gets statistics on a path. st_mtime gets the most recent content modification date time
yield [dir_path.path, datetime.fromtimestamp(os.stat('/' + dir_path.path.replace(':','')).st_mtime)]
elif dir_path.isDir() and pPath != dir_path.path:
#if the path is a directory, call the function on it again to check its contents
yield from get_dir_content(dir_path.path)
def get_latest_modified_file_from_directory(pDirectory):
"""
get_latest_modified_file_from_directory:
For a given path to a directory in the data lake, return the file that was last modified.
Uses the get_dir_content function as well.
Input path format expectation: '/mnt/datalake_rawdata'
You can add sub directories as well, as long as you use a registered mount point
Performance: With 588 files, it returns in less than 10 seconds on the lowest cluster size.
"""
#Call get_dir_content to get a list of all files in this directory and the last modified date time of each
vDirectoryContentsList = list(get_dir_content(pDirectory))
#Convert the list returned from get_dir_content into a dataframe so we can manipulate the data easily. Provide it with column headings.
#You can alternatively sort the list by LastModifiedDateTime and get the top record as well.
df = spark.createDataFrame(vDirectoryContentsList,['FullFilePath', 'LastModifiedDateTime'])
#Get the latest modified date time scalar value
maxLatestModifiedDateTime = df.agg({"LastModifiedDateTime": "max"}).collect()[0][0]
#Filter the data frame to the record with the latest modified date time value retrieved
df_filtered = df.filter(df.LastModifiedDateTime == maxLatestModifiedDateTime)
#return the file name that was last modifed in the given directory
return df_filtered.first()['FullFilePath']
If you like what I do please consider supporting me on Ko-Fi
Nice post. I learn something more challenging on different blogs everyday. It will always be stimulating to read content from other writers and practice a little something from their store. I’d prefer to use some with the content on my blog whether you don’t mind. Natually I’ll give you a link on your web blog. Thanks for sharing.
Heya i’m for the first time here. I found this board and I in finding It truly helpful & it helped me out much. I hope to offer one thing back and help others like you helped me.
I would like to show my thanks to the writer for rescuing me from such a challenge. After looking out throughout the online world and meeting concepts which are not pleasant, I assumed my life was gone. Existing without the presence of solutions to the issues you have sorted out by means of your short article is a serious case, as well as ones that could have in a wrong way affected my career if I hadn’t noticed your web site. Your good knowledge and kindness in playing with all things was helpful. I am not sure what I would’ve done if I hadn’t come across such a solution like this. It’s possible to at this time look ahead to my future. Thanks so much for the reliable and amazing help. I will not think twice to refer your web sites to any person who needs to have counselling about this area.
Hello very cool site!! Man .. Excellent .. Amazing .. I’ll bookmark your website and take the feeds additionally?KI am satisfied to search out numerous helpful information right here within the post, we want work out more strategies in this regard, thank you for sharing. . . . . .