Blog

Azure Databricks – Ví dụ cụ thể về cơ chế xử lý linh động

AzureDatabricks

Azure Databricks là gì? Cơ chế xử lý của Azure Databricks linh động như thế nào?

INDA Academy sẽ giải thích cho các bạn trong bài viết dưới đây. Cùng tìm hiểu qua qua cơ chế mẫu cho 2 feature notebook:

  • Format phone number (định dạng số điện thoại)
  • Remove sensitive column (xóa các cột thông tin nhạy cảm).

Tổng quan về Azure Databricks

Azure Databricks là dịch vụ triển khai Databricks trên nền tảng Azure, cung cấp khả năng autoscale, tương tác với các thành viên khác dễ dàng thông qua workspace.

Azure Databricks hỗ trợ nhiều ngôn ngữ như Java, Python, Scala… Trong các dự án, chúng tôi thường sử dụng ScalaPython là ngôn ngữ chính.

Để có thêm nhiều thông tin, các bạn có thể tìm hiểu thêm trong các đường dẫn sau:

Quy trình xử lý linh động trong Databricks

Một cách để tổ chức nền tảng Databricks là tạo một notebook riêng cho từng luồng dữ liệu (data flow). Tuy nhiên, phương pháp này có một số hạn chế đáng kể:

  • Thiếu tính mở rộng: Khi cần bổ sung data flow mới, đội ngũ phát triển phải tạo notebook từ đầu cho từng luồng dữ liệu, dẫn đến tốn nhiều thời gian và công sức.
  • Khó khăn trong cập nhật: Việc cập nhật data flow trở nên phức tạp, đặc biệt khi không phải luồng nào cũng đáp ứng đầy đủ yêu cầu hệ thống.
  • Thiếu nhất quán giữa các developer: Mặc dù các tính năng được xây dựng tương tự, mỗi developer có thể thực hiện theo cách riêng, gây khó khăn khi cập nhật và bảo trì.

Do đó, để xử lý nhiều luồng dữ liệu một cách hiệu quả, hệ thống Databricks cần tính linh hoạt cao và có khả năng cập nhật thông qua cấu hình. Điều này giúp hệ thống dễ dàng thay đổi luồng xử lý khi có nhu cầu mới.

Giải thích cơ chế xử lý dữ liệu trong Azure Databricks

  • Input: Tạo một tập tin cấu hình riêng biệt cho mỗi luồng xử lý. Khi hệ thống cần xử lý dữ liệu từ một luồng, các bước thực hiện cụ thể sẽ được lấy từ tập tin cấu hình này, đảm bảo xử lý theo đúng yêu cầu của từng luồng.
  • Coordinate Notebook: Notebook này đóng vai trò điều phối, bao gồm việc tải xuống input file, kiểm tra tập tin cấu hình và gọi các feature notebook tương ứng để thực hiện từng bước xử lý đã được chỉ định.
  • Feature Notebooks: Đây là các notebook chứa các chức năng xử lý dữ liệu đa dạng, có thể mở rộng thêm tùy theo nhu cầu. Những notebook này giúp thực hiện các tác vụ cụ thể, đảm bảo tính linh hoạt và đáp ứng yêu cầu trong tương lai.

Ví dụ về cơ chế Azure Databricks

Ví dụ minh họa cho cơ chế trên bao gồm việc thiết lập hai feature notebook: format phone number (định dạng số điện thoại)remove sensitive column (xóa các cột dữ liệu nhạy cảm). Các thao tác này sẽ được triển khai thông qua Azure Databricks và Azure Storage Blob.

Đầu tiên, tôi tạo các common notebook để tải và ghi tập tin, tương tác với Azure Storage Blob.

Download file notebook (scala)

