Discover how strategic non-fiction reading can transform your discipline, habits, and mindset. Learn how to leverage the Serenity app to track reading goals and implement life-changing insights from books.
Kickstart Your Mind: How Non-Fiction Reading Transforms Discipline and Self-Growth
Have you ever finished a powerful non-fiction book and felt that distinctive mental shift, like your brain has been rewired with new possibilities? That's not just a feeling; it's the transformative power of strategic reading at work.
In our journey toward self-improvement, few tools are as consistently powerful as non-fiction reading. But not all reading approaches are created equal. Today, I'll share how to turn reading from a passive activity into a catalyst for genuine life transformation, and how the Serenity app can be your companion through this journey.
The Science Behind How Reading Reshapes Your Brain
When you engage with non-fiction consistently, you're not just acquiring information, you're literally changing your brain. Neuroplasticity research shows that deep reading creates new neural pathways, enhancing critical thinking and emotional intelligence.
But the key difference between casual readers and those who experience transformation lies in implementation.
"Knowledge isn't power until it's applied." – Dale Carnegie
This is where most people struggle. They read inspiring books but fail to integrate the insights into their daily routines. The Serenity app addresses this gap by helping you turn reading insights into actionable steps that compound over time.
Creating Your Personal Development Reading Strategy
Rather than reading randomly, consider these core areas that create a foundation for transformation:
1. Physiological Understanding
Why it matters: Your body is the vehicle for all achievement. Understanding its mechanics gives you leverage for everything else.
Essential reads:
- "Why We Sleep" by Matthew Walker
- "Breath" by James Nestor
- "The Hungry Brain" by Stephan Guyenet
How Serenity helps: After reading these foundational books, use Serenity's AI interface to create personalized sleep optimization protocols or breathing practice schedules. Tell the app, "I want to implement the sleep hygiene practices from 'Why We Sleep,'" and it will generate a customized checklist pinned to your dashboard.
2. Willpower & Motivation Engineering
Why it matters: Understanding the psychological mechanics of motivation transforms how you approach challenges.
Essential reads:
- "Atomic Habits" by James Clear
- "Drive" by Daniel Pink
- "The Willpower Instinct" by Kelly McGonigal
Serenity implementation: Share your key takeaways from these books with Serenity, and the AI will help you develop a progressive willpower-building program tailored to your specific goals. The app tracks your consistency and provides accountability through regular check-ins.
3. Systematic Habit Building
Why it matters: Systems outlast motivation and create sustainable change.
Essential reads:
- "The Power of Habit" by Charles Duhigg
- "Tiny Habits" by BJ Fogg
- "Four Thousand Weeks" by Oliver Burkeman
Serenity integration: After reading about habit systems, use Serenity to design habit stacks based on your unique context. The app will help you troubleshoot when you face implementation challenges, offering adjustments based on principles from the books you've read.
E-Readers: The Underrated Tool for Transformational Reading
While physical books have their charm, e-readers offer distinct advantages for those serious about transformational reading:
- Frictionless highlighting and note-taking - Capture insights without interrupting reading flow
- Searchable annotations - Quickly revisit key concepts when implementation questions arise
- Ambient reading opportunities - Leverage small time blocks throughout your day
- Reduced decision fatigue - Carry your entire library without choosing which book to bring
- Adjustable fonts - Optimize for reading comfort and reduced eye strain
Serenity tip: Import your Kindle or Kobo highlights directly into the Serenity app with our integration feature. Then ask the AI to organize these highlights into actionable categories and create implementation plans from your most significant insights.
The Implementation Gap: From Knowledge to Action
The difference between information collectors and true learners is implementation. Here's how to bridge that gap:
1. The 24-Hour Rule
Apply one small insight from each book within 24 hours of reading it. This creates a neural connection between knowledge and action.
2. Concept Triangulation
When you encounter an important concept, find it in at least three different books before considering it validated. This prevents following one-off theories that might not stand up to scrutiny.
diegopereiraeng/junit-test-results
package io.harness.junit;
import io.harness.junit.controller.JUnitController;
import io.harness.junit.controller.JUnitResultController;
import io.harness.junit.model.JUnitDTO;
import io.harness.junit.model.JUnitResult;
import io.harness.junit.service.JUnitResultService;
import io.harness.junit.utils.JUnitServiceUtils;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Map;
import java.util.logging.Logger;
public class JUnitConverter {
private static JUnitResultService jUnitResultService = new JUnitResultService();
private static final Logger logger = Logger.getLogger(JUnitConverter.class.getName());
public static void main(String[] args) throws IOException {
if (args.length < 3) {
System.out.println("usage: java -jar junit-converter.jar <folderPath> <project> <environment>");
System.exit(1);
}
String folderPath = args[0];
String project = args[1];
String environment = args[2];
String harness = "false";
if (args.length == 5) {
harness = args[4];
}
run(folderPath, project, environment, null, harness);
System.exit(0);
}
private static void run(String folderPath, String project, String environment, Map<String, String> commonProperties, String isHarness)
throws IOException {
Path folder = Path.of(folderPath);
File[] files = folder.toFile().listFiles();
if (files == null) {
throw new IllegalArgumentException("Invalid folder path: " + folderPath);
}
boolean isHarnessBoolean = Boolean.parseBoolean(isHarness);
// Process all XML files
for (File file : files) {
if (file.getName().toLowerCase().endsWith(".xml")) {
try {
processXmlFile(file, project, environment, commonProperties, isHarnessBoolean);
} catch (Exception e) {
System.err.println("Error processing file " + file.getName() + ": " + e.getMessage());
e.printStackTrace();
throw e;
}
}
}
}
static void processXmlFile(File file, String project, String environment, Map<String, String> commonProperties,
boolean isHarness) throws IOException {
byte[] content = Files.readAllBytes(file.toPath());
ByteArrayInputStream inputStream = new ByteArrayInputStream(content);
try {
JUnitDTO junitDTO = JUnitServiceUtils.convertToJUnitDTO(inputStream);
if (junitDTO == null) {
logger.warning("Failed to convert file to JUnitDTO: " + file.getName());
return;
}
// Process and transform each testsuite
JUnitResultController junitResult = new JUnitResultController(junitDTO);
junitResult.applyFilter();
junitResult.applyMapping();
// We will convert the JunitDTO to JUnitResult here
JUnitResult result = junitResult.getJUnitResult(project, environment);
if (result == null) {
logger.warning("Failed to convert JUnitDTO to JUnitResult: " + file.getName());
return;
}
// If there are common properties to add
if (commonProperties != null && !commonProperties.isEmpty()) {
result.getProperties().putAll(commonProperties);
}
String executionId = "executionId";
boolean isExecutionPresent = false;
if (isHarness) {
// Check if Harness execution values are present in system properties
String[] harnessProperties = {
"harness.0.executionId",
"harness.0.pipelineId",
"harness.0.pipelineName",
"harness.0.triggerType",
"harness.0.buildNo",
"harness.0.stageName",
"harness.0.stepName",
"harness.0.workflowId"
};
for (String property : harnessProperties) {
String value = System.getProperty(property);
if (value != null && !value.isEmpty()) {
result.getProperties().put(property, value);
if (property.equals("harness.0.executionId")) {
executionId = value;
isExecutionPresent = true;
}
}
}
}
// Save the result
jUnitResultService.saveResults(result, isExecutionPresent, executionId);
// Print all properties for debugging
System.out.println("Processed file " + file.getName() + " with properties: " + result.getProperties());
} catch (Exception e) {
logger.severe("Error processing XML file " + file.getName() + ": " + e.getMessage());
throw new IOException("Failed to process XML file: " + file.getName(), e);
} finally {
inputStream.close();
}
}
}
End Filepackage io.harness.junit;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
public class JUnitUploader {
private static final Logger logger = Logger.getLogger(JUnitUploader.class.getName());
private static class Args {
@Parameter(names = "--folder", description = "Folder path containing JUnit XML reports", required = true)
String folderPath;
@Parameter(names = "--project", description = "Project name", required = true)
String project;
@Parameter(names = "--environment", description = "Environment name", required = true)
String environment;
@Parameter(names = "--file-pattern", description = "Regex pattern to filter files")
String filePattern = ".*.xml";
@Parameter(names = "--tags", description = "Common tags to add (format: key1=value1,key2=value2)")
String tags;
@Parameter(names = "--harness", description = "Whether to extract Harness variables", arity = 1)
boolean harness = false;
@Parameter(names = "--help", help = true)
boolean help;
}
public static void main(String[] args) {
Args commandArgs = new Args();
JCommander commander = JCommander.newBuilder()
.addObject(commandArgs)
.build();
commander.setProgramName("JUnit Uploader");
try {
commander.parse(args);
if (commandArgs.help) {
commander.usage();
return;
}
// Execute the processor with provided arguments
process(commandArgs);
} catch (Exception e) {
logger.log(Level.SEVERE, "Error processing command: " + e.getMessage(), e);
commander.usage();
System.exit(1);
}
}
private static void process(Args args) throws IOException {
Path folder = Paths.get(args.folderPath);
if (!Files.exists(folder) || !Files.isDirectory(folder)) {
throw new IllegalArgumentException("Invalid folder path: " + args.folderPath);
}
// Parse tags if provided
Map<String, String> commonProperties = parseTagsString(args.tags);
// Compile file pattern
Pattern pattern = Pattern.compile(args.filePattern);
// Get files and process them
List<Path> files = Files.walk(folder)
.filter(Files::isRegularFile)
.filter(path -> pattern.matcher(path.getFileName().toString()).matches())
.toList();
if (files.isEmpty()) {
logger.warning("No matching files found in the provided directory");
return;
}
int totalFiles = files.size();
int processedFiles = 0;
int errorFiles = 0;
for (Path filePath : files) {
try {
JUnitConverter.processXmlFile(
filePath.toFile(),
args.project,
args.environment,
commonProperties,
args.harness
);
processedFiles++;
logger.info(String.format("Processed file %d/%d: %s", processedFiles, totalFiles, filePath.getFileName()));
} catch (Exception e) {
errorFiles++;
logger.log(Level.SEVERE, "Error processing file: " + filePath.getFileName(), e);
}
}
logger.info(String.format("Processing complete. Total: %d, Processed: %d, Failed: %d",
totalFiles, processedFiles, errorFiles));
if (errorFiles > 0) {
throw new IOException("Failed to process " + errorFiles + " files");
}
}
private static Map<String, String> parseTagsString(String tagsString) {
if (tagsString == null || tagsString.trim().isEmpty()) {
return Collections.emptyMap();
}
Map<String, String> tags = new HashMap<>();
String[] pairs = tagsString.split(",");
for (String pair : pairs) {
String[] keyValue = pair.split("=", 2);
if (keyValue.length == 2) {
tags.put(keyValue[0].trim(), keyValue[1].trim());
} else {
logger.warning("Invalid tag format (skipping): " + pair);
}
}
return tags;
}
}
End File# diegopereiraeng/junit-test-results
src/main/java/io/harness/junit/controller/JUnitResultController.java
package io.harness.junit.controller;
import io.harness.junit.model.JUnitDTO;
import io.harness.junit.model.JUnitResult;
import io.harness.junit.model.PropertyDTO;
import io.harness.junit.model.TestCaseDTO;
import io.harness.junit.model.TestCaseResult;
import io.harness.junit.model.TestScenario;
import io.harness.junit.model.TestSuiteDTO;
import io.harness.junit.utils.DateTimeUtils;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
public class JUnitResultController {
private JUnitDTO junitDTO;
// Constructor that initializes with junitDTO
public JUnitResultController(JUnitDTO junitDTO) {
this.junitDTO = junitDTO;
}
public void applyFilter() {
// No filtering for now, but it can be implemented here
}
public void applyMapping() {
// No mapping for now, just ensures the object is valid
}
public JUnitResult getJUnitResult(String project, String environment) {
if (junitDTO == null || junitDTO.getTest_suites() == null || junitDTO.getTest_suites().isEmpty()) {
return null;
}
// Create a new JUnitResult to store our results
JUnitResult output = new JUnitResult(project, environment);
// Populate meta-level properties
setResultMetadata(output);
setTimestamps(output);
transferProperties(output);
// Process each test suite
List<TestCaseResult> allTestCases = processTestSuites(output);
output.setTestCases(allTestCases);
return output;
}
private void setResultMetadata(JUnitResult output) {
// Set default type if needed
if (output.getType() == null) {
output.setType("junit");
}
// Set timestamp if not already set
if (output.getTimestamp() == 0) {
output.setTimestamp(Instant.now().toEpochMilli());
}
}
private void setTimestamps(JUnitResult output) {
// Set global timestamps from the first test suite if available
if (!junitDTO.getTest_suites().isEmpty()) {
TestSuiteDTO firstSuite = junitDTO.getTest_suites().get(0);
if (firstSuite.getTimestamp() != null && !firstSuite.getTimestamp().isEmpty()) {
long timestamp = DateTimeUtils.parseIsoDateTimeToEpochMillis(firstSuite.getTimestamp());
if (timestamp > 0) {
output.setTimestamp(timestamp);
}
}
}
}
private void transferProperties(JUnitResult output) {
Map<String, String> properties = new HashMap<>();
// Add properties from all test suites
for (TestSuiteDTO testsuite : junitDTO.getTest_suites()) {
if (testsuite.getProperties() != null) {
for (PropertyDTO property : testsuite.getProperties()) {
if (property.getName() != null && property.getValue() != null) {
properties.put(property.getName(), property.getValue());
}
}
}
}
// Transfer to output
output.setProperties(properties);
}
private List<TestCaseResult> processTestSuites(JUnitResult output) {
List<TestCaseResult> allTestCases = new ArrayList<>();
List<TestScenario> testScenarios = new ArrayList<>();
// Process each test suite
for (TestSuiteDTO testsuite : junitDTO.getTest_suites()) {
// Process test cases within this suite
if (testsuite.getTestcases() != null) {
for (TestCaseDTO testcase : testsuite.getTestcases()) {
TestCaseResult result = convertTestCaseToResult(testsuite, testcase);
if (result != null) {
allTestCases.add(result);
}
}
}
// Create a test scenario for this suite
TestScenario scenario = createScenarioFromSuite(testsuite);
if (scenario != null) {
testScenarios.add(scenario);
}
}
// Set the test scenarios
output.setTestSuites(testScenarios);
return allTestCases;
}
private TestCaseResult convertTestCaseToResult(TestSuiteDTO testsuite, TestCaseDTO testcase) {
if (testcase == null) {
return null;
}
TestCaseResult result = new TestCaseResult();
// Set basic properties
result.setName(testcase.getName());
result.setClassname(testcase.getClassname());
result.setStatus(getStatus(testcase));
// Set test suite info
if (testsuite != null) {
result.setTestSuite(testsuite.getName());
result.setPackage(testsuite.getPackage());
}
// Set duration (convert seconds to milliseconds)
if (testcase.getTime() != null) {
try {
double timeInSeconds = Double.parseDouble(testcase.getTime());
result.setDuration((long) (timeInSeconds * 1000));
} catch (NumberFormatException e) {
result.setDuration(0L);
}
}
// Set error details
setErrorDetails(result, testcase);
return result;
}
private String getStatus(TestCaseDTO testcase) {
if (testcase.getSkipped() != null) {
return "SKIPPED";
} else if (testcase.getFailure() != null) {
return "FAILED";
} else if (testcase.getError() != null) {
return "ERROR";
} else {
return "PASSED";
}
}
private void setErrorDetails(TestCaseResult result, TestCaseDTO testcase) {
// Handle failure
if (testcase.getFailure() != null) {
result.setFailureMessage(testcase.getFailure().getMessage());
result.setFailureType(testcase.getFailure().getType());
result.setFailureContent(testcase.getFailure().getContent());
}
// Handle error
if (testcase.getError() != null) {
result.setErrorMessage(testcase.getError().getMessage());
result.setErrorType(testcase.getError().getType());
result.setErrorContent(testcase.getError().getContent());
}
}
private TestScenario createScenarioFromSuite(TestSuiteDTO testsuite) {
if (testsuite == null) {
return null;
}
TestScenario scenario = new TestScenario();
// Set basic properties
scenario.setName(testsuite.getName());
scenario.setPackageName(testsuite.getPackage());
// Set counts
scenario.setTests(parseIntSafely(testsuite.getTests()));
scenario.setFailures(parseIntSafely(testsuite.getFailures()));
scenario.setErrors(parseIntSafely(testsuite.getErrors()));
scenario.setSkipped(parseIntSafely(testsuite.getSkipped()));
// Calculate statistics
int total = scenario.getTests();
if (total > 0) {
scenario.setPassRate((double) (total - scenario.getFailures() - scenario.getErrors() - scenario.getSkipped()) / total);
} else {
scenario.setPassRate(0);
}
// Set durations
if (testsuite.getTime() != null) {
try {
double timeInSeconds = Double.parseDouble(testsuite.getTime());
scenario.setDuration((long) (timeInSeconds * 1000)); // Convert to milliseconds
} catch (NumberFormatException e) {
scenario.setDuration(0L);
}
}
return scenario;
}
private int parseIntSafely(String value) {
if (value == null || value.isEmpty()) {
return 0;
}
try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
return 0;
}
}
}
End File# diegopereiraeng/junit-test-results
package io.harness.junit.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.harness.junit.model.JUnitResult;
import io.harness.junit.utils.MapperUtils;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.logging.Logger;
public class JUnitResultService {
private static final Logger logger = Logger.getLogger(JUnitResultService.class.getName());
private static final String REMOTE_URL_ENV_VAR = "JUNIT_TEST_RESULTS_URL";
/**
- Saves the results either to a remote endpoint or local file based on configuration.
- @param result The JUnitResult to save
- @param isExecutionPresent Whether the execution ID is present
- @param executionId The execution ID if present
*/
public void saveResults(JUnitResult result, boolean isExecutionPresent, String executionId) {
// First try to send to remote endpoint
try {
sendToRemoteEndpoint(result);
logger.info("Results successfully sent to remote endpoint");
return;
} catch (Exception e) {
logger.warning("Failed to send results to remote endpoint: " + e.getMessage());
// Fall back to local file if remote fails
}
// If remote failed, save to local file
try {
saveToLocalFile(result, isExecutionPresent, executionId);
logger.info("Results successfully saved to local file");
} catch (Exception e) {
logger.severe("Failed to save results to local file: " + e.getMessage());
throw new RuntimeException("Failed to save test results", e);
}
}
private void sendToRemoteEndpoint(JUnitResult result) throws IOException {
String remoteUrl = System.getenv(REMOTE_URL_ENV_VAR);
if (remoteUrl == null || remoteUrl.trim().isEmpty()) {
throw new IOException("Remote URL not configured. Set JUNIT_TEST_RESULTS_URL environment variable.");
}
URL url = new URL(remoteUrl);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json");
connection.setDoOutput(true);
String jsonPayload = MapperUtils.getObjectMapper().writeValueAsString(result);
try (OutputStream os = connection.getOutputStream()) {
byte[] input = jsonPayload.getBytes(StandardCharsets.UTF_8);
os.write(input, 0, input.length);
}
int responseCode = connection.getResponseCode();
if (responseCode < 200 || responseCode >= 300) {
throw new IOException("HTTP error code: " + responseCode);
}
}
private void saveToLocalFile(JUnitResult result, boolean isExecutionPresent, String executionId) throws IOException {
// Create directory if it doesn't exist
String baseDir = "./junit-results";
String project = result.getProject() != null ? result.getProject() : "unknown-project";
String environment = result.getEnvironment() != null ? result.getEnvironment() : "unknown-environment";
Path dirPath = Paths.get(baseDir, project, environment);
Files.createDirectories(dirPath);
// Generate filename based on current time or execution ID
String filename;
if (isExecutionPresent) {
filename = "result-" + executionId + ".json";
} else {
String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss"));
filename = "result-" + timestamp + ".json";
}
Path filePath = dirPath.resolve(filename);
// Convert result to JSON and write to file
String jsonContent = MapperUtils.getObjectMapper().writeValueAsString(result);
Files.write(filePath, jsonContent.getBytes(StandardCharsets.UTF_8));
}
}
author = 'Javier Ordóñez'
import os
import sys
import csv
import glob
import json
import string
import difflib
import pandas as pd
import numpy as np
from tqdm import tqdm
from unidecode import unidecode
from helper_functions import is_valid_string
from helper_functions import get_directory_path
from helper_functions import get_path_for_output_file
from helper_functions import get_directory_name_from_path
tqdm.pandas()
pd.options.mode.chained_assignment = None
def preprocess_name(name):
name = name.lower().strip()
for char in string.punctuation:
name = name.replace(char, '')
return unidecode(name)
def simplify_string_for_matching(input_string):
if not is_valid_string(input_string):
return ''
return preprocess_name(input_string)
def get_data_matching_score(source_string1, source_string2, preprocess=True):
if not is_valid_string(source_string1) or not is_valid_string(source_string2):
return 0.0
string1 = preprocess_name(source_string1) if preprocess else source_string1
string2 = preprocess_name(source_string2) if preprocess else source_string2
similarity = difflib.SequenceMatcher(None, string1, string2).ratio()
return similarity
def parse_master_data(file_path_master_data, limit, print_log=False):
df_master = None
if print_log:
print('Reading master data file {} ...'.format(file_path_master_data))
if file_path_master_data.lower().endswith('.json'):
with open(file_path_master_data, 'r') as f:
json_data = json.load(f)
df_master = pd.DataFrame(json_data)
else:
if limit is None:
df_master = pd.read_csv(file_path_master_data, encoding='utf-8')
else:
df_master = pd.read_csv(file_path_master_data, encoding='utf-8', nrows=limit)
df_master.columns = [x.lower() for x in df_master.columns]
# Sanity check of expected column names
column_name_brand = None
column_name_model = None
for cn in df_master.columns.tolist():
if 'brand' in cn:
column_name_brand = cn
if 'model' in cn:
column_name_model = cn
# Get brand and model data
df_master['brand'] = df_master[column_name_brand] if column_name_brand in df_master.columns else None
df_master['model'] = df_master[column_name_model] if column_name_model in df_master.columns else None
# Apply preprocessing of strings (lower case, no unicode, no punctuation)
df_master['brand_pp'] = df_master['brand'].progress_apply(lambda x: simplify_string_for_matching(x))
df_master['model_pp'] = df_master['model'].progress_apply(lambda x: simplify_string_for_matching(x))
if print_log:
print('Parse done. {} records read. Keeping values of {} and {} for references.'.format(
len(df_master), column_name_brand, column_name_model))
return df_master
def get_preprocessed_source_data(x):
fields = ['brand', 'model']
result = x['product name'].lower()
for p in fields:
result = result.replace(x[p].lower(), '') if is_valid_string(x[p]) else result
for char in string.punctuation:
result = result.replace(char, '')
result = unidecode(result)
result = ' '.join(result.split())
return result
def get_preprocessed_master_data(x):
fields = ['brand', 'model']
result = ''
for p in fields:
if is_valid_string(x[p]):
for char in string.punctuation:
result += ' ' + x[p].lower().replace(char, '')
result = unidecode(result)
result = ' '.join(result.split())
return result
def get_best_match_from_master_data(df_source, df_master, idx):
brand = df_source.iloc[idx]['brand_pp']
model = df_source.iloc[idx]['model_pp']
product_data_pp = df_source.iloc[idx]['product name pp']
minimum_score = 0.75 # Minimum score to consider a match
matches = []
for i, item in df_master.iterrows():
if len(item['brand_pp']) > 2 and brand == item['brand_pp']:
# Matching criteria: Same brand and similarity over threshold
score = get_data_matching_score(product_data_pp, item['master_data_pp'], preprocess=False)
if score > minimum_score:
matches.append({
'master_index': i,
'brand': item['brand'],
'model': item['model'],
'score': score})
if len(matches) == 0:
# Failed to find a match
return None
# Return the one having highest score
best_match = max(matches, key=lambda m: m['score'])
return best_match
def add_matches_to_source_product(df_source, df_master, print_log=False):
# Add columns to df_source for the normalized brand and model if match with master data
df_source['master_match_brand'] = None
df_source['master_match_model'] = None
df_source['master_match_score'] = None
if print_log:
print('Finding matches from master data for source products ...')
for i, row in tqdm(df_source.iterrows(), total=len(df_source)):
best_match = get_best_match_from_master_data(df_source, df_master, i)
if best_match is not None:
df_source.at[i, 'master_match_brand'] = best_match['brand']
df_source.at[i, 'master_match_model'] = best_match['model']
df_source.at[i, 'master_match_score'] = best_match['score']
return df_source
def match(source_data_filename, master_data_filename, output_file, limit=None, print_log=True):
source_directory = get_directory_path(source_data_filename)
csv_files = glob.glob(os.path.join(source_directory, '*-source-data.csv'))
for source_file in csv_files:
# Get directory name for source file
source_dir_name = get_directory_name_from_path(source_file)
output_file = os.path.join(source_directory,
source_dir_name + '-products-with-matches.csv')
# Read and parse master data
df_master = parse_master_data(master_data_filename, limit, print_log)
# Add additional fields
df_master['master_data_pp'] = df_master.progress_apply(
lambda x: get_preprocessed_master_data(x), axis=1)
# Read source data
if print_log:
print('Reading source data file {} ...'.format(source_file))
df_source = pd.read_csv(source_file, encoding='utf-8')
# Sanity check of expected column names from source data
for cn in ['product name', 'brand', 'model']:
if cn not in df_source.columns.tolist():
if print_log:
print('Warning: {} field not found in source data'.format(cn))
# Apply preprocessing of strings (lower case, no unicode, no punctuation)
df_source['brand_pp'] = df_source['brand'].progress_apply(lambda x: simplify_string_for_matching(x))
df_source['model_pp'] = df_source['model'].progress_apply(lambda x: simplify_string_for_matching(x))
# Add a field for preprocessed product name (removing brand and model reference if present)
df_source['product name pp'] = df_source.progress_apply(
lambda x: get_preprocessed_source_data(x), axis=1)
# Perform the match
df_source = add_matches_to_source_product(df_source, df_master, print_log)
# Write output file
if print_log:
print('Writing matches to {} ...'.format(output_file))
df_source.to_csv(output_file, index=False, encoding='utf-8', quoting=csv.QUOTE_ALL)
if print_log:
print('Done!')
if name == "main":
if len(sys.argv) < 3:
print('Error: missing arguments. Two arguments needed: [source data csv file] [master data csv file]')
print('Optional third argument: products limit (for debug purposes)')
else:
file_path_source_data = sys.argv[1]
file_path_master_data = sys.argv[2]
products_limit = None if len(sys.argv) < 4 else sys.argv[3]
if os.path.exists(file_path_source_data) and os.path.exists(file_path_master_data):
print('Matching source data from {} with master data from {} ...'.format(file_path_source_data, file_path_master_data))
output_file = None
match(file_path_source_data, file_path_master_data, output_file, products_limit)
else:
print('Error: could not find specified files. {} {}'.format(file_path_source_data, file_path_master_data))
End File# adidas/lakehouse-engine
author = 'Javier Ordóñez'
import os
import sys
import re
import csv
import time
import glob
import json
import pandas as pd
import numpy as np
from tqdm import tqdm
from bs4 import BeautifulSoup
from helper_functions import get_matching_json_files
from helper_functions import is_valid_string, is_valid_data
from helper_functions import get_directory_path, get_directory_name_from_path
tqdm.pandas()
pd.options.mode.chained_assignment = None
def get_brand_name_from_source_file():
if sys.argv[1].lower().find('bestbuy') != -1:
return 'bestbuy'
if sys.argv[1].lower().find('walmart') != -1:
return 'walmart'
return ''
def extract_brand_model_bestbuy(soup):
brand = ''
model = ''
try:
# Parse brand name
blocks = soup.select('div.sku-title')
if len(blocks) == 1 and len(blocks[0]) == 2: # div.sku-title>a+h1
brand = blocks[0].contents[0].text.strip()
# Parse model name
features_block = soup.select('div[class^="model product-data"]')
if len(features_block) == 1:
features_items = features_block[0].select('div.row-value')
if len(features_items) == 1:
model = features_items[0].text.strip()
# Parse model with different layout
if len(model) < 2:
features_block2 = soup.select('div[class^="model-number"]')
if len(features_block2) == 1:
model = features_block2[0].select('span')[-1].text.strip()
except:
pass
return brand, model
def extract_brand_model_walmart(soup):
brand = ''
model = ''
product_specs = {}
try:
# Parse brand name from link
brand_anchors = soup.select('a[link-identifier="brandName"]')
if not brand_anchors:
brand_anchors = soup.select('div[class="sans-serif mid-gray mb2"] > div > a')
if len(brand_anchors) == 1:
brand = brand_anchors[0].text.strip()
# Extract from key specs
spec_containers = soup.select('div[data-testid="product-specs-container"]')
if spec_containers:
spec_items = spec_containers[0].select('div[data-testid="product-spec-item"]')
for item in spec_items:
labels = item.select('div[data-testid="product-spec-key"]')
values = item.select('div[data-testid="product-spec-value"]')
if len(labels) == 1 and len(values) == 1:
label = labels[0].text.strip().lower()
value = values[0].text.strip()
product_specs[label] = value
if label == 'brand':
brand = value
if label == 'model number' or label == 'tv model name':
model = value
# Fallbacks for brand
if brand == '':
# Try to find text with "Sold by" prefix
sold_by_containers = soup.select('div[class^="mr3 ml3"] div[class="f6 gray"] a')
if sold_by_containers and len(sold_by_containers) > 0:
brand = sold_by_containers[0].text.strip()
except:
pass
return brand, model
def add_product_data_to_csv(product_id, product_name, brand, model, source_data):
source_data.append({
'product id': product_id,
'product name': product_name,
'brand': brand,
'model': model
})
def get_product_details_from_html(source_brand, json_object, soup):
product_id = json_object['id']
product_name = json_object['name']
brand = model = ''
# Extract brand and model data from HTML
try:
if 'bestbuy' in source_brand.lower():
brand, model = extract_brand_model_bestbuy(soup)
elif 'walmart' in source_brand.lower():
brand, model = extract_brand_model_walmart(soup)
except:
pass
# Brand and model (if not found previously, use data from JSON object)
if not is_valid_string(brand) and 'brand_name' in json_object:
brand = json_object['brand_name']
if not is_valid_string(model) and 'model_number' in json_object:
model = json_object['model_number']
# Fallback for product_name if needed
if not is_valid_string(product_name) and 'title' in json_object:
product_name = json_object['title']
# Fallback for product_id if needed
if not is_valid_string(product_id) and 'ID' in json_object:
product_id = json_object['ID']
# Return the extracted information
return product_id, product_name, brand, model
def extract_product_details(input_file, print_log=True):
brand_name = get_brand_name_from_source_file()
if print_log:
print('Extracting product information ...')
json_files = get_matching_json_files(input_file)
if len(json_files) == 0:
print('Error: no matching JSON files found related to {}'.format(input_file))
return
source_data = []
total_files = len(json_files)
t1 = time.time()
file_counter = 0
for json_file in json_files:
file_counter += 1
if print_log and file_counter % 100 == 0:
progress = 100.0 * file_counter / float(total_files)
t2 = time.time()
eta = ((t2 - t1) / file_counter) * (total_files - file_counter)
print(f'Processed {file_counter} / {total_files} ({progress:.2f}%) - ETA: {eta:.0f}s', end='r')
# Read and parse the JSON file
try:
with open(json_file, 'r') as f:
json_data = json.load(f)
except:
continue
# Extract product data from HTML content
try:
html_content = json_data.get('html', '')
if html_content:
soup = BeautifulSoup(html_content, 'html.parser')
product_id, product_name, brand, model = get_product_details_from_html(brand_name, json_data, soup)
if product_id and product_name:
add_product_data_to_csv(product_id, product_name, brand, model, source_data)
except Exception as e:
if print_log:
print(f"Error processing file {json_file}: {e}")
# Create the output file
if len(source_data) > 0:
output_file = get_directory_path(input_file) + '/' + get_directory_name_from_path(input_file) + '-source-data.csv'
df = pd.DataFrame(source_data)
df.to_csv(output_file, index=False, encoding='utf-8', quoting=csv.QUOTE_ALL)
if print_log:
print(f"nExtraction completed. {len(source_data)} products extracted and saved to {output_file}")
else:
if print_log:
print("nNo product data extracted from the provided files.")
if name == "main":
if len(sys.argv) < 2:
print('Error: missing arguments. One argument needed: [directory_path]')
else:
extract_product_details(sys.argv[1])
End File# adidas/lakehouse-engine
import abc
import os
import shutil
from logging import getLogger
from typing import Any, Dict, List, Optional
import pytest
from lakehouse_engine.core.definitions import (
INPUT_SPECS,
OUTPUT_SPECS,
TABLE_NAME,
TRANSFORMATIONS,
)
logger = getLogger(name)
class PySparkTest(abc.ABC):
"""A base class for general spark setup.
Attributes:
spark_session: a spark session that lives during the entire test class
"""
spark_session: Any = None
spark_context: Any = None
@classmethod
def setup_class(cls):
"""Setup before class initialization."""
from pyspark.sql import SparkSession # pylint: disable=import-outside-toplevel
os.environ["PYTHONHASHSEED"] = "42"
# This will be overridden by test_config if test_config exists,
# but also allows the tests to run locally.
spark_builder = SparkSession.builder.appName("test")
spark_builder.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
spark_builder.config("hive.exec.dynamic.partition", "true")
spark_builder.config("hive.exec.dynamic.partition.mode", "nonstrict")
spark_builder.config("spark.sql.session.timeZone", "Etc/UTC")
spark_builder.config("spark.ui.showConsoleProgress", "false")
spark_builder.config("spark.sql.execution.arrow.pyspark.enabled", "true")
spark_builder.config("spark.eventLog.enabled", "false")
spark_builder.config("spark.sql.broadcastTimeout", "36000")
spark_session = (
spark_builder
# .enableHiveSupport()
.master("local[*]")
.getOrCreate()
)
cls.spark_session = spark_session
cls.spark_context = cls.spark_session.sparkContext
@classmethod
def teardown_class(cls):
"""Teardown after class initialization."""
# Close the spark session when the tests are complete
if cls.spark_session:
cls.spark_session.stop()
# Try to remove temporary files created by Hadoop when the spark_context is created
if cls.spark_context:
# Remove _temporary_xxx directories created by spark
if os.path.exists("/tmp/spark-*"):
shutil.rmtree("/tmp/spark-*")
# Remove __pycache__ directory created by pytest
if os.path.exists("tests/__pycache__"):
shutil.rmtree("tests/__pycache__")
def cleanup_files(self, paths: List[str]) -> None:
"""Clean up local files.
Args:
paths: list of paths to clean up.
"""
paths_to_remove = []
for path in paths:
# Add to paths to remove the path itself and the _SUCCESS file
success_path = os.path.join(path, "_SUCCESS")
if os.path.exists(success_path):
paths_to_remove.append(success_path)
if os.path.exists(path):
paths_to_remove.append(path)
for p in paths_to_remove:
try:
if os.path.isfile(p):
os.remove(p)
elif os.path.isdir(p):
shutil.rmtree(p)
except Exception as e:
logger.warning(f"Failed to clean up path {p}: {e}")
class PySparkTestConfig(PySparkTest):
"""A base class for general spark setup for test configs.
Attributes:
spark_session: a spark session that lives during the entire test class
"""
@pytest.fixture
def cleanup_dir(self) -> None:
"""Clean up directories after test."""
yield
self._clean_up_local_dirs()
def _clean_up_local_dirs(self) -> None:
"""Clean up local directories."""
pass
class PySparkTestBase(PySparkTest):
"""A base class for pyspark tests.
Attributes:
spark_session: a spark session that lives during the entire test class
"""
def before_test(self) -> None:
"""Actions to perform before test implementation starts."""
pass
def after_test(self) -> None:
"""Actions to perform after test implementation is concluded."""
pass
def test(self) -> None:
"""Test implementation - runs before and after methods as well."""
try:
self.before_test()
self.run_test()
finally:
self.after_test()
@abc.abstractmethod
def run_test(self) -> None:
"""Method to implement by test case classes, with the actual test logic."""
raise NotImplementedError
class EngineBaseTestCase(PySparkTestBase):
"""A base class for pyspark tests.
Attributes:
spark_session: a spark session that lives during the entire test class
"""
config_template: Optional[Dict[str, Any]] = None
# Optionally define a list of files and/or directories to clean up after each test
cleanup_paths: List[str] = []
def before_test(self) -> None:
"""Method to override to setup environment before test execution.
Example: populate delta tables, sample files, etc.
"""
super().before_test()
def after_test(self) -> None:
"""Method to override to cleanup after test execution.
Example: delete sample files, reset an external service, etc.
"""
self.cleanup_files(self.cleanup_paths)
super().after_test()
def create_empty_config(self) -> Dict[str, Any]:
"""Create an empty engine config with all section fields.
Returns:
An empty engine configuration dictionary.
"""
return {
INPUT_SPECS: [],
TRANSFORMATIONS: {},
OUTPUT_SPECS: [],
}
def create_table_config(self, table_name: str) -> Dict[str, Any]:
"""Create a basic table config.
Args:
table_name: the base name for a table.
Returns:
A basic table config.
"""
return {TABLE_NAME: table_name}
End File# adidas/lakehouse-engine
import json
import os
import shutil
from typing import Dict, List, Set, Union
from lakehouse_engine.algorithms.exceptions import SchemaExpectationsException
from lakehouse_engine.io.readers_writers import readers, writers_utils
def read_schema_expectations_json_paths(
json_schema_paths: Union[str, List[str]],
) -> List[Dict]:
"""Read schema expectations from provided json file paths.
Args:
json_schema_paths: the json file path, or a list of paths.
Returns:
The combined list of schema field dictionaries from all files.
"""
schema_fields = []
# Convert the input to a list if it's a single string
if isinstance(json_schema_paths, str):
json_schema_paths = [json_schema_paths]
# Read all schema files and combine them
for path in json_schema_paths:
with open(path, "r", encoding="utf-8") as schema_file:
schema = json.load(schema_file)
if isinstance(schema, list):
schema_fields.extend(schema)
elif "fields" in schema:
schema_fields.extend(schema["fields"])
return schema_fields
def get_schema_field_names_map(
schema_fields: List[Dict],
) -> Dict[str, Dict]:
"""Create a map of field names to their schema dictionaries.
Args:
schema_fields: List of schema field dictionaries.
Returns:
A dictionary mapping field names to their complete schema definitions.
"""
return {field["name"]: field for field in schema_fields}
def ensure_columns_exist(
dataframe,
expected_schema_fields_map: Dict[str, Dict],
) -> None:
"""Check if all expected columns exist in the DataFrame.
Args:
dataframe: The DataFrame to check.
expected_schema_fields_map: Dictionary of expected field names to their schema definitions.
Raises:
SchemaExpectationsException: If any expected column is missing.
"""
# Get the set of column names in the DataFrame
df_columns = {col.lower() for col in dataframe.columns}
# Get the set of expected column names (case-insensitive)
expected_columns = {field_name.lower() for field_name in expected_schema_fields_map}
# Find expected columns that are missing from the DataFrame
missing_columns = expected_columns - df_columns
if missing_columns:
missing_cols_str = ", ".join(missing_columns)
raise SchemaExpectationsException(
f"Schema validation failed. The following columns are missing: {missing_cols_str}"
)
def check_column_type_compatibility(
dataframe, expected_schema_fields_map: Dict[str, Dict]
) -> None:
"""Check if DataFrame column types are compatible with expected types.
Args:
dataframe: The DataFrame to check.
expected_schema_fields_map: Dictionary of expected field names to their schema definitions.
Raises:
SchemaExpectationsException: If any column has an incompatible type.
"""
# Get current DataFrame schema as a dictionary
df_schema = {
field.name.lower(): field.dataType.simpleString() for field in dataframe.schema
}
type_errors = []
for field_name, field_def in expected_schema_fields_map.items():
df_field_name = field_name.lower()
if df_field_name in df_schema:
expected_type = field_def.get("type", "unknown")
# Handle nullable types if they are specified as a list [null, type]
if isinstance(expected_type, list):
# Extract the non-null type
expected_type = next(
(t for t in expected_type if t != "null"), expected_type[0]
)
# Normalize and simplify expected type
expected_type = simplify_type(expected_type)
# Get actual type
actual_type = df_schema[df_field_name]
# Check if types are compatible
if not are_types_compatible(expected_type, actual_type):
type_errors.append(
f"Column '{field_name}' has incompatible type. Expected: "
f"'{expected_type}', Actual: '{actual_type}'"
)
if type_errors:
error_message = "Schema type validation failed:n" + "n".join(type_errors)
raise SchemaExpectationsException(error_message)
def simplify_type(type_str: Union[str, dict]) -> str:
"""Simplify a schema type for comparison.
Args:
type_str: The type string or dict to simplify.
Returns:
A simplified type string.
"""
# Handle complex types in dictionary format
if isinstance(type_str, dict):
if "type" in type_str:
base_type = type_str["type"]
if base_type == "array" and "items" in type_str:
items_type = simplify_type(type_str["items"])
return f"array<{items_type}>"
elif base_type == "record" and "fields" in type_str:
# For struct types, just return "struct" for simplicity
return "struct"
elif base_type in ["string", "boolean", "int", "long", "float", "double"]:
return base_type
return "complex" # Default for unrecognized complex types
# Handle primitive types as strings
type_mapping = {
"string": "string",
"boolean": "boolean",
"int": "integer",
"long": "long",
"integer": "integer",
"float": "float",
"double": "double",
"null": "null",
}
return type_mapping.get(type_str.lower(), type_str.lower())
def are_types_compatible(expected_type: str, actual_type: str) -> bool:
"""Check if expected and actual types are compatible.
Implements a flexible type compatibility mapping for schema validation.
Args:
expected_type: The expected type.
actual_type: The actual type in the DataFrame.
Returns:
True if types are compatible, False otherwise.
"""
# Normalize both types to lowercase for comparison
expected = expected_type.lower()
actual = actual_type.lower()
# Exact match
if expected == actual:
return True
# Type compatibility mappings
compatibility_map = {
# String types
"string": {"varchar", "char", "text"},
# Integer types
"integer": {"int", "smallint", "tinyint", "integer"},
"int": {"integer", "smallint", "tinyint"},
# Floating point types
"float": {"double", "decimal", "float"},
"double": {"float", "decimal"},
# Decimal types
"decimal": {"float", "double"},
# Boolean types
"boolean": {"bit", "boolean"},
# Date/Time types
"date": {"timestamp", "date"},
"timestamp": {"date"},
# Complex types
"struct": {"struct"},
"array": {"array"},
"map": {"map"},
}
# Check if the actual type is in the set of compatible types for the expected type
if expected in compatibility_map:
if any(compat_type in actual for compat_type in compatibility_map[expected]):
return True
# Check if expected type is at start of complex actual type
# e.g., "array" is compatible with "array<string>"
if expected in ["array", "map", "struct"] and actual.startswith(expected):
return True
return False
def check_schema_expectations(
dataframe,
json_schema_paths: Union[str, List[str]],
check_extra_columns: bool = False,
) -> None:
"""Check if the DataFrame schema meets the expectations defined in the schema files.
Args:
dataframe: The DataFrame to check.
json_schema_paths: Path or list of paths to JSON schema files.
check_extra_columns: If True, raises an exception if the DataFrame has extra columns.
Raises:
SchemaExpectationsException: If schema expectations are not met.
"""
# Read schema expectations from JSON files
expected_schema_fields = read_schema_expectations_json_paths(json_schema_paths)
expected_schema_fields_map = get_schema_field_names_map(expected_schema_fields)
# Check if all expected columns exist
ensure_columns_exist(dataframe, expected_schema_fields_map)
# Check if column types are compatible
check_column_type_compatibility(dataframe, expected_schema_fields_map)
# Optionally check for extra columns
if check_extra_columns:
df_columns_lower = {col.lower() for col in dataframe.columns}
expected_columns_lower = {
field_name.lower() for field_name in expected_schema_fields_map
}
extra_columns = df_columns_lower - expected_columns_lower
if extra_columns:
extra_cols_str = ", ".join(extra_columns)
raise SchemaExpectationsException(
f"Schema validation failed. The DataFrame contains extra columns: {extra_cols_str}"
)
def get_expected_columns(
json_schema_paths: Union[str, List[str]],
excluded_columns: Set[str] = None,
) -> Set[str]:
"""Get the set of expected column names from schema files.
Args:
json_schema_paths: Path or list of paths to JSON schema files.
excluded_columns: Optional set of column names to exclude.
Returns:
A set of expected column names.
"""
if excluded_columns is None:
excluded_columns = set()
# Read schema expectations from JSON files
expected_schema_fields = read_schema_expectations_json_paths(json_schema_paths)
expected_columns = {
field["name"] for field in expected_schema_fields if field["name"] not in excluded_columns
}
return expected_columns
def check_required_schema_files(
schema_dir: str, required_fields_file: str, actual_schema_file: str = None
) -> bool:
"""Check if required schema files exist.
Args:
schema_dir: Directory containing schema files.
required_fields_file: Filename of the required fields schema file.
actual_schema_file: Optional filename of the actual schema file.
Returns:
True if all required files exist, False otherwise.
"""
required_path = os.path.join(schema_dir, required_fields_file)
if not os.path.exists(required_path):
return False
if actual_schema_file:
actual_path = os.path.join(schema_dir, actual_schema_file)
if not os.path.exists(actual_path):
return False
return True
def generate_schema_from_dataframe(
dataframe, output_path: str, format_type: str = "json"
) -> None:
"""Generate a schema file from a DataFrame.
Args:
dataframe: The DataFrame to generate schema from.
output_path: Path to save the schema file.
format_type: The format type of the schema ('json' or 'avro').
"""
# Create the directory if it doesn't exist
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Generate the schema
if format_type.lower() == "json":
schema_json = dataframe.schema.json()
with open(output_path, "w", encoding="utf-8") as f:
f.write(schema_json)
elif format_type.lower() == "avro":
# For Avro schema, we need to convert Spark schema to Avro schema
# This is a simplified approach - for complex schemas, you might need
# a more sophisticated conversion
avro_schema = writers_utils.spark_schema_to_dict(dataframe.schema)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(avro_schema, f, indent=2)
else:
raise ValueError(f"Unsupported schema format: {format_type}")
def copy_schema_file(src_path: str, dest_path: str) -> None:
"""Copy a schema file from source to destination.
Args:
src_path: Source path of the schema file.
dest_path: Destination path for the schema file.
"""
# Create the directory if it doesn't exist
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
shutil.copy2(src_path, dest_path)
def load_sample_data_to_validate_schema(
spark,
source_path: str,
format_type: str = "csv",
header: bool = True,
infer_schema: bool = True,
schema_path: str = None,
options: Dict = None,
) -> bool:
"""Load sample data from source to validate schema.
Args:
spark: Spark session.
source_path: Path to the source data.
format_type: Format of the source data.
header: Whether the source data has a header row (for CSV).
infer_schema: Whether to infer schema from the data.
schema_path: Optional path to schema file.
options: Additional options for reading the data.
Returns:
True if the data was loaded successfully, False otherwise.
"""
try:
reader_options = options or {}
if format_type.lower() == "csv":
reader_options.update({"header": str(header).lower(), "inferSchema": str(infer_schema).lower()})
# If schema_path is provided, use it
if schema_path and os.path.exists(schema_path):
with open(schema_path, "r", encoding="utf-8") as f:
schema_str = f.read()
reader_options["schema"] = schema_str
# Try to load the data
df = readers.DeltaLakeReader(spark).read(source_path, **reader_options)
return True
except Exception as e:
print(f"Failed to load sample data: {e}")
return False
End File"""Writer module."""
from typing import Dict, List, Optional, Tuple
from pyspark.sql import DataFrame
from lakehouse_engine.algorithms.exceptions import UnknownWriterException
from lakehouse_engine.core.definitions import OUTPUT_TO
from lakehouse_engine.core.definitions import TableType as TableTypeDef
from lakehouse_engine.io.readers_writers.delta_lake_writer import DeltaLakeWriter
from lakehouse_engine.io.readers_writers.file_writer import FileWriter
from lakehouse_engine.io.readers_writers.jdbc_writer import JDBCWriter
from lakehouse_engine.io.readers_writers.snowflake_writer import SnowflakeWriter
from lakehouse_engine.utils.logging_handler import LoggingHandler
class Writer(LoggingHandler):
"""Writer class."""
def __init__(self):
"""Construct Writer."""
super().__init__(__name__)
self.writers: Dict = {
TableTypeDef.BRONZE: self._get_bronze_writer,
TableTypeDef.SILVER: self._get_silver_writer,
TableTypeDef.GOLD: self._get_gold_writer,
}
def write(
self,
df: DataFrame,
writer_config: Dict,
extra_args: Optional[Tuple] = None,
table_type: str = None,
) -> List[Dict]:
"""Write data according to the writer configuration.
Args:
df: dataframe to be written.
writer_config: writer configuration.
extra_args: extra arguments to be used for the writer.
table_type: explicit table type for when we want to customize the writer independently
from what's on the spec, i.e. for tables outside delta lake
Returns:
metadata dictionaries from the write operations
"""
if not table_type:
output_to = writer_config.get(OUTPUT_TO, "").lower()
table_type = output_to
extra_args_unpacked = () if extra_args is None else extra_args
func = self.writers.get(table_type)
if func is not None:
return func(df=df, writer_config=writer_config, extra_args=extra_args_unpacked)
raise UnknownWriterException(f"Writer not found for table type {table_type}")
def _get_bronze_writer(
self,
df: DataFrame,
writer_config: Dict,
extra_args: Optional[Tuple] = (),
) -> List[Dict]:
"""Get writer for bronze layer.
Args:
df: dataframe to be written.
writer_config: writer configuration.
extra_args: extra arguments to be used for the writer.
Returns:
metadata dictionary from the write operation
"""
return [DeltaLakeWriter().write(df, writer_config, "bronze", extra_args)]
def _get_silver_writer(
self,
df: DataFrame,
writer_config: Dict,
extra_args: Optional[Tuple] = (),
) -> List[Dict]:
"""Get writer for silver layer.
Args:
df: dataframe to be written.
writer_config: writer configuration.
extra_args: extra arguments to be used for the writer.
Returns:
metadata dictionary from the write operation
"""
return [DeltaLakeWriter().write(df, writer_config, "silver", extra_args)]
def _get_gold_writer(
self,
df: DataFrame,
writer_config: Dict,
extra_args: Optional[Tuple] = (),
) -> List[Dict]:
"""Get writer for gold layer.
Args:
df: dataframe to be written.
writer_config: writer configuration.
extra_args: extra arguments to be used for the writer.
Returns:
metadata dictionary from the write operation
"""
return [DeltaLakeWriter().write(df, writer_config, "gold", extra_args)]
def _get_file_writer(
self, df: DataFrame, writer_config: Dict, extra_args: Optional[Tuple] = ()
) -> List[Dict]:
"""Get file writer.
Args:
df: dataframe to be written.
writer_config: writer configuration.
extra_args: extra arguments to be used for the writer.
Returns:
metadata dictionary from the write operation
"""
return [FileWriter().write(df, writer_config, extra_args)]
def _get_jdbc_writer(
self, df: DataFrame, writer_config: Dict, extra_args: Optional[Tuple] = ()
) -> List[Dict]:
"""Get JDBC writer.
Args:
df: dataframe to be written.
writer_config: writer configuration.
extra_args: extra arguments to be used for the writer.
Returns:
metadata dictionary from the write operation
"""
return [JDBCWriter().write(df, writer_config, extra_args)]
def _get_snowflake_writer(
self, df: DataFrame, writer_config: Dict, extra_args: Optional[Tuple] = ()
) -> List[Dict]:
"""Get Snowflake writer.
Args:
df: dataframe to be written.
writer_config: writer configuration.
extra_args: extra arguments to be used for the writer.
Returns:
metadata dictionary from the write operation
"""
return [SnowflakeWriter().write(df, writer_config, extra_args)]
End File"""ReaderConfig class."""
from abc import ABC, abstractmethod
from typing import Dict, List
import pyspark.sql.functions as f
from pyspark.sql import DataFrame, SparkSession
from lakehouse_engine.core.definitions import TABLE_NAME
from lakehouse_engine.utils.logging_handler import LoggingHandler
class Reader(ABC, LoggingHandler):
"""Abstract Reader class."""
def __init__(self, spark: SparkSession):
"""Construct Reader.
Args:
spark: spark session
"""
super().__init__(__name__)
self.spark = spark
def read(self, read_from_path: str, **kwargs) -> DataFrame:
"""Read data from a storage location.
Args:
read_from_path: path or identifier for the data source.
**kwargs: keyword arguments to be passed to the read method.
Returns:
DataFrame with read data
"""
self.log.debug(f"Reader: path={read_from_path}, kwargs={kwargs}")
return self._read_data(read_from_path, **kwargs)
@abstractmethod
def _read_data(self, read_from_path: str, **kwargs) -> DataFrame:
"""Read data implementation to be implemented by concrete readers.
Args:
read_from_path: path or identifier for the data source.
**kwargs: keyword arguments to be passed to the read method.
Returns:
DataFrame with read data
"""
raise NotImplementedError("Method _read_data must be implemented by subclasses")
class DeltaLakeReader(Reader):
"""Reader for Delta Lake tables."""
def _read_data(self, read_from_path: str, **kwargs) -> DataFrame:
"""Read data from a Delta Lake table.
Args:
read_from_path: path or identifier for the delta table.
**kwargs: keyword arguments to be passed to the read method.
Returns:
DataFrame with read data from delta table
"""
if read_from_path.startswith("select "):
# Handle SQL query
return self.spark.sql(read_from_path)
elif "." in read_from_path and not read_from_path.startswith("/"):
# Handle catalog table reference
return self.spark.table(read_from_path)
else:
# Handle path reference
reader = self.spark.read.format("delta")
# Apply any additional options
for key, value in kwargs.items():
if key != "table_name": # Skip table_name, it's not an option
reader = reader.option(key, value)
return reader.load(read_from_path)
class FileReader(Reader):
"""Reader for file types."""
def _read_data(self, read_from_path: str, **kwargs) -> DataFrame:
"""Read data from a file.
Args:
read_from_path: path to the file(s).
**kwargs: keyword arguments to be passed to the read method.
Must contain 'format' specifying the file format (csv, json, etc.)
Returns:
DataFrame with read data from file
"""
format_type = kwargs.pop("format", "csv")
reader = self.spark.read.format(format_type)
# Apply any additional options
for key, value in kwargs.items():
if key != TABLE_NAME: # Skip table_name, it's not an option
reader = reader.option(key, value)
return reader.load(read_from_path)
class JDBCReader(Reader):
"""Reader for JDBC sources."""
def _read_data(self, read_from_path: str, **kwargs) -> DataFrame:
"""Read data from a JDBC source.
Args:
read_from_path: table name or query.
**kwargs: keyword arguments to be passed to the read method.
Must contain JDBC connection parameters.
Returns:
DataFrame with read data from JDBC source
"""
# Basic JDBC options
jdbc_url = kwargs.pop("url")
connection_properties = {}
# Extract specific JDBC connection properties
for key in ["user", "password", "driver"]:
if key in kwargs:
connection_properties[key] = kwargs.pop(key)
# Determine if this is a query or table name
if read_from_path.strip().lower().startswith("select "):
# It's a query
connection_properties["query"] = read_from_path
return self.spark.read.jdbc(
url=jdbc_url,
table="dummy", # Not used for queries
properties=connection_properties
)
else:
# It's a table name
# Apply any additional options to reader
reader = self.spark.read.format("jdbc").option("url", jdbc_url)
reader = reader.option("dbtable", read_from_path)
for key, value in kwargs.items():
if key != TABLE_NAME: # Skip table_name, it's not an option
reader = reader.option(key, value)
# Add connection properties
for key, value in connection_properties.items():
reader = reader.option(key, value)
return reader.load()
class SnowflakeReader(Reader):
"""Reader for Snowflake."""
def _read_data(self, read_from_path: str, **kwargs) -> DataFrame:
"""Read data from Snowflake.
Args:
read_from_path: table name, view name, or query.
**kwargs: keyword arguments to be passed to the read method.
Must contain Snowflake connection parameters.
Returns:
DataFrame with read data from Snowflake
"""
# Configure reader with Snowflake connection params
reader = self.spark.read.format("net.snowflake.spark.snowflake")
# Add all options
for key, value in kwargs.items():
if key != TABLE_NAME: # Skip table_name, it's not an option
reader = reader.option(key, value)
# Determine if this is a query or table name
if read_from_path.strip().lower().startswith("select "):
# It's a query
reader = reader.option("query", read_from_path)
else:
# It's a table name - make sure dbtable is set
reader = reader.option("dbtable", read_from_path)
return reader.load()
class MultiReader(Reader):
"""Reader for combining multiple data sources."""
def _read_data(self, read_from_path: str, **kwargs) -> DataFrame:
"""Read data from multiple sources and combine them.
Args:
read_from_path: Not used in MultiReader.
**kwargs: keyword arguments to be passed to the read method.
Must contain 'sources' list with source configurations.
Returns:
Combined DataFrame from all sources
"""
sources = kwargs.get("sources", [])
if not sources:
raise ValueError("MultiReader requires 'sources' parameter with source configurations")
# Initialize readers for each source type
readers = {
"delta": DeltaLakeReader(self.spark),
"file": FileReader(self.spark),
"jdbc": JDBCReader(self.spark),
"snowflake": SnowflakeReader(self.spark)
}
# Read from each source
dfs = []
for source in sources:
source_type = source.get("type", "delta")
path = source.get("path")
options = source.get("options", {})
if source_type not in readers:
raise ValueError(f"Unsupported source type: {source_type}")
df = readers[source_type].read(path, **options)
# Apply optional transformations to the source
if "select" in source:
df = df.selectExpr(source["select"])
if "where" in source:
df = df.filter(source["where"])
if "alias" in source:
df = df.alias(source["alias"])
dfs.append(df)
# Combine the DataFrames based on the combine_method
combine_method = kwargs.get("combine_method", "union")
return self._combine_dataframes(dfs, combine_method, kwargs.get("join_conditions", {}))
def _combine_dataframes(
self, dataframes: List[DataFrame], method: str, join_options: Dict
) -> DataFrame:
"""Combine multiple DataFrames using the specified method.
Args:
dataframes: List of DataFrames to combine
method: Method to use for combining ('union', 'join')
join_options: Options for join operation
Returns:
Combined DataFrame
"""
if not dataframes:
return self.spark.createDataFrame([], schema=None)
if len(dataframes) == 1:
return dataframes[0]
if method == "union":
result = dataframes[0]
for df in dataframes[1:]:
result = result.unionByName(df, allowMissingColumns=True)
return result
elif method == "join":
if not join_options:
raise ValueError("Join method requires join_conditions")
result = dataframes[0]
for i, df in enumerate(dataframes[1:], 1):
condition = join_options.get(f"condition_{i}", None)
join_type = join_options.get(f"type_{i}", "inner")
if not condition:
raise ValueError(f"Join condition missing for DataFrame {i}")
result = result.join(df, on=f.expr(condition), how=join_type)
return result
else:
raise ValueError(f"Unsupported combine method: {method}")
End File# adidas/lakehouse-engine
"""Writers utils."""
from typing import Dict, List, Optional, Union
from pyspark.sql.types import (
ArrayType,
BooleanType,
ByteType,
DataType,
DateType,
DecimalType,
DoubleType,
FloatType,
IntegerType,
LongType,
MapType,
ShortType,
StringType,
StructField,
StructType,
TimestampType,
)
Maps pyspark types to corresponding Avro types
SPARK_TO_AVRO_TYPE = {
StringType: "string",
IntegerType: "int",
LongType: "long",
FloatType: "float",
DoubleType: "double",
BooleanType: "boolean",
DateType: "int", # Date is represented as int (days since epoch) in Avro
TimestampType: "long", # Timestamp as long (milliseconds since epoch) in Avro
ByteType: "bytes",
ShortType: "int", # No direct short type in Avro
DecimalType: "bytes", # Decimal as bytes in Avro
}
def spark_schema_to_dict(schema: DataType) -> Dict:
"""Converts a Spark schema to a dictionary representation (compatible with Avro).
Args:
schema: Spark schema to convert
Returns:
Dictionary representation of the schema
"""
if isinstance(schema, StructType):
fields = []
for field in schema.fields:
fields.append(struct_field_to_dict(field))
return {
"type": "record",
"name": "Record",
"fields": fields
}
else:
# Handle non-struct types
return {"type": spark_type_to_avro_type(schema)}
def struct_field_to_dict(field: StructField) -> Dict:
"""Converts a StructField to a dictionary.
Args:
field: StructField to convert
Returns:
Dictionary representation of the field
"""
field_dict = {
"name": field.name,
"type": get_field_type_dict(field.dataType, field.nullable)
}
# Add nullable option if field is nullable
if field.nullable:
field_dict["type"] = ["null", field_dict["type"]]
return field_dict
def get_field_type_dict(data_type: DataType, nullable: bool) -> Union[str, Dict, List]:
"""Gets the type representation for a field's data type.
Args:
data_type: The data type of the field
nullable: Whether the field is nullable
Returns:
Type representation (string, dict, or list for union types)
"""
# Handle complex types
if isinstance(data_type, StructType):
fields = []
for field in data_type.fields:
fields.append(struct_field_to_dict(field))
return {
"type": "record",
"name": "NestedRecord", # This name can be improved
"fields": fields
}
elif isinstance(data_type, ArrayType):
item_type = get_field_type_dict(data_type.elementType, data_type.containsNull)
return {
"type": "array",
"items": item_type
}
elif isinstance(data_type, MapType):
# Avro only supports string keys
value_type = get_field_type_dict(data_type.valueType, data_type.valueContainsNull)
return {
"type": "map",
"values": value_type
}
else:
# Handle primitive types
avro_type = spark_type_to_avro_type(data_type)
if nullable:
return ["null", avro_type]
return avro_type
def spark_type_to_avro_type(data_type: DataType) -> str:
"""Maps a Spark data type to its Avro equivalent.
Args:
data_type: Spark data type
Returns:
Corresponding Avro type string
"""
# Use the mapping for direct type matches
for spark_type, avro_type in SPARK_TO_AVRO_TYPE.items():
if isinstance(data_type, spark_type):
# Special handling for logical types like Decimal
if isinstance(data_type, DecimalType):
return {
"type": "bytes",
"logicalType": "decimal",
"precision": data_type.precision,
"scale": data_type.scale
}
return avro_type
# Fallback
return "string"
def get_write_mode(mode: Optional[str] = None) -> str:
"""Get the write mode to be used.
Args:
mode: write mode to be used. Defaults to None.
Returns:
Write mode to be used.
"""
return "overwrite" if mode is None else mode
End File# adidas/lakehouse-engine
lakehouse_engine/io/readers_writers/writers_utils.py
"""Writers utils."""
from typing import Dict, List, Optional, Tuple, Union
from pyspark.sql.types import (
ArrayType,
BooleanType,
ByteType,
DataType,
DateType,
DecimalType,
DoubleType,
FloatType,
IntegerType,
LongType,
MapType,
ShortType,
StringType,
StructField,
StructType,
TimestampType,
)
from lakehouse_engine.core.definitions import (
CDC_CONFIG,
CDC_ENABLED,
CDC_KEY_COLUMNS,
CDC_WATERMARK_COLUMN,
KEY_COLUMN,
OVERWRITE_AT_EXECUTION_TIME,
OVERWRITE_PARTITIONS,
OVERWRITE_SCHEMA,
PARTITION_COLUMNS,
PATH,
PROCESSING_TIME,
WATERMARK_COLUMN,
Z_ORDER_BY,
)
Maps pyspark types to corresponding Avro types
SPARK_TO_AVRO_TYPE = {
StringType: "string",
IntegerType: "int",
LongType: "long",
FloatType: "float",
DoubleType: "double",
BooleanType: "boolean",
DateType: "int", # Date is represented as int (days since epoch) in Avro
TimestampType: "long", # Timestamp as long (milliseconds since epoch) in Avro
ByteType: "bytes",
ShortType: "int", # No direct short type in Avro
DecimalType: "bytes", # Decimal as bytes in Avro
}
def spark_schema_to_dict(schema: DataType) -> Dict:
"""Converts a Spark schema to a dictionary representation (compatible with Avro).
Args:
schema: Spark schema to convert
Returns:
Dictionary representation of the schema
"""
if isinstance(schema, StructType):
fields = []
for field in schema.fields:
fields.append(struct_field_to_dict(field))
return {
"type": "record",
"name": "Record",
"fields": fields
}
else:
# Handle non-struct types
return {"type": spark_type_to_avro_type(schema)}
def struct_field_to_dict(field: StructField) -> Dict:
"""Converts a StructField to a dictionary.
Args:
field: StructField to convert
Returns:
Dictionary representation of the field
"""
field_dict = {
"name": field.name,
"type": get_field_type_dict(field.dataType, field.nullable)
}
# Add nullable option if field is nullable
if field.nullable:
field_dict["type"] = ["null", field_dict["type"]]
return field_dict
def get_field_type_dict(data_type: DataType, nullable: bool) -> Union[str, Dict, List]:
"""Gets the type representation for a field's data type.
Args:
data_type: The data type of the field
nullable: Whether the field is nullable
Returns:
Type representation (string, dict, or list for union types)
"""
# Handle complex types
if isinstance(data_type, StructType):
fields = []
for field in data_type.fields:
fields.append(struct_field_to_dict(field))
return {
"type": "record",
"name": "NestedRecord", # This name can be improved
"fields": fields
}
elif isinstance(data_type, ArrayType):
item_type = get_field_type_dict(data_type.elementType, data_type.containsNull)
return {
"type": "array",
"items": item_type
}
elif isinstance(data_type, MapType):
# Avro only supports string keys
value_type = get_field_type_dict(data_type.valueType, data_type.valueContainsNull)
return {
"type": "map",
"values": value_type
}
else:
# Handle primitive types
avro_type = spark_type_to_avro_type(data_type)
if nullable:
return ["null", avro_type]
return avro_type
def spark_type_to_avro_type(data_type: DataType) -> str:
"""Maps a Spark data type to its Avro equivalent.
Args:
data_type: Spark data type
Returns:
Corresponding Avro type string
"""
# Use the mapping for direct type matches
for spark_type, avro_type in SPARK_TO_AVRO_TYPE.items():
if isinstance(data_type, spark_type):
# Special handling for logical types like Decimal
if isinstance(data_type, DecimalType):
return {
"type": "bytes",
"logicalType": "decimal",
"precision": data_type.precision,
"scale": data_type.scale
}
return avro_type
# Fallback
return "string"
def get_write_mode(mode: Optional[str] = None) -> str:
"""Get the write mode to be used.
Args:
mode: write mode to be used. Defaults to None.
Returns:
Write mode to be used.
"""
return "overwrite" if mode is None else mode
def get_z_order_by_statement_for_delta(z_order_by_cols: Union[str, List]) -> str:
"""Get Z-ORDER BY statement for delta table.
Args:
z_order_by_cols: Z-ORDER BY columns.
Returns:
Statement for Z-ORDER BY.
"""
if z_order_by_cols:
z_order_by_cols_as_list = (
([z_order_by_cols] if isinstance(z_order_by_cols, str) else z_order_by_cols)
if z_order_by_cols
else []
)
z_order_by_cols_joined = ", ".join(
[f"`{col}`" for col in z_order_by_cols_as_list]
)
return f"OPTIMIZE delta.`{0}` ZORDER BY ({z_order_by_cols_joined})"
return ""
def get_overwrite_partitions(
writer_config: Dict, output: Optional[str] = None
) -> List[str]:
"""Get list of partition values for conditional overwrite.
Args:
writer_config: writer configuration.
output: output path.
Returns:
list of partition values
"""
# Handle partitions that are provided by the writer itself at runtime
# e.g.: current execution time
output_path = writer_config.get(PATH, "") if output is None else output
if (
OVERWRITE_AT_EXECUTION_TIME in writer_config
and writer_config[OVERWRITE_AT_EXECUTION_TIME]
):
if PROCESSING_TIME in writer_config and writer_config[PROCESSING_TIME]:
return [writer_config[PROCESSING_TIME].__str__()]
# return parameters for overwritePartitions
if OVERWRITE_PARTITIONS in writer_config and writer_config[OVERWRITE_PARTITIONS]:
return writer_config[OVERWRITE_PARTITIONS]
return []
def get_args_for_merge_to_delta(
writer_config: Dict,
) -> Tuple[bool, str, str, Union[str, List[str]], Union[str, List[str]]]:
"""Get arguments for merge statement in Delta Lake.
Args:
writer_config: writer configuration.
Returns:
CDC enabled flag, waterma rk column, key column, partition cols, Z-ORDER BY columns
"""
cdc_enabled: bool = False
cdc_key_columns: List[str] = []
cdc_watermark_column: str = ""
partition_columns: List[str] = []
z_order_by_columns: List[str] = []
if CDC_CONFIG in writer_config and writer_config[CDC_CONFIG]:
cdc_config = writer_config[CDC_CONFIG]
cdc_enabled = bool(cdc_config.get(CDC_ENABLED, False))
cdc_key_columns = cdc_config.get(CDC_KEY_COLUMNS, [])
cdc_watermark_column = cdc_config.get(CDC_WATERMARK_COLUMN, "")
if PARTITION_COLUMNS in writer_config and writer_config[PARTITION_COLUMNS]:
partition_columns = writer_config[PARTITION_COLUMNS]
if WATERMARK_COLUMN in writer_config and writer_config[WATERMARK_COLUMN]:
cdc_watermark_column = writer_config[WATERMARK_COLUMN]
if KEY_COLUMN in writer_config and writer_config[KEY_COLUMN]:
cdc_key_columns = (
[writer_config[KEY_COLUMN]]
if isinstance(writer_config[KEY_COLUMN], str)
else writer_config[KEY_COLUMN]
)
if Z_ORDER_BY in writer_config and writer_config[Z_ORDER_BY]:
z_order_by_columns = writer_config[Z_ORDER_BY]
return (
cdc_enabled,
cdc_watermark_column,
cdc_key_columns,
partition_columns,
z_order_by_columns,
)
def should_overwrite_schema(writer_config: Dict) -> bool:
"""Check if schema should be overwritten.
Args:
writer_config: writer configuration.
Returns:
True if schema should be overwritten, False otherwise.
"""
return (
OVERWRITE_SCHEMA in writer_config
and writer_config[OVERWRITE_SCHEMA] is True
)
End File# adidas/lakehouse-engine
"""File Writer."""
from typing import Dict, List, Optional, Tuple
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from lakehouse_engine.core.definitions import (
FORMAT,
MODE,
OPTIONS,
OUTPUT_FORMAT,
OUTPUT_MODE,
OUTPUT_OPTIONS,
OUTPUT_PATH,
PARTITION_BY,
PATH,
)
from lakehouse_engine.io.readers_writers.writers_utils import get_write_mode
from lakehouse_engine.utils.logging_handler import LoggingHandler
class FileWriter(LoggingHandler):
"""A class for writing to file storage."""
def __init__(self):
"""Initialize the FileWriter."""
super().__init__(__name__)
def write(
self,
df: DataFrame,
writer_config: Dict,
extra_args: Optional[Tuple] = None,
) -> Dict:
"""Write data to file storage.
Args:
df: The DataFrame to write.
writer_config: Configuration for the writer.
extra_args: Extra arguments for the write operation (not used by this writer).
Returns:
Dictionary with metadata about the write operation.
"""
# Extract configuration
path = writer_config.get(PATH) or writer_config.get(OUTPUT_PATH, "")
format_type = writer_config.get(FORMAT) or writer_config.get(OUTPUT_FORMAT, "parquet")
mode = writer_config.get(MODE) or writer_config.get(OUTPUT_MODE, "overwrite")
options = writer_config.get(OPTIONS) or writer_config.get(OUTPUT_OPTIONS, {})
partition_by = writer_config.get(PARTITION_BY, [])
# Validate required parameters
if not path:
raise ValueError("Output path is required")
# Begin writing process
writer = df.write.format(format_type)
writer = writer.mode(get_write_mode(mode))
# Apply all options
for key, value in options.items():
writer = writer.option(key, value)
# Apply partitioning if specified
if partition_by:
if isinstance(partition_by, str):
partition_by = [partition_by]
writer = writer.partitionBy(partition_by)
# Perform the write operation
self.log.info(f"Writing data to path: {path} (format: {format_type}, mode: {mode})")
writer.save(path)
# Return metadata about the operation
num_rows = df.count()
return {
"path": path,
"format": format_type,
"mode": mode,
"partitions": partition_by if partition_by else None,
"options": options,
"num_rows": num_rows,
"success": True
}
End File# adidas/lakehouse-engine
"""JDBC Writer."""
from typing import Dict, List, Optional, Tuple
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from lakehouse_engine.core.definitions import (
MODE,
OPTIONS,
OUTPUT_MODE,
OUTPUT_OPTIONS,
TABLE,
URL,
)
from lakehouse_engine.io.readers_writers.writers_utils import get_write_mode
from lakehouse_engine.utils.logging_handler import LoggingHandler
class JDBCWriter(LoggingHandler):
"""A class for writing data to databases via JDBC."""
def __init__(self):
"""Initialize the JDBCWriter."""
super().__init__(__name__)
def write(
self,
df: DataFrame,
writer_config: Dict,
extra_args: Optional[Tuple] = None,
) -> Dict:
"""Write data to a JDBC destination.
Args:
df: The DataFrame to write.
writer_config: Configuration for the JDBC writer.
extra_args: Extra arguments for the write operation (not used by this writer).
Returns:
Dictionary with metadata about the write operation.
"""
# Extract configuration
jdbc_url = writer_config.get(URL)
table_name = writer_config.get(TABLE)
mode = writer_config.get(MODE) or writer_config.get(OUTPUT_MODE, "overwrite")
options = writer_config.get(OPTIONS) or writer_config.get(OUTPUT_OPTIONS, {})
# Validate required parameters
if not jdbc_url:
raise ValueError("JDBC URL is required")
if not table_name:
raise ValueError("Table name is required")
# Begin writing process
writer = df.write.format("jdbc")
writer = writer.mode(get_write_mode(mode))
# Set JDBC URL and table name
writer = writer.option("url", jdbc_url)
writer = writer.option("dbtable", table_name)
# Apply additional options
for key, value in options.items():
writer = writer.option(key, value)
# Perform the write operation
self.log.info(f"Writing data to JDBC table: {table_name}")
writer.save()
# Return metadata about the operation
num_rows = df.count()
return {
"jdbc_url": jdbc_url,
"table": table_name,
"mode": mode,
"options": options,
"num_rows": num_rows,
"success": True
}
End File"""Delta Lake Writer."""
from typing import Dict, List, Optional, Tuple, Union
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from delta.tables import DeltaTable
from lakehouse_engine.core.definitions import (
MODE,
OPTIMIZE,
OUTPUT_MODE,
OUTPUT_PATH,
PARTITION_BY,
PATH,
TABLE_NAME,
)
from lakehouse_engine.io.readers_writers.writers_utils import (
get_args_for_merge_to_delta,
get_overwrite_partitions,
get_write_mode,
get_z_order_by_statement_for_delta,
should_overwrite_schema,
)
from lakehouse_engine.utils.logging_handler import LoggingHandler
class DeltaLakeWriter(LoggingHandler):
"""A class for writing to Delta Lake format."""
def __init__(self):
"""Initialize the DeltaLakeWriter."""
super().__init__(__name__)
def write(
self,
df: DataFrame,
writer_config: Dict,
table_type: str,
extra_args: Optional[Tuple] = None,
) -> Dict:
"""Write data to Delta Lake format.
Args:
df: The DataFrame to write.
writer_config: Configuration for the writer.
table_type: Type of table (bronze, silver, gold).
extra_args: Extra arguments for the write operation (not used by this writer).
Returns:
Dictionary with metadata about the write operation.
"""
# Extract configuration
path = writer_config.get(PATH) or writer_config.get(OUTPUT_PATH, "")
table_name = writer_config.get(TABLE_NAME, "")
mode = writer_config.get(MODE) or writer_config.get(OUTPUT_MODE, "overwrite")
partition_by = writer_config.get(PARTITION_BY, [])
optimize = writer_config.get(OPTIMIZE, False)
# Validate required parameters
if not path and not table_name:
raise ValueError("Either path or table_name must be provided")
# Get settings for delta merge operations
(
cdc_enabled,
watermark_column,
key_columns,
partition_columns,
z_order_by,
) = get_args_for_merge_to_delta(writer_config)
target_path = path
# Determine if table exists
table_exists = False
try:
if table_name:
# Check if table exists in catalog
table_exists = df.sparkSession.catalog._jcatalog.tableExists(table_name)
elif path:
# Check if table exists by path
delta_table = DeltaTable.forPath(df.sparkSession, path)
table_exists = True
except:
table_exists = False
output_metadata = {
"table_type": table_type,
"table_name": table_name,
"path": path,
"mode": mode,
"partitions": partition_by if partition_by else None,
}
# Different write strategies based on mode and table existence
if not table_exists or mode in ("overwrite", "errorifexists"):
# Simple write for non-existing tables or overwrite mode
writer = df.write.format("delta")
writer = writer.mode(get_write_mode(mode))
# Apply partitioning if specified
if partition_by:
if isinstance(partition_by, str):
partition_by = [partition_by]
writer = writer.partitionBy(partition_by)
# Handle schema evolution
if should_overwrite_schema(writer_config):
writer = writer.option("overwriteSchema", "true")
# Perform the write operation
if table_name:
self.log.info(f"Writing to Delta table: {table_name}")
writer.saveAsTable(table_name)
else:
self.log.info(f"Writing to Delta path: {path}")
writer.save(path)
else:
# Handle updates to existing tables with CDC or merging
if cdc_enabled and key_columns:
self._handle_cdc_writes(
df,
writer_config,
table_name,
path,
key_columns,
watermark_column,
)
else:
self._handle_append_or_merge(
df,
writer_config,
table_name,
path,
mode,
partition_by,
)
# Optimization if requested
if optimize and z_order_by:
optimize_statement = get_z_order_by_statement_for_delta(z_order_by)
if optimize_statement:
optimize_statement = optimize_statement.format(target_path)
self.log.info(f"Running optimization: {optimize_statement}")
df.sparkSession.sql(optimize_statement)
output_metadata["optimized"] = True
# Count rows for metadata
num_rows = df.count()
output_metadata["num_rows"] = num_rows
output_metadata["success"] = True
return output_metadata
def _handle_cdc_writes(
self,
df: DataFrame,
writer_config: Dict,
table_name: str,
path: str,
key_columns: Union[str, List[str]],
watermark_column: str,
) -> None:
"""Handle CDC writes to Delta table.
Args:
df: The DataFrame to write.
writer_config: Writer configuration.
table_name: Name of the table.
path: Path to the table.
key_columns: Key column(s) for matching records.
watermark_column: Watermark column for tracking updates.
"""
# Ensure key_columns is a list
if isinstance(key_columns, str):
key_columns = [key_columns]
# Get the target table
target_table = None
if table_name:
target_table = DeltaTable.forName(df.sparkSession, table_name)
else:
target_table = DeltaTable.forPath(df.sparkSession, path)
if not target_table:
raise ValueError("Failed to locate target Delta table")
# Create the merge condition based on key columns
merge_condition = " AND ".join([f"source.{k} = target.{k}" for k in key_columns])
# If watermark column is provided, add it to the merge condition
if watermark_column:
merge_condition += f" AND source.{watermark_column} > target.{watermark_column}"
# Get target columns for updates (exclude key columns from updates if needed)
target_columns = [col for col in df.columns]
update_expr = {f"target.{col}": f"source.{col}" for col in target_columns}
insert_expr = {col: f"source.{col}" for col in target_columns}
self.log.info(f"Performing CDC merge with condition: {merge_condition}")
# Execute the merge
target_table.alias("target").merge(
df.alias("source"),
merge_condition
).whenMatchedUpdate(
condition=None,
set=update_expr
).whenNotMatchedInsert(
condition=None,
values=insert_expr
).execute()
def _handle_append_or_merge(
self,
df: DataFrame,
writer_config: Dict,
table_name: str,
path: str,
mode: str,
partition_by: Union[str, List[str]],
) -> None:
"""Handle append or merge operations to Delta table.
Args:
df: The DataFrame to write.
writer_config: Writer configuration.
table_name: Name of the table.
path: Path to the table.
mode: Write mode.
partition_by: Partition columns.
"""
writer = df.write.format("delta")
writer = writer.mode(mode)
# Apply partitioning if specified
if partition_by:
if isinstance(partition_by, str):
partition_by = [partition_by]
writer = writer.partitionBy(partition_by)
# Handle dynamic partition overwrite
overwrite_partitions = get_overwrite_partitions(writer_config, path)
if overwrite_partitions and mode == "overwrite":
# Use replaceWhere for partition overwrite
if isinstance(overwrite_partitions, list) and len(overwrite_partitions) > 0:
partition_col = partition_by[0] if isinstance(partition_by, list) else partition_by
where_condition = f"{partition_col} IN ('{overwrite_partitions[0]}')"
for val in overwrite_partitions[1:]:
where_condition += f", '{val}'"
where_condition += ")"
self.log.info(f"Replacing partitions with condition: {where_condition}")
writer = writer.option("replaceWhere", where_condition)
# Write to table or path
if table_name:
self.log.info(f"Writing to Delta table with mode {mode}: {table_name}")
writer.saveAsTable(table_name)
else:
self.log.info(f"Writing to Delta path with mode {mode}: {path}")
writer.save(path)
End File"""Snowflake Writer."""
from typing import Dict, List, Optional, Tuple
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from lakehouse_engine.core.definitions import (
MODE,
OPTIONS,
OUTPUT_MODE,
OUTPUT_OPTIONS,
SAVE_MODE,
TABLE,
)
from lakehouse_engine.io.readers_writers.writers_utils import get_write_mode
from lakehouse_engine.utils.logging_handler import LoggingHandler
class SnowflakeWriter(LoggingHandler):
"""A class for writing data to Snowflake."""
def __init__(self):
"""Initialize the SnowflakeWriter."""
super().__init__(__name__)
def write(
self,
df: DataFrame,
writer_config: Dict,
extra_args: Optional[Tuple] = None,
) -> Dict:
"""Write data to Snowflake.
Args:
df: The DataFrame to write.
writer_config: Configuration for the Snowflake writer.
extra_args: Extra arguments for the write operation (not used by this writer).
Returns:
Dictionary with metadata about the write operation.
"""
# Extract configuration
table_name = writer_config.get(TABLE)
mode = writer_config.get(MODE) or writer_config.get(OUTPUT_MODE, "overwrite")
options = writer_config.get(OPTIONS) or writer_config.get(OUTPUT_OPTIONS, {})
# Set save mode if not in options
if SAVE_MODE not in options:
options[SAVE_MODE] = get_write_mode(mode)
# Validate required parameters
if not table_name:
raise ValueError("Snowflake table name is required")
# Begin writing process
writer = df.write.format("snowflake")
# Apply all options
for key, value in options.items():
writer = writer.option(key, value)
# Set the table parameter (dbtable)
writer = writer.option("dbtable", table_name)
# Perform the write operation
self.log.info(f"Writing data to Snowflake table: {table_name}")
writer.save()
# Return metadata about the operation
num_rows = df.count()
return {
"table": table_name,
"mode": mode,
"options": options,
"num_rows": num_rows,
"success": True
}
End File# adidas/lakehouse-engine
"""Reader module."""
from typing import Any, Dict, Union
from pyspark.sql import DataFrame, SparkSession
from lakehouse_engine.algorithms.exceptions import UnknownReaderException
from lakehouse_engine.core.definitions import INPUT_FORMAT, InputFormat, RESOURCE_PATH
from lakehouse_engine.io.readers_writers.readers import (
DeltaLakeReader,
FileReader,
JDBCReader,
MultiReader,
SnowflakeReader,
)
from lakehouse_engine.utils.logging_handler import LoggingHandler
class Reader(LoggingHandler):
"""Reader class."""
def __init__(self, spark_session: SparkSession):
"""Construct Reader.
Args:
spark_session: Spark session.
"""
super().__init__(__name__)
self.spark = spark_session
self.readers = {
InputFormat.DELTA: DeltaLakeReader(spark_session),
InputFormat.CSV: FileReader(spark_session),
InputFormat.PARQUET: FileReader(spark_session),
InputFormat.AVRO: FileReader(spark_session),
InputFormat.JSON: FileReader(spark_session),
InputFormat.JDBC: JDBCReader(spark_session),
InputFormat.SNOWFLAKE: SnowflakeReader(spark_session),
InputFormat.MULTI: MultiReader(spark_session),
}
def read(
self, reader_config: Dict[str, Any], reader_type: str = None
) -> DataFrame:
"""Read data according to the reader configuration.
Args:
reader_config: Reader configuration.
reader_type: Reader type (format) to use. Defaults to None.
Returns:
DataFrame with read data
"""
read_from_path = reader_config.get(RESOURCE_PATH)
if read_from_path is None:
raise ValueError("No resource path provided in reader configuration")
# Determine the input format (either from specified reader_type or from config)
format_type = reader_type
if format_type is None:
format_type = reader_config.get(INPUT_FORMAT, InputFormat.DELTA).lower()
# Get the appropriate reader
reader = self.readers.get(format_type)
if reader is None:
raise UnknownReaderException(f"Unknown reader type: {format_type}")
# Prepare options - remove known keys that aren't options
options = reader_config.copy()
non_option_keys = [RESOURCE_PATH, INPUT_FORMAT]
for key in non_option_keys:
if key in options:
del options[key]
# Add format for file reader
if isinstance(reader, FileReader):
options["format"] = format_type
# Read and return the data
return reader.read(read_from_path, **options)
End File# adidas/lakehouse-engine
"""Writer module."""
from typing import Dict, List, Optional, Tuple
from pyspark.sql import DataFrame
from lakehouse_engine.algorithms.exceptions import UnknownWriterException
from lakehouse_engine.core.definitions import OUTPUT_FORMAT, OutputFormat
from lakehouse_engine.io.readers_writers.delta_lake_writer import DeltaLakeWriter
from lakehouse_engine.io.readers_writers.file_writer import FileWriter
from lakehouse_engine.io.readers_writers.jdbc_writer import JDBCWriter
from lakehouse_engine.io.readers_writers.snowflake_writer import SnowflakeWriter
from lakehouse_engine.utils.logging_handler import LoggingHandler
class Writer(LoggingHandler):
"""Writer class."""
def __init__(self):
"""Construct Writer."""
super().__init__(__name__)
self.writers = {
OutputFormat.BRONZE: DeltaLakeWriter(),
OutputFormat.SILVER: DeltaLakeWriter(),
OutputFormat.GOLD: DeltaLakeWriter(),
OutputFormat.DELTA: DeltaLakeWriter(),
OutputFormat.CSV: FileWriter(),
OutputFormat.PARQUET: FileWriter(),
OutputFormat.AVRO: FileWriter(),
OutputFormat.JSON: FileWriter(),
OutputFormat.JDBC: JDBCWriter(),
OutputFormat.SNOWFLAKE: SnowflakeWriter(),
}
def write(
self,
df: DataFrame,
writer_config: Dict,
extra_args: Optional[Tuple] = None,
output_format: str = None,
) -> List[Dict]:
"""Write data according to the writer configuration.
Args:
df: dataframe to be written.
writer_config: writer configuration.
extra_args: extra arguments to be used for the writer.
output_format: explicit output format, overrides what's in the config.
Returns:
metadata dictionaries from the write operations
"""
# Determine the output format (either from specified output_format or from config)
format_type = output_format
if format_type is None:
format_type = writer_config.get(OUTPUT_FORMAT, OutputFormat.DELTA).lower()
# Get the appropriate writer
writer = self.writers.get(format_type)
if writer is None:
raise UnknownWriterException(f"Unknown writer format: {format_type}")
extra_args_unpacked = () if extra_args is None else extra_args
# For delta lake writers, we need to pass the table type
if isinstance(writer, DeltaLakeWriter):
# Map the format to a table type if it's a layer designation
if format_type in [OutputFormat.BRONZE, OutputFormat.SILVER, OutputFormat.GOLD]:
table_type = format_type
else:
table_type = "delta" # Generic delta type
return [writer.write(df, writer_config, table_type, extra_args_unpacked)]
else:
return [writer.write(df, writer_config, extra_args_unpacked)]
End File""" A transformer class for SQL transformations. """
from typing import Any, Dict, List, Optional, Tuple, Union
import pyspark.sql.functions
from pyspark.sql import Column, DataFrame
from lakehouse_engine.utils.logging_handler import LoggingHandler
class SQLTransformer(LoggingHandler):
"""A transformer class for SQL transformations."""
def __init__(self, spark_session):
"""Construct SQLTransformer.
Args:
spark_session: Spark session.
"""
super().__init__(__name__)
self.spark = spark_session
def transform(
self,
input_dfs: Dict[str, DataFrame],
transformation_config: Dict[str, Any]
) -> DataFrame:
"""Transform the input dataframes using SQL.
Args:
input_dfs: Dictionary mapping dataframe aliases to dataframes.
transformation_config: Configuration for SQL transformation.
Returns:
Transformed dataframe.
"""
# Register temporary views for all input dataframes
for alias, df in input_dfs.items():
df.createOrReplaceTempView(alias)
sql_query = transformation_config.get("query")
if not sql_query:
raise ValueError("SQL query is required in transformation configuration")
# Execute the SQL query
self.log.info(f"Executing SQL transformation: {sql_query}")
result_df = self.spark.sql(sql_query)
# Clean up temporary views
for alias in input_dfs.keys():
self.spark.catalog.dropTempView(alias)
return result_df
class ColumnTransformer(LoggingHandler):
"""A transformer class for column operations."""
def __init__(self, spark_session):
"""Construct ColumnTransformer.
Args:
spark_session: Spark session.
"""
super().__init__(__name__)
self.spark = spark_session
# Map of operation names to methods that implement them
self.operations = {
"select": self._select,
"drop": self._drop,
"rename": self._rename,
"cast": self._cast,
"withColumn": self._with_column,
"withColumnRenamed": self._with_column_renamed,
"filter": self._filter,
"where": self._filter, # alias for filter
"orderBy": self._order_by,
"sort": self._order_by, # alias for orderBy
"groupBy": self._group_by,
"join": self._join,
"union": self._union,
"unionByName": self._union_by_name,
"limit": self._limit,
"distinct": self._distinct,
"dropDuplicates": self._drop_duplicates,
"sample": self._sample,
"explode": self._explode,
"pivot": self._pivot,
"unpivot": self._unpivot,
"summary": self._summary,
}
def transform(
self,
input_dfs: Dict[str, DataFrame],
transformation_config: Dict[str, Any]
) -> DataFrame:
"""Transform the input dataframes using column operations.
Args:
input_dfs: Dictionary mapping dataframe aliases to dataframes.
transformation_config: Configuration for column operations.
Format: {"operations": [{"op": "operation_name", "params": {...}}]}
Returns:
Transformed dataframe.
"""
# Get the primary input dataframe
primary_alias = transformation_config.get("primary_df", next(iter(input_dfs.keys())))
result_df = input_dfs[primary_alias]
# Process each operation in sequence
operations = transformation_config.get("operations", [])
for operation in operations:
op_name = operation.get("op")
if not op_name or op_name not in self.operations:
raise ValueError(f"Unknown or missing operation: {op_name}")
op_params = operation.get("params", {})
# Execute the operation
self.log.info(f"Executing column operation: {op_name}")
result_df = self.operations[op_name](result_df, op_params, input_dfs)
return result_df
def _select(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Select columns from dataframe.
Args:
df: Input dataframe.
params: Parameters for the operation. Format: {"columns": ["col1", "col2", ...]}
input_dfs: Dictionary of all input dataframes.
Returns:
Dataframe with selected columns.
"""
columns = params.get("columns", [])
if not columns:
return df
return df.select(*columns)
def _drop(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Drop columns from dataframe.
Args:
df: Input dataframe.
params: Parameters for the operation. Format: {"columns": ["col1", "col2", ...]}
input_dfs: Dictionary of all input dataframes.
Returns:
Dataframe with dropped columns.
"""
columns = params.get("columns", [])
if not columns:
return df
return df.drop(*columns)
def _rename(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Rename columns in dataframe.
Args:
df: Input dataframe.
params: Parameters for the operation. Format: {"mappings": {"old_col1": "new_col1", ...}}
input_dfs: Dictionary of all input dataframes.
Returns:
Dataframe with renamed columns.
"""
mappings = params.get("mappings", {})
if not mappings:
return df
result_df = df
for old_name, new_name in mappings.items():
result_df = result_df.withColumnRenamed(old_name, new_name)
return result_df
def _cast(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Cast columns to specified types.
Args:
df: Input dataframe.
params: Parameters for the operation. Format: {"columns": {"col1": "type1", ...}}
input_dfs: Dictionary of all input dataframes.
Returns:
Dataframe with cast columns.
"""
columns = params.get("columns", {})
if not columns:
return df
result_df = df
for col_name, col_type in columns.items():
result_df = result_df.withColumn(col_name, result_df[col_name].cast(col_type))
return result_df
def _with_column(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Add or replace column in dataframe.
Args:
df: Input dataframe.
params: Parameters for the operation.
Format: {"column": "col_name", "expr": "expression"}
input_dfs: Dictionary of all input dataframes.
Returns:
Dataframe with added/replaced column.
"""
column_name = params.get("column")
expression = params.get("expr")
if not column_name or not expression:
return df
# Convert string expression to Column expression
expr_col = pyspark.sql.functions.expr(expression)
return df.withColumn(column_name, expr_col)
def _with_column_renamed(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Rename a column in dataframe.
Args:
df: Input dataframe.
params: Parameters for the operation.
Format: {"existing": "old_col", "new": "new_col"}
input_dfs: Dictionary of all input dataframes.
Returns:
Dataframe with renamed column.
"""
existing = params.get("existing")
new = params.get("new")
if not existing or not new:
return df
return df.withColumnRenamed(existing, new)
def _filter(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Filter rows in dataframe.
Args:
df: Input dataframe.
params: Parameters for the operation. Format: {"condition": "filter_condition"}
input_dfs: Dictionary of all input dataframes.
Returns:
Filtered dataframe.
"""
condition = params.get("condition")
if not condition:
return df
return df.filter(condition)
def _order_by(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Sort dataframe by columns.
Args:
df: Input dataframe.
params: Parameters for the operation.
Format: {"columns": ["col1", "col2 desc", ...]}
input_dfs: Dictionary of all input dataframes.
Returns:
Sorted dataframe.
"""
columns = params.get("columns", [])
if not columns:
return df
return df.orderBy(*columns)
def _group_by(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Group dataframe by columns and aggregate.
Args:
df: Input dataframe.
params: Parameters for the operation.
Format: {"columns": ["col1", "col2", ...],
"aggs": [{"col": "col3", "agg": "sum"}, ...]}
input_dfs: Dictionary of all input dataframes.
Returns:
Grouped and aggregated dataframe.
"""
columns = params.get("columns", [])
aggs = params.get("aggs", [])
if not columns or not aggs:
return df
# Prepare aggregation expressions
agg_exprs = []
for agg in aggs:
col_name = agg.get("col")
agg_func = agg.get("agg")
alias = agg.get("alias", f"{agg_func}_{col_name}")
agg_method = getattr(pyspark.sql.functions, agg_func, None)
if agg_method:
agg_exprs.append(agg_method(col_name).alias(alias))
# Group by columns and apply aggregations
return df.groupBy(*columns).agg(*agg_exprs)
def _join(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Join dataframe with another dataframe.
Args:
df: Input dataframe.
params: Parameters for the operation.
Format: {"right": "right_df_alias", "on": "join_condition",
"how": "join_type"}
input_dfs: Dictionary of all input dataframes.
Returns:
Joined dataframe.
"""
right_df_alias = params.get("right")
join_condition = params.get("on")
join_type = params.get("how", "inner")
if not right_df_alias or not join_condition:
return df
if right_df_alias not in input_dfs:
raise ValueError(f"Right dataframe alias '{right_df_alias}' not found")
right_df = input_dfs[right_df_alias]
# Handle different types of join conditions
if isinstance(join_condition, str):
# String expression
condition = pyspark.sql.functions.expr(join_condition)
elif isinstance(join_condition, list):
# List of columns to join on
condition = [df[col] == right_df[col] for col in join_condition]
else:
# Default to using the expression as is
condition = join_condition
return df.join(right_df, on=condition, how=join_type)
def _union(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Union dataframe with other dataframes.
Args:
df: Input dataframe.
params: Parameters for the operation.
Format: {"dfs": ["df_alias1", "df_alias2", ...]}
input_dfs: Dictionary of all input dataframes.
Returns:
Unioned dataframe.
"""
df_aliases = params.get("dfs", [])
if not df_aliases:
return df
result_df = df
for alias in df_aliases:
if alias not in input_dfs:
raise ValueError(f"Dataframe alias '{alias}' not found")
result_df = result_df.union(input_dfs[alias])
return result_df
def _union_by_name(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Union dataframes by column name.
Args:
df: Input dataframe.
params: Parameters for the operation.
Format: {"dfs": ["df_alias1", "df_alias2", ...],
"allowMissingColumns": true}
input_dfs: Dictionary of all input dataframes.
Returns:
Unioned dataframe.
"""
df_aliases = params.get("dfs", [])
allow_missing = params.get("allowMissingColumns", True)
if not df_aliases:
return df
result_df = df
for alias in df_aliases:
if alias not in input_dfs:
raise ValueError(f"Dataframe alias '{alias}' not found")
result_df = result_df.unionByName(input_dfs[alias], allowMissingColumns=allow_missing)
return result_df
def _limit(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Limit number of rows in dataframe.
Args:
df: Input dataframe.
params: Parameters for the operation. Format: {"n": 100}
input_dfs: Dictionary of all input dataframes.
Returns:
Limited dataframe.
"""
n = params.get("n", 0)
if n <= 0:
return df
return df.limit(n)
def _distinct(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Return distinct rows from dataframe.
Args:
df: Input dataframe.
params: Parameters for the operation (empty).
input_dfs: Dictionary of all input dataframes.
Returns:
Dataframe with distinct rows.
"""
return df.distinct()
def _drop_duplicates(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Drop duplicate rows from dataframe.
Args:
df: Input dataframe.
params: Parameters for the operation. Format: {"columns": ["col1", "col2", ...]}
input_dfs: Dictionary of all input dataframes.
Returns:
Dataframe with duplicates dropped.
"""
columns = params.get("columns", [])
if not columns:
return df.dropDuplicates()
return df.dropDuplicates(columns)
def _sample(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Sample rows from dataframe.
Args:
df: Input dataframe.
params: Parameters for the operation.
Format: {"fraction": 0.1, "withReplacement": false, "seed": 42}
input_dfs: Dictionary of all input dataframes.
Returns:
Sampled dataframe.
"""
fraction = params.get("fraction", 0.1)
with_replacement = params.get("withReplacement", False)
seed = params.get("seed", None)
return df.sample(fraction=fraction, withReplacement=with_replacement, seed=seed)
def _explode(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Explode an array column to multiple rows.
Args:
df: Input dataframe.
params: Parameters for the operation. Format: {"column": "array_column"}
input_dfs: Dictionary of all input dataframes.
Returns:
Dataframe with exploded column.
"""
column = params.get("column")
if not column:
return df
return df.withColumn(column, pyspark.sql.functions.explode(df[column]))
def _pivot(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Pivot a dataframe.
Args:
df: Input dataframe.
params: Parameters for the operation.
Format: {"columns": ["col1", "col2"], "pivotColumn": "col3",
"values": ["val1", "val2"], "aggregation": "sum"}
input_dfs: Dictionary of all input dataframes.
Returns:
Pivoted dataframe.
"""
group_cols = params.get("columns", [])
pivot_col = params.get("pivotColumn")
values = params.get("values", None)
agg_func = params.get("aggregation", "count")
if not group_cols or not pivot_col:
return df
# Get the aggregation function
agg_method = getattr(pyspark.sql.functions, agg_func)
# Create the pivot
grouped = df.groupBy(*group_cols)
if values:
return grouped.pivot(pivot_col, values).agg(agg_method("*"))
else:
return grouped.pivot(pivot_col).agg(agg_method("*"))
def _unpivot(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Unpivot a dataframe. Note: This is a custom implementation.
Args:
df: Input dataframe.
params: Parameters for the operation.
Format: {"idColumns": ["id1", "id2"],
"pivotColumns": ["pivot1", "pivot2"],
"valueColumn": "value", "nameColumn": "name"}
input_dfs: Dictionary of all input dataframes.
Returns:
Unpivoted dataframe.
"""
id_columns = params.get("idColumns", [])
pivot_columns = params.get("pivotColumns", [])
value_column = params.get("valueColumn", "value")
name_column = params.get("nameColumn", "name")
if not pivot_columns:
return df
# Create expressions for stack function
stack_expr = ", ".join([f"'{col}', `{col}`" for col in pivot_columns])
select_expr = [f"`{col}`" for col in id_columns]
select_expr.append(f"stack({len(pivot_columns)}, {stack_expr}) as (`{name_column}`, `{value_column}`)")
return df.selectExpr(*select_expr)
def _summary(
self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]
) -> DataFrame:
"""Generate a summary of dataframe statistics.
Args:
df: Input dataframe.
params: Parameters for the operation. Format: {"statistics": ["count", "min", "max"]}
input_dfs: Dictionary of all input dataframes.
Returns:
Summary dataframe.
"""
statistics = params.get("statistics", ["count", "mean", "stddev", "min", "max"])
return df.summary(*statistics)
class UDFTransformer(LoggingHandler):
"""A transformer class for User-Defined Functions (UDFs)."""
def __init__(self, spark_session):
"""Construct UDFTransformer.
Args:
spark_session: Spark session.
"""
super().__init__(__name__)
self.spark = spark_session
def transform(
self,
input_dfs: Dict[str, DataFrame],
transformation_config: Dict[str, Any]
) -> DataFrame:
"""Transform using user-defined functions.
Args:
input_dfs: Dictionary mapping dataframe aliases to dataframes.
transformation_config: Configuration for UDF transformation.
Format: {
"primary_df": "main",
"code": "def my_udf(x): return x.upper()",
"register_udfs": [
{
"name": "upper_udf",
"function": "my_udf",
"return_type": "string"
}
],
"apply": [
{
"output_column": "upper_name",
"udf": "upper_udf",
"input_columns": ["name"]
}
]
}
Returns:
Transformed dataframe.
"""
# Get the primary input dataframe
primary_alias = transformation_config.get("primary_df", next(iter(input_dfs.keys())))
result_df = input_dfs[primary_alias]
# Execute the UDF code
udf_code = transformation_config.get("code", "")
if not udf_code:
self.log.warning("No UDF code provided in transformation config")
return result_df
# Create a new namespace for the UDFs
udf_namespace = {}
# Execute the code in the namespace
try:
exec(udf_code, globals(), udf_namespace)
except Exception as e:
self.log.error(f"Error executing UDF code: {e}")
raise
# Register each UDF
registered_udfs = {}
for udf_config in transformation_config.get("register_udfs", []):
udf_name = udf_config.get("name")
function_name = udf_config.get("function")
return_type = udf_config.get("return_type", "string")
if not udf_name or not function_name:
continue
if function_name not in udf_namespace:
self.log.error(f"Function {function_name} not found in UDF code")
continue
# Get the function from the namespace
func = udf_namespace[function_name]
# Register as Spark UDF
registered_udfs[udf_name] = pyspark.sql.functions.udf(func, return_type)
# Apply the UDFs to the dataframe
for apply_config in transformation_config.get("apply", []):
output_column = apply_config.get("output_column")
udf_name = apply_config.get("udf")
input_columns = apply_config.get("input_columns", [])
if not output_column or not udf_name or not input_columns:
continue
if udf_name not in registered_udfs:
self.log.error(f"UDF {udf_name} not registered")
continue
# Apply the UDF to create or replace the output column
udf = registered_udfs[udf_name]
input_cols = [pyspark.sql.functions.col(col) for col in input_columns]
result_df = result_df.withColumn(output_column, udf(*input_cols))
return result_df
End File# adidas/lakehouse-engine
lakehouse_engine/engine.py
"""The Lakehouse Engine module."""
from typing import Any, Dict, List, Optional, Tuple, Union
from pyspark.sql import DataFrame, SparkSession
from lakehouse_engine.core.definitions import (
INPUT_SPECS,
INPUT_TYPE,
OUTPUT_SPECS,
RESOURCE_PATH,
TABLE_NAME,
TRANSFORMATIONS,
)
from lakehouse_engine.io.reader import Reader
from lakehouse_engine.io.writer import Writer
from lakehouse_engine.transformers.transformers import (
ColumnTransformer,
SQLTransformer,
UDFTransformer,
)
from lakehouse_engine.utils.logging_handler import LoggingHandler
class LakehouseEngine(LoggingHandler):
"""Main class that orchestrates the Lakehouse Engine operations."""
def __init__(self, spark_session: SparkSession = None):
"""Initialize the Lakehouse Engine.
Args:
spark_session: The Spark session to use. If None, a new session will be created.
"""
super().__init__(__name__)
# Initialize Spark session if not provided
self.spark = spark_session or SparkSession.builder.getOrCreate()
# Initialize components
self.reader = Reader(self.spark)
self.writer = Writer()
# Initialize transformers
self.transformers = {
"sql": SQLTransformer(self.spark),
"column": ColumnTransformer(self.spark),
"udf": UDFTransformer(self.spark),
}
self.log.info("Lakehouse Engine initialized")
def execute(self, config: Dict[str, Any]) -> List[Dict]:
"""Execute the data processing pipeline defined in the configuration.
Args:
config: The configuration dictionary with input specs, transformations, and output specs.
Returns:
A list of metadata dictionaries from the write operations.
"""
# 1. Read input data
input_dfs = self._read_input_data(config.get(INPUT_SPECS, []))
if not input_dfs:
self.log.warning("No input data to process")
return []
# 2. Apply transformations
transformed_df = self._apply_transformations(
input_dfs, config.get(TRANSFORMATIONS, {})
)
if transformed_df is None:
self.log.info("No transformations applied, using first input as output")
# Use the first input dataframe as the output if no transformations applied
transformed_df = next(iter(input_dfs.values()))
# 3. Write output data
output_specs = config.get(OUTPUT_SPECS, [])
metadata = []
if not output_specs:
self.log.warning("No output specifications provided, pipeline ends without writing data")
else:
for output_spec in output_specs:
output_metadata = self._write_output_data(transformed_df, output_spec)
metadata.extend(output_metadata)
self.log.info("Pipeline execution completed successfully")
return metadata
def _read_input_data(self, input_specs: List[Dict]) -> Dict[str, DataFrame]:
"""Read input data according to specifications.
Args:
input_specs: List of input specifications.
Returns:
Dictionary mapping input aliases to DataFrames.
"""
input_dfs = {}
for spec in input_specs:
# Get unique identifier for this input (table_name or path)
alias = spec.get(TABLE_NAME) or spec.get(RESOURCE_PATH, "input")
# Remove path from alias if it's a full path
if '/' in alias:
alias = alias.split('/')[-1]
self.log.info(f"Reading input data: {alias}")
try:
# Read the data
input_format = spec.get(INPUT_TYPE)
df = self.reader.read(spec, input_format)
# Store with alias
input_dfs[alias] = df
self.log.info(f"Successfully read input: {alias}, {df.count()} rows")
except Exception as e:
self.log.error(f"Error reading input {alias}: {e}")
raise
return input_dfs
def _apply_transformations(
self, input_dfs: Dict[str, DataFrame], transformations: Dict[str, Any]
) -> Optional[DataFrame]:
"""Apply transformations to input data.
Args:
input_dfs: Dictionary of input DataFrames.
transformations: Transformation specifications.
Returns:
Transformed DataFrame, or None if no transformations applied.
"""
if not transformations:
return None
# Get the transformer type
transformer_type = transformations.get("type", "sql")
if transformer_type not in self.transformers:
raise ValueError(f"Unknown transformer type: {transformer_type}")
# Get the appropriate transformer and apply it
transformer = self.transformers[transformer_type]
self.log.info(f"Applying {transformer_type} transformation")
try:
result_df = transformer.transform(input_dfs, transformations)
self.log.info(f"Transformation complete, resulting in {result_df.count()} rows")
return result_df
except Exception as e:
self.log.error(f"Error during transformation: {e}")
raise
def _write_output_data(
self, df: DataFrame, output_spec: Dict[str, Any]
) -> List[Dict]:
"""Write output data according to specification.
Args:
df: DataFrame to write.
output_spec: Output specification.
Returns:
List of metadata dictionaries from write operations.
"""
table_name = output_spec.get(TABLE_NAME, "")
if table_name:
self.log.info(f"Writing output to: {table_name}")
else:
self.log.info(f"Writing output data")
try:
# Write the data
metadata = self.writer.write(df, output_spec)
return metadata
except Exception as e:
self.log.error(f"Error writing output data: {e}")
raise
End File# adidas/lakehouse-engine
lakehouse_engine/utils/logging_handler.py
"""LoggingHandler module."""
import logging
import os
from typing import Callable, List, Optional, Set, Union
class LoggingHandler:
"""LoggingHandler class.
This class is used to handle logging for all the components of the lakehouse engine.
Each component inherits from this class and can use its log attribute to log messages.
Example:
>>> class MyComponent(LoggingHandler):
... def __init__(self):
... super().__init__(__name__)
...
... def do_something(self):
... self.log.info("Doing something")
... self.log.debug("debug message")
"""
log_level: str = os.environ.get("LAKEHOUSE_ENGINE_LOG_LEVEL", "INFO")
def __init__(
self,
name: str,
log_level: Optional[str] = None,
formatter: Optional[str] = None,
):
"""Initialize the LoggingHandler.
Args:
name: the name of the logger (usually __name__)
log_level: the log level for this handler (overrides the class level one if provided)
formatter: the log formatter for this handler
"""
# Create logger
self.log = logging.getLogger(name)
# Set log level from instance, class, or default
level = log_level or self.log_level or "INFO"
self.log.setLevel(self._get_log_level(level))
# If logger doesn't have handlers, add a new one
if not self.log.handlers:
handler = logging.StreamHandler()
handler.setLevel(self.log.level)
# Set formatter
if formatter:
handler.setFormatter(logging.Formatter(formatter))
else:
handler.setFormatter(
logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
)
# Add handler to logger
self.log.addHandler(handler)
def _get_log_level(self, level_name: str) -> int:
"""Get logging level from string.
Args:
level_name: the name of the log level
Returns:
The logging level as an int.
"""
levels = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
return levels.get(level_name.upper(), logging.INFO)
End File"""Core definitions of the lakehouse engine."""
from enum import Enum
class InputFormat(str, Enum):
"""Input format types."""
DELTA = "delta"
CSV = "csv"
PARQUET = "parquet"
AVRO = "avro"
JSON = "json"
JDBC = "jdbc"
SNOWFLAKE = "snowflake"
MULTI = "multi"
class OutputFormat(str, Enum):
"""Output format types."""
DELTA = "delta"
BRONZE = "bronze"
SILVER = "silver"
GOLD = "gold"
CSV = "csv"
PARQUET = "parquet"
AVRO = "avro"
JSON = "json"
JDBC = "jdbc"
SNOWFLAKE = "snowflake"
class TableType(str, Enum):
"""Table types."""
BRONZE = "bronze"
SILVER = "silver"
GOLD = "gold"
Input specs field definitions
INPUT_SPECS = "input_specs"
INPUT_TYPE = "input_type"
RESOURCE_PATH = "resource_path"
Transformation field definitions
TRANSFORMATIONS = "transformations"
PRIMARY_KEYS = "primary_keys"
Output specs field definitions
OUTPUT_SPECS = "output_specs"
OUTPUT_TYPE = "output_type"
OUTPUT_PATH = "output_path"
OUTPUT_FORMAT = "output_format"
OUTPUT_MODE = "output_mode"
OUTPUT_OPTIONS = "output_options"
OUTPUT_TO = "output_to"
Field definitions used in both inputs and outputs
TABLE_NAME = "table_name"
TABLE = "table"
PATH = "path"
FORMAT = "format"
MODE = "mode"
OPTIONS = "options"
Delta Lake specific fields
OPTIMIZE = "optimize"
Z_ORDER_BY = "z_order_by"
PARTITION_BY = "partition_by"
PARTITION_COLUMNS = "partition_columns"
WATERMARK_COLUMN = "watermark_column"
KEY_COLUMN = "key_column"
OVERWRITE_SCHEMA = "overwrite_schema"
OVERWRITE_PARTITIONS = "overwrite_partitions"
OVERWRITE_AT_EXECUTION_TIME = "overwrite_at_execution_time"
PROCESSING_TIME = "processing_time"
CDC specific fields
CDC_CONFIG = "cdc_config"
CDC_ENABLED = "cdc_enabled"
CDC_KEY_COLUMNS = "cdc_key_columns"
CDC_WATERMARK_COLUMN = "cdc_watermark_column"
JDBC specific fields
URL = "url"
DRIVER = "driver"
USER = "user"
PASSWORD = "password"
SAVE_MODE = "saveMode"
Snowflake specific fields
SNOWFLAKE_OPTIONS = "snowflake_options"
End File# adidas/lakehouse-engine
lakehouse_engine/algorithms/extract/delta_extract_pipeline.py
"""DeltaExtractPipeline class."""
from typing import List, Optional
from pyspark.sql import DataFrame, SparkSession
from delta.tables import DeltaTable
from typing import Dict, Any
from lakehouse_engine.algorithms.exceptions import (
ConfigurationError,
)
from lakehouse_engine.utils.logging_handler import LoggingHandler
class DeltaExtractPipeline(LoggingHandler):
"""
A pipeline for extracting data changes from Delta tables.
This pipeline supports various extraction modes such as incremental, snapshot,
CDC (Change Data Capture), and time travel.
"""
def __init__(self, spark: SparkSession):
"""Initialize the DeltaExtractPipeline.
Args:
spark: Spark session.
"""
super().__init__(__name__)
self.spark = spark
def extract(
self, extract_config: Dict[str, Any]
) -> DataFrame:
"""
Extract data changes from a Delta table.
Args:
extract_config: Configuration for the extraction.
Required keys:
- source_table: The Delta table path or name to extract from
- mode: The extraction mode ('incremental', 'snapshot', 'cdc', 'time_travel')
Mode-specific keys:
- For 'incremental':
* watermark_column: Column to track changes (usually a timestamp)
* watermark_value: The last processed watermark value (optional)
- For 'cdc':
* version_column: Column tracking record versions
* operation_column: Column containing operation type (insert, update, delete)
* watermark_column: Same as incremental
* watermark_value: Same as incremental
- For 'time_travel':
* version: The table version to read
* timestamp: The timestamp to read (alternative to version)
Returns:
DataFrame with the extracted data.
Raises:
ConfigurationError: If the configuration is invalid.
"""
# Extract required configuration
source_table = extract_config.get("source_table")
mode = extract_config.get("mode", "snapshot").lower()
if not source_table:
raise ConfigurationError("source_table is required in the extract configuration")
# Check if table exists
table_exists = self._table_exists(source_table)
if not table_exists:
raise ConfigurationError(f"Delta table '{source_table}' does not exist")
# Process according to mode
if mode == "incremental":
return self._extract_incremental(source_table, extract_config)
elif mode == "snapshot":
return self._extract_snapshot(source_table)
elif mode == "cdc":
return self._extract_cdc(source_table, extract_config)
elif mode == "time_travel":
return self._extract_time_travel(source_table, extract_config)
else:
raise ConfigurationError(f"Unknown extraction mode: {mode}")
def _table_exists(self, table_path: str) -> bool:
"""
Check if a Delta table exists.
Args:
table_path: The table path or name.
Returns:
True if the table exists, False otherwise.
"""
try:
if table_path.startswith("delta.`") or "/" in table_path:
# It's a path
_ = DeltaTable.forPath(self.spark, table_path.replace("delta.`", "").replace("`", ""))
else:
# It's a table name
_ = DeltaTable.forName(self.spark, table_path)
return True
except Exception:
return False
def _extract_snapshot(self, source_table: str) -> DataFrame:
"""
Extract a snapshot of the Delta table.
Args:
source_table: The table path or name.
Returns:
DataFrame with the current snapshot.
"""
self.log.info(f"Extracting snapshot from {source_table}")
# Simply read the current state of the table
if source_table.startswith("delta.`") or "/" in source_table:
return self.spark.read.format("delta").load(
source_table.replace("delta.`", "").replace("`", "")
)
else:
return self.spark.table(source_table)
def _extract_incremental(
self, source_table: str, extract_config: Dict[str, Any]
) -> DataFrame:
"""
Extract incremental changes from a Delta table based on a watermark column.
Args:
source_table: The table path or name.
extract_config: The extraction configuration.
Returns:
DataFrame with incremental changes.
Raises:
ConfigurationError: If watermark_column is missing.
"""
watermark_column = extract_config.get("watermark_column")
watermark_value = extract_config.get("watermark_value")
if not watermark_column:
raise ConfigurationError("watermark_column is required for incremental extraction")
self.log.info(
f"Extracting incremental changes from {source_table} "
f"where {watermark_column} > {watermark_value}"
)
# Read the table
if source_table.startswith("delta.`") or "/" in source_table:
df = self.spark.read.format("delta").load(
source_table.replace("delta.`", "").replace("`", "")
)
else:
df = self.spark.table(source_table)
# Filter by watermark if provided
if watermark_value is not None:
df = df.filter(f"{watermark_column} > '{watermark_value}'")
return df
def _extract_cdc(
self, source_table: str, extract_config: Dict[str, Any]
) -> DataFrame:
"""
Extract CDC changes from a Delta table.
Args:
source_table: The table path or name.
extract_config: The extraction configuration.
Returns:
DataFrame with CDC changes.
Raises:
ConfigurationError: If required CDC columns are missing.
"""
version_column = extract_config.get("version_column")
operation_column = extract_config.get("operation_column")
watermark_column = extract_config.get("watermark_column")
watermark_value = extract_config.get("watermark_value")
if not version_column or not operation_column:
raise ConfigurationError(
"version_column and operation_column are required for CDC extraction"
)
self.log.info(f"Extracting CDC changes from {source_table}")
# Read the table
if source_table.startswith("delta.`") or "/" in source_table:
df = self.spark.read.format("delta").load(
source_table.replace("delta.`", "").replace("`", "")
)
else:
df = self.spark.table(source_table)
# Filter by watermark if provided
if watermark_column and watermark_value is not None:
df = df.filter(f"{watermark_column} > '{watermark_value}'")
return df
def _extract_time_travel(
self, source_table: str, extract_config: Dict[str, Any]
) -> DataFrame:
"""
Extract a specific version or timestamp of a Delta table.
Args:
source_table: The table path or name.
extract_config: The extraction configuration.
Returns:
DataFrame with the data at the specified version or timestamp.
Raises:
ConfigurationError: If neither version nor timestamp is provided.
"""
version = extract_config.get("version")
timestamp = extract_config.get("timestamp")
if version is None and timestamp is None:
raise ConfigurationError(
"Either version or timestamp is required for time travel extraction"
)
# Prepare the appropriate reader based on what was provided
reader = self.spark.read.format("delta")
if version is not None:
self.log.info(f"Extracting version {version} from {source_table}")
reader = reader.option("versionAsOf", version)
else:
self.log.info(f"Extracting timestamp {timestamp} from {source_table}")
reader = reader.option("timestampAsOf", timestamp)
# Read the table
if source_table.startswith("delta.`") or "/" in source_table:
return reader.load(source_table.replace("delta.`", "").replace("`", ""))
else:
# For catalog tables with time travel, we need to use different approach
if version is not None:
return self.spark.sql(f"SELECT * FROM {source_table} VERSION AS OF {version}")
else:
return self.spark.sql(f"SELECT * FROM {source_table} TIMESTAMP AS OF '{timestamp}'")
End File"""DataQuality class for data validation."""
from typing import Dict, List, Optional
from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.functions as F
from lakehouse_engine.algorithms.exceptions import (
DataQualityCheckException,
DataQualityConfigurationError,
)
from lakehouse_engine.utils.logging_handler import LoggingHandler
class DataQuality(LoggingHandler):
"""
Data quality validation class for Spark DataFrames.
This class provides data quality checks like completeness, uniqueness,
conformity, and custom validations for Spark DataFrames.
"""
def __init__(self, spark: SparkSession, fail_on_error: bool = True):
"""Initialize the DataQuality class.
Args:
spark: Spark session.
fail_on_error: Whether to raise an exception when checks fail (default True).
"""
super().__init__(__name__)
self.spark = spark
self.fail_on_error = fail_on_error
def validate(
self, df: DataFrame, dq_config: Dict
) -> Dict:
"""
Validate a DataFrame using the provided data quality configuration.
Args:
df: DataFrame to validate.
dq_config: Data quality configuration dictionary.
Should contain a 'checks' key with a list of check configurations.
Returns:
Dictionary with validation results.
Raises:
DataQualityConfigurationError: If the configuration is invalid.
DataQualityCheckException: If checks fail and fail_on_error is True.
"""
if not isinstance(dq_config, dict) or "checks" not in dq_config:
raise DataQualityConfigurationError(
"Data quality configuration must be a dictionary with a 'checks' key"
)
checks = dq_config["checks"]
if not isinstance(checks, list) or not checks:
raise DataQualityConfigurationError(
"The 'checks' key must contain a non-empty list of check configurations"
)
# Initialize results
all_results = {
"passed": True,
"total_checks": len(checks),
"passed_checks": 0,
"failed_checks": 0,
"check_results": []
}
# Run each check
for check in checks:
check_type = check.get("type")
if not check_type:
raise DataQualityConfigurationError("Each check must have a 'type' key")
check_result = self._run_check(df, check_type, check)
all_results["check_results"].append(check_result)
# Update summary statistics
if check_result["passed"]:
all_results["passed_checks"] += 1
else:
all_results["failed_checks"] += 1
all_results["passed"] = False
# Handle failure if needed
if not all_results["passed"] and self.fail_on_error:
failed_checks = [
f"{r['type']}: {r['message']}"
for r in all_results["check_results"]
if not r["passed"]
]
failed_message = ", ".join(failed_checks)
raise DataQualityCheckException(
f"Data quality checks failed: {failed_message}"
)
return all_results
def _run_check(
self, df: DataFrame, check_type: str, check_config: Dict
) -> Dict:
"""
Run a specific data quality check.
Args:
df: DataFrame to check.
check_type: Type of check to run.
check_config: Check configuration.
Returns:
Dictionary with check results.
"""
# Map check types to handler methods
check_handlers = {
"completeness": self._check_completeness,
"uniqueness": self._check_uniqueness,
"conformity": self._check_conformity,
"integrity": self._check_integrity,
"accuracy": self._check_accuracy,
"custom": self._check_custom,
"schema": self._check_schema,
}
# Get the handler for this check type
handler = check_handlers.get(check_type)
if not handler:
return {
"type": check_type,
"passed": False,
"message": f"Unknown check type: {check_type}",
}
# Initialize result with basic info
result = {
"type": check_type,
"name": check_config.get("name", check_type),
"description": check_config.get("description", ""),
"columns": check_config.get("columns", []),
"severity": check_config.get("severity", "error"),
}
try:
# Run the check
check_result = handler(df, check_config)
result.update(check_result)
# Log the result
if result["passed"]:
self.log.info(f"Check '{result['name']}' passed")
else:
log_method = self.log.error if result["severity"] == "error" else self.log.warning
log_method(f"Check '{result['name']}' failed: {result['message']}")
return result
except Exception as e:
# Handle errors in check execution
self.log.exception(f"Error running check '{check_type}': {str(e)}")
return {
"type": check_type,
"name": check_config.get("name", check_type),
"passed": False,
"message": f"Error executing check: {str(e)}",
"severity": check_config.get("severity", "error"),
}
def _check_completeness(
self, df: DataFrame, check_config: Dict
) -> Dict:
"""
Check for null/empty values in specified columns.
Args:
df: DataFrame to check.
check_config: Check configuration.
Required keys:
- columns: List of columns to check
- threshold: Maximum allowed percentage of nulls (0-100)
Returns:
Dictionary with check results.
"""
columns = check_config.get("columns", [])
threshold = check_config.get("threshold", 0)
if not columns:
return {
"passed": False,
"message": "No columns specified for completeness check",
}
# Calculate nulls for each column
null_counts = {}
total_rows = df.count()
if total_rows == 0:
return {
"passed": True,
"message": "DataFrame is empty, completeness check skipped",
}
for col in columns:
null_count = df.filter(F.col(col).isNull() | F.trim(F.col(col)) == "").count()
null_percent = (null_count / total_rows) * 100
null_counts[col] = {"count": null_count, "percent": null_percent}
# Check if any column exceeds the threshold
failed_columns = {
col: stats for col, stats in null_counts.items()
if stats["percent"] > threshold
}
if failed_columns:
col_messages = [
f"{col}: {stats['percent']:.2f}% null ({stats['count']} rows)"
for col, stats in failed_columns.items()
]
return {
"passed": False,
"message": f"Columns exceed null threshold of {threshold}%: {', '.join(col_messages)}",
"details": null_counts,
}
else:
return {
"passed": True,
"message": f"All columns have nulls within threshold of {threshold}%",
"details": null_counts,
}
def _check_uniqueness(
self, df: DataFrame, check_config: Dict
) -> Dict:
"""
Check for uniqueness of values in specified columns.
Args:
df: DataFrame to check.
check_config: Check configuration.
Required keys:
- columns: List of columns forming the unique key
- allow_duplicates: False to fail on any duplicates
Returns:
Dictionary with check results.
"""
columns = check_config.get("columns", [])
allow_duplicates = check_config.get("allow_duplicates", False)
if not columns:
return {
"passed": False,
"message": "No columns specified for uniqueness check",
}
# Count total and distinct values
total_rows = df.count()
distinct_rows = df.select(*columns).distinct().count()
duplicate_rows = total_rows - distinct_rows
if duplicate_rows == 0 or allow_duplicates:
return {
"passed": True,
"message": f"No duplicates found in columns: {', '.join(columns)}",
"details": {
"total_rows": total_rows,
"unique_rows": distinct_rows,
"duplicate_rows": duplicate_rows,
},
}
else:
return {
"passed": False,
"message": f"Found {duplicate_rows} duplicate values in columns: {', '.join(columns)}",
"details": {
"total_rows": total_rows,
"unique_rows": distinct_rows,
"duplicate_rows": duplicate_rows,
},
}
def _check_conformity(
self, df: DataFrame, check_config: Dict
) -> Dict:
"""
Check that values conform to expected patterns or ranges.
Args:
df: DataFrame to check.
check_config: Check configuration.
Required keys:
- column: Column to check
- pattern: Regex pattern for the column values
OR
- min: Minimum allowed value
- max: Maximum allowed value
OR
- allowed_values: List of allowed values
Returns:
Dictionary with check results.
"""
column = check_config.get("column")
pattern = check_config.get("pattern")
min_value = check_config.get("min")
max_value = check_config.get("max")
allowed_values = check_config.get("allowed_values")
threshold = check_config.get("threshold", 0) # % of non-conforming values allowed
if not column:
return {
"passed": False,
"message": "No column specified for conformity check",
}
# Total rows
total_rows = df.count()
if total_rows == 0:
return {
"passed": True,
"message": "DataFrame is empty, conformity check skipped",
}
# Check based on the provided parameters
if pattern:
# Regex pattern check
non_conforming = df.filter(~F.col(column).rlike(pattern)).count()
elif min_value is not None and max_value is not None:
# Range check
non_conforming = df.filter(
(F.col(column) < min_value) | (F.col(column) > max_value)
).count()
elif allowed_values:
# Allowed values check
non_conforming = df.filter(~F.col(column).isin(allowed_values)).count()
else:
return {
"passed": False,
"message": "Conformity check requires pattern, min/max, or allowed_values",
}
# Calculate percentage of non-conforming values
non_conforming_percent = (non_conforming / total_rows) * 100
if non_conforming_percent <= threshold:
return {
"passed": True,
"message": f"Column {column} has {non_conforming_percent:.2f}% non-conforming values, within threshold of {threshold}%",
"details": {
"non_conforming_rows": non_conforming,
"total_rows": total_rows,
"non_conforming_percent": non_conforming_percent,
},
}
else:
return {
"passed": False,
"message": f"Column {column} has {non_conforming_percent:.2f}% non-conforming values, exceeds threshold of {threshold}%",
"details": {
"non_conforming_rows": non_conforming,
"total_rows": total_rows,
"non_conforming_percent": non_conforming_percent,
},
}
def _check_integrity(
self, df: DataFrame, check_config: Dict
) -> Dict:
"""
Check referential integrity against a reference DataFrame.
Args:
df: DataFrame to check.
check_config: Check configuration.
Required keys:
- columns: Columns in this DataFrame forming the foreign key
- reference_table: Reference table or DataFrame name
- reference_columns: Columns in reference table forming the primary key
- allow_orphans: Whether to allow orphan records
Returns:
Dictionary with check results.
"""
columns = check_config.get("columns", [])
reference_table = check_config.get("reference_table")
reference_columns = check_config.get("reference_columns", columns)
allow_orphans = check_config.get("allow_orphans", False)
if not columns or not reference_table:
return {
"passed": False,
"message": "Integrity check requires columns and reference_table",
}
# Read the reference table
try:
ref_df = self.spark.table(reference_table)
except Exception as e:
return {
"passed": False,
"message": f"Failed to read reference table {reference_table}: {str(e)}",
}
# Check for orphan records (records in df not in ref_df)
# First, select only the relevant columns from both dataframes
df_keys = df.select(*columns)
ref_keys = ref_df.select(*reference_columns)
# Find orphans using except
orphans = df_keys.exceptAll(ref_keys)
orphan_count = orphans.count()
if orphan_count == 0 or allow_orphans:
return {
"passed": True,
"message": f"No orphan records found" if orphan_count == 0 else f"{orphan_count} orphan records found, but allowed",
"details": {
"orphan_count": orphan_count,
},
}
else:
return {
"passed": False,
"message": f"Found {orphan_count} orphan records",
"details": {
"orphan_count": orphan_count,
},
}
def _check_accuracy(
self, df: DataFrame, check_config: Dict
) -> Dict:
"""
Check data accuracy using comparisons or derived values.
Args:
df: DataFrame to check.
check_config: Check configuration.
Required keys:
- column: Column to check
- relation: One of 'equal', 'not_equal', 'less_than', 'greater_than'
- value: Value to compare against
OR
- expression: SQL expression to evaluate
Returns:
Dictionary with check results.
"""
column = check_config.get("column")
relation = check_config.get("relation")
value = check_config.get("value")
expression = check_config.get("expression")
threshold = check_config.get("threshold", 0) # % of inaccurate values allowed
if not ((column and relation and value is not None) or expression):
return {
"passed": False,
"message": "Accuracy check requires either column+relation+value or expression",
}
# Total rows
total_rows = df.count()
if total_rows == 0:
return {
"passed": True,
"message": "DataFrame is empty, accuracy check skipped",
}
# Check based on the provided parameters
if expression:
# Use SQL expression
inaccurate = df.filter(~F.expr(expression)).count()
else:
# Use column relation value
if relation == "equal":
inaccurate = df.filter(F.col(column) != value).count()
elif relation == "not_equal":
inaccurate = df.filter(F.col(column) == value).count()
elif relation == "less_than":
inaccurate = df.filter(F.col(column) >= value).count()
elif relation == "greater_than":
inaccurate = df.filter(F.col(column) <= value).count()
else:
return {
"passed": False,
"message": f"Unknown relation: {relation}",
}
# Calculate percentage of inaccurate values
inaccurate_percent = (inaccurate / total_rows) * 100
if inaccurate_percent <= threshold:
return {
"passed": True,
"message": f"Accuracy check passed with {inaccurate_percent:.2f}% inaccurate values, within threshold of {threshold}%",
"details": {
"inaccurate_rows": inaccurate,
"total_rows": total_rows,
"inaccurate_percent": inaccurate_percent,
},
}
else:
return {
"passed": False,
"message": f"Accuracy check failed with {inaccurate_percent:.2f}% inaccurate values, exceeds threshold of {threshold}%",
"details": {
"inaccurate_rows": inaccurate,
"total_rows": total_rows,
"inaccurate_percent": inaccurate_percent,
},
}
def _check_custom(
self, df: DataFrame, check_config: Dict
) -> Dict:
"""
Run a custom SQL expression check.
Args:
df: DataFrame to check.
check_config: Check configuration.
Required keys:
- condition: SQL expression that should evaluate to true for all valid rows
- name: Name of the check
Returns:
Dictionary with check results.
"""
condition = check_config.get("condition")
threshold = check_config.get("threshold", 0) # % of failing rows allowed
if not condition:
return {
"passed": False,
"message": "Custom check requires a condition expression",
}
# Register the dataframe as a temp view for the SQL expression
view_name = f"data_quality_check_view_{hash(condition) % 1000000}"
df.createOrReplaceTempView(view_name)
try:
# Count total and failing rows
total_rows = df.count()
if total_rows == 0:
return {
"passed": True,
"message": "DataFrame is empty, custom check skipped",
}
# Count rows that don't satisfy the condition
failing_rows = df.filter(~F.expr(condition)).count()
failing_percent = (failing_rows / total_rows) * 100
if failing_percent <= threshold:
return {
"passed": True,
"message": f"Custom check passed with {failing_percent:.2f}% failing rows, within threshold of {threshold}%",
"details": {
"failing_rows": failing_rows,
"total_rows": total_rows,
"failing_percent": failing_percent,
},
}
else:
return {
"passed": False,
"message": f"Custom check failed with {failing_percent:.2f}% failing rows, exceeds threshold of {threshold}%",
"details": {
"failing_rows": failing_rows,
"total_rows": total_rows,
"failing_percent": failing_percent,
},
}
finally:
# Clean up the temp view
self.spark.catalog.dropTempView(view_name)
def _check_schema(
self, df: DataFrame, check_config: Dict
) -> Dict:
"""
Check the DataFrame schema against expected columns and types.
Args:
df: DataFrame to check.
check_config: Check configuration.
Required keys:
- expected_columns: List of expected column names
OR
- expected_schema: List of {name, type} dictionaries
Returns:
Dictionary with check results.
"""
expected_columns = check_config.get("expected_columns", [])
expected_schema = check_config.get("expected_schema", [])
if not expected_columns and not expected_schema:
return {
"passed": False,
"message": "Schema check requires expected_columns or expected_schema",
}
# Get actual columns
actual_columns = df.columns
if expected_columns:
# Check column names only
missing_columns = [col for col in expected_columns if col not in actual_columns]
if missing_columns:
return {
"passed": False,
"message": f"Schema is missing expected columns: {', '.join(missing_columns)}",
"details": {
"expected_columns": expected_columns,
"actual_columns": actual_columns,
"missing_columns": missing_columns,
},
}
else:
return {
"passed": True,
"message": "All expected columns are present",
"details": {
"expected_columns": expected_columns,
"actual_columns": actual_columns,
},
}
else:
# Check column names and types
actual_schema = {
field.name: field.dataType.simpleString()
for field in df.schema.fields
}
# Check for missing columns and type mismatches
schema_issues = []
for expected in expected_schema:
col_name = expected.get("name")
col_type = expected.get("type")
if col_name not in actual_columns:
schema_issues.append(f"Column {col_name} is missing")
elif col_type and actual_schema[col_name] != col_type:
schema_issues.append(
f"Column {col_name} has type {actual_schema[col_name]}, expected {col_type}"
)
if schema_issues:
return {
"passed": False,
"message": f"Schema check failed: {', '.join(schema_issues)}",
"details": {
"expected_schema": expected_schema,
"actual_schema": [
{"name": name, "type": type_str}
for name, type_str in actual_schema.items()
],
},
}
else:
return {
"passed": True,
"message": "Schema matches expectations",
"details": {
"expected_schema": expected_schema,
"actual_schema": [
{"name": name, "type": type_str}
for name, type_str in actual_schema.items()
],
},
}
End File# adidas/lakehouse-engine
"""DataQuality class for data validation."""
from typing import Dict, List, Optional
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from lakehouse_engine.utils.logging_handler import LoggingHandler
class SchemaEvolution(LoggingHandler):
"""
Schema evolution class for managing schema changes in Delta tables.
This class provides methods for:
- Comparing schemas between dataframes
- Detecting new, removed, and changed columns
- Applying schema evolution strategies
"""
def __init__(self, spark_session, allow_type_widening=True, allow_column_addition=True):
"""Initialize the SchemaEvolution class.
Args:
spark_session: Spark session to use.
allow_type_widening: Whether to allow widening of column types (e.g. int to long)
allow_column_addition: Whether to allow adding new columns
"""
super().__init__(__name__)
self.spark = spark_session
self.allow_type_widening = allow_type_widening
self.allow_column_addition = allow_column_addition
# Type compatibility mappings for widening
self.type_widening_map = {
"byte": ["short", "int", "long", "float", "double", "decimal"],
"short": ["int", "long", "float", "double", "decimal"],
"int": ["long", "float", "double", "decimal"],
"long": ["float", "double", "decimal"],
"float": ["double", "decimal"],
"decimal": [], # Can only widen precision, not type
"string": [], # Can't widen string
"boolean": [], # Can't widen boolean
"date": ["timestamp"],
"timestamp": [], # Can't widen timestamp
}
def compare_schemas(
self, source_df: DataFrame, target_df: DataFrame
) -> Dict:
"""
Compare schemas between source and target DataFrames.
Args:
source_df: Source DataFrame with new schema.
target_df: Target DataFrame with existing schema.
Returns:
Dictionary with schema comparison results.
"""
self.log.info("Comparing schemas between source and target")
source_schema = {
field.name: field.dataType.simpleString()
for field in source_df.schema.fields
}
target_schema = {
field.name: field.dataType.simpleString()
for field in target_df.schema.fields
}
# Find new, removed, and changed columns
new_columns = [
{"name": name, "type": type_}
for name, type_ in source_schema.items()
if name not in target_schema
]
removed_columns = [
{"name": name, "type": type_}
for name, type_ in target_schema.items()
if name not in source_schema
]
changed_columns = [
{"name": name, "source_type": source_type, "target_type": target_schema[name]}
for name, source_type in source_schema.items()
if name in target_schema and source_type != target_schema[name]
]
comparison = {
"new_columns": new_columns,
"removed_columns": removed_columns,
"changed_columns": changed_columns,
"source_schema": source_schema,
"target_schema": target_schema,
"has_changes": bool(new_columns or removed_columns or changed_columns)
}
if comparison["has_changes"]:
self.log.info(f"Schema differences detected: "
f"{len(new_columns)} new, {len(removed_columns)} removed, "
f"{len(changed_columns)} changed columns")
else:
self.log.info("No schema differences detected")
return comparison
def validate_schema_changes(
self, schema_comparison: Dict, strict: bool = False
) -> Dict:
"""
Validate if schema changes are allowed according to the current settings.
Args:
schema_comparison: Schema comparison from compare_schemas().
strict: Whether to use strict validation (no changes allowed).
Returns:
Validation result dictionary.
"""
if strict:
# In strict mode, no changes are allowed
is_valid = not schema_comparison["has_changes"]
message = "Schema changes not allowed in strict mode" if not is_valid else "No schema changes detected"
return {
"is_valid": is_valid,
"message": message,
"invalid_changes": schema_comparison if not is_valid else {}
}
# Non-strict validation based on settings
invalid_changes = {
"new_columns": [],
"changed_columns": []
}
# Validate new columns
if not self.allow_column_addition:
invalid_changes["new_columns"] = schema_comparison["new_columns"]
# Validate type changes
for changed in schema_comparison["changed_columns"]:
source_type = changed["source_type"]
target_type = changed["target_type"]
# Check if type widening is valid
is_widening_valid = self._is_valid_type_widening(source_type, target_type)
if not is_widening_valid or not self.allow_type_widening:
invalid_changes["changed_columns"].append(changed)
# Check if we have any invalid changes
has_invalid_changes = (
invalid_changes["new_columns"] or
invalid_changes["changed_columns"]
)
result = {
"is_valid": not has_invalid_changes,
"message": "Schema changes are valid" if not has_invalid_changes else "Some schema changes are not allowed",
"invalid_changes": invalid_changes if has_invalid_changes else {}
}
if not result["is_valid"]:
self.log.warning(f"Invalid schema changes: {result['message']}")
return result
def apply_schema_evolution(
self, source_df: DataFrame, target_schema_df: DataFrame, evolution_mode: str = "additive"
) -> DataFrame:
"""
Apply schema evolution to make source DataFrame compatible with target schema.
Args:
source_df: Source DataFrame to evolve.
target_schema_df: Target DataFrame with desired schema.
evolution_mode: Mode of evolution:
- "additive": Adds missing columns, keeps existing ones
- "strict": Matches target schema exactly (drops extra columns)
- "relaxed": Adds missing columns, converts types where possible
Returns:
Evolved DataFrame.
"""
self.log.info(f"Applying schema evolution in {evolution_mode} mode")
# Get schema comparison
comparison = self.compare_schemas(source_df, target_schema_df)
if not comparison["has_changes"]:
self.log.info("No schema evolution needed, schemas already match")
return source_df
# Create a copy of source DataFrame to evolve
evolved_df = source_df
if evolution_mode == "strict":
# In strict mode, we select only the columns from target schema
# This means dropping any extra columns in source
target_columns = list(comparison["target_schema"].keys())
evolved_df = evolved_df.select(*[
F.col(col) if col in evolved_df.columns else F.lit(None).alias(col)
for col in target_columns
])
elif evolution_mode in ["additive", "relaxed"]:
# Add missing columns
for col in comparison["removed_columns"]:
col_name = col["name"]
evolved_df = evolved_df.withColumn(col_name, F.lit(None))
# Handle type conversions in relaxed mode
if evolution_mode == "relaxed":
for changed in comparison["changed_columns"]:
col_name = changed["name"]
target_type = changed["target_type"]
# Try to cast to target type
try:
evolved_df = evolved_df.withColumn(col_name,
F.col(col_name).cast(target_type))
except Exception as e:
self.log.warning(f"Could not cast column {col_name} to {target_type}: {str(e)}")
else:
raise ValueError(f"Unknown evolution mode: {evolution_mode}")
self.log.info(f"Schema evolution complete in {evolution_mode} mode")
return evolved_df
def generate_evolution_statements(
self, schema_comparison: Dict, table_name: str
) -> List[str]:
"""
Generate DDL statements for schema evolution.
Args:
schema_comparison: Schema comparison from compare_schemas().
table_name: Name of the table to evolve.
Returns:
List of SQL statements for schema evolution.
"""
statements = []
# Generate ADD COLUMN statements for new columns
for col in schema_comparison["new_columns"]:
stmt = f"ALTER TABLE {table_name} ADD COLUMN {col['name']} {col['type']}"
statements.append(stmt)
# Generate ALTER COLUMN TYPE statements for changed columns
for changed in schema_comparison["changed_columns"]:
if self._is_valid_type_widening(changed["source_type"], changed["target_type"]):
stmt = f"ALTER TABLE {table_name} ALTER COLUMN {changed['name']} TYPE {changed['source_type']}"
statements.append(stmt)
return statements
def _is_valid_type_widening(self, source_type: str, target_type: str) -> bool:
"""
Check if source_type can be widened to target_type.
Args:
source_type: Source data type.
target_type: Target data type.
Returns:
True if valid widening, False otherwise.
"""
# Extract base type from complex types
base_source_type = source_type.split("(")[0].lower()
base_target_type = target_type.split("(")[0].lower()
# Check if source type can be widened to target type
if base_source_type in self.type_widening_map:
return base_target_type in self.type_widening_map[base_source_type]
return False
End File"""This module contains all exceptions that can be raised by the engine."""
class LakehouseEngineException(Exception):
"""Base exception class for all Lakehouse Engine exceptions."""
class DeprecationException(LakehouseEngineException):
"""Base exception class for deprecated features."""
class DataValidationException(LakehouseEngineException):
"""Base exception class for Data Validation errors."""
class ConfigurationError(LakehouseEngineException):
"""Exception raised when the configuration is invalid."""
class InvalidDataException(LakehouseEngineException):
"""Exception raised when the data doesn't meet validation requirements."""
class DataQualityCheckException(DataValidationException):
"""Exception raised when data quality checks fail."""
class DataQualityConfigurationError(ConfigurationError, DataValidationException):
"""Exception raised when data quality configuration is invalid."""
class SchemaExpectationsException(DataValidationException):
"""Exception raised when schema expectations are not met."""
class UnknownReaderException(ConfigurationError):
"""Exception raised when an unknown reader is specified."""
class UnknownWriterException(ConfigurationError):
"""Exception raised when an unknown writer is specified."""
class MissingFieldException(ConfigurationError):
"""Exception raised when a required field is missing in the configuration."""
class UnknownTransformationException(ConfigurationError):
"""Exception raised when an unknown transformation is specified."""
class ProcessingException(LakehouseEngineException):
"""Exception raised when there's an error in data processing."""
class ExecutionException(LakehouseEngineException):
"""Exception raised when there's an error during pipeline execution."""
End Fileimport base64
import json
import requests
import logging
from datetime import datetime
from typing import Dict, List, Optional, Any
from lakehouse_engine.utils.logging_handler import LoggingHandler
class APIClient(LoggingHandler):
"""Client for interacting with REST APIs.
This class provides methods for making HTTP requests to REST APIs with
various authentication methods and handling common response patterns.
"""
def __init__(
self,
base_url: str,
auth_type: Optional[str] = None,
auth_config: Optional[Dict[str, str]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: int = 30
):
"""Initialize the API client.
Args:
base_url: Base URL for API requests
auth_type: Type of authentication ('basic', 'token', 'oauth', etc.)
auth_config: Authentication configuration parameters
headers: Default headers to include in all requests
timeout: Default timeout for requests in seconds
"""
super().__init__(__name__)
self.base_url = base_url.rstrip('/')
self.timeout = timeout
self.headers = headers or {}
self.session = requests.Session()
# Apply authentication if provided
if auth_type and auth_config:
self._configure_auth(auth_type, auth_config)
def _configure_auth(self, auth_type: str, auth_config: Dict[str, str]) -> None:
"""Configure authentication for requests.
Args:
auth_type: Type of authentication
auth_config: Authentication configuration
"""
auth_type = auth_type.lower()
if auth_type == 'basic':
username = auth_config.get('username', '')
password = auth_config.get('password', '')
credentials = f"{username}:{password}"
encoded = base64.b64encode(credentials.encode()).decode()
self.headers['Authorization'] = f'Basic {encoded}'
elif auth_type == 'token':
token = auth_config.get('token', '')
prefix = auth_config.get('prefix', 'Bearer')
self.headers['Authorization'] = f'{prefix} {token}'
elif auth_type == 'oauth':
# Get token from OAuth provider
token_url = auth_config.get('token_url', '')
client_id = auth_config.get('client_id', '')
client_secret = auth_config.get('client_secret', '')
scope = auth_config.get('scope', '')
data = {
'grant_type': 'client_credentials',
'client_id': client_id,
'client_secret': client_secret
}
if scope:
data['scope'] = scope
try:
response = requests.post(token_url, data=data)
response.raise_for_status()
token_data = response.json()
self.headers['Authorization'] = f"Bearer {token_data.get('access_token')}"
# Save token expiration for refresh logic
self.token_expiry = datetime.now().timestamp() + token_data.get('expires_in', 3600)
self.token_config = auth_config
except Exception as e:
self.log.error(f"OAuth authentication failed: {str(e)}")
raise
elif auth_type == 'api_key':
key = auth_config.get('key', '')
key_name = auth_config.get('key_name', 'api-key')
in_header = auth_config.get('in_header', True)
if in_header:
self.headers[key_name] = key
else:
# Add as query parameter to all requests
self.api_key_param = (key_name, key)
else:
self.log.warning(f"Unknown authentication type: {auth_type}")
def request(
self,
method: str,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
data: Optional[Any] = None,
json_data: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
stream: bool = False
) -> requests.Response:
"""Make an HTTP request to the API.
Args:
method: HTTP method (GET, POST, PUT, DELETE, etc.)
endpoint: API endpoint (will be appended to base_url)
params: Query parameters
data: Request data (for form data)
json_data: Request data (will be JSON-encoded)
headers: Additional headers for this request
timeout: Request timeout in seconds
stream: Whether to stream the response
Returns:
Response object from the requests library
"""
# Build URL
url = f"{self.base_url}/{endpoint.lstrip('/')}"
# Combine headers
request_headers = {**self.headers}
if headers:
request_headers.update(headers)
# Set timeout
request_timeout = timeout if timeout is not None else self.timeout
# Make request
try:
response = self.session.request(
method=method.upper(),
url=url,
params=params,
data=data,
json=json_data,
headers=request_headers,
timeout=request_timeout,
stream=stream
)
self.log.debug(f"{method} {url} - Status: {response.status_code}")
return response
except requests.RequestException as e:
self.log.error(f"Request failed: {method} {url} - {str(e)}")
raise
def get(
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
**kwargs
) -> requests.Response:
"""Make a GET request to the API.
Args:
endpoint: API endpoint
params: Query parameters
**kwargs: Additional arguments to pass to request()
Returns:
Response object
"""
return self.request('GET', endpoint, params=params, **kwargs)
def post(
self,
endpoint: str,
json_data: Optional[Dict[str, Any]] = None,
data: Optional[Any] = None,
**kwargs
) -> requests.Response:
"""Make a POST request to the API.
Args:
endpoint: API endpoint
json_data: JSON data for the request body
data: Form data for the request body
**kwargs: Additional arguments to pass to request()
Returns:
Response object
"""
return self.request('POST', endpoint, json_data=json_data, data=data, **kwargs)
def put(
self,
endpoint: str,
json_data: Optional[Dict[str, Any]] = None,
data: Optional[Any] = None,
**kwargs
) -> requests.Response:
"""Make a PUT request to the API.
Args:
endpoint: API endpoint
json_data: JSON data for the request body
data: Form data for the request body
**kwargs: Additional arguments to pass to request()
Returns:
Response object
"""
return self.request('PUT', endpoint, json_data=json_data, data=data, **kwargs)
def delete(
self,
endpoint: str,
**kwargs
) -> requests.Response:
"""Make a DELETE request to the API.
Args:
endpoint: API endpoint
**kwargs: Additional arguments to pass to request()
Returns:
Response object
"""
return self.request('DELETE', endpoint, **kwargs)
def get_json(
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
**kwargs
) -> Any:
"""Make a GET request and parse the JSON response.
Args:
endpoint: API endpoint
params: Query parameters
**kwargs: Additional arguments to pass to request()
Returns:
Parsed JSON response
"""
response = self.get(endpoint, params=params, **kwargs)
response.raise_for_status()
return response.json()
def post_json(
self,
endpoint: str,
json_data: Optional[Dict[str, Any]] = None,
**kwargs
) -> Any:
"""Make a POST request and parse the JSON response.
Args:
endpoint: API endpoint
json_data: JSON data for the request body
**kwargs: Additional arguments to pass to request()
Returns:
Parsed JSON response
"""
response = self.post(endpoint, json_data=json_data, **kwargs)
response.raise_for_status()
return response.json()
def paginate(
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
page_param: str = 'page',
size_param: str = 'size',
page_size: int = 100,
max_pages: Optional[int] = None,
data_key: Optional[str] = None,
next_page_key: Optional[str] = None,
**kwargs
) -> List[Any]:
"""Paginate through API results.
Args:
endpoint: API endpoint
params: Query parameters
page_param: Name of page parameter for standard pagination
size_param: Name of size parameter for standard pagination
page_size: Number of items per page
max_pages: Maximum number of pages to retrieve (None for all)
data_key: JSON key containing the data array in response
next_page_key: Key in response that contains next page token/URL
**kwargs: Additional arguments to pass to request()
Returns:
List of all items from paginated responses
"""
all_results = []
current_page = 1
request_params = params.copy() if params else {}
# If we use standard page/size pagination
if next_page_key is None:
request_params[page_param] = current_page
request_params[size_param] = page_size
while True:
response = self.get(endpoint, params=request_params, **kwargs)
response.raise_for_status()
data = response.json()
# Extract data based on data_key if provided
page_data = data.get(data_key, data) if data_key else data
# Add data to results
if isinstance(page_data, list):
all_results.extend(page_data)
else:
self.log.warning(f"Unexpected data format in pagination response: {type(page_data)}")
# Check if we need to stop
if max_pages and current_page >= max_pages:
break
# For cursor-based pagination
if next_page_key:
next_page_token = data.get(next_page_key)
if not next_page_token:
break
request_params['cursor'] = next_page_token
# For standard pagination
else:
current_page += 1
request_params[page_param] = current_page
# Check if we received fewer items than the page size
if isinstance(page_data, list) and len(page_data) < page_size:
break
return all_results
def download_file(
self,
endpoint: str,
output_path: str,
params: Optional[Dict[str, Any]] = None,
chunk_size: int = 8192,
**kwargs
) -> str:
"""Download a file from the API.
Args:
endpoint: API endpoint
output_path: Path to save the downloaded file
params: Query parameters
chunk_size: Size of chunks for streaming download
**kwargs: Additional arguments to pass to request()
Returns:
Path to the downloaded file
"""
response = self.get(endpoint, params=params, stream=True, **kwargs)
response.raise_for_status()
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
self.log.info(f"Downloaded file to {output_path}")
return output_path
def upload_file(
self,
endpoint: str,
file_path: str,
file_param: str = 'file',
additional_data: Optional[Dict[str, Any]] = None,
**kwargs
) -> requests.Response:
"""Upload a file to the API.
Args:
endpoint: API endpoint
file_path: Path to the file to upload
file_param: Name of the file parameter
additional_data: Additional form data to include
**kwargs: Additional arguments to pass to request()
Returns:
Response object
"""
files = {file_param: open(file_path, 'rb')}
data = additional_data or {}
response = self.post(endpoint, data=data, files=files, **kwargs)
response.raise_for_status()
self.log.info(f"Uploaded file {file_path} to {endpoint}")
return response
End File# adidas/lakehouse-engine
import os
import time
from typing import Dict, List, Optional, Any, Union
import requests
from lakehouse_engine.utils.logging_handler import LoggingHandler
class SlackClient(LoggingHandler):
"""Client for interacting with Slack API.
This class provides methods for sending messages, uploading files,
and working with channels in Slack.
"""
def __init__(self, token: Optional[str] = None):
"""Initialize the Slack client.
Args:
token: Slack API token (defaults to SLACK_API_TOKEN environment variable)
"""
super().__init__(__name__)
self.token = token or os.environ.get("SLACK_API_TOKEN")
if not self.token:
raise ValueError("Slack API token is required")
self.base_url = "https://slack.com/api"
self.headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
def send_message(
self,
channel: str,
text: str,
blocks: Optional[List[Dict[str, Any]]] = None,
thread_ts: Optional[str] = None,
reply_broadcast: bool = False,
attachments: Optional[List[Dict[str, Any]]] = None
) -> Dict[str, Any]:
"""Send a message to a Slack channel.
Args:
channel: Channel ID or name to send the message to
text: Message text (fallback for clients that don't support blocks)
blocks: Rich message layout blocks
thread_ts: Thread timestamp to reply to
reply_broadcast: Whether to also send reply to the channel
attachments: Legacy message attachments
Returns:
Slack API response
"""
payload = {
"channel": channel,
"text": text
}
if blocks:
payload["blocks"] = blocks
if thread_ts:
payload["thread_ts"] = thread_ts
payload["reply_broadcast"] = reply_broadcast
if attachments:
payload["attachments"] = attachments
response = requests.post(
f"{self.base_url}/chat.postMessage",
headers=self.headers,
json=payload
)
response.raise_for_status()
data = response.json()
if not data.get("ok", False):
self.log.error(f"Failed to send message: {data.get('error', 'Unknown error')}")
return data
def upload_file(
self,
channels: Union[str, List[str]],
file_path: Optional[str] = None,
content: Optional[str] = None,
filename: Optional[str] = None,
filetype: Optional[str] = None,
initial_comment: Optional[str] = None,
thread_ts: Optional[str] = None
) -> Dict[str, Any]:
"""Upload a file to Slack.
Args:
channels: Channel ID(s) to share the file in
file_path: Path to the file to upload
content: Content of the file as string (alternative to file_path)
filename: Name of the file
filetype: File type (e.g., 'csv', 'txt')
initial_comment: Initial comment for the file
thread_ts: Thread timestamp to share the file in
Returns:
Slack API response
Raises:
ValueError: If neither file_path nor content is provided
"""
if not file_path and not content:
raise ValueError("Either file_path or content must be provided")
# Prepare the channels string
if isinstance(channels, list):
channels_str = ",".join(channels)
else:
channels_str = channels
# Prepare request data
data = {"channels": channels_str}
if initial_comment:
data["initial_comment"] = initial_comment
if thread_ts:
data["thread_ts"] = thread_ts
if filename:
data["filename"] = filename
if filetype:
data["filetype"] = filetype
# Prepare files for upload
files = {}
if file_path:
files["file"] = open(file_path, "rb")
if not filename:
data["filename"] = os.path.basename(file_path)
else:
files["file"] = (filename or "file.txt", content)
# Update headers for file upload (remove Content-Type)
upload_headers = {
"Authorization": f"Bearer {self.token}"
}
try:
response = requests.post(
f"{self.base_url}/files.upload",
headers=upload_headers,
data=data,
files=files
)
response.raise_for_status()
result = response.json()
if not result.get("ok", False):
self.log.error(f"Failed to upload file: {result.get('error', 'Unknown error')}")
return result
finally:
# Close file if it was opened
if file_path and "file" in files:
files["file"].close()
def get_channel_id(self, channel_name: str) -> Optional[str]:
"""Get channel ID from channel name.
Args:
channel_name: Name of the channel (without #)
Returns:
Channel ID if found, None otherwise
"""
# Remove # if present at the beginning
if channel_name.startswith("#"):
channel_name = channel_name[1:]
# Try conversations.list approach first
response = requests.get(
f"{self.base_url}/conversations.list",
headers=self.headers,
params={"types": "public_channel,private_channel"}
)
response.raise_for_status()
data = response.json()
if data.get("ok", False):
for channel in data.get("channels", []):
if channel.get("name") == channel_name:
return channel.get("id")
# Try searching next
response = requests.get(
f"{self.base_url}/conversations.list",
headers=self.headers,
params={"exclude_archived": True}
)
response.raise_for_status()
data = response.json()
if data.get("ok", False):
for channel in data.get("channels", []):
if channel.get("name") == channel_name:
return channel.get("id")
self.log.warning(f"Could not find channel with name: {channel_name}")
return None
def create_channel(
self,
name: str,
is_private: bool = False,
user_ids: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Create a new channel in Slack.
Args:
name: Name for the channel
is_private: Whether the channel should be private
user_ids: List of user IDs to invite to the channel
Returns:
Slack API response
"""
# Ensure channel name meets Slack requirements
# - Lowercase letters, numbers, hyphens, and underscores only
# - Maximum 80 characters
# - Cannot begin with a hyphen or underscore
sanitized_name = ''.join(
c for c in name.lower()
if c.isalnum() or c in ['-', '_']
)
if sanitized_name.startswith(('-', '_')):
sanitized_name = 'c' + sanitized_name[1:]
sanitized_name = sanitized_name[:80]
if sanitized_name != name:
self.log.info(f"Channel name sanitized: '{name}' → '{sanitized_name}'")
payload = {
"name": sanitized_name,
"is_private": is_private
}
# Create the channel
response = requests.post(
f"{self.base_url}/conversations.create",
headers=self.headers,
json=payload
)
response.raise_for_status()
data = response.json()
if not data.get("ok", False):
error = data.get("error", "Unknown error")
self.log.error(f"Failed to create channel: {error}")
return data
# Invite users if specified
channel_id = data.get("channel", {}).get("id")
if channel_id and user_ids:
invite_payload = {
"channel": channel_id,
"users": ",".join(user_ids)
}
invite_response = requests.post(
f"{self.base_url}/conversations.invite",
headers=self.headers,
json=invite_payload
)
if not invite_response.json().get("ok", False):
self.log.warning(f"Failed to invite users to channel: {invite_response.json().get('error')}")
return data
def update_message(
self,
channel: str,
ts: str,
text: str,
blocks: Optional[List[Dict[str, Any]]] = None,
attachments: Optional[List[Dict[str, Any]]] = None
) -> Dict[str, Any]:
"""Update an existing message in Slack.
Args:
channel: Channel ID where the message exists
ts: Timestamp of the message to update
text: New message text
blocks: New rich message layout blocks
attachments: New message attachments
Returns:
Slack API response
"""
payload = {
"channel": channel,
"ts": ts,
"text": text
}
if blocks:
payload["blocks"] = blocks
if attachments:
payload["attachments"] = attachments
response = requests.post(
f"{self.base_url}/chat.update",
headers=self.headers,
json=payload
)
response.raise_for_status()
data = response.json()
if not data.get("ok", False):
self.log.error(f"Failed to update message: {data.get('error', 'Unknown error')}")
return data
def add_reaction(
self,
channel: str,
timestamp: str,
reaction: str
) -> Dict[str, Any]:
"""Add a reaction to a message.
Args:
channel: Channel ID where the message exists
timestamp: Timestamp of the message
reaction: Emoji name (without colons)
Returns:
Slack API response
"""
# Remove colons if they exist
if reaction.startswith(':') and reaction.endswith(':'):
reaction = reaction[1:-1]
payload = {
"channel": channel,
"timestamp": timestamp,
"name": reaction
}
response = requests.post(
f"{self.base_url}/reactions.add",
headers=self.headers,
json=payload
)
response.raise_for_status()
data = response.json()
if not data.get("ok", False):
self.log.error(f"Failed to add reaction: {data.get('error', 'Unknown error')}")
return data
def delete_message(self, channel: str, ts: str) -> Dict[str, Any]:
"""Delete a message from Slack.
Args:
channel: Channel ID where the message exists
ts: Timestamp of the message to delete
Returns:
Slack API response
"""
payload = {
"channel": channel,
"ts": ts
}
response = requests.post(
f"{self.base_url}/chat.delete",
headers=self.headers,
json=payload
)
response.raise_for_status()
data = response.json()
if not data.get("ok", False):
self.log.error(f"Failed to delete message: {data.get('error', 'Unknown error')}")
return data
def get_user_info(self, user_id: str) -> Dict[str, Any]:
"""Get information about a user.
Args:
user_id: User ID to get information for
Returns:
User information
"""
response = requests.get(
f"{self.base_url}/users.info",
headers=self.headers,
params={"user": user_id}
)
response.raise_for_status()
data = response.json()
if not data.get("ok", False):
self.log.error(f"Failed to get user info: {data.get('error', 'Unknown error')}")
return data
def search_messages(
self,
query: str,
count: int = 100,
page: int = 1,
sort: str = "timestamp",
sort_dir: str = "desc"
) -> Dict[str, Any]:
"""Search for messages in Slack.
Args:
query: Search query
count: Number of results per page
page: Page number
sort: Sort field ('score' or 'timestamp')
sort_dir: Sort direction ('asc' or 'desc')
Returns:
Search results
"""
params = {
"query": query,
"count": count,
"page": page,
"sort": sort,
"sort_dir": sort_dir
}
response = requests.get(
f"{self.base_url}/search.messages",
headers=self.headers,
params=params
)
response.raise_for_status()
data = response.json()
if not data.get("ok", False):
self.log.error(f"Failed to search messages: {data.get('error', 'Unknown error')}")
return data
End File# adidas/lakehouse-engine
"""Tests for the JDBCReader class."""
import pytest
from unittest.mock import MagicMock, patch
from lakehouse_engine.io.readers_writers.readers import JDBCReader
class TestJDBCReader:
"""Test class for JDBCReader."""
def test_read_data_query(self, mocker):
"""Test reading from a JDBC query."""
# Mock dependencies
mock_spark = MagicMock()
mock_jdbc_read = MagicMock()
mock_spark.read.jdbc.return_value = mock_jdbc_read
# Create reader instance
reader = JDBCReader(mock_spark)
# Mock a query read
query = "SELECT * FROM table"
kwargs = {
"url": "jdbc:mysql://localhost:3306/db",
"user": "user",
"password": "pass",
"driver": "com.mysql.jdbc.Driver"
}
reader._read_data(query, **kwargs)
# Verify JDBC was called correctly for a query
mock_spark.read.jdbc.assert_called_once()
call_args = mock_spark.read.jdbc.call_args[1]
assert call_args["url"] == "jdbc:mysql://localhost:3306/db"
assert call_args["table"] == "dummy" # Not used for queries
assert "query" in call_args["properties"]
assert call_args["properties"]["query"] == query
def test_read_data_table(self, mocker):
"""Test reading from a JDBC table."""
# Mock dependencies
mock_spark = MagicMock()
mock_option = MagicMock()
mock_load = MagicMock()
# Set up chained mock methods
mock_spark.read.format.return_value = mock_option
mock_option.option.return_value = mock_option
mock_option.load.return_value = mock_load
# Create reader instance
reader = JDBCReader(mock_spark)
# Mock a table read
table_name = "my_table"
kwargs = {
"url": "jdbc:mysql://localhost:3306/db",
"user": "user",
"password": "pass",
"driver": "com.mysql.jdbc.Driver",
"fetchsize": "10000"
}
reader._read_data(table_name, **kwargs)
# Verify format was set to jdbc
mock_spark.read.format.assert_called_once_with("jdbc")
# Check that options were set correctly
option_calls = mock_option.option.call_args_list
expected_options = {
"url": "jdbc:mysql://localhost:3306/db",
"dbtable": "my_table",
"user": "user",
"password": "pass",
"driver": "com.mysql.jdbc.Driver",
"fetchsize": "10000"
}
# Verify each option was set
for key, value in expected_options.items():
assert (key, value) in [(call[0][0], call[0][1]) for call in option_calls]
# Verify load was called
mock_option.load.assert_called_once()
def test_read_with_invalid_params(self):
"""Test reading with invalid parameters."""
# Mock dependencies
mock_spark = MagicMock()
# Create reader instance
reader = JDBCReader(mock_spark)
# Call without required url
with pytest.raises(KeyError):
reader._read_data("my_table", driver="com.mysql.jdbc.Driver", user="user", password="pass")
End File# tests/unit/io/readers_writers/test_delta_lake_writer.py
"""Tests for the DeltaLakeWriter class."""
import pytest
from unittest.mock import MagicMock, patch
from lakehouse_engine.io.readers_writers.delta_lake_writer import DeltaLakeWriter
class TestDeltaLakeWriter:
"""Test class for DeltaLakeWriter."""
def test_write_new_table(self):
"""Test writing to a new Delta table."""
# Create mocks
mock_df = MagicMock()
mock_writer = MagicMock()
mock_df.write.format.return_value = mock_writer
mock_writer.mode.return_value = mock_writer
# Create writer instance
writer = DeltaLakeWriter()
# Mock table existence check
with patch.object(writer, '_table_exists', return_value=False):
# Test writing to a new table by path
writer_config = {
"path": "/path/to/delta",
"mode": "overwrite"
}
result = writer.write(mock_df, writer_config, "bronze")
# Verify correct format and mode were set
mock_df.write.format.assert_called_once_with("delta")
mock_writer.mode.assert_called_once_with("overwrite")
# Verify save was called with the path
mock_writer.save.assert_called_once_with("/path/to/delta")
# Verify the result contains the correct metadata
assert result["table_type"] == "bronze"
assert result["path"] == "/path/to/delta"
assert result["mode"] == "overwrite"
assert result["success"] == True
def test_write_with_table_name(self):
"""Test writing to a Delta table using table name."""
# Create mocks
mock_df = MagicMock()
mock_writer = MagicMock()
mock_df.write.format.return_value = mock_writer
mock_writer.mode.return_value = mock_writer
# Create writer instance
writer = DeltaLakeWriter()
# Mock table existence check
with patch.object(writer, '_table_exists', return_value=False):
# Test writing to a table by name
writer_config = {
"table_name": "my_database.my_table",
"mode": "overwrite"
}
result = writer.write(mock_df, writer_config, "silver")
# Verify correct format and mode were set
mock_df.write.format.assert_called_once_with("delta")
mock_writer.mode.assert_called_once_with("overwrite")
# Verify saveAsTable was called with the table name
mock_writer.saveAsTable.assert_called_once_with("my_database.my_table")
# Verify the result contains the correct metadata
assert result["table_type"] == "silver"
assert result["table_name"] == "my_database.my_table"
assert result["mode"] == "overwrite"
assert result["success"] == True
def test_write_with_partitioning(self):
"""Test writing to a Delta table with partitioning."""
# Create mocks
mock_df = MagicMock()
mock_writer = MagicMock()
mock_partitioned_writer = MagicMock()
mock_df.write.format.return_value = mock_writer
mock_writer.mode.return_value = mock_writer
mock_writer.partitionBy.return_value = mock_partitioned_writer
# Create writer instance
writer = DeltaLakeWriter()
# Mock table existence check
with patch.object(writer, '_table_exists', return_value=False):
# Test writing to a table with partitioning
writer_config = {
"path": "/path/to/delta",
"mode": "overwrite",
"partition_by": ["year", "month"]
}
result = writer.write(mock_df, writer_config, "bronze")
# Verify correct format and mode were set
mock_df.write.format.assert_called_once_with("delta")
mock_writer.mode.assert_called_once_with("overwrite")
# Verify partitionBy was called with the correct columns
mock_writer.partitionBy.assert_called_once_with(["year", "month"])
# Verify save was called on the partitioned writer
mock_partitioned_writer.save.assert_called_once_with("/path/to/delta")
# Verify the result contains the correct metadata
assert result["table_type"] == "bronze"
assert result["path"] == "/path/to/delta"
assert result["mode"] == "overwrite"
assert result["partitions"] == ["year", "month"]
assert result["success"] == True
def test_write_with_schema_overwrite(self):
"""Test writing to a Delta table with schema overwrite."""
# Create mocks
mock_df = MagicMock()
mock_writer = MagicMock()
mock_schema_writer = MagicMock()
mock_df.write.format.return_value = mock_writer
mock_writer.mode.return_value = mock_writer
mock_writer.option.return_value = mock_schema_writer
# Create writer instance
writer = DeltaLakeWriter()
# Mock table existence check
with patch.object(writer, '_table_exists', return_value=False):
# Test writing with schema overwrite
writer_config = {
"path": "/path/to/delta",
"mode": "overwrite",
"overwrite_schema": True
}
result = writer.write(mock_df, writer_config, "gold")
# Verify correct format and mode were set
mock_df.write.format.assert_called_once_with("delta")
mock_writer.mode.assert_called_once_with("overwrite")
# Verify option was set for schema overwrite
mock_writer.option.assert_called_once_with("overwriteSchema", "true")
# Verify save was called on the schema writer
mock_schema_writer.save.assert_called_once_with("/path/to/delta")
# Verify the result contains the correct metadata
assert result["table_type"] == "gold"
assert result["path"] == "/path/to/delta"
assert result["mode"] == "overwrite"
assert result["success"] == True
def test_optimization(self):
"""Test writing to a Delta table with optimization."""
# Create mocks
mock_df = MagicMock()
mock_writer = MagicMock()
mock_spark = MagicMock()
mock_df.write.format.return_value = mock_writer
mock_writer.mode.return_value = mock_writer
mock_df.sparkSession = mock_spark
# Create writer instance
writer = DeltaLakeWriter()
# Mock table existence check
with patch.object(writer, '_table_exists', return_value=False):
# Test writing with optimization
writer_config = {
"path": "/path/to/delta",
"mode": "overwrite",
"optimize": True,
"z_order_by": ["id", "timestamp"]
}
result = writer.write(mock_df, writer_config, "silver")
# Verify SQL was executed for optimization
mock_spark.sql.assert_called_once()
sql_arg = mock_spark.sql.call_args[0][0]
assert "OPTIMIZE" in sql_arg
assert "ZORDER BY" in sql_arg
assert "`id`, `timestamp`" in sql_arg
# Verify the result contains the correct metadata
assert result["table_type"] == "silver"
assert result["path"] == "/path/to/delta"
assert result["mode"] == "overwrite"
assert result["optimized"] == True
assert result["success"] == True
def test_cdc_write(self):
"""Test CDC write to a Delta table."""
# Create mocks
mock_df = MagicMock()
mock_delta_table = MagicMock()
mock_merge = MagicMock()
# Mock the DeltaTable.forPath method
with patch('delta.tables.DeltaTable') as mock_delta:
mock_delta.forPath.return_value = mock_delta_table
mock_delta_table.alias.return_value = mock_delta_table
mock_delta_table.merge.return_value = mock_merge
mock_merge.whenMatchedUpdate.return_value = mock_merge
mock_merge.whenNotMatchedInsert.return_value = mock_merge
# Create writer instance
writer = DeltaLakeWriter()
# Mock table existence check
with patch.object(writer, '_table_exists', return_value=True):
# Test CDC write
writer_config = {
"path": "/path/to/delta",
"mode": "merge",
"cdc_config": {
"cdc_enabled": True,
"cdc_key_columns": ["id"],
"cdc_watermark_column": "updated_at"
}
}
# Call the private CDC method directly for testing
writer._handle_cdc_writes(
mock_df,
writer_config,
None, # table_name
"/path/to/delta", # path
["id"], # key_columns
"updated_at" # watermark_column
)
# Verify DeltaTable.forPath was called with correct path
mock_delta.forPath.assert_called_once()
# Verify merge was called with correct condition
mock_delta_table.merge.assert_called_once()
# Verify update and insert operations were called
mock_merge.whenMatchedUpdate.assert_called_once()
mock_merge.whenNotMatchedInsert.assert_called_once()
mock_merge.execute.assert_called_once()
def test_missing_required_params(self):
"""Test writing with missing required parameters."""
# Create mocks
mock_df = MagicMock()
# Create writer instance
writer = DeltaLakeWriter()
# Test with missing path and table_name
writer_config = {
"mode": "overwrite"
}
with pytest.raises(ValueError, match="Either path or table_name must be provided"):
writer.write(mock_df, writer_config, "bronze")
End File# tests/unit/io/readers_writers/test_file_writer.py
"""Tests for the FileWriter class."""
import pytest
from unittest.mock import MagicMock, patch
from lakehouse_engine.io.readers_writers.file_writer import FileWriter
class TestFileWriter:
"""Test class for FileWriter."""
def test_write_basic(self):
"""Test basic file writing functionality."""
# Create mocks
mock_df = MagicMock()
mock_writer = MagicMock()
mock_df.write.format.return_value = mock_writer
mock_writer.mode.return_value = mock_writer
mock_df.count.return_value = 100
# Create writer instance
writer = FileWriter()
# Test writing a parquet file
writer_config = {
"path": "/path/to/output",
"format": "parquet",
"mode": "overwrite"
}
result = writer.write(mock_df, writer_config)
# Verify correct format and mode were set
mock_df.write.format.assert_called_once_with("parquet")
mock_writer.mode.assert_called_once_with("overwrite")
# Verify save was called with the correct path
mock_writer.save.assert_called_once_with("/path/to/output")
# Verify the result contains the correct metadata
assert result["path"] == "/path/to/output"
assert result["format"] == "parquet"
assert result["mode"] == "overwrite"
assert result["num_rows"] == 100
assert result["success"] == True
def test_write_with_options(self):
"""Test file writing with options."""
# Create mocks
mock_df = MagicMock()
mock_writer = MagicMock()
mock_df.write.format.return_value = mock_writer
mock_writer.mode.return_value = mock_writer
mock_writer.option.return_value = mock_writer
mock_df.count.return_value = 200
# Create writer instance
writer = FileWriter()
# Test writing CSV with options
writer_config = {
"path": "/path/to/csv",
"format": "csv",
"mode": "overwrite",
"options": {
"header": "true",
"delimiter": ",",
"quote": """
}
}
result = writer.write(mock_df, writer_config)
# Verify correct format and mode were set
mock_df.write.format.assert_called_once_with("csv")
mock_writer.mode.assert_called_once_with("overwrite")
# Verify options were set correctly
mock_writer.option.assert_any_call("header", "true")
mock_writer.option.assert_any_call("delimiter", ",")
mock_writer.option.assert_any_call("quote", """)
# Verify save was called with the correct path
mock_writer.save.assert_called_once_with("/path/to/csv")
# Verify the result contains the correct metadata
assert result["path"] == "/path/to/csv"
assert result["format"] == "csv"
assert result["mode"] == "overwrite"
assert result["options"] == {"header": "true", "delimiter": ",", "quote": """}
assert result["num_rows"] == 200
assert result["success"] == True
def test_write_with_partitioning(self):
"""Test file writing with partitioning."""
# Create mocks
mock_df = MagicMock()
mock_writer = MagicMock()
mock_partitioned_writer = MagicMock()
mock_df.write.format.return_value = mock_writer
mock_writer.mode.return_value = mock_writer
mock_writer.partitionBy.return_value = mock_partitioned_writer
mock_df.count.return_value = 300
# Create writer instance
writer = FileWriter()
# Test writing with partitioning
writer_config = {
"path": "/path/to/data",
"format": "parquet",
"mode": "append",
"partition_by": ["year", "month", "day"]
}
result = writer.write(mock_df, writer_config)
# Verify correct format and mode were set
mock_df.write.format.assert_called_once_with("parquet")
mock_writer.mode.assert_called_once_with("append")
# Verify partitionBy was called with the correct columns
mock_writer.partitionBy.assert_called_once_with(["year", "month", "day"])
# Verify save was called on the partitioned writer
mock_partitioned_writer.save.assert_called_once_with("/path/to/data")
# Verify the result contains the correct metadata
assert result["path"] == "/path/to/data"
assert result["format"] == "parquet"
assert result["mode"] == "append"
assert result["partitions"] == ["year", "month", "day"]
assert result["num_rows"] == 300
assert result["success"] == True
def test_write_alternate_config_keys(self):
"""Test file writing with alternate configuration keys."""
# Create mocks
mock_df = MagicMock()
mock_writer = MagicMock()
mock_df.write.format.return_value = mock_writer
mock_writer.mode.return_value = mock_writer
mock_df.count.return_value = 100
# Create writer instance
writer = FileWriter()
# Test writing using alternate config keys
writer_config = {
"output_path": "/path/to/alt",
"output_format": "json",
"output_mode": "errorifexists",
"output_options": {
"compression": "gzip"
}
}
result = writer.write(mock_df, writer_config)
# Verify correct format and mode were set
mock_df.write.format.assert_called_once_with("json")
mock_writer.mode.assert_called_once_with("errorifexists")
# Verify option was set correctly
mock_writer.option.assert_called_once_with("compression", "gzip")
# Verify save was called with the correct path
mock_writer.save.assert_called_once_with("/path/to/alt")
# Verify the result contains the correct metadata
assert result["path"] == "/path/to/alt"
assert result["format"] == "json"
assert result["mode"] == "errorifexists"
assert result["options"] == {"compression": "gzip"}
assert result["success"] == True
def test_missing_path(self):
"""Test writing without a path."""
# Create mocks
mock_df = MagicMock()
# Create writer