1.  val storageAccountName = "<your_storage_account_name>"  
2.  val storageAccountKey = "<your_storage_account_key>"  
3.  // Get information from parameters
4.  val containerName = dbutils.widgets.get("containerName")  
5.  val fileName = dbutils.widgets.get("fileName")  
6.  val outputFile = dbutils.widgets.get("outputFile")  
7.  val fileType = dbutils.widgets.get("fileType")  
8.    
9.    
10. // Configure connection string
11. // In reality, you should use Key Vault instead of using key directly
12. spark.conf.set("fs.azure.account.key." + storageAccountName + ".blob.core.windows.net", storageAccountKey)  
13.   
14. // Remove the file if it exists  
15. dbutils.fs.rm(outputFile, true)  
16. 
17. // Download csv file
18. if (fileType == "csv") {  
19.     val dataDf = spark.read  
20.     .option("header","true")  
21.     .option("inferSchema", "true")  
22.     .csv("wasbs://" + containerName + "@"+ storageAccountName+ ".blob.core.windows.net/" + fileName)        
23.     dataDf.write.parquet(outputFile)  
24. } else if (fileType == "json") {  
25. // Download json file
26.     val dataDf = spark.read.json("wasbs://" + containerName + "@"+ storageAccountName+ ".blob.core.windows.net/" + fileName)  
27.     dataDf.write.json(outputFile)  
28. } else {  
29. // Download text file
30.     val dataDf = spark.read.textFile("wasbs://" + containerName + "@"+ storageAccountName+ ".blob.core.windows.net/" + fileName)  
31.     
32.     val result = dataDf.collect().mkString("\n")  
33.     dbutils.notebook.exit(result)  
34. }

Write file notebook (python)

1.  val storageAccountName = "<your_storage_account_name>"  
2.  val storageAccountKey = "<your_storage_account_key>"  
3.  // Get information from parameters
4.  dataPath = dbutils.widgets.get("dataPath")  
5.  output_container_name = dbutils.widgets.get("output_container_name")  
6.  outputPath = dbutils.widgets.get("outputPath")  
7.    
8.  # Configure blob storage account access key globally  
9.  # In reality, you should use Key Vault instead of using key directly
10. spark.conf.set("fs.azure.account.key.%s.blob.core.windows.net" % storage_name, storage_key)  
11.   
12. df = spark.read.parquet(dataPath)  
13.   
14. output_container_path = "wasbs://%s@%s.blob.core.windows.net/" % (output_container_name, storage_name)  
15. output_blob_folder = "%stmpFolder" % output_container_path  
16.   
17. # write the dataframe as a single file to blob storage  
18. (df  
19.  .coalesce(1)  
20.  .write  
21.  .mode("overwrite")  
22.  .option("header", "true")  
23.  .format("com.databricks.spark.csv")  
24.  .save(output_blob_folder))  
25.   
26. files = dbutils.fs.ls(output_blob_folder)  
27. output_file = [x for x in files if x.name.startswith("part-")]  
28.   
29. dbutils.fs.mv(output_file[0].path, "%s" % (output_container_path + outputPath))  

Định dạng dữ liệu

Tiếp theo, tôi tạo notebook để chuyển đổi định dạng số điện thoại thành định dạng chuẩn, và xóa những số điện thoại không thể định dạng được.

1.  import scala.util.matching.Regex  
2.  import org.apache.spark.sql.functions._  
3.    
4.  // define function check  
5.  def isPhoneNumberFormat() = udf((data: String) => {  
6.    var pattern: Regex = "^[+]*[(]84[)]\\d{8,10}$".r  
7.    if (pattern.findFirstMatchIn(data).mkString.length() > 0) {  
8.      1 // correct pattern  
9.    } else {  
10.     val data2 = data.replaceAll("[+()-]|\\s", "") // remove  special character  
11.     if (data2.length() > 11 | data2.length() < 10) {  
12.       0 // not a format of phone number  
13.     } else {  
14.       100 // can be corrected  
15.     }  
16.   }  
17. })  
18. // define corrected function  
19. def correctPhoneNumber() = udf((data: String, check: Int) => {  
20.   if (check == 1 | check == 0) {  
21.     data.toString // no need to correct  
22.   } else {  
23.     if (check == 100) {  
24.       // replace first digit to +(84)  
25.       data.replaceAll("[+()-]|\\s", "").replaceFirst("[0-9]", "+(84)").toString  
26.     } else {  
27.       data  
28.     }  
29.   }  
30. })  
31. // read paramter  
32. val dataPath = dbutils.widgets.get("dataPath")  
33. val colName = dbutils.widgets.get("phoneCol")  
34. val keyCol = dbutils.widgets.get("keyCol")  
35. val tmpColumn = dbutils.widgets.get("tmpColumn")  
36.   
37. var df = spark.read.parquet(dataPath)  
38. 
39. // execute check, return data frame with 3 column: key Column, corrected column, check column  
40. df = df.withColumn(tmpColumn, isPhoneNumberFormat()(df(colName)))  
41. df = df.withColumn(colName, correctPhoneNumber()(df(colName), df(tmpColumn)))  
42. .select(keyCol,colName, tmpColumn)  
43. df.show()  
44. 
45. // write to temp file and result  
46. val outputPath = dataPath + System.currentTimeMillis().toString  
47. df.write.parquet(outputPath)  
48. dbutils.notebook.exit(outputPath)  

Tạo notebook để xóa các cột dữ liệu nhạy cảm

1.  import org.apache.spark.sql.functions._  
2.  
3.  // read paramter  
4.  val dataPath = dbutils.widgets.get("dataPath")  
5.  val keyCol = dbutils.widgets.get("keyCol")  
6.  val colNameStr = dbutils.widgets.get("column")  
7.    
8.  var df = spark.read.parquet(dataPath)  
9.  val colNameLst = colNameStr.split("//")  
10.   
11. df= df.select(keyCol)  
12. for( w <- 0 to colNameLst.length - 1)   
13.   {   
14.     df= df.withColumn(colNameLst(w), lit(1))  
15.   }  
16. df= df.drop(keyCol)  
17. 
18. // write to temp file and result  
19. val outputPath = dataPath + (current_timestamp()).expr.eval().toString  
20. df.write.parquet(outputPath)  
21. dbutils.notebook.exit(outputPath)

Tạo coordinate notebook

1.  import org.apache.spark.sql.functions._  
2.  import org.apache.spark.sql.types.{  
3.      StructType, StructField, StringType, IntegerType}  
4.  import org.apache.spark.sql.Row  
5.  import org.apache.spark.sql.Column  
6.  
7.  
8.  // Configure common feature notebook  
9.  val downloadFileNotebook = "<path_to_download_file_notebook>"  
10. val writeFileNotebook = "<path_to_write_file_notebook>"  
11. val contraintFeaturesFolder = "<path_to_feature_notebook_folder>"  
12. 
13. val tmpFolder = "/tmp/"  
14. 
15. // These values are hard coded here, but you can update to get it from parameters  
16. val dataFile = "user.csv"  
17. val dataFileContainer = "input"    
18. val validationFile = "user-configuration.json"  
19. val validationFileContainer = "configuration"  
20. val keyCol = "User_ID"  
21.   
22.   
23. val hashString = (current_timestamp()).expr.eval().toString  
24. // Download data file  
25. val outputFileData = tmpFolder + "dataTemp" + hashString  
26. dbutils.notebook.run(downloadFileNotebook, 60, Map("containerName" -> dataFileContainer, "fileName" -> dataFile, "outputFile" -> outputFileData, "fileType" -> "csv"))  
27.   
28. //download validation file  
29. val outputFileValidation = tmpFolder + "validation" + hashString  
30. dbutils.notebook.run(downloadFileNotebook, 60, Map("containerName" -> validationFileContainer, "fileName" -> validationFile, "outputFile" -> outputFileValidation, "fileType" -> "json"))  
31.   
32. // Read validation file  
33. val constraintDF = spark.read.json(outputFileValidation)  
34.   
35. // Read data file  
36. var dataDf = spark.read.parquet(outputFileData)  
37. val dataOrginDf = spark.read.parquet(outputFileData)  
38. 
39. //Read constraint check  
40. val conDf = constraintDF.select($"action", $"param.*")  
41.   
42. val tmpColumn = "tmpCheck"  
43.   
44. //create empty failzone dataframe  
45. val schema = StructType(  
46.     StructField(keyCol, StringType, false) :: Nil)  
47.   
48. // var failDF = spark.createDataFrame(sc.emptyRDD[Row], schema)  
49. var constOutputLst = Array[String]()  
50. // execute constraint check  
51. conDf.collect().foreach(row => {  
52.   var paramMap = scala.collection.mutable.Map[String, String]()  
53.   paramMap("dataPath") = outputFileData  
54.   paramMap("keyCol") = keyCol  
55.   paramMap("tmpColumn") = tmpColumn  
56.   // create param map  
57.   for( w <- 1 to conDf.columns.size - 1)   
58.     {   
59.       if (row(w) != null) {  
60.         paramMap(conDf.columns(w)) = row(w).toString  
61.       }  
62.     }  
63.   // execute Check  
64.   val outputPath = dbutils.notebook.run(contraintFeaturesFolder + row(0).toString, 60, paramMap)   
65.   // add output file path to list for process later   
66.   constOutputLst :+= outputPath  
67. })  
68. 
69. var failDF = spark.createDataFrame(sc.emptyRDD[Row], schema)  
70.   
71. //Process constraint output file  
72. constOutputLst.foreach (file => {  
73.   val outDF = spark.read.parquet(file)  
74.   // check to remove column, if returned dataframe not have key Column => remove column  
75.   if (outDF.columns.filter(_ == keyCol).length > 0) {  
76.     // get all column but keycol and tempCheck column  
77.     outDF.columns.filter(_ != keyCol).filter(_ != tmpColumn).foreach(colName => {  
78.       // update or add new column from output file  
79.       val tmp = outDF.select(keyCol, colName)  
80.       dataDf = dataDf.drop(colName).join(tmp, dataDf(keyCol) === tmp(keyCol), "left").drop(tmp(keyCol))  
81.     })  
82.     // add fail data to fail df  
83.     failDF = failDF.union(outDF.filter(col(tmpColumn) === 0).select(keyCol))  
84.   } else {  
85.     // remove column  
86.     outDF.columns.filter(_ != tmpColumn).foreach(colName => {  
87.       dataDf = dataDf.drop(colName)  
88.     })  
89.   }  
90.     
91. })  
92. 
93. //save to correctZone  
94. val correctZoneDF = dataDf.join(failDF, dataDf(keyCol) === failDF(keyCol), "left_anti")  
95. if (correctZoneDF.collect().size > 0) {  
96.   val dataPath = outputFileData + "output" + System.currentTimeMillis().toString  
97.   val output_container_name = "output"  
98.   val outputPath = dataFile  
99.   correctZoneDF.write.parquet(dataPath)  
100.      dbutils.notebook.run(writeFileNotebook, 60, Map("dataPath" -> dataPath, "output_container_name" -> output_container_name, "outputPath" -> outputPath))  
101.    }  
102.    
103.    // Remove temp file if it exists  
104.    dbutils.fs.rm(outputFileValidation, true)  
105.    dbutils.fs.rm(outputFileData, true)  
106.    
107.    // return result check  
108.    dbutils.notebook.exit("1") 

Kiểm thử luồng xử lí

Cuối cùng sẽ là bước kiểm thử luồng xử lí bằng cách upload data fileconfiguration file vào Azure Blob Storage.

Data file sample:

User_ID,Phone_No,User_Name,Password

24306,303-555-0011,achigeol,8[gxXvQDt9sTQX

65824,225-556-1923,hermathe,Q7#CDYrr?hdxnth6

14506,219-557-3874,stashero,Vq8#upVE7qj9_M+n

71463,215-558-9821,inghthlo,WXzshf8rU^ts8CUN

36808,262-559212-212,adeldona,8@6RvrbJzNg%Dws5

69170,319-660-9832,wdyalbow,r3^T8++f9MhVJe5h

17255,229-661-2134,introsgo,eXH8ENa8J!cd^P4

56940,216-662-8732,burienti,BSZC_vxPgTm^q4J%

52720,210-663-8724,itereart,A$F3Rtnc4b%Rtk

Configuration file sample:

[{“action”:”remove-column”,”param”:{“column”:”Password”}},{“action”:”format-phone-number”,”param”:{“phoneCol”:”Phone_No”}}]

Quá trình kiểm thử như sau:

  • Upload data file với tên “user.csv” vào “input” container
  • Upload configuration file với tên “user-configuration.json” vào “configuration” file
  • Tạo “output” container

Những tên này được hard code trong coordinate notebook để phục vụ cho việc demo khi sử dụng Azure Databricks. Trong thực tế, ta sẽ lấy những giá trị này từ parameter hoặc configuration file.

Sau khi đã chuẩn bị các tập tin, ta sẽ chạy coordinate notebook. Output file với tên “user.csv” sẽ được xuất ra trong “output” container. Nội dung kì vọng:

User_ID,User_Name,Phone_No

65824,hermathe,+(84)255561923

24306,achigeol,+(84)035550011

56940,burienti,+(84)166628732

71463,inghthlo,+(84)155589821

17255,introsgo,+(84)296612134

14506,stashero,+(84)195573874

69170,wdyalbow,+(84)196609832

52720,itereart,+(84)106638724

Cột Phone number được chuyển thành định dạng chuẩn, và cột Password được xóa khỏi dữ liệu.

Những mẹo sử dụng Azure Databricks

Cập nhật và tái sử dụng (Configurable and Reusable)

Cơ chế cấu hình trên cho phép các feature có thể được tạo mới hoặc cập nhật một cách độc lập. Điều này mang lại lợi ích lớn vì từng feature notebook có thể phát triển riêng biệt mà không ảnh hưởng đến toàn hệ thống, đồng thời có thể tái sử dụng cho nhiều luồng xử lý khác nhau, tối ưu hóa hiệu suất và tiết kiệm thời gian.

Lợi ích chính của cơ chế này:

  • Mỗi luồng xử lý mới chỉ cần một tệp cấu hình duy nhất. Doanh nghiệp có thể dễ dàng tạo mới hoặc cập nhật các flow này thông qua giao diện thân thiện (UI/UX).
  • Hệ thống linh hoạt, có thể thêm tính năng xử lý mới mà không ảnh hưởng đến các thành phần khác, tiết kiệm chi phí so với việc xây dựng một flow xử lý hoàn toàn mới.
  • Các tính năng có thể được tái sử dụng trong nhiều processing flow, giúp tiết kiệm thời gian phát triển và tăng tính linh hoạt.

Trigger Databricks bằng Azure Logic App

Tích hợp Azure Databricks với Azure Logic App

Để tích hợp Azure Databricks với các hệ thống khác, bạn có thể sử dụng Azure Logic App để kích hoạt Databricks khi có dữ liệu mới tải lên blob container. Các bước thực hiện như sau:

  1. Tạo Databricks Job: Để publish coordinate notebook, bạn có thể trigger job này để run notebook bằng Azure Databricks REST API.
  2. Tạo Logic App: Cấu hình Logic App để gọi API của Azure Databricks, trigger notebook khi có tập tin mới được upload vào blob container.
  3. Cấu hình blob container: Thiết lập blob container để gọi Logic App khi tập tin được tải lên, tự động hóa quy trình xử lý dữ liệu.

Trong trường hợp cần tích hợp với các hệ thống khác, bạn có thể dùng Azure Logic App để trigger Databricks khi một tập tin dữ liệu được upload vào blob container. Các bước như bên dưới:

  • Tạo Databricks Job để publish coordinate notebook, bạn có thể trigger job này để run notebook bằng Azure Databricks REST API
  • Tạo Logic App để gọi Azure Databricks REST API nhằm trigger notebook. Logic App này có thể được trigger khi tập tin dữ liệu upload vào blob container
  • Cấu hình blob container để gọi Logic App khi tập tin dữ liệu được upload

Cải tiến hiệu suất trong Azure Databricks

Theo kinh nghiệm của tôi khi sử dụng Azure Databricks, việc chạy một notebook từ một notebook khác có thể gây chậm trễ do thời gian khởi động của các notebook. Để cải thiện performance, bạn có thể tham khảo các mẹo sau:

  • Tối ưu hóa theo nghiệp vụ: Thay vì dùng notebook riêng rẽ theo feature, bạn có thể tách notebook theo nghiệp vụ (gồm nhiều feature), hoặc mỗi luồng xử lí có một notebook riêng. Tất cả các feature cần thiết đều được hiện thực trong cùng notebook. Cơ chế này có thể làm giảm bớt tính tái sử dụng, nhưng có thể cải thiện performance.
  • Xây dựng thư viện feature: Build feature thành các thư viện và import vào coordinate notebook để gọi mà không cần phải thông qua các notebook khác. Mặc dù mỗi lần cập nhật tính năng sẽ cần phải cập nhật lại các thư viện, nhưng nếu nghiệp vụ không yêu cầu update thường xuyên thì cơ chế này phù hợp để cải thiện performance.

Kết lại, dù việc ứng dụng vào thực tiễn có có rất nhiều hạn chế và khó khăn, nhưng tôi hy vọng bạn sẽ tìm thấy vài điểm hữu ích giúp bạn hiện thực được luồng xử lí dữ liệu bằng Azure Databricks.

Nguồn: Internet

>>Tìm hiểu thêm các khóa học tại đây!

Leave a Reply

Your email address will not be published. Required fields are marked *