6/22/2025
9 min read
Serenity Team
Kickstart Your Mind: How Non-Fiction Reading Transforms Discipline and Self-Growth

Discover how strategic non-fiction reading can transform your discipline, habits, and mindset. Learn how to leverage the Serenity app to track reading goals and implement life-changing insights from books.

Non-Fiction Reading
Personal Development
Reading Habits
Book Recommendations
E-Reader Benefits
Knowledge Implementation
Self-Improvement

Kickstart Your Mind: How Non-Fiction Reading Transforms Discipline and Self-Growth

Have you ever finished a powerful non-fiction book and felt that distinctive mental shift, like your brain has been rewired with new possibilities? That's not just a feeling; it's the transformative power of strategic reading at work.

In our journey toward self-improvement, few tools are as consistently powerful as non-fiction reading. But not all reading approaches are created equal. Today, I'll share how to turn reading from a passive activity into a catalyst for genuine life transformation, and how the Serenity app can be your companion through this journey.

The Science Behind How Reading Reshapes Your Brain

When you engage with non-fiction consistently, you're not just acquiring information, you're literally changing your brain. Neuroplasticity research shows that deep reading creates new neural pathways, enhancing critical thinking and emotional intelligence.

But the key difference between casual readers and those who experience transformation lies in implementation.

"Knowledge isn't power until it's applied." – Dale Carnegie

This is where most people struggle. They read inspiring books but fail to integrate the insights into their daily routines. The Serenity app addresses this gap by helping you turn reading insights into actionable steps that compound over time.

Creating Your Personal Development Reading Strategy

Rather than reading randomly, consider these core areas that create a foundation for transformation:

1. Physiological Understanding

Why it matters: Your body is the vehicle for all achievement. Understanding its mechanics gives you leverage for everything else.

Essential reads:

  • "Why We Sleep" by Matthew Walker
  • "Breath" by James Nestor
  • "The Hungry Brain" by Stephan Guyenet

How Serenity helps: After reading these foundational books, use Serenity's AI interface to create personalized sleep optimization protocols or breathing practice schedules. Tell the app, "I want to implement the sleep hygiene practices from 'Why We Sleep,'" and it will generate a customized checklist pinned to your dashboard.

2. Willpower & Motivation Engineering

Why it matters: Understanding the psychological mechanics of motivation transforms how you approach challenges.

Essential reads:

  • "Atomic Habits" by James Clear
  • "Drive" by Daniel Pink
  • "The Willpower Instinct" by Kelly McGonigal

Serenity implementation: Share your key takeaways from these books with Serenity, and the AI will help you develop a progressive willpower-building program tailored to your specific goals. The app tracks your consistency and provides accountability through regular check-ins.

3. Systematic Habit Building

Why it matters: Systems outlast motivation and create sustainable change.

Essential reads:

  • "The Power of Habit" by Charles Duhigg
  • "Tiny Habits" by BJ Fogg
  • "Four Thousand Weeks" by Oliver Burkeman

Serenity integration: After reading about habit systems, use Serenity to design habit stacks based on your unique context. The app will help you troubleshoot when you face implementation challenges, offering adjustments based on principles from the books you've read.

E-Readers: The Underrated Tool for Transformational Reading

While physical books have their charm, e-readers offer distinct advantages for those serious about transformational reading:

  1. Frictionless highlighting and note-taking - Capture insights without interrupting reading flow
  2. Searchable annotations - Quickly revisit key concepts when implementation questions arise
  3. Ambient reading opportunities - Leverage small time blocks throughout your day
  4. Reduced decision fatigue - Carry your entire library without choosing which book to bring
  5. Adjustable fonts - Optimize for reading comfort and reduced eye strain

Serenity tip: Import your Kindle or Kobo highlights directly into the Serenity app with our integration feature. Then ask the AI to organize these highlights into actionable categories and create implementation plans from your most significant insights.

The Implementation Gap: From Knowledge to Action

The difference between information collectors and true learners is implementation. Here's how to bridge that gap:

1. The 24-Hour Rule

Apply one small insight from each book within 24 hours of reading it. This creates a neural connection between knowledge and action.

2. Concept Triangulation

When you encounter an important concept, find it in at least three different books before considering it validated. This prevents following one-off theories that might not stand up to scrutiny.

diegopereiraeng/junit-test-results

package io.harness.junit;

import io.harness.junit.controller.JUnitController;
import io.harness.junit.controller.JUnitResultController;
import io.harness.junit.model.JUnitDTO;
import io.harness.junit.model.JUnitResult;
import io.harness.junit.service.JUnitResultService;
import io.harness.junit.utils.JUnitServiceUtils;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Map;
import java.util.logging.Logger;

public class JUnitConverter {

private static JUnitResultService jUnitResultService = new JUnitResultService();
private static final Logger logger = Logger.getLogger(JUnitConverter.class.getName());

public static void main(String[] args) throws IOException {

if (args.length < 3) {  
  System.out.println("usage: java -jar junit-converter.jar <folderPath> <project> <environment>");  
  System.exit(1);  
}

String folderPath = args[0];  
String project = args[1];  
String environment = args[2];  
String harness = "false";  
if (args.length == 5) {  
  harness = args[4];  
}

run(folderPath, project, environment, null, harness);  
System.exit(0);

}

private static void run(String folderPath, String project, String environment, Map<String, String> commonProperties, String isHarness)
throws IOException {

Path folder = Path.of(folderPath);  
File[] files = folder.toFile().listFiles();  
if (files == null) {  
  throw new IllegalArgumentException("Invalid folder path: " + folderPath);  
}

boolean isHarnessBoolean = Boolean.parseBoolean(isHarness);

// Process all XML files  
for (File file : files) {  
  if (file.getName().toLowerCase().endsWith(".xml")) {  
    try {  
      processXmlFile(file, project, environment, commonProperties, isHarnessBoolean);  
    } catch (Exception e) {  
      System.err.println("Error processing file " + file.getName() + ": " + e.getMessage());  
      e.printStackTrace();  
      throw e;  
    }  
  }  
}  

}

static void processXmlFile(File file, String project, String environment, Map<String, String> commonProperties,
boolean isHarness) throws IOException {
byte[] content = Files.readAllBytes(file.toPath());
ByteArrayInputStream inputStream = new ByteArrayInputStream(content);

try {  
  JUnitDTO junitDTO = JUnitServiceUtils.convertToJUnitDTO(inputStream);  
  if (junitDTO == null) {  
    logger.warning("Failed to convert file to JUnitDTO: " + file.getName());  
    return;  
  }

  // Process and transform each testsuite  
  JUnitResultController junitResult = new JUnitResultController(junitDTO);  
  junitResult.applyFilter();  
  junitResult.applyMapping();  
  // We will convert the JunitDTO to JUnitResult here  
  JUnitResult result = junitResult.getJUnitResult(project, environment);  
  if (result == null) {  
    logger.warning("Failed to convert JUnitDTO to JUnitResult: " + file.getName());  
    return;  
  }

  // If there are common properties to add  
  if (commonProperties != null && !commonProperties.isEmpty()) {  
    result.getProperties().putAll(commonProperties);  
  }

  String executionId = "executionId";  
  boolean isExecutionPresent = false;  
  if (isHarness) {  
    // Check if Harness execution values are present in system properties  
    String[] harnessProperties = {  
        "harness.0.executionId",  
        "harness.0.pipelineId",  
        "harness.0.pipelineName",  
        "harness.0.triggerType",  
        "harness.0.buildNo",  
        "harness.0.stageName",  
        "harness.0.stepName",  
        "harness.0.workflowId"  
    };

    for (String property : harnessProperties) {  
      String value = System.getProperty(property);  
      if (value != null && !value.isEmpty()) {  
        result.getProperties().put(property, value);  
        if (property.equals("harness.0.executionId")) {  
          executionId = value;  
          isExecutionPresent = true;  
        }  
      }  
    }  
  }

  // Save the result  
  jUnitResultService.saveResults(result, isExecutionPresent, executionId);  
  // Print all properties for debugging  
  System.out.println("Processed file " + file.getName() + " with properties: " + result.getProperties());

} catch (Exception e) {  
  logger.severe("Error processing XML file " + file.getName() + ": " + e.getMessage());  
  throw new IOException("Failed to process XML file: " + file.getName(), e);  
} finally {  
  inputStream.close();  
}  

}
}
End Filepackage io.harness.junit;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;

public class JUnitUploader {
private static final Logger logger = Logger.getLogger(JUnitUploader.class.getName());

private static class Args {
@Parameter(names = "--folder", description = "Folder path containing JUnit XML reports", required = true)
String folderPath;

@Parameter(names = "--project", description = "Project name", required = true)  
String project;

@Parameter(names = "--environment", description = "Environment name", required = true)  
String environment;

@Parameter(names = "--file-pattern", description = "Regex pattern to filter files")  
String filePattern = ".*.xml";

@Parameter(names = "--tags", description = "Common tags to add (format: key1=value1,key2=value2)")  
String tags;

@Parameter(names = "--harness", description = "Whether to extract Harness variables", arity = 1)  
boolean harness = false;

@Parameter(names = "--help", help = true)  
boolean help;  

}

public static void main(String[] args) {
Args commandArgs = new Args();
JCommander commander = JCommander.newBuilder()
.addObject(commandArgs)
.build();
commander.setProgramName("JUnit Uploader");

try {  
  commander.parse(args);  
  if (commandArgs.help) {  
    commander.usage();  
    return;  
  }

  // Execute the processor with provided arguments  
  process(commandArgs);  
} catch (Exception e) {  
  logger.log(Level.SEVERE, "Error processing command: " + e.getMessage(), e);  
  commander.usage();  
  System.exit(1);  
}  

}

private static void process(Args args) throws IOException {
Path folder = Paths.get(args.folderPath);
if (!Files.exists(folder) || !Files.isDirectory(folder)) {
throw new IllegalArgumentException("Invalid folder path: " + args.folderPath);
}

// Parse tags if provided  
Map<String, String> commonProperties = parseTagsString(args.tags);

// Compile file pattern  
Pattern pattern = Pattern.compile(args.filePattern);

// Get files and process them  
List<Path> files = Files.walk(folder)  
    .filter(Files::isRegularFile)  
    .filter(path -> pattern.matcher(path.getFileName().toString()).matches())  
    .toList();

if (files.isEmpty()) {  
  logger.warning("No matching files found in the provided directory");  
  return;  
}

int totalFiles = files.size();  
int processedFiles = 0;  
int errorFiles = 0;

for (Path filePath : files) {  
  try {  
    JUnitConverter.processXmlFile(  
        filePath.toFile(),  
        args.project,  
        args.environment,  
        commonProperties,  
        args.harness  
    );  
    processedFiles++;  
    logger.info(String.format("Processed file %d/%d: %s", processedFiles, totalFiles, filePath.getFileName()));  
  } catch (Exception e) {  
    errorFiles++;  
    logger.log(Level.SEVERE, "Error processing file: " + filePath.getFileName(), e);  
  }  
}

logger.info(String.format("Processing complete. Total: %d, Processed: %d, Failed: %d",  
    totalFiles, processedFiles, errorFiles));

if (errorFiles > 0) {  
  throw new IOException("Failed to process " + errorFiles + " files");  
}  

}

private static Map<String, String> parseTagsString(String tagsString) {
if (tagsString == null || tagsString.trim().isEmpty()) {
return Collections.emptyMap();
}

Map<String, String> tags = new HashMap<>();  
String[] pairs = tagsString.split(",");  
for (String pair : pairs) {  
  String[] keyValue = pair.split("=", 2);  
  if (keyValue.length == 2) {  
    tags.put(keyValue[0].trim(), keyValue[1].trim());  
  } else {  
    logger.warning("Invalid tag format (skipping): " + pair);  
  }  
}  
return tags;  

}
}
End File# diegopereiraeng/junit-test-results

src/main/java/io/harness/junit/controller/JUnitResultController.java

package io.harness.junit.controller;

import io.harness.junit.model.JUnitDTO;
import io.harness.junit.model.JUnitResult;
import io.harness.junit.model.PropertyDTO;
import io.harness.junit.model.TestCaseDTO;
import io.harness.junit.model.TestCaseResult;
import io.harness.junit.model.TestScenario;
import io.harness.junit.model.TestSuiteDTO;
import io.harness.junit.utils.DateTimeUtils;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import java.util.Optional;

public class JUnitResultController {
private JUnitDTO junitDTO;

// Constructor that initializes with junitDTO
public JUnitResultController(JUnitDTO junitDTO) {
this.junitDTO = junitDTO;
}

public void applyFilter() {
// No filtering for now, but it can be implemented here
}

public void applyMapping() {
// No mapping for now, just ensures the object is valid
}

public JUnitResult getJUnitResult(String project, String environment) {
if (junitDTO == null || junitDTO.getTest_suites() == null || junitDTO.getTest_suites().isEmpty()) {
return null;
}

// Create a new JUnitResult to store our results  
JUnitResult output = new JUnitResult(project, environment);

// Populate meta-level properties   
setResultMetadata(output);  
setTimestamps(output);  
transferProperties(output);

// Process each test suite  
List<TestCaseResult> allTestCases = processTestSuites(output);  
output.setTestCases(allTestCases);

return output;  

}

private void setResultMetadata(JUnitResult output) {
// Set default type if needed
if (output.getType() == null) {
output.setType("junit");
}

// Set timestamp if not already set  
if (output.getTimestamp() == 0) {  
  output.setTimestamp(Instant.now().toEpochMilli());  
}  

}

private void setTimestamps(JUnitResult output) {
// Set global timestamps from the first test suite if available
if (!junitDTO.getTest_suites().isEmpty()) {
TestSuiteDTO firstSuite = junitDTO.getTest_suites().get(0);

  if (firstSuite.getTimestamp() != null && !firstSuite.getTimestamp().isEmpty()) {  
    long timestamp = DateTimeUtils.parseIsoDateTimeToEpochMillis(firstSuite.getTimestamp());  
    if (timestamp > 0) {  
      output.setTimestamp(timestamp);  
    }  
  }  
}  

}

private void transferProperties(JUnitResult output) {
Map<String, String> properties = new HashMap<>();

// Add properties from all test suites  
for (TestSuiteDTO testsuite : junitDTO.getTest_suites()) {  
  if (testsuite.getProperties() != null) {  
    for (PropertyDTO property : testsuite.getProperties()) {  
      if (property.getName() != null && property.getValue() != null) {  
        properties.put(property.getName(), property.getValue());  
      }  
    }  
  }  
}  
  
// Transfer to output  
output.setProperties(properties);  

}

private List<TestCaseResult> processTestSuites(JUnitResult output) {
List<TestCaseResult> allTestCases = new ArrayList<>();
List<TestScenario> testScenarios = new ArrayList<>();

// Process each test suite  
for (TestSuiteDTO testsuite : junitDTO.getTest_suites()) {  
  // Process test cases within this suite  
  if (testsuite.getTestcases() != null) {  
    for (TestCaseDTO testcase : testsuite.getTestcases()) {  
      TestCaseResult result = convertTestCaseToResult(testsuite, testcase);  
      if (result != null) {  
        allTestCases.add(result);  
      }  
    }  
  }  
    
  // Create a test scenario for this suite  
  TestScenario scenario = createScenarioFromSuite(testsuite);  
  if (scenario != null) {  
    testScenarios.add(scenario);  
  }  
}  
  
// Set the test scenarios  
output.setTestSuites(testScenarios);  
  
return allTestCases;  

}

private TestCaseResult convertTestCaseToResult(TestSuiteDTO testsuite, TestCaseDTO testcase) {
if (testcase == null) {
return null;
}

TestCaseResult result = new TestCaseResult();  
  
// Set basic properties  
result.setName(testcase.getName());  
result.setClassname(testcase.getClassname());  
result.setStatus(getStatus(testcase));  
  
// Set test suite info  
if (testsuite != null) {  
  result.setTestSuite(testsuite.getName());  
  result.setPackage(testsuite.getPackage());  
}  
  
// Set duration (convert seconds to milliseconds)  
if (testcase.getTime() != null) {  
  try {  
    double timeInSeconds = Double.parseDouble(testcase.getTime());  
    result.setDuration((long) (timeInSeconds * 1000));  
  } catch (NumberFormatException e) {  
    result.setDuration(0L);  
  }  
}  
  
// Set error details  
setErrorDetails(result, testcase);

return result;  

}

private String getStatus(TestCaseDTO testcase) {
if (testcase.getSkipped() != null) {
return "SKIPPED";
} else if (testcase.getFailure() != null) {
return "FAILED";
} else if (testcase.getError() != null) {
return "ERROR";
} else {
return "PASSED";
}
}

private void setErrorDetails(TestCaseResult result, TestCaseDTO testcase) {
// Handle failure
if (testcase.getFailure() != null) {
result.setFailureMessage(testcase.getFailure().getMessage());
result.setFailureType(testcase.getFailure().getType());
result.setFailureContent(testcase.getFailure().getContent());
}

// Handle error  
if (testcase.getError() != null) {  
  result.setErrorMessage(testcase.getError().getMessage());  
  result.setErrorType(testcase.getError().getType());  
  result.setErrorContent(testcase.getError().getContent());  
}  

}

private TestScenario createScenarioFromSuite(TestSuiteDTO testsuite) {
if (testsuite == null) {
return null;
}

TestScenario scenario = new TestScenario();  
  
// Set basic properties  
scenario.setName(testsuite.getName());  
scenario.setPackageName(testsuite.getPackage());  
  
// Set counts  
scenario.setTests(parseIntSafely(testsuite.getTests()));  
scenario.setFailures(parseIntSafely(testsuite.getFailures()));  
scenario.setErrors(parseIntSafely(testsuite.getErrors()));  
scenario.setSkipped(parseIntSafely(testsuite.getSkipped()));  
  
// Calculate statistics  
int total = scenario.getTests();  
if (total > 0) {  
  scenario.setPassRate((double) (total - scenario.getFailures() - scenario.getErrors() - scenario.getSkipped()) / total);  
} else {  
  scenario.setPassRate(0);  
}  
  
// Set durations  
if (testsuite.getTime() != null) {  
  try {  
    double timeInSeconds = Double.parseDouble(testsuite.getTime());  
    scenario.setDuration((long) (timeInSeconds * 1000)); // Convert to milliseconds  
  } catch (NumberFormatException e) {  
    scenario.setDuration(0L);  
  }  
}  
  
return scenario;  

}

private int parseIntSafely(String value) {
if (value == null || value.isEmpty()) {
return 0;
}
try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
return 0;
}
}
}
End File# diegopereiraeng/junit-test-results
package io.harness.junit.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.harness.junit.model.JUnitResult;
import io.harness.junit.utils.MapperUtils;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.logging.Logger;

public class JUnitResultService {

private static final Logger logger = Logger.getLogger(JUnitResultService.class.getName());
private static final String REMOTE_URL_ENV_VAR = "JUNIT_TEST_RESULTS_URL";

/**

  • Saves the results either to a remote endpoint or local file based on configuration.
  • @param result The JUnitResult to save
  • @param isExecutionPresent Whether the execution ID is present
  • @param executionId The execution ID if present
    */
    public void saveResults(JUnitResult result, boolean isExecutionPresent, String executionId) {
    // First try to send to remote endpoint
    try {
    sendToRemoteEndpoint(result);
    logger.info("Results successfully sent to remote endpoint");
    return;
    } catch (Exception e) {
    logger.warning("Failed to send results to remote endpoint: " + e.getMessage());
    // Fall back to local file if remote fails
    }
// If remote failed, save to local file  
try {  
  saveToLocalFile(result, isExecutionPresent, executionId);  
  logger.info("Results successfully saved to local file");  
} catch (Exception e) {  
  logger.severe("Failed to save results to local file: " + e.getMessage());  
  throw new RuntimeException("Failed to save test results", e);  
}  

}

private void sendToRemoteEndpoint(JUnitResult result) throws IOException {
String remoteUrl = System.getenv(REMOTE_URL_ENV_VAR);
if (remoteUrl == null || remoteUrl.trim().isEmpty()) {
throw new IOException("Remote URL not configured. Set JUNIT_TEST_RESULTS_URL environment variable.");
}

URL url = new URL(remoteUrl);  
HttpURLConnection connection = (HttpURLConnection) url.openConnection();  
connection.setRequestMethod("POST");  
connection.setRequestProperty("Content-Type", "application/json");  
connection.setDoOutput(true);

String jsonPayload = MapperUtils.getObjectMapper().writeValueAsString(result);  
try (OutputStream os = connection.getOutputStream()) {  
  byte[] input = jsonPayload.getBytes(StandardCharsets.UTF_8);  
  os.write(input, 0, input.length);  
}

int responseCode = connection.getResponseCode();  
if (responseCode < 200 || responseCode >= 300) {  
  throw new IOException("HTTP error code: " + responseCode);  
}  

}

private void saveToLocalFile(JUnitResult result, boolean isExecutionPresent, String executionId) throws IOException {
// Create directory if it doesn't exist
String baseDir = "./junit-results";
String project = result.getProject() != null ? result.getProject() : "unknown-project";
String environment = result.getEnvironment() != null ? result.getEnvironment() : "unknown-environment";

Path dirPath = Paths.get(baseDir, project, environment);  
Files.createDirectories(dirPath);  
  
// Generate filename based on current time or execution ID  
String filename;  
if (isExecutionPresent) {  
  filename = "result-" + executionId + ".json";  
} else {  
  String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss"));  
  filename = "result-" + timestamp + ".json";  
}  
  
Path filePath = dirPath.resolve(filename);  
  
// Convert result to JSON and write to file  
String jsonContent = MapperUtils.getObjectMapper().writeValueAsString(result);  
Files.write(filePath, jsonContent.getBytes(StandardCharsets.UTF_8));  

}
}
author = 'Javier Ordóñez'

import os
import sys
import csv
import glob
import json
import string
import difflib
import pandas as pd
import numpy as np
from tqdm import tqdm
from unidecode import unidecode
from helper_functions import is_valid_string
from helper_functions import get_directory_path
from helper_functions import get_path_for_output_file
from helper_functions import get_directory_name_from_path
tqdm.pandas()
pd.options.mode.chained_assignment = None

def preprocess_name(name):
name = name.lower().strip()
for char in string.punctuation:
name = name.replace(char, '')
return unidecode(name)

def simplify_string_for_matching(input_string):
if not is_valid_string(input_string):
return ''
return preprocess_name(input_string)

def get_data_matching_score(source_string1, source_string2, preprocess=True):
if not is_valid_string(source_string1) or not is_valid_string(source_string2):
return 0.0
string1 = preprocess_name(source_string1) if preprocess else source_string1
string2 = preprocess_name(source_string2) if preprocess else source_string2
similarity = difflib.SequenceMatcher(None, string1, string2).ratio()
return similarity

def parse_master_data(file_path_master_data, limit, print_log=False):
df_master = None
if print_log:
print('Reading master data file {} ...'.format(file_path_master_data))
if file_path_master_data.lower().endswith('.json'):
with open(file_path_master_data, 'r') as f:
json_data = json.load(f)
df_master = pd.DataFrame(json_data)
else:
if limit is None:
df_master = pd.read_csv(file_path_master_data, encoding='utf-8')
else:
df_master = pd.read_csv(file_path_master_data, encoding='utf-8', nrows=limit)
df_master.columns = [x.lower() for x in df_master.columns]
# Sanity check of expected column names
column_name_brand = None
column_name_model = None
for cn in df_master.columns.tolist():
if 'brand' in cn:
column_name_brand = cn
if 'model' in cn:
column_name_model = cn
# Get brand and model data
df_master['brand'] = df_master[column_name_brand] if column_name_brand in df_master.columns else None
df_master['model'] = df_master[column_name_model] if column_name_model in df_master.columns else None
# Apply preprocessing of strings (lower case, no unicode, no punctuation)
df_master['brand_pp'] = df_master['brand'].progress_apply(lambda x: simplify_string_for_matching(x))
df_master['model_pp'] = df_master['model'].progress_apply(lambda x: simplify_string_for_matching(x))
if print_log:
print('Parse done. {} records read. Keeping values of {} and {} for references.'.format(
len(df_master), column_name_brand, column_name_model))
return df_master

def get_preprocessed_source_data(x):

fields = ['brand', 'model']  
result = x['product name'].lower()  
for p in fields:  
    result = result.replace(x[p].lower(), '') if is_valid_string(x[p]) else result  
for char in string.punctuation:  
    result = result.replace(char, '')  
result = unidecode(result)  
result = ' '.join(result.split())  
return result

def get_preprocessed_master_data(x):

fields = ['brand', 'model']  
result = ''  
for p in fields:  
    if is_valid_string(x[p]):  
        for char in string.punctuation:  
            result += ' ' + x[p].lower().replace(char, '')  
        result = unidecode(result)  
        result = ' '.join(result.split())  
return result

def get_best_match_from_master_data(df_source, df_master, idx):

brand = df_source.iloc[idx]['brand_pp']  
model = df_source.iloc[idx]['model_pp']  
product_data_pp = df_source.iloc[idx]['product name pp']  
minimum_score = 0.75  # Minimum score to consider a match  
matches = []  
for i, item in df_master.iterrows():  
    if len(item['brand_pp']) > 2 and brand == item['brand_pp']:  
        # Matching criteria: Same brand and similarity over threshold  
        score = get_data_matching_score(product_data_pp, item['master_data_pp'], preprocess=False)  
        if score > minimum_score:  
            matches.append({  
                'master_index': i,  
                'brand': item['brand'],  
                'model': item['model'],  
                'score': score})  
if len(matches) == 0:  
    # Failed to find a match  
    return None  
# Return the one having highest score  
best_match = max(matches, key=lambda m: m['score'])  
return best_match

def add_matches_to_source_product(df_source, df_master, print_log=False):
# Add columns to df_source for the normalized brand and model if match with master data
df_source['master_match_brand'] = None
df_source['master_match_model'] = None
df_source['master_match_score'] = None
if print_log:
print('Finding matches from master data for source products ...')
for i, row in tqdm(df_source.iterrows(), total=len(df_source)):
best_match = get_best_match_from_master_data(df_source, df_master, i)
if best_match is not None:
df_source.at[i, 'master_match_brand'] = best_match['brand']
df_source.at[i, 'master_match_model'] = best_match['model']
df_source.at[i, 'master_match_score'] = best_match['score']
return df_source

def match(source_data_filename, master_data_filename, output_file, limit=None, print_log=True):
source_directory = get_directory_path(source_data_filename)
csv_files = glob.glob(os.path.join(source_directory, '*-source-data.csv'))

for source_file in csv_files:

    # Get directory name for source file  
    source_dir_name = get_directory_name_from_path(source_file)  
    output_file = os.path.join(source_directory,  
                               source_dir_name + '-products-with-matches.csv')

    # Read and parse master data  
    df_master = parse_master_data(master_data_filename, limit, print_log)

    # Add additional fields  
    df_master['master_data_pp'] = df_master.progress_apply(  
        lambda x: get_preprocessed_master_data(x), axis=1)

    # Read source data  
    if print_log:  
        print('Reading source data file {} ...'.format(source_file))  
    df_source = pd.read_csv(source_file, encoding='utf-8')  
    # Sanity check of expected column names from source data  
    for cn in ['product name', 'brand', 'model']:  
        if cn not in df_source.columns.tolist():  
            if print_log:  
                print('Warning: {} field not found in source data'.format(cn))

    # Apply preprocessing of strings (lower case, no unicode, no punctuation)  
    df_source['brand_pp'] = df_source['brand'].progress_apply(lambda x: simplify_string_for_matching(x))  
    df_source['model_pp'] = df_source['model'].progress_apply(lambda x: simplify_string_for_matching(x))  
    # Add a field for preprocessed product name (removing brand and model reference if present)  
    df_source['product name pp'] = df_source.progress_apply(  
        lambda x: get_preprocessed_source_data(x), axis=1)  
      
    # Perform the match  
    df_source = add_matches_to_source_product(df_source, df_master, print_log)  
      
    # Write output file  
    if print_log:  
        print('Writing matches to {} ...'.format(output_file))  
    df_source.to_csv(output_file, index=False, encoding='utf-8', quoting=csv.QUOTE_ALL)  
  
if print_log:  
    print('Done!')

if name == "main":

if len(sys.argv) < 3:  
    print('Error: missing arguments. Two arguments needed: [source data csv file] [master data csv file]')  
    print('Optional third argument: products limit (for debug purposes)')  
else:  
    file_path_source_data = sys.argv[1]  
    file_path_master_data = sys.argv[2]  
    products_limit = None if len(sys.argv) < 4 else sys.argv[3]  
    if os.path.exists(file_path_source_data) and os.path.exists(file_path_master_data):  
        print('Matching source data from {} with master data from {} ...'.format(file_path_source_data, file_path_master_data))  
        output_file = None  
        match(file_path_source_data, file_path_master_data, output_file, products_limit)  
    else:  
        print('Error: could not find specified files. {} {}'.format(file_path_source_data, file_path_master_data))  

End File# adidas/lakehouse-engine
author = 'Javier Ordóñez'

import os
import sys
import re
import csv
import time
import glob
import json
import pandas as pd
import numpy as np
from tqdm import tqdm
from bs4 import BeautifulSoup
from helper_functions import get_matching_json_files
from helper_functions import is_valid_string, is_valid_data
from helper_functions import get_directory_path, get_directory_name_from_path
tqdm.pandas()
pd.options.mode.chained_assignment = None

def get_brand_name_from_source_file():
if sys.argv[1].lower().find('bestbuy') != -1:
return 'bestbuy'
if sys.argv[1].lower().find('walmart') != -1:
return 'walmart'
return ''

def extract_brand_model_bestbuy(soup):
brand = ''
model = ''
try:
# Parse brand name
blocks = soup.select('div.sku-title')
if len(blocks) == 1 and len(blocks[0]) == 2: # div.sku-title>a+h1
brand = blocks[0].contents[0].text.strip()
# Parse model name
features_block = soup.select('div[class^="model product-data"]')
if len(features_block) == 1:
features_items = features_block[0].select('div.row-value')
if len(features_items) == 1:
model = features_items[0].text.strip()
# Parse model with different layout
if len(model) < 2:
features_block2 = soup.select('div[class^="model-number"]')
if len(features_block2) == 1:
model = features_block2[0].select('span')[-1].text.strip()
except:
pass
return brand, model

def extract_brand_model_walmart(soup):
brand = ''
model = ''
product_specs = {}
try:
# Parse brand name from link
brand_anchors = soup.select('a[link-identifier="brandName"]')
if not brand_anchors:
brand_anchors = soup.select('div[class="sans-serif mid-gray mb2"] > div > a')
if len(brand_anchors) == 1:
brand = brand_anchors[0].text.strip()

    # Extract from key specs  
    spec_containers = soup.select('div[data-testid="product-specs-container"]')  
    if spec_containers:  
        spec_items = spec_containers[0].select('div[data-testid="product-spec-item"]')  
        for item in spec_items:  
            labels = item.select('div[data-testid="product-spec-key"]')  
            values = item.select('div[data-testid="product-spec-value"]')  
            if len(labels) == 1 and len(values) == 1:  
                label = labels[0].text.strip().lower()  
                value = values[0].text.strip()  
                product_specs[label] = value  
                if label == 'brand':  
                    brand = value  
                if label == 'model number' or label == 'tv model name':  
                    model = value

    # Fallbacks for brand  
    if brand == '':  
        # Try to find text with "Sold by" prefix  
        sold_by_containers = soup.select('div[class^="mr3 ml3"] div[class="f6 gray"] a')  
        if sold_by_containers and len(sold_by_containers) > 0:  
            brand = sold_by_containers[0].text.strip()  
except:  
    pass  
return brand, model

def add_product_data_to_csv(product_id, product_name, brand, model, source_data):
source_data.append({
'product id': product_id,
'product name': product_name,
'brand': brand,
'model': model
})

def get_product_details_from_html(source_brand, json_object, soup):
product_id = json_object['id']
product_name = json_object['name']
brand = model = ''

# Extract brand and model data from HTML  
try:  
    if 'bestbuy' in source_brand.lower():  
        brand, model = extract_brand_model_bestbuy(soup)  
    elif 'walmart' in source_brand.lower():  
        brand, model = extract_brand_model_walmart(soup)  
except:  
    pass

# Brand and model (if not found previously, use data from JSON object)  
if not is_valid_string(brand) and 'brand_name' in json_object:  
    brand = json_object['brand_name']  
if not is_valid_string(model) and 'model_number' in json_object:  
    model = json_object['model_number']

# Fallback for product_name if needed  
if not is_valid_string(product_name) and 'title' in json_object:  
    product_name = json_object['title']

# Fallback for product_id if needed  
if not is_valid_string(product_id) and 'ID' in json_object:  
    product_id = json_object['ID']

# Return the extracted information  
return product_id, product_name, brand, model

def extract_product_details(input_file, print_log=True):
brand_name = get_brand_name_from_source_file()

if print_log:  
    print('Extracting product information ...')

json_files = get_matching_json_files(input_file)  
if len(json_files) == 0:  
    print('Error: no matching JSON files found related to {}'.format(input_file))  
    return

source_data = []  
total_files = len(json_files)  
t1 = time.time()

file_counter = 0  
for json_file in json_files:  
    file_counter += 1  
    if print_log and file_counter % 100 == 0:  
        progress = 100.0 * file_counter / float(total_files)  
        t2 = time.time()  
        eta = ((t2 - t1) / file_counter) * (total_files - file_counter)  
        print(f'Processed {file_counter} / {total_files} ({progress:.2f}%) - ETA: {eta:.0f}s', end='r')

    # Read and parse the JSON file  
    try:  
        with open(json_file, 'r') as f:  
            json_data = json.load(f)  
    except:  
        continue

    # Extract product data from HTML content  
    try:  
        html_content = json_data.get('html', '')  
        if html_content:  
            soup = BeautifulSoup(html_content, 'html.parser')  
            product_id, product_name, brand, model = get_product_details_from_html(brand_name, json_data, soup)  
            if product_id and product_name:  
                add_product_data_to_csv(product_id, product_name, brand, model, source_data)  
    except Exception as e:  
        if print_log:  
            print(f"Error processing file {json_file}: {e}")

# Create the output file  
if len(source_data) > 0:  
    output_file = get_directory_path(input_file) + '/' + get_directory_name_from_path(input_file) + '-source-data.csv'  
    df = pd.DataFrame(source_data)  
    df.to_csv(output_file, index=False, encoding='utf-8', quoting=csv.QUOTE_ALL)

    if print_log:  
        print(f"nExtraction completed. {len(source_data)} products extracted and saved to {output_file}")  
else:  
    if print_log:  
        print("nNo product data extracted from the provided files.")

if name == "main":
if len(sys.argv) < 2:
print('Error: missing arguments. One argument needed: [directory_path]')
else:
extract_product_details(sys.argv[1])
End File# adidas/lakehouse-engine
import abc
import os
import shutil
from logging import getLogger
from typing import Any, Dict, List, Optional

import pytest

from lakehouse_engine.core.definitions import (
INPUT_SPECS,
OUTPUT_SPECS,
TABLE_NAME,
TRANSFORMATIONS,
)

logger = getLogger(name)

class PySparkTest(abc.ABC):
"""A base class for general spark setup.

Attributes:  
    spark_session: a spark session that lives during the entire test class  
"""

spark_session: Any = None  
spark_context: Any = None

@classmethod  
def setup_class(cls):  
    """Setup before class initialization."""  
    from pyspark.sql import SparkSession  # pylint: disable=import-outside-toplevel

    os.environ["PYTHONHASHSEED"] = "42"

    # This will be overridden by test_config if test_config exists,  
    # but also allows the tests to run locally.  
    spark_builder = SparkSession.builder.appName("test")  
    spark_builder.config("spark.sql.sources.partitionOverwriteMode", "dynamic")  
    spark_builder.config("hive.exec.dynamic.partition", "true")  
    spark_builder.config("hive.exec.dynamic.partition.mode", "nonstrict")  
    spark_builder.config("spark.sql.session.timeZone", "Etc/UTC")  
    spark_builder.config("spark.ui.showConsoleProgress", "false")  
    spark_builder.config("spark.sql.execution.arrow.pyspark.enabled", "true")  
    spark_builder.config("spark.eventLog.enabled", "false")  
    spark_builder.config("spark.sql.broadcastTimeout", "36000")

    spark_session = (  
        spark_builder  
        # .enableHiveSupport()  
        .master("local[*]")  
        .getOrCreate()  
    )  
    cls.spark_session = spark_session  
    cls.spark_context = cls.spark_session.sparkContext

@classmethod  
def teardown_class(cls):  
    """Teardown after class initialization."""  
    # Close the spark session when the tests are complete  
    if cls.spark_session:  
        cls.spark_session.stop()  
    # Try to remove temporary files created by Hadoop when the spark_context is created  
    if cls.spark_context:  
        # Remove _temporary_xxx directories created by spark  
        if os.path.exists("/tmp/spark-*"):  
            shutil.rmtree("/tmp/spark-*")  
        # Remove __pycache__ directory created by pytest  
        if os.path.exists("tests/__pycache__"):  
            shutil.rmtree("tests/__pycache__")

def cleanup_files(self, paths: List[str]) -> None:  
    """Clean up local files.

    Args:  
        paths: list of paths to clean up.  
    """  
    paths_to_remove = []  
    for path in paths:  
        # Add to paths to remove the path itself and the _SUCCESS file  
        success_path = os.path.join(path, "_SUCCESS")  
        if os.path.exists(success_path):  
            paths_to_remove.append(success_path)  
        if os.path.exists(path):  
            paths_to_remove.append(path)

    for p in paths_to_remove:  
        try:  
            if os.path.isfile(p):  
                os.remove(p)  
            elif os.path.isdir(p):  
                shutil.rmtree(p)  
        except Exception as e:  
            logger.warning(f"Failed to clean up path {p}: {e}")

class PySparkTestConfig(PySparkTest):
"""A base class for general spark setup for test configs.

Attributes:  
    spark_session: a spark session that lives during the entire test class  
"""

@pytest.fixture  
def cleanup_dir(self) -> None:  
    """Clean up directories after test."""  
    yield  
    self._clean_up_local_dirs()

def _clean_up_local_dirs(self) -> None:  
    """Clean up local directories."""  
    pass

class PySparkTestBase(PySparkTest):
"""A base class for pyspark tests.

Attributes:  
    spark_session: a spark session that lives during the entire test class  
"""

def before_test(self) -> None:  
    """Actions to perform before test implementation starts."""  
    pass

def after_test(self) -> None:  
    """Actions to perform after test implementation is concluded."""  
    pass

def test(self) -> None:  
    """Test implementation - runs before and after methods as well."""  
    try:  
        self.before_test()  
        self.run_test()  
    finally:  
        self.after_test()

@abc.abstractmethod  
def run_test(self) -> None:  
    """Method to implement by test case classes, with the actual test logic."""  
    raise NotImplementedError

class EngineBaseTestCase(PySparkTestBase):
"""A base class for pyspark tests.

Attributes:  
    spark_session: a spark session that lives during the entire test class  
"""

config_template: Optional[Dict[str, Any]] = None  
# Optionally define a list of files and/or directories to clean up after each test  
cleanup_paths: List[str] = []

def before_test(self) -> None:  
    """Method to override to setup environment before test execution.

    Example: populate delta tables, sample files, etc.  
    """  
    super().before_test()

def after_test(self) -> None:  
    """Method to override to cleanup after test execution.

    Example: delete sample files, reset an external service, etc.  
    """  
    self.cleanup_files(self.cleanup_paths)  
    super().after_test()

def create_empty_config(self) -> Dict[str, Any]:  
    """Create an empty engine config with all section fields.

    Returns:  
        An empty engine configuration dictionary.  
    """  
    return {  
        INPUT_SPECS: [],  
        TRANSFORMATIONS: {},  
        OUTPUT_SPECS: [],  
    }

def create_table_config(self, table_name: str) -> Dict[str, Any]:  
    """Create a basic table config.

    Args:  
        table_name: the base name for a table.

    Returns:  
        A basic table config.  
    """  
    return {TABLE_NAME: table_name}  

End File# adidas/lakehouse-engine
import json
import os
import shutil
from typing import Dict, List, Set, Union

from lakehouse_engine.algorithms.exceptions import SchemaExpectationsException
from lakehouse_engine.io.readers_writers import readers, writers_utils

def read_schema_expectations_json_paths(
json_schema_paths: Union[str, List[str]],
) -> List[Dict]:
"""Read schema expectations from provided json file paths.

Args:  
    json_schema_paths: the json file path, or a list of paths.

Returns:  
    The combined list of schema field dictionaries from all files.  
"""  
schema_fields = []

# Convert the input to a list if it's a single string  
if isinstance(json_schema_paths, str):  
    json_schema_paths = [json_schema_paths]

# Read all schema files and combine them  
for path in json_schema_paths:  
    with open(path, "r", encoding="utf-8") as schema_file:  
        schema = json.load(schema_file)  
        if isinstance(schema, list):  
            schema_fields.extend(schema)  
        elif "fields" in schema:  
            schema_fields.extend(schema["fields"])

return schema_fields

def get_schema_field_names_map(
schema_fields: List[Dict],
) -> Dict[str, Dict]:
"""Create a map of field names to their schema dictionaries.

Args:  
    schema_fields: List of schema field dictionaries.

Returns:  
    A dictionary mapping field names to their complete schema definitions.  
"""  
return {field["name"]: field for field in schema_fields}

def ensure_columns_exist(
dataframe,
expected_schema_fields_map: Dict[str, Dict],
) -> None:
"""Check if all expected columns exist in the DataFrame.

Args:  
    dataframe: The DataFrame to check.  
    expected_schema_fields_map: Dictionary of expected field names to their schema definitions.

Raises:  
    SchemaExpectationsException: If any expected column is missing.  
"""  
# Get the set of column names in the DataFrame  
df_columns = {col.lower() for col in dataframe.columns}  
# Get the set of expected column names (case-insensitive)  
expected_columns = {field_name.lower() for field_name in expected_schema_fields_map}

# Find expected columns that are missing from the DataFrame  
missing_columns = expected_columns - df_columns  
if missing_columns:  
    missing_cols_str = ", ".join(missing_columns)  
    raise SchemaExpectationsException(  
        f"Schema validation failed. The following columns are missing: {missing_cols_str}"  
    )

def check_column_type_compatibility(
dataframe, expected_schema_fields_map: Dict[str, Dict]
) -> None:
"""Check if DataFrame column types are compatible with expected types.

Args:  
    dataframe: The DataFrame to check.  
    expected_schema_fields_map: Dictionary of expected field names to their schema definitions.

Raises:  
    SchemaExpectationsException: If any column has an incompatible type.  
"""  
# Get current DataFrame schema as a dictionary  
df_schema = {  
    field.name.lower(): field.dataType.simpleString() for field in dataframe.schema  
}

type_errors = []  
for field_name, field_def in expected_schema_fields_map.items():  
    df_field_name = field_name.lower()  
    if df_field_name in df_schema:  
        expected_type = field_def.get("type", "unknown")  
        # Handle nullable types if they are specified as a list [null, type]  
        if isinstance(expected_type, list):  
            # Extract the non-null type  
            expected_type = next(  
                (t for t in expected_type if t != "null"), expected_type[0]  
            )

        # Normalize and simplify expected type  
        expected_type = simplify_type(expected_type)

        # Get actual type  
        actual_type = df_schema[df_field_name]

        # Check if types are compatible  
        if not are_types_compatible(expected_type, actual_type):  
            type_errors.append(  
                f"Column '{field_name}' has incompatible type. Expected: "  
                f"'{expected_type}', Actual: '{actual_type}'"  
            )

if type_errors:  
    error_message = "Schema type validation failed:n" + "n".join(type_errors)  
    raise SchemaExpectationsException(error_message)

def simplify_type(type_str: Union[str, dict]) -> str:
"""Simplify a schema type for comparison.

Args:  
    type_str: The type string or dict to simplify.

Returns:  
    A simplified type string.  
"""  
# Handle complex types in dictionary format  
if isinstance(type_str, dict):  
    if "type" in type_str:  
        base_type = type_str["type"]  
        if base_type == "array" and "items" in type_str:  
            items_type = simplify_type(type_str["items"])  
            return f"array<{items_type}>"  
        elif base_type == "record" and "fields" in type_str:  
            # For struct types, just return "struct" for simplicity  
            return "struct"  
        elif base_type in ["string", "boolean", "int", "long", "float", "double"]:  
            return base_type  
    return "complex"  # Default for unrecognized complex types

# Handle primitive types as strings  
type_mapping = {  
    "string": "string",  
    "boolean": "boolean",  
    "int": "integer",  
    "long": "long",  
    "integer": "integer",  
    "float": "float",  
    "double": "double",  
    "null": "null",  
}

return type_mapping.get(type_str.lower(), type_str.lower())

def are_types_compatible(expected_type: str, actual_type: str) -> bool:
"""Check if expected and actual types are compatible.

Implements a flexible type compatibility mapping for schema validation.

Args:  
    expected_type: The expected type.  
    actual_type: The actual type in the DataFrame.

Returns:  
    True if types are compatible, False otherwise.  
"""  
# Normalize both types to lowercase for comparison  
expected = expected_type.lower()  
actual = actual_type.lower()

# Exact match  
if expected == actual:  
    return True

# Type compatibility mappings  
compatibility_map = {  
    # String types  
    "string": {"varchar", "char", "text"},  
    # Integer types  
    "integer": {"int", "smallint", "tinyint", "integer"},  
    "int": {"integer", "smallint", "tinyint"},  
    # Floating point types  
    "float": {"double", "decimal", "float"},  
    "double": {"float", "decimal"},  
    # Decimal types  
    "decimal": {"float", "double"},  
    # Boolean types  
    "boolean": {"bit", "boolean"},  
    # Date/Time types  
    "date": {"timestamp", "date"},  
    "timestamp": {"date"},  
    # Complex types  
    "struct": {"struct"},  
    "array": {"array"},  
    "map": {"map"},  
}

# Check if the actual type is in the set of compatible types for the expected type  
if expected in compatibility_map:  
    if any(compat_type in actual for compat_type in compatibility_map[expected]):  
        return True

# Check if expected type is at start of complex actual type  
# e.g., "array" is compatible with "array<string>"  
if expected in ["array", "map", "struct"] and actual.startswith(expected):  
    return True

return False

def check_schema_expectations(
dataframe,
json_schema_paths: Union[str, List[str]],
check_extra_columns: bool = False,
) -> None:
"""Check if the DataFrame schema meets the expectations defined in the schema files.

Args:  
    dataframe: The DataFrame to check.  
    json_schema_paths: Path or list of paths to JSON schema files.  
    check_extra_columns: If True, raises an exception if the DataFrame has extra columns.

Raises:  
    SchemaExpectationsException: If schema expectations are not met.  
"""  
# Read schema expectations from JSON files  
expected_schema_fields = read_schema_expectations_json_paths(json_schema_paths)  
expected_schema_fields_map = get_schema_field_names_map(expected_schema_fields)

# Check if all expected columns exist  
ensure_columns_exist(dataframe, expected_schema_fields_map)

# Check if column types are compatible  
check_column_type_compatibility(dataframe, expected_schema_fields_map)

# Optionally check for extra columns  
if check_extra_columns:  
    df_columns_lower = {col.lower() for col in dataframe.columns}  
    expected_columns_lower = {  
        field_name.lower() for field_name in expected_schema_fields_map  
    }  
    extra_columns = df_columns_lower - expected_columns_lower  
    if extra_columns:  
        extra_cols_str = ", ".join(extra_columns)  
        raise SchemaExpectationsException(  
            f"Schema validation failed. The DataFrame contains extra columns: {extra_cols_str}"  
        )

def get_expected_columns(
json_schema_paths: Union[str, List[str]],
excluded_columns: Set[str] = None,
) -> Set[str]:
"""Get the set of expected column names from schema files.

Args:  
    json_schema_paths: Path or list of paths to JSON schema files.  
    excluded_columns: Optional set of column names to exclude.

Returns:  
    A set of expected column names.  
"""  
if excluded_columns is None:  
    excluded_columns = set()

# Read schema expectations from JSON files  
expected_schema_fields = read_schema_expectations_json_paths(json_schema_paths)  
expected_columns = {  
    field["name"] for field in expected_schema_fields if field["name"] not in excluded_columns  
}  
return expected_columns

def check_required_schema_files(
schema_dir: str, required_fields_file: str, actual_schema_file: str = None
) -> bool:
"""Check if required schema files exist.

Args:  
    schema_dir: Directory containing schema files.  
    required_fields_file: Filename of the required fields schema file.  
    actual_schema_file: Optional filename of the actual schema file.

Returns:  
    True if all required files exist, False otherwise.  
"""  
required_path = os.path.join(schema_dir, required_fields_file)  
if not os.path.exists(required_path):  
    return False

if actual_schema_file:  
    actual_path = os.path.join(schema_dir, actual_schema_file)  
    if not os.path.exists(actual_path):  
        return False

return True

def generate_schema_from_dataframe(
dataframe, output_path: str, format_type: str = "json"
) -> None:
"""Generate a schema file from a DataFrame.

Args:  
    dataframe: The DataFrame to generate schema from.  
    output_path: Path to save the schema file.  
    format_type: The format type of the schema ('json' or 'avro').  
"""  
# Create the directory if it doesn't exist  
os.makedirs(os.path.dirname(output_path), exist_ok=True)

# Generate the schema  
if format_type.lower() == "json":  
    schema_json = dataframe.schema.json()  
    with open(output_path, "w", encoding="utf-8") as f:  
        f.write(schema_json)  
elif format_type.lower() == "avro":  
    # For Avro schema, we need to convert Spark schema to Avro schema  
    # This is a simplified approach - for complex schemas, you might need  
    # a more sophisticated conversion  
    avro_schema = writers_utils.spark_schema_to_dict(dataframe.schema)  
    with open(output_path, "w", encoding="utf-8") as f:  
        json.dump(avro_schema, f, indent=2)  
else:  
    raise ValueError(f"Unsupported schema format: {format_type}")

def copy_schema_file(src_path: str, dest_path: str) -> None:
"""Copy a schema file from source to destination.

Args:  
    src_path: Source path of the schema file.  
    dest_path: Destination path for the schema file.  
"""  
# Create the directory if it doesn't exist  
os.makedirs(os.path.dirname(dest_path), exist_ok=True)  
shutil.copy2(src_path, dest_path)

def load_sample_data_to_validate_schema(
spark,
source_path: str,
format_type: str = "csv",
header: bool = True,
infer_schema: bool = True,
schema_path: str = None,
options: Dict = None,
) -> bool:
"""Load sample data from source to validate schema.

Args:  
    spark: Spark session.  
    source_path: Path to the source data.  
    format_type: Format of the source data.  
    header: Whether the source data has a header row (for CSV).  
    infer_schema: Whether to infer schema from the data.  
    schema_path: Optional path to schema file.  
    options: Additional options for reading the data.

Returns:  
    True if the data was loaded successfully, False otherwise.  
"""  
try:  
    reader_options = options or {}  
    if format_type.lower() == "csv":  
        reader_options.update({"header": str(header).lower(), "inferSchema": str(infer_schema).lower()})

    # If schema_path is provided, use it  
    if schema_path and os.path.exists(schema_path):  
        with open(schema_path, "r", encoding="utf-8") as f:  
            schema_str = f.read()  
        reader_options["schema"] = schema_str

    # Try to load the data  
    df = readers.DeltaLakeReader(spark).read(source_path, **reader_options)  
    return True  
except Exception as e:  
    print(f"Failed to load sample data: {e}")  
    return False  

End File"""Writer module."""
from typing import Dict, List, Optional, Tuple

from pyspark.sql import DataFrame

from lakehouse_engine.algorithms.exceptions import UnknownWriterException
from lakehouse_engine.core.definitions import OUTPUT_TO
from lakehouse_engine.core.definitions import TableType as TableTypeDef
from lakehouse_engine.io.readers_writers.delta_lake_writer import DeltaLakeWriter
from lakehouse_engine.io.readers_writers.file_writer import FileWriter
from lakehouse_engine.io.readers_writers.jdbc_writer import JDBCWriter
from lakehouse_engine.io.readers_writers.snowflake_writer import SnowflakeWriter
from lakehouse_engine.utils.logging_handler import LoggingHandler

class Writer(LoggingHandler):
"""Writer class."""

def __init__(self):  
    """Construct Writer."""  
    super().__init__(__name__)  
    self.writers: Dict = {  
        TableTypeDef.BRONZE: self._get_bronze_writer,  
        TableTypeDef.SILVER: self._get_silver_writer,  
        TableTypeDef.GOLD: self._get_gold_writer,  
    }

def write(  
    self,  
    df: DataFrame,  
    writer_config: Dict,  
    extra_args: Optional[Tuple] = None,  
    table_type: str = None,  
) -> List[Dict]:  
    """Write data according to the writer configuration.

    Args:  
        df: dataframe to be written.  
        writer_config: writer configuration.  
        extra_args: extra arguments to be used for the writer.  
        table_type: explicit table type for when we want to customize the writer independently  
            from what's on the spec, i.e. for tables outside delta lake

    Returns:  
        metadata dictionaries from the write operations  
    """  
    if not table_type:  
        output_to = writer_config.get(OUTPUT_TO, "").lower()  
        table_type = output_to

    extra_args_unpacked = () if extra_args is None else extra_args  
    func = self.writers.get(table_type)

    if func is not None:  
        return func(df=df, writer_config=writer_config, extra_args=extra_args_unpacked)  
    raise UnknownWriterException(f"Writer not found for table type {table_type}")

def _get_bronze_writer(  
    self,  
    df: DataFrame,  
    writer_config: Dict,  
    extra_args: Optional[Tuple] = (),  
) -> List[Dict]:  
    """Get writer for bronze layer.

    Args:  
        df: dataframe to be written.  
        writer_config: writer configuration.  
        extra_args: extra arguments to be used for the writer.

    Returns:  
        metadata dictionary from the write operation  
    """  
    return [DeltaLakeWriter().write(df, writer_config, "bronze", extra_args)]

def _get_silver_writer(  
    self,  
    df: DataFrame,  
    writer_config: Dict,  
    extra_args: Optional[Tuple] = (),  
) -> List[Dict]:  
    """Get writer for silver layer.

    Args:  
        df: dataframe to be written.  
        writer_config: writer configuration.  
        extra_args: extra arguments to be used for the writer.

    Returns:  
        metadata dictionary from the write operation  
    """  
    return [DeltaLakeWriter().write(df, writer_config, "silver", extra_args)]

def _get_gold_writer(  
    self,  
    df: DataFrame,  
    writer_config: Dict,  
    extra_args: Optional[Tuple] = (),  
) -> List[Dict]:  
    """Get writer for gold layer.

    Args:  
        df: dataframe to be written.  
        writer_config: writer configuration.  
        extra_args: extra arguments to be used for the writer.

    Returns:  
        metadata dictionary from the write operation  
    """  
    return [DeltaLakeWriter().write(df, writer_config, "gold", extra_args)]

def _get_file_writer(  
    self, df: DataFrame, writer_config: Dict, extra_args: Optional[Tuple] = ()  
) -> List[Dict]:  
    """Get file writer.

    Args:  
        df: dataframe to be written.  
        writer_config: writer configuration.  
        extra_args: extra arguments to be used for the writer.

    Returns:  
        metadata dictionary from the write operation  
    """  
    return [FileWriter().write(df, writer_config, extra_args)]

def _get_jdbc_writer(  
    self, df: DataFrame, writer_config: Dict, extra_args: Optional[Tuple] = ()  
) -> List[Dict]:  
    """Get JDBC writer.

    Args:  
        df: dataframe to be written.  
        writer_config: writer configuration.  
        extra_args: extra arguments to be used for the writer.

    Returns:  
        metadata dictionary from the write operation  
    """  
    return [JDBCWriter().write(df, writer_config, extra_args)]

def _get_snowflake_writer(  
    self, df: DataFrame, writer_config: Dict, extra_args: Optional[Tuple] = ()  
) -> List[Dict]:  
    """Get Snowflake writer.

    Args:  
        df: dataframe to be written.  
        writer_config: writer configuration.  
        extra_args: extra arguments to be used for the writer.

    Returns:  
        metadata dictionary from the write operation  
    """  
    return [SnowflakeWriter().write(df, writer_config, extra_args)]  

End File"""ReaderConfig class."""
from abc import ABC, abstractmethod
from typing import Dict, List

import pyspark.sql.functions as f
from pyspark.sql import DataFrame, SparkSession

from lakehouse_engine.core.definitions import TABLE_NAME
from lakehouse_engine.utils.logging_handler import LoggingHandler

class Reader(ABC, LoggingHandler):
"""Abstract Reader class."""

def __init__(self, spark: SparkSession):  
    """Construct Reader.

    Args:  
        spark: spark session  
    """  
    super().__init__(__name__)  
    self.spark = spark

def read(self, read_from_path: str, **kwargs) -> DataFrame:  
    """Read data from a storage location.

    Args:  
        read_from_path: path or identifier for the data source.  
        **kwargs: keyword arguments to be passed to the read method.

    Returns:  
        DataFrame with read data  
    """  
    self.log.debug(f"Reader: path={read_from_path}, kwargs={kwargs}")  
    return self._read_data(read_from_path, **kwargs)

@abstractmethod  
def _read_data(self, read_from_path: str, **kwargs) -> DataFrame:  
    """Read data implementation to be implemented by concrete readers.

    Args:  
        read_from_path: path or identifier for the data source.  
        **kwargs: keyword arguments to be passed to the read method.

    Returns:  
        DataFrame with read data  
    """  
    raise NotImplementedError("Method _read_data must be implemented by subclasses")

class DeltaLakeReader(Reader):
"""Reader for Delta Lake tables."""

def _read_data(self, read_from_path: str, **kwargs) -> DataFrame:  
    """Read data from a Delta Lake table.

    Args:  
        read_from_path: path or identifier for the delta table.  
        **kwargs: keyword arguments to be passed to the read method.

    Returns:  
        DataFrame with read data from delta table  
    """  
    if read_from_path.startswith("select "):  
        # Handle SQL query  
        return self.spark.sql(read_from_path)  
    elif "." in read_from_path and not read_from_path.startswith("/"):  
        # Handle catalog table reference  
        return self.spark.table(read_from_path)  
    else:  
        # Handle path reference  
        reader = self.spark.read.format("delta")  
        # Apply any additional options  
        for key, value in kwargs.items():  
            if key != "table_name":  # Skip table_name, it's not an option  
                reader = reader.option(key, value)  
        return reader.load(read_from_path)

class FileReader(Reader):
"""Reader for file types."""

def _read_data(self, read_from_path: str, **kwargs) -> DataFrame:  
    """Read data from a file.

    Args:  
        read_from_path: path to the file(s).  
        **kwargs: keyword arguments to be passed to the read method.  
            Must contain 'format' specifying the file format (csv, json, etc.)

    Returns:  
        DataFrame with read data from file  
    """  
    format_type = kwargs.pop("format", "csv")  
    reader = self.spark.read.format(format_type)  
      
    # Apply any additional options  
    for key, value in kwargs.items():  
        if key != TABLE_NAME:  # Skip table_name, it's not an option  
            reader = reader.option(key, value)  
              
    return reader.load(read_from_path)

class JDBCReader(Reader):
"""Reader for JDBC sources."""

def _read_data(self, read_from_path: str, **kwargs) -> DataFrame:  
    """Read data from a JDBC source.

    Args:  
        read_from_path: table name or query.  
        **kwargs: keyword arguments to be passed to the read method.  
            Must contain JDBC connection parameters.

    Returns:  
        DataFrame with read data from JDBC source  
    """  
    # Basic JDBC options  
    jdbc_url = kwargs.pop("url")  
    connection_properties = {}  
      
    # Extract specific JDBC connection properties  
    for key in ["user", "password", "driver"]:  
        if key in kwargs:  
            connection_properties[key] = kwargs.pop(key)  
      
    # Determine if this is a query or table name  
    if read_from_path.strip().lower().startswith("select "):  
        # It's a query  
        connection_properties["query"] = read_from_path  
        return self.spark.read.jdbc(  
            url=jdbc_url,  
            table="dummy",  # Not used for queries  
            properties=connection_properties  
        )  
    else:  
        # It's a table name  
        # Apply any additional options to reader  
        reader = self.spark.read.format("jdbc").option("url", jdbc_url)  
        reader = reader.option("dbtable", read_from_path)  
          
        for key, value in kwargs.items():  
            if key != TABLE_NAME:  # Skip table_name, it's not an option  
                reader = reader.option(key, value)  
                  
        # Add connection properties  
        for key, value in connection_properties.items():  
            reader = reader.option(key, value)  
              
        return reader.load()

class SnowflakeReader(Reader):
"""Reader for Snowflake."""

def _read_data(self, read_from_path: str, **kwargs) -> DataFrame:  
    """Read data from Snowflake.

    Args:  
        read_from_path: table name, view name, or query.  
        **kwargs: keyword arguments to be passed to the read method.  
            Must contain Snowflake connection parameters.

    Returns:  
        DataFrame with read data from Snowflake  
    """  
    # Configure reader with Snowflake connection params  
    reader = self.spark.read.format("net.snowflake.spark.snowflake")  
      
    # Add all options  
    for key, value in kwargs.items():  
        if key != TABLE_NAME:  # Skip table_name, it's not an option  
            reader = reader.option(key, value)  
      
    # Determine if this is a query or table name  
    if read_from_path.strip().lower().startswith("select "):  
        # It's a query  
        reader = reader.option("query", read_from_path)  
    else:  
        # It's a table name - make sure dbtable is set  
        reader = reader.option("dbtable", read_from_path)  
          
    return reader.load()

class MultiReader(Reader):
"""Reader for combining multiple data sources."""

def _read_data(self, read_from_path: str, **kwargs) -> DataFrame:  
    """Read data from multiple sources and combine them.

    Args:  
        read_from_path: Not used in MultiReader.  
        **kwargs: keyword arguments to be passed to the read method.  
            Must contain 'sources' list with source configurations.

    Returns:  
        Combined DataFrame from all sources  
    """  
    sources = kwargs.get("sources", [])  
    if not sources:  
        raise ValueError("MultiReader requires 'sources' parameter with source configurations")  
      
    # Initialize readers for each source type  
    readers = {  
        "delta": DeltaLakeReader(self.spark),  
        "file": FileReader(self.spark),  
        "jdbc": JDBCReader(self.spark),  
        "snowflake": SnowflakeReader(self.spark)  
    }  
      
    # Read from each source  
    dfs = []  
    for source in sources:  
        source_type = source.get("type", "delta")  
        path = source.get("path")   
        options = source.get("options", {})  
          
        if source_type not in readers:  
            raise ValueError(f"Unsupported source type: {source_type}")  
          
        df = readers[source_type].read(path, **options)  
          
        # Apply optional transformations to the source  
        if "select" in source:  
            df = df.selectExpr(source["select"])  
        if "where" in source:  
            df = df.filter(source["where"])  
        if "alias" in source:  
            df = df.alias(source["alias"])  
          
        dfs.append(df)  
      
    # Combine the DataFrames based on the combine_method  
    combine_method = kwargs.get("combine_method", "union")  
    return self._combine_dataframes(dfs, combine_method, kwargs.get("join_conditions", {}))  
  
def _combine_dataframes(  
    self, dataframes: List[DataFrame], method: str, join_options: Dict  
) -> DataFrame:  
    """Combine multiple DataFrames using the specified method.

    Args:  
        dataframes: List of DataFrames to combine  
        method: Method to use for combining ('union', 'join')  
        join_options: Options for join operation

    Returns:  
        Combined DataFrame  
    """  
    if not dataframes:  
        return self.spark.createDataFrame([], schema=None)  
      
    if len(dataframes) == 1:  
        return dataframes[0]  
      
    if method == "union":  
        result = dataframes[0]  
        for df in dataframes[1:]:  
            result = result.unionByName(df, allowMissingColumns=True)  
        return result  
      
    elif method == "join":  
        if not join_options:  
            raise ValueError("Join method requires join_conditions")  
          
        result = dataframes[0]  
        for i, df in enumerate(dataframes[1:], 1):  
            condition = join_options.get(f"condition_{i}", None)  
            join_type = join_options.get(f"type_{i}", "inner")  
              
            if not condition:  
                raise ValueError(f"Join condition missing for DataFrame {i}")  
              
            result = result.join(df, on=f.expr(condition), how=join_type)  
        return result  
      
    else:  
        raise ValueError(f"Unsupported combine method: {method}")  

End File# adidas/lakehouse-engine
"""Writers utils."""
from typing import Dict, List, Optional, Union

from pyspark.sql.types import (
ArrayType,
BooleanType,
ByteType,
DataType,
DateType,
DecimalType,
DoubleType,
FloatType,
IntegerType,
LongType,
MapType,
ShortType,
StringType,
StructField,
StructType,
TimestampType,
)

Maps pyspark types to corresponding Avro types

SPARK_TO_AVRO_TYPE = {
StringType: "string",
IntegerType: "int",
LongType: "long",
FloatType: "float",
DoubleType: "double",
BooleanType: "boolean",
DateType: "int", # Date is represented as int (days since epoch) in Avro
TimestampType: "long", # Timestamp as long (milliseconds since epoch) in Avro
ByteType: "bytes",
ShortType: "int", # No direct short type in Avro
DecimalType: "bytes", # Decimal as bytes in Avro
}

def spark_schema_to_dict(schema: DataType) -> Dict:
"""Converts a Spark schema to a dictionary representation (compatible with Avro).

Args:  
    schema: Spark schema to convert

Returns:  
    Dictionary representation of the schema  
"""  
if isinstance(schema, StructType):  
    fields = []  
    for field in schema.fields:  
        fields.append(struct_field_to_dict(field))  
      
    return {  
        "type": "record",  
        "name": "Record",  
        "fields": fields  
    }  
else:  
    # Handle non-struct types  
    return {"type": spark_type_to_avro_type(schema)}

def struct_field_to_dict(field: StructField) -> Dict:
"""Converts a StructField to a dictionary.

Args:  
    field: StructField to convert

Returns:  
    Dictionary representation of the field  
"""  
field_dict = {  
    "name": field.name,  
    "type": get_field_type_dict(field.dataType, field.nullable)  
}  
  
# Add nullable option if field is nullable  
if field.nullable:  
    field_dict["type"] = ["null", field_dict["type"]]  
  
return field_dict

def get_field_type_dict(data_type: DataType, nullable: bool) -> Union[str, Dict, List]:
"""Gets the type representation for a field's data type.

Args:  
    data_type: The data type of the field  
    nullable: Whether the field is nullable

Returns:  
    Type representation (string, dict, or list for union types)  
"""  
# Handle complex types  
if isinstance(data_type, StructType):  
    fields = []  
    for field in data_type.fields:  
        fields.append(struct_field_to_dict(field))  
      
    return {  
        "type": "record",  
        "name": "NestedRecord",  # This name can be improved  
        "fields": fields  
    }  
elif isinstance(data_type, ArrayType):  
    item_type = get_field_type_dict(data_type.elementType, data_type.containsNull)  
    return {  
        "type": "array",  
        "items": item_type  
    }  
elif isinstance(data_type, MapType):  
    # Avro only supports string keys  
    value_type = get_field_type_dict(data_type.valueType, data_type.valueContainsNull)  
    return {  
        "type": "map",  
        "values": value_type  
    }  
else:  
    # Handle primitive types  
    avro_type = spark_type_to_avro_type(data_type)  
    if nullable:  
        return ["null", avro_type]  
    return avro_type

def spark_type_to_avro_type(data_type: DataType) -> str:
"""Maps a Spark data type to its Avro equivalent.

Args:  
    data_type: Spark data type

Returns:  
    Corresponding Avro type string  
"""  
# Use the mapping for direct type matches  
for spark_type, avro_type in SPARK_TO_AVRO_TYPE.items():  
    if isinstance(data_type, spark_type):  
        # Special handling for logical types like Decimal  
        if isinstance(data_type, DecimalType):  
            return {  
                "type": "bytes",  
                "logicalType": "decimal",  
                "precision": data_type.precision,  
                "scale": data_type.scale  
            }  
        return avro_type  
  
# Fallback  
return "string"

def get_write_mode(mode: Optional[str] = None) -> str:
"""Get the write mode to be used.

Args:  
    mode: write mode to be used. Defaults to None.

Returns:  
    Write mode to be used.  
"""  
return "overwrite" if mode is None else mode  

End File# adidas/lakehouse-engine

lakehouse_engine/io/readers_writers/writers_utils.py

"""Writers utils."""
from typing import Dict, List, Optional, Tuple, Union

from pyspark.sql.types import (
ArrayType,
BooleanType,
ByteType,
DataType,
DateType,
DecimalType,
DoubleType,
FloatType,
IntegerType,
LongType,
MapType,
ShortType,
StringType,
StructField,
StructType,
TimestampType,
)

from lakehouse_engine.core.definitions import (
CDC_CONFIG,
CDC_ENABLED,
CDC_KEY_COLUMNS,
CDC_WATERMARK_COLUMN,
KEY_COLUMN,
OVERWRITE_AT_EXECUTION_TIME,
OVERWRITE_PARTITIONS,
OVERWRITE_SCHEMA,
PARTITION_COLUMNS,
PATH,
PROCESSING_TIME,
WATERMARK_COLUMN,
Z_ORDER_BY,
)

Maps pyspark types to corresponding Avro types

SPARK_TO_AVRO_TYPE = {
StringType: "string",
IntegerType: "int",
LongType: "long",
FloatType: "float",
DoubleType: "double",
BooleanType: "boolean",
DateType: "int", # Date is represented as int (days since epoch) in Avro
TimestampType: "long", # Timestamp as long (milliseconds since epoch) in Avro
ByteType: "bytes",
ShortType: "int", # No direct short type in Avro
DecimalType: "bytes", # Decimal as bytes in Avro
}

def spark_schema_to_dict(schema: DataType) -> Dict:
"""Converts a Spark schema to a dictionary representation (compatible with Avro).

Args:  
    schema: Spark schema to convert

Returns:  
    Dictionary representation of the schema  
"""  
if isinstance(schema, StructType):  
    fields = []  
    for field in schema.fields:  
        fields.append(struct_field_to_dict(field))  
      
    return {  
        "type": "record",  
        "name": "Record",  
        "fields": fields  
    }  
else:  
    # Handle non-struct types  
    return {"type": spark_type_to_avro_type(schema)}

def struct_field_to_dict(field: StructField) -> Dict:
"""Converts a StructField to a dictionary.

Args:  
    field: StructField to convert

Returns:  
    Dictionary representation of the field  
"""  
field_dict = {  
    "name": field.name,  
    "type": get_field_type_dict(field.dataType, field.nullable)  
}  
  
# Add nullable option if field is nullable  
if field.nullable:  
    field_dict["type"] = ["null", field_dict["type"]]  
  
return field_dict

def get_field_type_dict(data_type: DataType, nullable: bool) -> Union[str, Dict, List]:
"""Gets the type representation for a field's data type.

Args:  
    data_type: The data type of the field  
    nullable: Whether the field is nullable

Returns:  
    Type representation (string, dict, or list for union types)  
"""  
# Handle complex types  
if isinstance(data_type, StructType):  
    fields = []  
    for field in data_type.fields:  
        fields.append(struct_field_to_dict(field))  
      
    return {  
        "type": "record",  
        "name": "NestedRecord",  # This name can be improved  
        "fields": fields  
    }  
elif isinstance(data_type, ArrayType):  
    item_type = get_field_type_dict(data_type.elementType, data_type.containsNull)  
    return {  
        "type": "array",  
        "items": item_type  
    }  
elif isinstance(data_type, MapType):  
    # Avro only supports string keys  
    value_type = get_field_type_dict(data_type.valueType, data_type.valueContainsNull)  
    return {  
        "type": "map",  
        "values": value_type  
    }  
else:  
    # Handle primitive types  
    avro_type = spark_type_to_avro_type(data_type)  
    if nullable:  
        return ["null", avro_type]  
    return avro_type

def spark_type_to_avro_type(data_type: DataType) -> str:
"""Maps a Spark data type to its Avro equivalent.

Args:  
    data_type: Spark data type

Returns:  
    Corresponding Avro type string  
"""  
# Use the mapping for direct type matches  
for spark_type, avro_type in SPARK_TO_AVRO_TYPE.items():  
    if isinstance(data_type, spark_type):  
        # Special handling for logical types like Decimal  
        if isinstance(data_type, DecimalType):  
            return {  
                "type": "bytes",  
                "logicalType": "decimal",  
                "precision": data_type.precision,  
                "scale": data_type.scale  
            }  
        return avro_type  
  
# Fallback  
return "string"

def get_write_mode(mode: Optional[str] = None) -> str:
"""Get the write mode to be used.

Args:  
    mode: write mode to be used. Defaults to None.

Returns:  
    Write mode to be used.  
"""  
return "overwrite" if mode is None else mode

def get_z_order_by_statement_for_delta(z_order_by_cols: Union[str, List]) -> str:
"""Get Z-ORDER BY statement for delta table.

Args:  
    z_order_by_cols: Z-ORDER BY columns.

Returns:  
    Statement for Z-ORDER BY.  
"""  
if z_order_by_cols:  
    z_order_by_cols_as_list = (  
        ([z_order_by_cols] if isinstance(z_order_by_cols, str) else z_order_by_cols)  
        if z_order_by_cols  
        else []  
    )  
    z_order_by_cols_joined = ", ".join(  
        [f"`{col}`" for col in z_order_by_cols_as_list]  
    )  
    return f"OPTIMIZE delta.`{0}` ZORDER BY ({z_order_by_cols_joined})"

return ""

def get_overwrite_partitions(
writer_config: Dict, output: Optional[str] = None
) -> List[str]:
"""Get list of partition values for conditional overwrite.

Args:  
    writer_config: writer configuration.  
    output: output path.

Returns:  
    list of partition values  
"""  
# Handle partitions that are provided by the writer itself at runtime  
# e.g.: current execution time  
output_path = writer_config.get(PATH, "") if output is None else output  
if (  
    OVERWRITE_AT_EXECUTION_TIME in writer_config  
    and writer_config[OVERWRITE_AT_EXECUTION_TIME]  
):  
    if PROCESSING_TIME in writer_config and writer_config[PROCESSING_TIME]:  
        return [writer_config[PROCESSING_TIME].__str__()]  
# return parameters for overwritePartitions  
if OVERWRITE_PARTITIONS in writer_config and writer_config[OVERWRITE_PARTITIONS]:  
    return writer_config[OVERWRITE_PARTITIONS]  
return []

def get_args_for_merge_to_delta(
writer_config: Dict,
) -> Tuple[bool, str, str, Union[str, List[str]], Union[str, List[str]]]:
"""Get arguments for merge statement in Delta Lake.

Args:  
    writer_config: writer configuration.

Returns:  
    CDC enabled flag, waterma rk column, key column, partition cols, Z-ORDER BY columns  
"""  
cdc_enabled: bool = False  
cdc_key_columns: List[str] = []  
cdc_watermark_column: str = ""  
partition_columns: List[str] = []  
z_order_by_columns: List[str] = []

if CDC_CONFIG in writer_config and writer_config[CDC_CONFIG]:  
    cdc_config = writer_config[CDC_CONFIG]  
    cdc_enabled = bool(cdc_config.get(CDC_ENABLED, False))  
    cdc_key_columns = cdc_config.get(CDC_KEY_COLUMNS, [])  
    cdc_watermark_column = cdc_config.get(CDC_WATERMARK_COLUMN, "")

if PARTITION_COLUMNS in writer_config and writer_config[PARTITION_COLUMNS]:  
    partition_columns = writer_config[PARTITION_COLUMNS]

if WATERMARK_COLUMN in writer_config and writer_config[WATERMARK_COLUMN]:  
    cdc_watermark_column = writer_config[WATERMARK_COLUMN]

if KEY_COLUMN in writer_config and writer_config[KEY_COLUMN]:  
    cdc_key_columns = (  
        [writer_config[KEY_COLUMN]]  
        if isinstance(writer_config[KEY_COLUMN], str)  
        else writer_config[KEY_COLUMN]  
    )

if Z_ORDER_BY in writer_config and writer_config[Z_ORDER_BY]:  
    z_order_by_columns = writer_config[Z_ORDER_BY]

return (  
    cdc_enabled,  
    cdc_watermark_column,  
    cdc_key_columns,  
    partition_columns,  
    z_order_by_columns,  
)

def should_overwrite_schema(writer_config: Dict) -> bool:
"""Check if schema should be overwritten.

Args:  
    writer_config: writer configuration.

Returns:  
    True if schema should be overwritten, False otherwise.  
"""  
return (  
    OVERWRITE_SCHEMA in writer_config  
    and writer_config[OVERWRITE_SCHEMA] is True  
)  

End File# adidas/lakehouse-engine
"""File Writer."""
from typing import Dict, List, Optional, Tuple

from pyspark.sql import DataFrame
import pyspark.sql.functions as F

from lakehouse_engine.core.definitions import (
FORMAT,
MODE,
OPTIONS,
OUTPUT_FORMAT,
OUTPUT_MODE,
OUTPUT_OPTIONS,
OUTPUT_PATH,
PARTITION_BY,
PATH,
)
from lakehouse_engine.io.readers_writers.writers_utils import get_write_mode
from lakehouse_engine.utils.logging_handler import LoggingHandler

class FileWriter(LoggingHandler):
"""A class for writing to file storage."""

def __init__(self):  
    """Initialize the FileWriter."""  
    super().__init__(__name__)

def write(  
    self,  
    df: DataFrame,  
    writer_config: Dict,  
    extra_args: Optional[Tuple] = None,  
) -> Dict:  
    """Write data to file storage.

    Args:  
        df: The DataFrame to write.  
        writer_config: Configuration for the writer.  
        extra_args: Extra arguments for the write operation (not used by this writer).

    Returns:  
        Dictionary with metadata about the write operation.  
    """  
    # Extract configuration  
    path = writer_config.get(PATH) or writer_config.get(OUTPUT_PATH, "")  
    format_type = writer_config.get(FORMAT) or writer_config.get(OUTPUT_FORMAT, "parquet")  
    mode = writer_config.get(MODE) or writer_config.get(OUTPUT_MODE, "overwrite")  
    options = writer_config.get(OPTIONS) or writer_config.get(OUTPUT_OPTIONS, {})  
    partition_by = writer_config.get(PARTITION_BY, [])

    # Validate required parameters  
    if not path:  
        raise ValueError("Output path is required")

    # Begin writing process  
    writer = df.write.format(format_type)  
    writer = writer.mode(get_write_mode(mode))

    # Apply all options  
    for key, value in options.items():  
        writer = writer.option(key, value)

    # Apply partitioning if specified  
    if partition_by:  
        if isinstance(partition_by, str):  
            partition_by = [partition_by]  
        writer = writer.partitionBy(partition_by)

    # Perform the write operation  
    self.log.info(f"Writing data to path: {path} (format: {format_type}, mode: {mode})")  
    writer.save(path)

    # Return metadata about the operation  
    num_rows = df.count()  
    return {  
        "path": path,  
        "format": format_type,  
        "mode": mode,  
        "partitions": partition_by if partition_by else None,  
        "options": options,  
        "num_rows": num_rows,  
        "success": True  
    }  

End File# adidas/lakehouse-engine
"""JDBC Writer."""
from typing import Dict, List, Optional, Tuple

from pyspark.sql import DataFrame
import pyspark.sql.functions as F

from lakehouse_engine.core.definitions import (
MODE,
OPTIONS,
OUTPUT_MODE,
OUTPUT_OPTIONS,
TABLE,
URL,
)
from lakehouse_engine.io.readers_writers.writers_utils import get_write_mode
from lakehouse_engine.utils.logging_handler import LoggingHandler

class JDBCWriter(LoggingHandler):
"""A class for writing data to databases via JDBC."""

def __init__(self):  
    """Initialize the JDBCWriter."""  
    super().__init__(__name__)

def write(  
    self,  
    df: DataFrame,  
    writer_config: Dict,  
    extra_args: Optional[Tuple] = None,  
) -> Dict:  
    """Write data to a JDBC destination.

    Args:  
        df: The DataFrame to write.  
        writer_config: Configuration for the JDBC writer.  
        extra_args: Extra arguments for the write operation (not used by this writer).

    Returns:  
        Dictionary with metadata about the write operation.  
    """  
    # Extract configuration  
    jdbc_url = writer_config.get(URL)  
    table_name = writer_config.get(TABLE)  
    mode = writer_config.get(MODE) or writer_config.get(OUTPUT_MODE, "overwrite")  
    options = writer_config.get(OPTIONS) or writer_config.get(OUTPUT_OPTIONS, {})

    # Validate required parameters  
    if not jdbc_url:  
        raise ValueError("JDBC URL is required")  
    if not table_name:  
        raise ValueError("Table name is required")

    # Begin writing process  
    writer = df.write.format("jdbc")  
    writer = writer.mode(get_write_mode(mode))  
      
    # Set JDBC URL and table name  
    writer = writer.option("url", jdbc_url)  
    writer = writer.option("dbtable", table_name)  
      
    # Apply additional options  
    for key, value in options.items():  
        writer = writer.option(key, value)

    # Perform the write operation  
    self.log.info(f"Writing data to JDBC table: {table_name}")  
    writer.save()

    # Return metadata about the operation  
    num_rows = df.count()  
    return {  
        "jdbc_url": jdbc_url,  
        "table": table_name,  
        "mode": mode,  
        "options": options,  
        "num_rows": num_rows,  
        "success": True  
    }  

End File"""Delta Lake Writer."""
from typing import Dict, List, Optional, Tuple, Union

from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from delta.tables import DeltaTable

from lakehouse_engine.core.definitions import (
MODE,
OPTIMIZE,
OUTPUT_MODE,
OUTPUT_PATH,
PARTITION_BY,
PATH,
TABLE_NAME,
)
from lakehouse_engine.io.readers_writers.writers_utils import (
get_args_for_merge_to_delta,
get_overwrite_partitions,
get_write_mode,
get_z_order_by_statement_for_delta,
should_overwrite_schema,
)
from lakehouse_engine.utils.logging_handler import LoggingHandler

class DeltaLakeWriter(LoggingHandler):
"""A class for writing to Delta Lake format."""

def __init__(self):  
    """Initialize the DeltaLakeWriter."""  
    super().__init__(__name__)

def write(  
    self,  
    df: DataFrame,  
    writer_config: Dict,  
    table_type: str,  
    extra_args: Optional[Tuple] = None,  
) -> Dict:  
    """Write data to Delta Lake format.

    Args:  
        df: The DataFrame to write.  
        writer_config: Configuration for the writer.  
        table_type: Type of table (bronze, silver, gold).  
        extra_args: Extra arguments for the write operation (not used by this writer).

    Returns:  
        Dictionary with metadata about the write operation.  
    """  
    # Extract configuration  
    path = writer_config.get(PATH) or writer_config.get(OUTPUT_PATH, "")  
    table_name = writer_config.get(TABLE_NAME, "")  
    mode = writer_config.get(MODE) or writer_config.get(OUTPUT_MODE, "overwrite")  
    partition_by = writer_config.get(PARTITION_BY, [])  
    optimize = writer_config.get(OPTIMIZE, False)  
      
    # Validate required parameters  
    if not path and not table_name:  
        raise ValueError("Either path or table_name must be provided")

    # Get settings for delta merge operations  
    (  
        cdc_enabled,  
        watermark_column,  
        key_columns,  
        partition_columns,  
        z_order_by,  
    ) = get_args_for_merge_to_delta(writer_config)  
      
    target_path = path  
      
    # Determine if table exists  
    table_exists = False  
    try:  
        if table_name:  
            # Check if table exists in catalog  
            table_exists = df.sparkSession.catalog._jcatalog.tableExists(table_name)  
        elif path:  
            # Check if table exists by path  
            delta_table = DeltaTable.forPath(df.sparkSession, path)  
            table_exists = True  
    except:  
        table_exists = False  
      
    output_metadata = {  
        "table_type": table_type,  
        "table_name": table_name,  
        "path": path,  
        "mode": mode,  
        "partitions": partition_by if partition_by else None,  
    }  
      
    # Different write strategies based on mode and table existence  
    if not table_exists or mode in ("overwrite", "errorifexists"):  
        # Simple write for non-existing tables or overwrite mode  
        writer = df.write.format("delta")  
        writer = writer.mode(get_write_mode(mode))  
          
        # Apply partitioning if specified  
        if partition_by:  
            if isinstance(partition_by, str):  
                partition_by = [partition_by]  
            writer = writer.partitionBy(partition_by)  
          
        # Handle schema evolution  
        if should_overwrite_schema(writer_config):  
            writer = writer.option("overwriteSchema", "true")  
          
        # Perform the write operation  
        if table_name:  
            self.log.info(f"Writing to Delta table: {table_name}")  
            writer.saveAsTable(table_name)  
        else:  
            self.log.info(f"Writing to Delta path: {path}")  
            writer.save(path)  
              
    else:  
        # Handle updates to existing tables with CDC or merging  
        if cdc_enabled and key_columns:  
            self._handle_cdc_writes(  
                df,  
                writer_config,  
                table_name,  
                path,  
                key_columns,  
                watermark_column,  
            )  
        else:  
            self._handle_append_or_merge(  
                df,  
                writer_config,  
                table_name,  
                path,  
                mode,  
                partition_by,  
            )  
      
    # Optimization if requested  
    if optimize and z_order_by:  
        optimize_statement = get_z_order_by_statement_for_delta(z_order_by)  
        if optimize_statement:  
            optimize_statement = optimize_statement.format(target_path)  
            self.log.info(f"Running optimization: {optimize_statement}")  
            df.sparkSession.sql(optimize_statement)  
            output_metadata["optimized"] = True  
      
    # Count rows for metadata  
    num_rows = df.count()  
    output_metadata["num_rows"] = num_rows  
    output_metadata["success"] = True  
      
    return output_metadata

def _handle_cdc_writes(  
    self,  
    df: DataFrame,  
    writer_config: Dict,  
    table_name: str,  
    path: str,  
    key_columns: Union[str, List[str]],  
    watermark_column: str,  
) -> None:  
    """Handle CDC writes to Delta table.

    Args:  
        df: The DataFrame to write.  
        writer_config: Writer configuration.  
        table_name: Name of the table.  
        path: Path to the table.  
        key_columns: Key column(s) for matching records.  
        watermark_column: Watermark column for tracking updates.  
    """  
    # Ensure key_columns is a list  
    if isinstance(key_columns, str):  
        key_columns = [key_columns]  
          
    # Get the target table  
    target_table = None  
    if table_name:  
        target_table = DeltaTable.forName(df.sparkSession, table_name)  
    else:  
        target_table = DeltaTable.forPath(df.sparkSession, path)  
          
    if not target_table:  
        raise ValueError("Failed to locate target Delta table")  
      
    # Create the merge condition based on key columns  
    merge_condition = " AND ".join([f"source.{k} = target.{k}" for k in key_columns])  
      
    # If watermark column is provided, add it to the merge condition  
    if watermark_column:  
        merge_condition += f" AND source.{watermark_column} > target.{watermark_column}"  
          
    # Get target columns for updates (exclude key columns from updates if needed)  
    target_columns = [col for col in df.columns]  
    update_expr = {f"target.{col}": f"source.{col}" for col in target_columns}  
    insert_expr = {col: f"source.{col}" for col in target_columns}  
      
    self.log.info(f"Performing CDC merge with condition: {merge_condition}")  
      
    # Execute the merge  
    target_table.alias("target").merge(  
        df.alias("source"),  
        merge_condition  
    ).whenMatchedUpdate(  
        condition=None,  
        set=update_expr  
    ).whenNotMatchedInsert(  
        condition=None,  
        values=insert_expr  
    ).execute()

def _handle_append_or_merge(  
    self,  
    df: DataFrame,  
    writer_config: Dict,  
    table_name: str,  
    path: str,  
    mode: str,  
    partition_by: Union[str, List[str]],  
) -> None:  
    """Handle append or merge operations to Delta table.

    Args:  
        df: The DataFrame to write.  
        writer_config: Writer configuration.  
        table_name: Name of the table.  
        path: Path to the table.  
        mode: Write mode.  
        partition_by: Partition columns.  
    """  
    writer = df.write.format("delta")  
    writer = writer.mode(mode)  
      
    # Apply partitioning if specified  
    if partition_by:  
        if isinstance(partition_by, str):  
            partition_by = [partition_by]  
        writer = writer.partitionBy(partition_by)  
      
    # Handle dynamic partition overwrite  
    overwrite_partitions = get_overwrite_partitions(writer_config, path)  
    if overwrite_partitions and mode == "overwrite":  
        # Use replaceWhere for partition overwrite  
        if isinstance(overwrite_partitions, list) and len(overwrite_partitions) > 0:  
            partition_col = partition_by[0] if isinstance(partition_by, list) else partition_by  
            where_condition = f"{partition_col} IN ('{overwrite_partitions[0]}')"  
            for val in overwrite_partitions[1:]:  
                where_condition += f", '{val}'"  
            where_condition += ")"  
            self.log.info(f"Replacing partitions with condition: {where_condition}")  
            writer = writer.option("replaceWhere", where_condition)  
      
    # Write to table or path  
    if table_name:  
        self.log.info(f"Writing to Delta table with mode {mode}: {table_name}")  
        writer.saveAsTable(table_name)  
    else:  
        self.log.info(f"Writing to Delta path with mode {mode}: {path}")  
        writer.save(path)  

End File"""Snowflake Writer."""
from typing import Dict, List, Optional, Tuple

from pyspark.sql import DataFrame
import pyspark.sql.functions as F

from lakehouse_engine.core.definitions import (
MODE,
OPTIONS,
OUTPUT_MODE,
OUTPUT_OPTIONS,
SAVE_MODE,
TABLE,
)
from lakehouse_engine.io.readers_writers.writers_utils import get_write_mode
from lakehouse_engine.utils.logging_handler import LoggingHandler

class SnowflakeWriter(LoggingHandler):
"""A class for writing data to Snowflake."""

def __init__(self):  
    """Initialize the SnowflakeWriter."""  
    super().__init__(__name__)

def write(  
    self,  
    df: DataFrame,  
    writer_config: Dict,  
    extra_args: Optional[Tuple] = None,  
) -> Dict:  
    """Write data to Snowflake.

    Args:  
        df: The DataFrame to write.  
        writer_config: Configuration for the Snowflake writer.  
        extra_args: Extra arguments for the write operation (not used by this writer).

    Returns:  
        Dictionary with metadata about the write operation.  
    """  
    # Extract configuration  
    table_name = writer_config.get(TABLE)  
    mode = writer_config.get(MODE) or writer_config.get(OUTPUT_MODE, "overwrite")  
    options = writer_config.get(OPTIONS) or writer_config.get(OUTPUT_OPTIONS, {})  
      
    # Set save mode if not in options  
    if SAVE_MODE not in options:  
        options[SAVE_MODE] = get_write_mode(mode)

    # Validate required parameters  
    if not table_name:  
        raise ValueError("Snowflake table name is required")

    # Begin writing process  
    writer = df.write.format("snowflake")  
      
    # Apply all options  
    for key, value in options.items():  
        writer = writer.option(key, value)  
      
    # Set the table parameter (dbtable)  
    writer = writer.option("dbtable", table_name)  
      
    # Perform the write operation   
    self.log.info(f"Writing data to Snowflake table: {table_name}")  
    writer.save()

    # Return metadata about the operation  
    num_rows = df.count()  
    return {  
        "table": table_name,  
        "mode": mode,  
        "options": options,  
        "num_rows": num_rows,  
        "success": True  
    }  

End File# adidas/lakehouse-engine
"""Reader module."""
from typing import Any, Dict, Union

from pyspark.sql import DataFrame, SparkSession

from lakehouse_engine.algorithms.exceptions import UnknownReaderException
from lakehouse_engine.core.definitions import INPUT_FORMAT, InputFormat, RESOURCE_PATH
from lakehouse_engine.io.readers_writers.readers import (
DeltaLakeReader,
FileReader,
JDBCReader,
MultiReader,
SnowflakeReader,
)
from lakehouse_engine.utils.logging_handler import LoggingHandler

class Reader(LoggingHandler):
"""Reader class."""

def __init__(self, spark_session: SparkSession):  
    """Construct Reader.

    Args:  
        spark_session: Spark session.  
    """  
    super().__init__(__name__)  
    self.spark = spark_session  
    self.readers = {  
        InputFormat.DELTA: DeltaLakeReader(spark_session),  
        InputFormat.CSV: FileReader(spark_session),  
        InputFormat.PARQUET: FileReader(spark_session),  
        InputFormat.AVRO: FileReader(spark_session),  
        InputFormat.JSON: FileReader(spark_session),  
        InputFormat.JDBC: JDBCReader(spark_session),  
        InputFormat.SNOWFLAKE: SnowflakeReader(spark_session),  
        InputFormat.MULTI: MultiReader(spark_session),  
    }

def read(  
    self, reader_config: Dict[str, Any], reader_type: str = None  
) -> DataFrame:  
    """Read data according to the reader configuration.

    Args:  
        reader_config: Reader configuration.  
        reader_type: Reader type (format) to use. Defaults to None.

    Returns:  
        DataFrame with read data  
    """  
    read_from_path = reader_config.get(RESOURCE_PATH)  
    if read_from_path is None:  
        raise ValueError("No resource path provided in reader configuration")  
      
    # Determine the input format (either from specified reader_type or from config)  
    format_type = reader_type  
    if format_type is None:  
        format_type = reader_config.get(INPUT_FORMAT, InputFormat.DELTA).lower()

    # Get the appropriate reader  
    reader = self.readers.get(format_type)  
    if reader is None:  
        raise UnknownReaderException(f"Unknown reader type: {format_type}")  
      
    # Prepare options - remove known keys that aren't options  
    options = reader_config.copy()  
    non_option_keys = [RESOURCE_PATH, INPUT_FORMAT]  
    for key in non_option_keys:  
        if key in options:  
            del options[key]  
      
    # Add format for file reader  
    if isinstance(reader, FileReader):  
        options["format"] = format_type  
      
    # Read and return the data  
    return reader.read(read_from_path, **options)  

End File# adidas/lakehouse-engine
"""Writer module."""
from typing import Dict, List, Optional, Tuple

from pyspark.sql import DataFrame

from lakehouse_engine.algorithms.exceptions import UnknownWriterException
from lakehouse_engine.core.definitions import OUTPUT_FORMAT, OutputFormat
from lakehouse_engine.io.readers_writers.delta_lake_writer import DeltaLakeWriter
from lakehouse_engine.io.readers_writers.file_writer import FileWriter
from lakehouse_engine.io.readers_writers.jdbc_writer import JDBCWriter
from lakehouse_engine.io.readers_writers.snowflake_writer import SnowflakeWriter
from lakehouse_engine.utils.logging_handler import LoggingHandler

class Writer(LoggingHandler):
"""Writer class."""

def __init__(self):  
    """Construct Writer."""  
    super().__init__(__name__)  
    self.writers = {  
        OutputFormat.BRONZE: DeltaLakeWriter(),  
        OutputFormat.SILVER: DeltaLakeWriter(),  
        OutputFormat.GOLD: DeltaLakeWriter(),  
        OutputFormat.DELTA: DeltaLakeWriter(),  
        OutputFormat.CSV: FileWriter(),  
        OutputFormat.PARQUET: FileWriter(),  
        OutputFormat.AVRO: FileWriter(),  
        OutputFormat.JSON: FileWriter(),  
        OutputFormat.JDBC: JDBCWriter(),  
        OutputFormat.SNOWFLAKE: SnowflakeWriter(),  
    }

def write(  
    self,  
    df: DataFrame,  
    writer_config: Dict,  
    extra_args: Optional[Tuple] = None,  
    output_format: str = None,  
) -> List[Dict]:  
    """Write data according to the writer configuration.

    Args:  
        df: dataframe to be written.  
        writer_config: writer configuration.  
        extra_args: extra arguments to be used for the writer.  
        output_format: explicit output format, overrides what's in the config.

    Returns:  
        metadata dictionaries from the write operations  
    """  
    # Determine the output format (either from specified output_format or from config)  
    format_type = output_format  
    if format_type is None:  
        format_type = writer_config.get(OUTPUT_FORMAT, OutputFormat.DELTA).lower()

    # Get the appropriate writer  
    writer = self.writers.get(format_type)  
    if writer is None:  
        raise UnknownWriterException(f"Unknown writer format: {format_type}")

    extra_args_unpacked = () if extra_args is None else extra_args

    # For delta lake writers, we need to pass the table type  
    if isinstance(writer, DeltaLakeWriter):  
        # Map the format to a table type if it's a layer designation  
        if format_type in [OutputFormat.BRONZE, OutputFormat.SILVER, OutputFormat.GOLD]:  
            table_type = format_type  
        else:  
            table_type = "delta"  # Generic delta type  
        return [writer.write(df, writer_config, table_type, extra_args_unpacked)]  
    else:  
        return [writer.write(df, writer_config, extra_args_unpacked)]  

End File""" A transformer class for SQL transformations. """
from typing import Any, Dict, List, Optional, Tuple, Union

import pyspark.sql.functions
from pyspark.sql import Column, DataFrame

from lakehouse_engine.utils.logging_handler import LoggingHandler

class SQLTransformer(LoggingHandler):
"""A transformer class for SQL transformations."""

def __init__(self, spark_session):  
    """Construct SQLTransformer.

    Args:  
        spark_session: Spark session.  
    """  
    super().__init__(__name__)  
    self.spark = spark_session

def transform(  
    self,   
    input_dfs: Dict[str, DataFrame],   
    transformation_config: Dict[str, Any]  
) -> DataFrame:  
    """Transform the input dataframes using SQL.

    Args:  
        input_dfs: Dictionary mapping dataframe aliases to dataframes.  
        transformation_config: Configuration for SQL transformation.

    Returns:  
        Transformed dataframe.  
    """  
    # Register temporary views for all input dataframes  
    for alias, df in input_dfs.items():  
        df.createOrReplaceTempView(alias)  
          
    sql_query = transformation_config.get("query")  
    if not sql_query:  
        raise ValueError("SQL query is required in transformation configuration")  
          
    # Execute the SQL query  
    self.log.info(f"Executing SQL transformation: {sql_query}")  
    result_df = self.spark.sql(sql_query)  
      
    # Clean up temporary views  
    for alias in input_dfs.keys():  
        self.spark.catalog.dropTempView(alias)  
          
    return result_df

class ColumnTransformer(LoggingHandler):
"""A transformer class for column operations."""

def __init__(self, spark_session):  
    """Construct ColumnTransformer.

    Args:  
        spark_session: Spark session.  
    """  
    super().__init__(__name__)  
    self.spark = spark_session  
      
    # Map of operation names to methods that implement them  
    self.operations = {  
        "select": self._select,  
        "drop": self._drop,  
        "rename": self._rename,  
        "cast": self._cast,  
        "withColumn": self._with_column,  
        "withColumnRenamed": self._with_column_renamed,  
        "filter": self._filter,  
        "where": self._filter,  # alias for filter  
        "orderBy": self._order_by,  
        "sort": self._order_by,  # alias for orderBy  
        "groupBy": self._group_by,  
        "join": self._join,  
        "union": self._union,  
        "unionByName": self._union_by_name,  
        "limit": self._limit,  
        "distinct": self._distinct,  
        "dropDuplicates": self._drop_duplicates,  
        "sample": self._sample,  
        "explode": self._explode,  
        "pivot": self._pivot,  
        "unpivot": self._unpivot,  
        "summary": self._summary,  
    }

def transform(  
    self,   
    input_dfs: Dict[str, DataFrame],   
    transformation_config: Dict[str, Any]  
) -> DataFrame:  
    """Transform the input dataframes using column operations.

    Args:  
        input_dfs: Dictionary mapping dataframe aliases to dataframes.  
        transformation_config: Configuration for column operations.  
            Format: {"operations": [{"op": "operation_name", "params": {...}}]}

    Returns:  
        Transformed dataframe.  
    """  
    # Get the primary input dataframe  
    primary_alias = transformation_config.get("primary_df", next(iter(input_dfs.keys())))  
    result_df = input_dfs[primary_alias]  
      
    # Process each operation in sequence  
    operations = transformation_config.get("operations", [])  
    for operation in operations:  
        op_name = operation.get("op")  
        if not op_name or op_name not in self.operations:  
            raise ValueError(f"Unknown or missing operation: {op_name}")  
              
        op_params = operation.get("params", {})  
          
        # Execute the operation  
        self.log.info(f"Executing column operation: {op_name}")  
        result_df = self.operations[op_name](result_df, op_params, input_dfs)  
          
    return result_df

def _select(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Select columns from dataframe.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation. Format: {"columns": ["col1", "col2", ...]}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Dataframe with selected columns.  
    """  
    columns = params.get("columns", [])  
    if not columns:  
        return df  
          
    return df.select(*columns)

def _drop(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Drop columns from dataframe.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation. Format: {"columns": ["col1", "col2", ...]}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Dataframe with dropped columns.  
    """  
    columns = params.get("columns", [])  
    if not columns:  
        return df  
          
    return df.drop(*columns)

def _rename(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Rename columns in dataframe.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation. Format: {"mappings": {"old_col1": "new_col1", ...}}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Dataframe with renamed columns.  
    """  
    mappings = params.get("mappings", {})  
    if not mappings:  
        return df  
          
    result_df = df  
    for old_name, new_name in mappings.items():  
        result_df = result_df.withColumnRenamed(old_name, new_name)  
          
    return result_df

def _cast(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Cast columns to specified types.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation. Format: {"columns": {"col1": "type1", ...}}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Dataframe with cast columns.  
    """  
    columns = params.get("columns", {})  
    if not columns:  
        return df  
          
    result_df = df  
    for col_name, col_type in columns.items():  
        result_df = result_df.withColumn(col_name, result_df[col_name].cast(col_type))  
          
    return result_df

def _with_column(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Add or replace column in dataframe.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation.   
            Format: {"column": "col_name", "expr": "expression"}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Dataframe with added/replaced column.  
    """  
    column_name = params.get("column")  
    expression = params.get("expr")  
      
    if not column_name or not expression:  
        return df  
          
    # Convert string expression to Column expression  
    expr_col = pyspark.sql.functions.expr(expression)  
    return df.withColumn(column_name, expr_col)

def _with_column_renamed(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Rename a column in dataframe.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation.   
            Format: {"existing": "old_col", "new": "new_col"}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Dataframe with renamed column.  
    """  
    existing = params.get("existing")  
    new = params.get("new")  
      
    if not existing or not new:  
        return df  
          
    return df.withColumnRenamed(existing, new)

def _filter(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Filter rows in dataframe.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation. Format: {"condition": "filter_condition"}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Filtered dataframe.  
    """  
    condition = params.get("condition")  
    if not condition:  
        return df  
          
    return df.filter(condition)

def _order_by(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Sort dataframe by columns.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation.   
            Format: {"columns": ["col1", "col2 desc", ...]}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Sorted dataframe.  
    """  
    columns = params.get("columns", [])  
    if not columns:  
        return df  
          
    return df.orderBy(*columns)

def _group_by(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Group dataframe by columns and aggregate.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation.   
            Format: {"columns": ["col1", "col2", ...],   
                    "aggs": [{"col": "col3", "agg": "sum"}, ...]}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Grouped and aggregated dataframe.  
    """  
    columns = params.get("columns", [])  
    aggs = params.get("aggs", [])  
      
    if not columns or not aggs:  
        return df  
          
    # Prepare aggregation expressions  
    agg_exprs = []  
    for agg in aggs:  
        col_name = agg.get("col")  
        agg_func = agg.get("agg")  
        alias = agg.get("alias", f"{agg_func}_{col_name}")  
          
        agg_method = getattr(pyspark.sql.functions, agg_func, None)  
        if agg_method:  
            agg_exprs.append(agg_method(col_name).alias(alias))  
      
    # Group by columns and apply aggregations  
    return df.groupBy(*columns).agg(*agg_exprs)

def _join(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Join dataframe with another dataframe.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation.   
            Format: {"right": "right_df_alias", "on": "join_condition",   
                    "how": "join_type"}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Joined dataframe.  
    """  
    right_df_alias = params.get("right")  
    join_condition = params.get("on")  
    join_type = params.get("how", "inner")  
      
    if not right_df_alias or not join_condition:  
        return df  
          
    if right_df_alias not in input_dfs:  
        raise ValueError(f"Right dataframe alias '{right_df_alias}' not found")  
          
    right_df = input_dfs[right_df_alias]  
      
    # Handle different types of join conditions  
    if isinstance(join_condition, str):  
        # String expression  
        condition = pyspark.sql.functions.expr(join_condition)  
    elif isinstance(join_condition, list):  
        # List of columns to join on  
        condition = [df[col] == right_df[col] for col in join_condition]  
    else:  
        # Default to using the expression as is  
        condition = join_condition  
      
    return df.join(right_df, on=condition, how=join_type)

def _union(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Union dataframe with other dataframes.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation.   
            Format: {"dfs": ["df_alias1", "df_alias2", ...]}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Unioned dataframe.  
    """  
    df_aliases = params.get("dfs", [])  
    if not df_aliases:  
        return df  
          
    result_df = df  
    for alias in df_aliases:  
        if alias not in input_dfs:  
            raise ValueError(f"Dataframe alias '{alias}' not found")  
        result_df = result_df.union(input_dfs[alias])  
          
    return result_df

def _union_by_name(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Union dataframes by column name.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation.   
            Format: {"dfs": ["df_alias1", "df_alias2", ...],   
                    "allowMissingColumns": true}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Unioned dataframe.  
    """  
    df_aliases = params.get("dfs", [])  
    allow_missing = params.get("allowMissingColumns", True)  
      
    if not df_aliases:  
        return df  
          
    result_df = df  
    for alias in df_aliases:  
        if alias not in input_dfs:  
            raise ValueError(f"Dataframe alias '{alias}' not found")  
        result_df = result_df.unionByName(input_dfs[alias], allowMissingColumns=allow_missing)  
          
    return result_df

def _limit(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Limit number of rows in dataframe.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation. Format: {"n": 100}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Limited dataframe.  
    """  
    n = params.get("n", 0)  
    if n <= 0:  
        return df  
          
    return df.limit(n)

def _distinct(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Return distinct rows from dataframe.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation (empty).  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Dataframe with distinct rows.  
    """  
    return df.distinct()

def _drop_duplicates(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Drop duplicate rows from dataframe.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation. Format: {"columns": ["col1", "col2", ...]}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Dataframe with duplicates dropped.  
    """  
    columns = params.get("columns", [])  
    if not columns:  
        return df.dropDuplicates()  
      
    return df.dropDuplicates(columns)

def _sample(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Sample rows from dataframe.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation.   
            Format: {"fraction": 0.1, "withReplacement": false, "seed": 42}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Sampled dataframe.  
    """  
    fraction = params.get("fraction", 0.1)  
    with_replacement = params.get("withReplacement", False)  
    seed = params.get("seed", None)  
      
    return df.sample(fraction=fraction, withReplacement=with_replacement, seed=seed)

def _explode(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Explode an array column to multiple rows.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation. Format: {"column": "array_column"}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Dataframe with exploded column.  
    """  
    column = params.get("column")  
    if not column:  
        return df  
          
    return df.withColumn(column, pyspark.sql.functions.explode(df[column]))

def _pivot(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Pivot a dataframe.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation.   
            Format: {"columns": ["col1", "col2"], "pivotColumn": "col3",   
                    "values": ["val1", "val2"], "aggregation": "sum"}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Pivoted dataframe.  
    """  
    group_cols = params.get("columns", [])  
    pivot_col = params.get("pivotColumn")  
    values = params.get("values", None)  
    agg_func = params.get("aggregation", "count")  
      
    if not group_cols or not pivot_col:  
        return df  
          
    # Get the aggregation function  
    agg_method = getattr(pyspark.sql.functions, agg_func)  
      
    # Create the pivot  
    grouped = df.groupBy(*group_cols)  
    if values:  
        return grouped.pivot(pivot_col, values).agg(agg_method("*"))  
    else:  
        return grouped.pivot(pivot_col).agg(agg_method("*"))

def _unpivot(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Unpivot a dataframe. Note: This is a custom implementation.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation.   
            Format: {"idColumns": ["id1", "id2"],   
                    "pivotColumns": ["pivot1", "pivot2"],   
                    "valueColumn": "value", "nameColumn": "name"}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Unpivoted dataframe.  
    """  
    id_columns = params.get("idColumns", [])  
    pivot_columns = params.get("pivotColumns", [])  
    value_column = params.get("valueColumn", "value")  
    name_column = params.get("nameColumn", "name")  
      
    if not pivot_columns:  
        return df  
          
    # Create expressions for stack function  
    stack_expr = ", ".join([f"'{col}', `{col}`" for col in pivot_columns])  
    select_expr = [f"`{col}`" for col in id_columns]  
    select_expr.append(f"stack({len(pivot_columns)}, {stack_expr}) as (`{name_column}`, `{value_column}`)")  
      
    return df.selectExpr(*select_expr)

def _summary(  
    self, df: DataFrame, params: Dict[str, Any], input_dfs: Dict[str, DataFrame]  
) -> DataFrame:  
    """Generate a summary of dataframe statistics.

    Args:  
        df: Input dataframe.  
        params: Parameters for the operation. Format: {"statistics": ["count", "min", "max"]}  
        input_dfs: Dictionary of all input dataframes.

    Returns:  
        Summary dataframe.  
    """  
    statistics = params.get("statistics", ["count", "mean", "stddev", "min", "max"])  
    return df.summary(*statistics)

class UDFTransformer(LoggingHandler):
"""A transformer class for User-Defined Functions (UDFs)."""

def __init__(self, spark_session):  
    """Construct UDFTransformer.

    Args:  
        spark_session: Spark session.  
    """  
    super().__init__(__name__)  
    self.spark = spark_session

def transform(  
    self,   
    input_dfs: Dict[str, DataFrame],   
    transformation_config: Dict[str, Any]  
) -> DataFrame:  
    """Transform using user-defined functions.

    Args:  
        input_dfs: Dictionary mapping dataframe aliases to dataframes.  
        transformation_config: Configuration for UDF transformation.  
            Format: {  
                "primary_df": "main",  
                "code": "def my_udf(x): return x.upper()",  
                "register_udfs": [  
                    {  
                        "name": "upper_udf",  
                        "function": "my_udf",  
                        "return_type": "string"  
                    }  
                ],  
                "apply": [  
                    {  
                        "output_column": "upper_name",  
                        "udf": "upper_udf",  
                        "input_columns": ["name"]  
                    }  
                ]  
            }

    Returns:  
        Transformed dataframe.  
    """  
    # Get the primary input dataframe  
    primary_alias = transformation_config.get("primary_df", next(iter(input_dfs.keys())))  
    result_df = input_dfs[primary_alias]  
      
    # Execute the UDF code  
    udf_code = transformation_config.get("code", "")  
    if not udf_code:  
        self.log.warning("No UDF code provided in transformation config")  
        return result_df  
          
    # Create a new namespace for the UDFs  
    udf_namespace = {}  
      
    # Execute the code in the namespace  
    try:  
        exec(udf_code, globals(), udf_namespace)  
    except Exception as e:  
        self.log.error(f"Error executing UDF code: {e}")  
        raise  
          
    # Register each UDF  
    registered_udfs = {}  
    for udf_config in transformation_config.get("register_udfs", []):  
        udf_name = udf_config.get("name")  
        function_name = udf_config.get("function")  
        return_type = udf_config.get("return_type", "string")  
          
        if not udf_name or not function_name:  
            continue  
              
        if function_name not in udf_namespace:  
            self.log.error(f"Function {function_name} not found in UDF code")  
            continue  
              
        # Get the function from the namespace  
        func = udf_namespace[function_name]  
          
        # Register as Spark UDF  
        registered_udfs[udf_name] = pyspark.sql.functions.udf(func, return_type)  
          
    # Apply the UDFs to the dataframe  
    for apply_config in transformation_config.get("apply", []):  
        output_column = apply_config.get("output_column")  
        udf_name = apply_config.get("udf")  
        input_columns = apply_config.get("input_columns", [])  
          
        if not output_column or not udf_name or not input_columns:  
            continue  
              
        if udf_name not in registered_udfs:  
            self.log.error(f"UDF {udf_name} not registered")  
            continue  
              
        # Apply the UDF to create or replace the output column  
        udf = registered_udfs[udf_name]  
        input_cols = [pyspark.sql.functions.col(col) for col in input_columns]  
        result_df = result_df.withColumn(output_column, udf(*input_cols))  
          
    return result_df  

End File# adidas/lakehouse-engine

lakehouse_engine/engine.py

"""The Lakehouse Engine module."""
from typing import Any, Dict, List, Optional, Tuple, Union

from pyspark.sql import DataFrame, SparkSession

from lakehouse_engine.core.definitions import (
INPUT_SPECS,
INPUT_TYPE,
OUTPUT_SPECS,
RESOURCE_PATH,
TABLE_NAME,
TRANSFORMATIONS,
)
from lakehouse_engine.io.reader import Reader
from lakehouse_engine.io.writer import Writer
from lakehouse_engine.transformers.transformers import (
ColumnTransformer,
SQLTransformer,
UDFTransformer,
)
from lakehouse_engine.utils.logging_handler import LoggingHandler

class LakehouseEngine(LoggingHandler):
"""Main class that orchestrates the Lakehouse Engine operations."""

def __init__(self, spark_session: SparkSession = None):  
    """Initialize the Lakehouse Engine.

    Args:  
        spark_session: The Spark session to use. If None, a new session will be created.  
    """  
    super().__init__(__name__)  
    # Initialize Spark session if not provided  
    self.spark = spark_session or SparkSession.builder.getOrCreate()  
      
    # Initialize components  
    self.reader = Reader(self.spark)  
    self.writer = Writer()  
      
    # Initialize transformers  
    self.transformers = {  
        "sql": SQLTransformer(self.spark),  
        "column": ColumnTransformer(self.spark),  
        "udf": UDFTransformer(self.spark),  
    }  
      
    self.log.info("Lakehouse Engine initialized")

def execute(self, config: Dict[str, Any]) -> List[Dict]:  
    """Execute the data processing pipeline defined in the configuration.

    Args:  
        config: The configuration dictionary with input specs, transformations, and output specs.

    Returns:  
        A list of metadata dictionaries from the write operations.  
    """  
    # 1. Read input data  
    input_dfs = self._read_input_data(config.get(INPUT_SPECS, []))  
    if not input_dfs:  
        self.log.warning("No input data to process")  
        return []

    # 2. Apply transformations  
    transformed_df = self._apply_transformations(  
        input_dfs, config.get(TRANSFORMATIONS, {})  
    )  
    if transformed_df is None:  
        self.log.info("No transformations applied, using first input as output")  
        # Use the first input dataframe as the output if no transformations applied  
        transformed_df = next(iter(input_dfs.values()))

    # 3. Write output data  
    output_specs = config.get(OUTPUT_SPECS, [])  
    metadata = []  
      
    if not output_specs:  
        self.log.warning("No output specifications provided, pipeline ends without writing data")  
    else:  
        for output_spec in output_specs:  
            output_metadata = self._write_output_data(transformed_df, output_spec)  
            metadata.extend(output_metadata)

    self.log.info("Pipeline execution completed successfully")  
    return metadata

def _read_input_data(self, input_specs: List[Dict]) -> Dict[str, DataFrame]:  
    """Read input data according to specifications.

    Args:  
        input_specs: List of input specifications.

    Returns:  
        Dictionary mapping input aliases to DataFrames.  
    """  
    input_dfs = {}  
      
    for spec in input_specs:  
        # Get unique identifier for this input (table_name or path)  
        alias = spec.get(TABLE_NAME) or spec.get(RESOURCE_PATH, "input")  
        # Remove path from alias if it's a full path  
        if '/' in alias:  
            alias = alias.split('/')[-1]  
          
        self.log.info(f"Reading input data: {alias}")  
          
        try:  
            # Read the data  
            input_format = spec.get(INPUT_TYPE)  
            df = self.reader.read(spec, input_format)  
              
            # Store with alias  
            input_dfs[alias] = df  
              
            self.log.info(f"Successfully read input: {alias}, {df.count()} rows")  
        except Exception as e:  
            self.log.error(f"Error reading input {alias}: {e}")  
            raise  
              
    return input_dfs

def _apply_transformations(  
    self, input_dfs: Dict[str, DataFrame], transformations: Dict[str, Any]  
) -> Optional[DataFrame]:  
    """Apply transformations to input data.

    Args:  
        input_dfs: Dictionary of input DataFrames.  
        transformations: Transformation specifications.

    Returns:  
        Transformed DataFrame, or None if no transformations applied.  
    """  
    if not transformations:  
        return None  
          
    # Get the transformer type  
    transformer_type = transformations.get("type", "sql")  
      
    if transformer_type not in self.transformers:  
        raise ValueError(f"Unknown transformer type: {transformer_type}")  
          
    # Get the appropriate transformer and apply it  
    transformer = self.transformers[transformer_type]  
    self.log.info(f"Applying {transformer_type} transformation")  
      
    try:  
        result_df = transformer.transform(input_dfs, transformations)  
        self.log.info(f"Transformation complete, resulting in {result_df.count()} rows")  
        return result_df  
    except Exception as e:  
        self.log.error(f"Error during transformation: {e}")  
        raise

def _write_output_data(  
    self, df: DataFrame, output_spec: Dict[str, Any]  
) -> List[Dict]:  
    """Write output data according to specification.

    Args:  
        df: DataFrame to write.  
        output_spec: Output specification.

    Returns:  
        List of metadata dictionaries from write operations.  
    """  
    table_name = output_spec.get(TABLE_NAME, "")  
    if table_name:  
        self.log.info(f"Writing output to: {table_name}")  
    else:  
        self.log.info(f"Writing output data")  
          
    try:  
        # Write the data  
        metadata = self.writer.write(df, output_spec)  
        return metadata  
    except Exception as e:  
        self.log.error(f"Error writing output data: {e}")  
        raise  

End File# adidas/lakehouse-engine

lakehouse_engine/utils/logging_handler.py

"""LoggingHandler module."""
import logging
import os
from typing import Callable, List, Optional, Set, Union

class LoggingHandler:
"""LoggingHandler class.

This class is used to handle logging for all the components of the lakehouse engine.  
Each component inherits from this class and can use its log attribute to log messages.

Example:  
    >>> class MyComponent(LoggingHandler):  
    ...     def __init__(self):  
    ...         super().__init__(__name__)  
    ...  
    ...     def do_something(self):  
    ...         self.log.info("Doing something")  
    ...         self.log.debug("debug message")  
"""

log_level: str = os.environ.get("LAKEHOUSE_ENGINE_LOG_LEVEL", "INFO")

def __init__(  
    self,  
    name: str,  
    log_level: Optional[str] = None,  
    formatter: Optional[str] = None,  
):  
    """Initialize the LoggingHandler.

    Args:  
        name: the name of the logger (usually __name__)  
        log_level: the log level for this handler (overrides the class level one if provided)  
        formatter: the log formatter for this handler  
    """  
    # Create logger  
    self.log = logging.getLogger(name)

    # Set log level from instance, class, or default  
    level = log_level or self.log_level or "INFO"  
    self.log.setLevel(self._get_log_level(level))

    # If logger doesn't have handlers, add a new one  
    if not self.log.handlers:  
        handler = logging.StreamHandler()  
        handler.setLevel(self.log.level)

        # Set formatter  
        if formatter:  
            handler.setFormatter(logging.Formatter(formatter))  
        else:  
            handler.setFormatter(  
                logging.Formatter(  
                    "%(asctime)s - %(name)s - %(levelname)s - %(message)s"  
                )  
            )

        # Add handler to logger  
        self.log.addHandler(handler)

def _get_log_level(self, level_name: str) -> int:  
    """Get logging level from string.

    Args:  
        level_name: the name of the log level

    Returns:  
        The logging level as an int.  
    """  
    levels = {  
        "DEBUG": logging.DEBUG,  
        "INFO": logging.INFO,  
        "WARNING": logging.WARNING,  
        "ERROR": logging.ERROR,  
        "CRITICAL": logging.CRITICAL,  
    }  
    return levels.get(level_name.upper(), logging.INFO)  

End File"""Core definitions of the lakehouse engine."""
from enum import Enum

class InputFormat(str, Enum):
"""Input format types."""

DELTA = "delta"  
CSV = "csv"  
PARQUET = "parquet"  
AVRO = "avro"  
JSON = "json"  
JDBC = "jdbc"  
SNOWFLAKE = "snowflake"  
MULTI = "multi"

class OutputFormat(str, Enum):
"""Output format types."""

DELTA = "delta"  
BRONZE = "bronze"  
SILVER = "silver"  
GOLD = "gold"  
CSV = "csv"  
PARQUET = "parquet"  
AVRO = "avro"  
JSON = "json"  
JDBC = "jdbc"  
SNOWFLAKE = "snowflake"

class TableType(str, Enum):
"""Table types."""

BRONZE = "bronze"  
SILVER = "silver"  
GOLD = "gold"

Input specs field definitions

INPUT_SPECS = "input_specs"
INPUT_TYPE = "input_type"
RESOURCE_PATH = "resource_path"

Transformation field definitions

TRANSFORMATIONS = "transformations"
PRIMARY_KEYS = "primary_keys"

Output specs field definitions

OUTPUT_SPECS = "output_specs"
OUTPUT_TYPE = "output_type"
OUTPUT_PATH = "output_path"
OUTPUT_FORMAT = "output_format"
OUTPUT_MODE = "output_mode"
OUTPUT_OPTIONS = "output_options"
OUTPUT_TO = "output_to"

Field definitions used in both inputs and outputs

TABLE_NAME = "table_name"
TABLE = "table"
PATH = "path"
FORMAT = "format"
MODE = "mode"
OPTIONS = "options"

Delta Lake specific fields

OPTIMIZE = "optimize"
Z_ORDER_BY = "z_order_by"
PARTITION_BY = "partition_by"
PARTITION_COLUMNS = "partition_columns"
WATERMARK_COLUMN = "watermark_column"
KEY_COLUMN = "key_column"
OVERWRITE_SCHEMA = "overwrite_schema"
OVERWRITE_PARTITIONS = "overwrite_partitions"
OVERWRITE_AT_EXECUTION_TIME = "overwrite_at_execution_time"
PROCESSING_TIME = "processing_time"

CDC specific fields

CDC_CONFIG = "cdc_config"
CDC_ENABLED = "cdc_enabled"
CDC_KEY_COLUMNS = "cdc_key_columns"
CDC_WATERMARK_COLUMN = "cdc_watermark_column"

JDBC specific fields

URL = "url"
DRIVER = "driver"
USER = "user"
PASSWORD = "password"
SAVE_MODE = "saveMode"

Snowflake specific fields

SNOWFLAKE_OPTIONS = "snowflake_options"
End File# adidas/lakehouse-engine

lakehouse_engine/algorithms/extract/delta_extract_pipeline.py

"""DeltaExtractPipeline class."""
from typing import List, Optional

from pyspark.sql import DataFrame, SparkSession
from delta.tables import DeltaTable
from typing import Dict, Any

from lakehouse_engine.algorithms.exceptions import (
ConfigurationError,
)
from lakehouse_engine.utils.logging_handler import LoggingHandler

class DeltaExtractPipeline(LoggingHandler):
"""
A pipeline for extracting data changes from Delta tables.

This pipeline supports various extraction modes such as incremental, snapshot,   
CDC (Change Data Capture), and time travel.  
"""

def __init__(self, spark: SparkSession):  
    """Initialize the DeltaExtractPipeline.

    Args:  
        spark: Spark session.  
    """  
    super().__init__(__name__)  
    self.spark = spark

def extract(  
    self, extract_config: Dict[str, Any]  
) -> DataFrame:  
    """  
    Extract data changes from a Delta table.

    Args:  
        extract_config: Configuration for the extraction.  
            Required keys:  
            - source_table: The Delta table path or name to extract from  
            - mode: The extraction mode ('incremental', 'snapshot', 'cdc', 'time_travel')  
              
            Mode-specific keys:  
            - For 'incremental':  
              * watermark_column: Column to track changes (usually a timestamp)  
              * watermark_value: The last processed watermark value (optional)  
                
            - For 'cdc':  
              * version_column: Column tracking record versions  
              * operation_column: Column containing operation type (insert, update, delete)  
              * watermark_column: Same as incremental  
              * watermark_value: Same as incremental  
                
            - For 'time_travel':  
              * version: The table version to read  
              * timestamp: The timestamp to read (alternative to version)  
              
    Returns:  
        DataFrame with the extracted data.  
      
    Raises:  
        ConfigurationError: If the configuration is invalid.  
    """  
    # Extract required configuration  
    source_table = extract_config.get("source_table")  
    mode = extract_config.get("mode", "snapshot").lower()  
      
    if not source_table:  
        raise ConfigurationError("source_table is required in the extract configuration")  
          
    # Check if table exists  
    table_exists = self._table_exists(source_table)  
    if not table_exists:  
        raise ConfigurationError(f"Delta table '{source_table}' does not exist")  
          
    # Process according to mode  
    if mode == "incremental":  
        return self._extract_incremental(source_table, extract_config)  
    elif mode == "snapshot":  
        return self._extract_snapshot(source_table)  
    elif mode == "cdc":  
        return self._extract_cdc(source_table, extract_config)  
    elif mode == "time_travel":  
        return self._extract_time_travel(source_table, extract_config)  
    else:  
        raise ConfigurationError(f"Unknown extraction mode: {mode}")  
  
def _table_exists(self, table_path: str) -> bool:  
    """  
    Check if a Delta table exists.

    Args:  
        table_path: The table path or name.

    Returns:  
        True if the table exists, False otherwise.  
    """  
    try:  
        if table_path.startswith("delta.`") or "/" in table_path:  
            # It's a path  
            _ = DeltaTable.forPath(self.spark, table_path.replace("delta.`", "").replace("`", ""))  
        else:  
            # It's a table name  
            _ = DeltaTable.forName(self.spark, table_path)  
        return True  
    except Exception:  
        return False  
  
def _extract_snapshot(self, source_table: str) -> DataFrame:  
    """  
    Extract a snapshot of the Delta table.

    Args:  
        source_table: The table path or name.

    Returns:  
        DataFrame with the current snapshot.  
    """  
    self.log.info(f"Extracting snapshot from {source_table}")  
      
    # Simply read the current state of the table  
    if source_table.startswith("delta.`") or "/" in source_table:  
        return self.spark.read.format("delta").load(  
            source_table.replace("delta.`", "").replace("`", "")  
        )  
    else:  
        return self.spark.table(source_table)  
  
def _extract_incremental(  
    self, source_table: str, extract_config: Dict[str, Any]  
) -> DataFrame:  
    """  
    Extract incremental changes from a Delta table based on a watermark column.

    Args:  
        source_table: The table path or name.  
        extract_config: The extraction configuration.

    Returns:  
        DataFrame with incremental changes.  
          
    Raises:  
        ConfigurationError: If watermark_column is missing.  
    """  
    watermark_column = extract_config.get("watermark_column")  
    watermark_value = extract_config.get("watermark_value")  
      
    if not watermark_column:  
        raise ConfigurationError("watermark_column is required for incremental extraction")  
          
    self.log.info(  
        f"Extracting incremental changes from {source_table} "  
        f"where {watermark_column} > {watermark_value}"  
    )  
      
    # Read the table  
    if source_table.startswith("delta.`") or "/" in source_table:  
        df = self.spark.read.format("delta").load(  
            source_table.replace("delta.`", "").replace("`", "")  
        )  
    else:  
        df = self.spark.table(source_table)  
      
    # Filter by watermark if provided  
    if watermark_value is not None:  
        df = df.filter(f"{watermark_column} > '{watermark_value}'")  
          
    return df  
  
def _extract_cdc(  
    self, source_table: str, extract_config: Dict[str, Any]  
) -> DataFrame:  
    """  
    Extract CDC changes from a Delta table.

    Args:  
        source_table: The table path or name.  
        extract_config: The extraction configuration.

    Returns:  
        DataFrame with CDC changes.  
          
    Raises:  
        ConfigurationError: If required CDC columns are missing.  
    """  
    version_column = extract_config.get("version_column")  
    operation_column = extract_config.get("operation_column")  
    watermark_column = extract_config.get("watermark_column")  
    watermark_value = extract_config.get("watermark_value")  
      
    if not version_column or not operation_column:  
        raise ConfigurationError(  
            "version_column and operation_column are required for CDC extraction"  
        )  
          
    self.log.info(f"Extracting CDC changes from {source_table}")  
      
    # Read the table  
    if source_table.startswith("delta.`") or "/" in source_table:  
        df = self.spark.read.format("delta").load(  
            source_table.replace("delta.`", "").replace("`", "")  
        )  
    else:  
        df = self.spark.table(source_table)  
      
    # Filter by watermark if provided  
    if watermark_column and watermark_value is not None:  
        df = df.filter(f"{watermark_column} > '{watermark_value}'")  
          
    return df  
  
def _extract_time_travel(  
    self, source_table: str, extract_config: Dict[str, Any]  
) -> DataFrame:  
    """  
    Extract a specific version or timestamp of a Delta table.

    Args:  
        source_table: The table path or name.  
        extract_config: The extraction configuration.

    Returns:  
        DataFrame with the data at the specified version or timestamp.  
          
    Raises:  
        ConfigurationError: If neither version nor timestamp is provided.  
    """  
    version = extract_config.get("version")  
    timestamp = extract_config.get("timestamp")  
      
    if version is None and timestamp is None:  
        raise ConfigurationError(  
            "Either version or timestamp is required for time travel extraction"  
        )  
          
    # Prepare the appropriate reader based on what was provided  
    reader = self.spark.read.format("delta")  
      
    if version is not None:  
        self.log.info(f"Extracting version {version} from {source_table}")  
        reader = reader.option("versionAsOf", version)  
    else:  
        self.log.info(f"Extracting timestamp {timestamp} from {source_table}")  
        reader = reader.option("timestampAsOf", timestamp)  
          
    # Read the table  
    if source_table.startswith("delta.`") or "/" in source_table:  
        return reader.load(source_table.replace("delta.`", "").replace("`", ""))  
    else:  
        # For catalog tables with time travel, we need to use different approach  
        if version is not None:  
            return self.spark.sql(f"SELECT * FROM {source_table} VERSION AS OF {version}")  
        else:  
            return self.spark.sql(f"SELECT * FROM {source_table} TIMESTAMP AS OF '{timestamp}'")  

End File"""DataQuality class for data validation."""
from typing import Dict, List, Optional

from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.functions as F

from lakehouse_engine.algorithms.exceptions import (
DataQualityCheckException,
DataQualityConfigurationError,
)
from lakehouse_engine.utils.logging_handler import LoggingHandler

class DataQuality(LoggingHandler):
"""
Data quality validation class for Spark DataFrames.

This class provides data quality checks like completeness, uniqueness,   
conformity, and custom validations for Spark DataFrames.  
"""

def __init__(self, spark: SparkSession, fail_on_error: bool = True):  
    """Initialize the DataQuality class.

    Args:  
        spark: Spark session.  
        fail_on_error: Whether to raise an exception when checks fail (default True).  
    """  
    super().__init__(__name__)  
    self.spark = spark  
    self.fail_on_error = fail_on_error

def validate(  
    self, df: DataFrame, dq_config: Dict  
) -> Dict:  
    """  
    Validate a DataFrame using the provided data quality configuration.

    Args:  
        df: DataFrame to validate.  
        dq_config: Data quality configuration dictionary.  
            Should contain a 'checks' key with a list of check configurations.

    Returns:  
        Dictionary with validation results.  
          
    Raises:  
        DataQualityConfigurationError: If the configuration is invalid.  
        DataQualityCheckException: If checks fail and fail_on_error is True.  
    """  
    if not isinstance(dq_config, dict) or "checks" not in dq_config:  
        raise DataQualityConfigurationError(  
            "Data quality configuration must be a dictionary with a 'checks' key"  
        )  
          
    checks = dq_config["checks"]  
    if not isinstance(checks, list) or not checks:  
        raise DataQualityConfigurationError(  
            "The 'checks' key must contain a non-empty list of check configurations"  
        )  
          
    # Initialize results  
    all_results = {  
        "passed": True,  
        "total_checks": len(checks),  
        "passed_checks": 0,  
        "failed_checks": 0,  
        "check_results": []  
    }  
      
    # Run each check  
    for check in checks:  
        check_type = check.get("type")  
        if not check_type:  
            raise DataQualityConfigurationError("Each check must have a 'type' key")  
              
        check_result = self._run_check(df, check_type, check)  
        all_results["check_results"].append(check_result)  
          
        # Update summary statistics  
        if check_result["passed"]:  
            all_results["passed_checks"] += 1  
        else:  
            all_results["failed_checks"] += 1  
            all_results["passed"] = False  
              
    # Handle failure if needed  
    if not all_results["passed"] and self.fail_on_error:  
        failed_checks = [  
            f"{r['type']}: {r['message']}"   
            for r in all_results["check_results"]   
            if not r["passed"]  
        ]  
        failed_message = ", ".join(failed_checks)  
        raise DataQualityCheckException(  
            f"Data quality checks failed: {failed_message}"  
        )  
          
    return all_results  
  
def _run_check(  
    self, df: DataFrame, check_type: str, check_config: Dict  
) -> Dict:  
    """  
    Run a specific data quality check.

    Args:  
        df: DataFrame to check.  
        check_type: Type of check to run.  
        check_config: Check configuration.

    Returns:  
        Dictionary with check results.  
    """  
    # Map check types to handler methods  
    check_handlers = {  
        "completeness": self._check_completeness,  
        "uniqueness": self._check_uniqueness,   
        "conformity": self._check_conformity,  
        "integrity": self._check_integrity,  
        "accuracy": self._check_accuracy,  
        "custom": self._check_custom,  
        "schema": self._check_schema,  
    }  
      
    # Get the handler for this check type  
    handler = check_handlers.get(check_type)  
    if not handler:  
        return {  
            "type": check_type,  
            "passed": False,  
            "message": f"Unknown check type: {check_type}",  
        }  
          
    # Initialize result with basic info  
    result = {  
        "type": check_type,  
        "name": check_config.get("name", check_type),  
        "description": check_config.get("description", ""),  
        "columns": check_config.get("columns", []),  
        "severity": check_config.get("severity", "error"),  
    }  
      
    try:  
        # Run the check  
        check_result = handler(df, check_config)  
        result.update(check_result)  
          
        # Log the result  
        if result["passed"]:  
            self.log.info(f"Check '{result['name']}' passed")  
        else:  
            log_method = self.log.error if result["severity"] == "error" else self.log.warning  
            log_method(f"Check '{result['name']}' failed: {result['message']}")  
              
        return result  
    except Exception as e:  
        # Handle errors in check execution  
        self.log.exception(f"Error running check '{check_type}': {str(e)}")  
        return {  
            "type": check_type,  
            "name": check_config.get("name", check_type),  
            "passed": False,  
            "message": f"Error executing check: {str(e)}",  
            "severity": check_config.get("severity", "error"),  
        }  
  
def _check_completeness(  
    self, df: DataFrame, check_config: Dict  
) -> Dict:  
    """  
    Check for null/empty values in specified columns.

    Args:  
        df: DataFrame to check.  
        check_config: Check configuration.  
            Required keys:  
            - columns: List of columns to check  
            - threshold: Maximum allowed percentage of nulls (0-100)

    Returns:  
        Dictionary with check results.  
    """  
    columns = check_config.get("columns", [])  
    threshold = check_config.get("threshold", 0)  
      
    if not columns:  
        return {  
            "passed": False,  
            "message": "No columns specified for completeness check",  
        }  
          
    # Calculate nulls for each column  
    null_counts = {}  
    total_rows = df.count()  
      
    if total_rows == 0:  
        return {  
            "passed": True,  
            "message": "DataFrame is empty, completeness check skipped",  
        }  
          
    for col in columns:  
        null_count = df.filter(F.col(col).isNull() | F.trim(F.col(col)) == "").count()  
        null_percent = (null_count / total_rows) * 100  
        null_counts[col] = {"count": null_count, "percent": null_percent}  
          
    # Check if any column exceeds the threshold  
    failed_columns = {  
        col: stats for col, stats in null_counts.items()   
        if stats["percent"] > threshold  
    }  
      
    if failed_columns:  
        col_messages = [  
            f"{col}: {stats['percent']:.2f}% null ({stats['count']} rows)"   
            for col, stats in failed_columns.items()  
        ]  
        return {  
            "passed": False,  
            "message": f"Columns exceed null threshold of {threshold}%: {', '.join(col_messages)}",  
            "details": null_counts,  
        }  
    else:  
        return {  
            "passed": True,  
            "message": f"All columns have nulls within threshold of {threshold}%",  
            "details": null_counts,  
        }  
  
def _check_uniqueness(  
    self, df: DataFrame, check_config: Dict  
) -> Dict:  
    """  
    Check for uniqueness of values in specified columns.

    Args:  
        df: DataFrame to check.  
        check_config: Check configuration.  
            Required keys:  
            - columns: List of columns forming the unique key  
            - allow_duplicates: False to fail on any duplicates

    Returns:  
        Dictionary with check results.  
    """  
    columns = check_config.get("columns", [])  
    allow_duplicates = check_config.get("allow_duplicates", False)  
      
    if not columns:  
        return {  
            "passed": False,  
            "message": "No columns specified for uniqueness check",  
        }  
          
    # Count total and distinct values  
    total_rows = df.count()  
    distinct_rows = df.select(*columns).distinct().count()  
    duplicate_rows = total_rows - distinct_rows  
      
    if duplicate_rows == 0 or allow_duplicates:  
        return {  
            "passed": True,  
            "message": f"No duplicates found in columns: {', '.join(columns)}",  
            "details": {  
                "total_rows": total_rows,  
                "unique_rows": distinct_rows,  
                "duplicate_rows": duplicate_rows,  
            },  
        }  
    else:  
        return {  
            "passed": False,  
            "message": f"Found {duplicate_rows} duplicate values in columns: {', '.join(columns)}",  
            "details": {  
                "total_rows": total_rows,  
                "unique_rows": distinct_rows,  
                "duplicate_rows": duplicate_rows,  
            },  
        }  
  
def _check_conformity(  
    self, df: DataFrame, check_config: Dict  
) -> Dict:  
    """  
    Check that values conform to expected patterns or ranges.

    Args:  
        df: DataFrame to check.  
        check_config: Check configuration.  
            Required keys:  
            - column: Column to check  
            - pattern: Regex pattern for the column values  
              OR  
            - min: Minimum allowed value  
            - max: Maximum allowed value  
              OR  
            - allowed_values: List of allowed values

    Returns:  
        Dictionary with check results.  
    """  
    column = check_config.get("column")  
    pattern = check_config.get("pattern")  
    min_value = check_config.get("min")  
    max_value = check_config.get("max")  
    allowed_values = check_config.get("allowed_values")  
    threshold = check_config.get("threshold", 0)  # % of non-conforming values allowed  
      
    if not column:  
        return {  
            "passed": False,  
            "message": "No column specified for conformity check",  
        }  
          
    # Total rows  
    total_rows = df.count()  
    if total_rows == 0:  
        return {  
            "passed": True,  
            "message": "DataFrame is empty, conformity check skipped",  
        }  
          
    # Check based on the provided parameters  
    if pattern:  
        # Regex pattern check  
        non_conforming = df.filter(~F.col(column).rlike(pattern)).count()  
    elif min_value is not None and max_value is not None:  
        # Range check  
        non_conforming = df.filter(  
            (F.col(column) < min_value) | (F.col(column) > max_value)  
        ).count()  
    elif allowed_values:  
        # Allowed values check  
        non_conforming = df.filter(~F.col(column).isin(allowed_values)).count()  
    else:  
        return {  
            "passed": False,  
            "message": "Conformity check requires pattern, min/max, or allowed_values",  
        }  
          
    # Calculate percentage of non-conforming values  
    non_conforming_percent = (non_conforming / total_rows) * 100  
      
    if non_conforming_percent <= threshold:  
        return {  
            "passed": True,  
            "message": f"Column {column} has {non_conforming_percent:.2f}% non-conforming values, within threshold of {threshold}%",  
            "details": {  
                "non_conforming_rows": non_conforming,  
                "total_rows": total_rows,  
                "non_conforming_percent": non_conforming_percent,  
            },  
        }  
    else:  
        return {  
            "passed": False,  
            "message": f"Column {column} has {non_conforming_percent:.2f}% non-conforming values, exceeds threshold of {threshold}%",  
            "details": {  
                "non_conforming_rows": non_conforming,  
                "total_rows": total_rows,  
                "non_conforming_percent": non_conforming_percent,  
            },  
        }  
  
def _check_integrity(  
    self, df: DataFrame, check_config: Dict  
) -> Dict:  
    """  
    Check referential integrity against a reference DataFrame.

    Args:  
        df: DataFrame to check.  
        check_config: Check configuration.  
            Required keys:  
            - columns: Columns in this DataFrame forming the foreign key  
            - reference_table: Reference table or DataFrame name  
            - reference_columns: Columns in reference table forming the primary key  
            - allow_orphans: Whether to allow orphan records

    Returns:  
        Dictionary with check results.  
    """  
    columns = check_config.get("columns", [])  
    reference_table = check_config.get("reference_table")  
    reference_columns = check_config.get("reference_columns", columns)  
    allow_orphans = check_config.get("allow_orphans", False)  
      
    if not columns or not reference_table:  
        return {  
            "passed": False,  
            "message": "Integrity check requires columns and reference_table",  
        }  
          
    # Read the reference table  
    try:  
        ref_df = self.spark.table(reference_table)  
    except Exception as e:  
        return {  
            "passed": False,  
            "message": f"Failed to read reference table {reference_table}: {str(e)}",  
        }  
          
    # Check for orphan records (records in df not in ref_df)  
    # First, select only the relevant columns from both dataframes  
    df_keys = df.select(*columns)  
    ref_keys = ref_df.select(*reference_columns)  
      
    # Find orphans using except  
    orphans = df_keys.exceptAll(ref_keys)  
    orphan_count = orphans.count()  
      
    if orphan_count == 0 or allow_orphans:  
        return {  
            "passed": True,  
            "message": f"No orphan records found" if orphan_count == 0 else f"{orphan_count} orphan records found, but allowed",  
            "details": {  
                "orphan_count": orphan_count,  
            },  
        }  
    else:  
        return {  
            "passed": False,  
            "message": f"Found {orphan_count} orphan records",  
            "details": {  
                "orphan_count": orphan_count,  
            },  
        }  
  
def _check_accuracy(  
    self, df: DataFrame, check_config: Dict  
) -> Dict:  
    """  
    Check data accuracy using comparisons or derived values.

    Args:  
        df: DataFrame to check.  
        check_config: Check configuration.  
            Required keys:  
            - column: Column to check  
            - relation: One of 'equal', 'not_equal', 'less_than', 'greater_than'  
            - value: Value to compare against  
              OR  
            - expression: SQL expression to evaluate

    Returns:  
        Dictionary with check results.  
    """  
    column = check_config.get("column")  
    relation = check_config.get("relation")  
    value = check_config.get("value")  
    expression = check_config.get("expression")  
    threshold = check_config.get("threshold", 0)  # % of inaccurate values allowed  
      
    if not ((column and relation and value is not None) or expression):  
        return {  
            "passed": False,  
            "message": "Accuracy check requires either column+relation+value or expression",  
        }  
          
    # Total rows  
    total_rows = df.count()  
    if total_rows == 0:  
        return {  
            "passed": True,  
            "message": "DataFrame is empty, accuracy check skipped",  
        }  
          
    # Check based on the provided parameters  
    if expression:  
        # Use SQL expression  
        inaccurate = df.filter(~F.expr(expression)).count()  
    else:  
        # Use column relation value  
        if relation == "equal":  
            inaccurate = df.filter(F.col(column) != value).count()  
        elif relation == "not_equal":  
            inaccurate = df.filter(F.col(column) == value).count()  
        elif relation == "less_than":  
            inaccurate = df.filter(F.col(column) >= value).count()  
        elif relation == "greater_than":  
            inaccurate = df.filter(F.col(column) <= value).count()  
        else:  
            return {  
                "passed": False,  
                "message": f"Unknown relation: {relation}",  
            }  
              
    # Calculate percentage of inaccurate values  
    inaccurate_percent = (inaccurate / total_rows) * 100  
      
    if inaccurate_percent <= threshold:  
        return {  
            "passed": True,  
            "message": f"Accuracy check passed with {inaccurate_percent:.2f}% inaccurate values, within threshold of {threshold}%",  
            "details": {  
                "inaccurate_rows": inaccurate,  
                "total_rows": total_rows,  
                "inaccurate_percent": inaccurate_percent,  
            },  
        }  
    else:  
        return {  
            "passed": False,  
            "message": f"Accuracy check failed with {inaccurate_percent:.2f}% inaccurate values, exceeds threshold of {threshold}%",  
            "details": {  
                "inaccurate_rows": inaccurate,  
                "total_rows": total_rows,  
                "inaccurate_percent": inaccurate_percent,  
            },  
        }  
  
def _check_custom(  
    self, df: DataFrame, check_config: Dict  
) -> Dict:  
    """  
    Run a custom SQL expression check.

    Args:  
        df: DataFrame to check.  
        check_config: Check configuration.  
            Required keys:  
            - condition: SQL expression that should evaluate to true for all valid rows  
            - name: Name of the check

    Returns:  
        Dictionary with check results.  
    """  
    condition = check_config.get("condition")  
    threshold = check_config.get("threshold", 0)  # % of failing rows allowed  
      
    if not condition:  
        return {  
            "passed": False,  
            "message": "Custom check requires a condition expression",  
        }  
          
    # Register the dataframe as a temp view for the SQL expression  
    view_name = f"data_quality_check_view_{hash(condition) % 1000000}"  
    df.createOrReplaceTempView(view_name)  
      
    try:  
        # Count total and failing rows  
        total_rows = df.count()  
        if total_rows == 0:  
            return {  
                "passed": True,  
                "message": "DataFrame is empty, custom check skipped",  
            }  
              
        # Count rows that don't satisfy the condition  
        failing_rows = df.filter(~F.expr(condition)).count()  
        failing_percent = (failing_rows / total_rows) * 100  
          
        if failing_percent <= threshold:  
            return {  
                "passed": True,  
                "message": f"Custom check passed with {failing_percent:.2f}% failing rows, within threshold of {threshold}%",  
                "details": {  
                    "failing_rows": failing_rows,  
                    "total_rows": total_rows,  
                    "failing_percent": failing_percent,  
                },  
            }  
        else:  
            return {  
                "passed": False,  
                "message": f"Custom check failed with {failing_percent:.2f}% failing rows, exceeds threshold of {threshold}%",  
                "details": {  
                    "failing_rows": failing_rows,  
                    "total_rows": total_rows,  
                    "failing_percent": failing_percent,  
                },  
            }  
    finally:  
        # Clean up the temp view  
        self.spark.catalog.dropTempView(view_name)  
  
def _check_schema(  
    self, df: DataFrame, check_config: Dict  
) -> Dict:  
    """  
    Check the DataFrame schema against expected columns and types.

    Args:  
        df: DataFrame to check.  
        check_config: Check configuration.  
            Required keys:  
            - expected_columns: List of expected column names  
              OR  
            - expected_schema: List of {name, type} dictionaries

    Returns:  
        Dictionary with check results.  
    """  
    expected_columns = check_config.get("expected_columns", [])  
    expected_schema = check_config.get("expected_schema", [])  
      
    if not expected_columns and not expected_schema:  
        return {  
            "passed": False,  
            "message": "Schema check requires expected_columns or expected_schema",  
        }  
          
    # Get actual columns  
    actual_columns = df.columns  
      
    if expected_columns:  
        # Check column names only  
        missing_columns = [col for col in expected_columns if col not in actual_columns]  
          
        if missing_columns:  
            return {  
                "passed": False,  
                "message": f"Schema is missing expected columns: {', '.join(missing_columns)}",  
                "details": {  
                    "expected_columns": expected_columns,  
                    "actual_columns": actual_columns,  
                    "missing_columns": missing_columns,  
                },  
            }  
        else:  
            return {  
                "passed": True,  
                "message": "All expected columns are present",  
                "details": {  
                    "expected_columns": expected_columns,  
                    "actual_columns": actual_columns,  
                },  
            }  
              
    else:  
        # Check column names and types  
        actual_schema = {  
            field.name: field.dataType.simpleString()   
            for field in df.schema.fields  
        }  
          
        # Check for missing columns and type mismatches  
        schema_issues = []  
          
        for expected in expected_schema:  
            col_name = expected.get("name")  
            col_type = expected.get("type")  
              
            if col_name not in actual_columns:  
                schema_issues.append(f"Column {col_name} is missing")  
            elif col_type and actual_schema[col_name] != col_type:  
                schema_issues.append(  
                    f"Column {col_name} has type {actual_schema[col_name]}, expected {col_type}"  
                )  
                  
        if schema_issues:  
            return {  
                "passed": False,  
                "message": f"Schema check failed: {', '.join(schema_issues)}",  
                "details": {  
                    "expected_schema": expected_schema,  
                    "actual_schema": [  
                        {"name": name, "type": type_str}   
                        for name, type_str in actual_schema.items()  
                    ],  
                },  
            }  
        else:  
            return {  
                "passed": True,  
                "message": "Schema matches expectations",  
                "details": {  
                    "expected_schema": expected_schema,  
                    "actual_schema": [  
                        {"name": name, "type": type_str}   
                        for name, type_str in actual_schema.items()  
                    ],  
                },  
            }  

End File# adidas/lakehouse-engine
"""DataQuality class for data validation."""
from typing import Dict, List, Optional

from pyspark.sql import DataFrame
import pyspark.sql.functions as F

from lakehouse_engine.utils.logging_handler import LoggingHandler

class SchemaEvolution(LoggingHandler):
"""
Schema evolution class for managing schema changes in Delta tables.

This class provides methods for:  
- Comparing schemas between dataframes  
- Detecting new, removed, and changed columns  
- Applying schema evolution strategies  
"""

def __init__(self, spark_session, allow_type_widening=True, allow_column_addition=True):  
    """Initialize the SchemaEvolution class.

    Args:  
        spark_session: Spark session to use.  
        allow_type_widening: Whether to allow widening of column types (e.g. int to long)  
        allow_column_addition: Whether to allow adding new columns  
    """  
    super().__init__(__name__)  
    self.spark = spark_session  
    self.allow_type_widening = allow_type_widening  
    self.allow_column_addition = allow_column_addition  
      
    # Type compatibility mappings for widening  
    self.type_widening_map = {  
        "byte": ["short", "int", "long", "float", "double", "decimal"],  
        "short": ["int", "long", "float", "double", "decimal"],  
        "int": ["long", "float", "double", "decimal"],  
        "long": ["float", "double", "decimal"],  
        "float": ["double", "decimal"],  
        "decimal": [],  # Can only widen precision, not type  
        "string": [],  # Can't widen string  
        "boolean": [],  # Can't widen boolean  
        "date": ["timestamp"],  
        "timestamp": [],  # Can't widen timestamp  
    }

def compare_schemas(  
    self, source_df: DataFrame, target_df: DataFrame  
) -> Dict:  
    """  
    Compare schemas between source and target DataFrames.

    Args:  
        source_df: Source DataFrame with new schema.  
        target_df: Target DataFrame with existing schema.

    Returns:  
        Dictionary with schema comparison results.  
    """  
    self.log.info("Comparing schemas between source and target")  
      
    source_schema = {  
        field.name: field.dataType.simpleString()   
        for field in source_df.schema.fields  
    }  
      
    target_schema = {  
        field.name: field.dataType.simpleString()   
        for field in target_df.schema.fields  
    }  
      
    # Find new, removed, and changed columns  
    new_columns = [  
        {"name": name, "type": type_}  
        for name, type_ in source_schema.items()  
        if name not in target_schema  
    ]  
      
    removed_columns = [  
        {"name": name, "type": type_}  
        for name, type_ in target_schema.items()  
        if name not in source_schema  
    ]  
      
    changed_columns = [  
        {"name": name, "source_type": source_type, "target_type": target_schema[name]}  
        for name, source_type in source_schema.items()  
        if name in target_schema and source_type != target_schema[name]  
    ]  
      
    comparison = {  
        "new_columns": new_columns,  
        "removed_columns": removed_columns,  
        "changed_columns": changed_columns,  
        "source_schema": source_schema,  
        "target_schema": target_schema,  
        "has_changes": bool(new_columns or removed_columns or changed_columns)  
    }  
      
    if comparison["has_changes"]:  
        self.log.info(f"Schema differences detected: "  
                     f"{len(new_columns)} new, {len(removed_columns)} removed, "  
                     f"{len(changed_columns)} changed columns")  
    else:  
        self.log.info("No schema differences detected")  
          
    return comparison

def validate_schema_changes(  
    self, schema_comparison: Dict, strict: bool = False  
) -> Dict:  
    """  
    Validate if schema changes are allowed according to the current settings.

    Args:  
        schema_comparison: Schema comparison from compare_schemas().  
        strict: Whether to use strict validation (no changes allowed).

    Returns:  
        Validation result dictionary.  
    """  
    if strict:  
        # In strict mode, no changes are allowed  
        is_valid = not schema_comparison["has_changes"]  
        message = "Schema changes not allowed in strict mode" if not is_valid else "No schema changes detected"  
        return {  
            "is_valid": is_valid,  
            "message": message,  
            "invalid_changes": schema_comparison if not is_valid else {}  
        }  
      
    # Non-strict validation based on settings  
    invalid_changes = {  
        "new_columns": [],  
        "changed_columns": []  
    }  
      
    # Validate new columns  
    if not self.allow_column_addition:  
        invalid_changes["new_columns"] = schema_comparison["new_columns"]  
      
    # Validate type changes  
    for changed in schema_comparison["changed_columns"]:  
        source_type = changed["source_type"]  
        target_type = changed["target_type"]  
          
        # Check if type widening is valid  
        is_widening_valid = self._is_valid_type_widening(source_type, target_type)  
          
        if not is_widening_valid or not self.allow_type_widening:  
            invalid_changes["changed_columns"].append(changed)  
      
    # Check if we have any invalid changes  
    has_invalid_changes = (  
        invalid_changes["new_columns"] or   
        invalid_changes["changed_columns"]  
    )  
      
    result = {  
        "is_valid": not has_invalid_changes,  
        "message": "Schema changes are valid" if not has_invalid_changes else "Some schema changes are not allowed",  
        "invalid_changes": invalid_changes if has_invalid_changes else {}  
    }  
      
    if not result["is_valid"]:  
        self.log.warning(f"Invalid schema changes: {result['message']}")  
          
    return result

def apply_schema_evolution(  
    self, source_df: DataFrame, target_schema_df: DataFrame, evolution_mode: str = "additive"  
) -> DataFrame:  
    """  
    Apply schema evolution to make source DataFrame compatible with target schema.

    Args:  
        source_df: Source DataFrame to evolve.  
        target_schema_df: Target DataFrame with desired schema.  
        evolution_mode: Mode of evolution:  
            - "additive": Adds missing columns, keeps existing ones  
            - "strict": Matches target schema exactly (drops extra columns)  
            - "relaxed": Adds missing columns, converts types where possible

    Returns:  
        Evolved DataFrame.  
    """  
    self.log.info(f"Applying schema evolution in {evolution_mode} mode")  
      
    # Get schema comparison  
    comparison = self.compare_schemas(source_df, target_schema_df)  
      
    if not comparison["has_changes"]:  
        self.log.info("No schema evolution needed, schemas already match")  
        return source_df  
          
    # Create a copy of source DataFrame to evolve  
    evolved_df = source_df  
      
    if evolution_mode == "strict":  
        # In strict mode, we select only the columns from target schema  
        # This means dropping any extra columns in source  
        target_columns = list(comparison["target_schema"].keys())  
        evolved_df = evolved_df.select(*[  
            F.col(col) if col in evolved_df.columns else F.lit(None).alias(col)  
            for col in target_columns  
        ])  
          
    elif evolution_mode in ["additive", "relaxed"]:  
        # Add missing columns  
        for col in comparison["removed_columns"]:  
            col_name = col["name"]  
            evolved_df = evolved_df.withColumn(col_name, F.lit(None))  
              
        # Handle type conversions in relaxed mode  
        if evolution_mode == "relaxed":  
            for changed in comparison["changed_columns"]:  
                col_name = changed["name"]  
                target_type = changed["target_type"]  
                  
                # Try to cast to target type  
                try:  
                    evolved_df = evolved_df.withColumn(col_name,   
                                                      F.col(col_name).cast(target_type))  
                except Exception as e:  
                    self.log.warning(f"Could not cast column {col_name} to {target_type}: {str(e)}")  
                      
    else:  
        raise ValueError(f"Unknown evolution mode: {evolution_mode}")  
          
    self.log.info(f"Schema evolution complete in {evolution_mode} mode")  
    return evolved_df

def generate_evolution_statements(  
    self, schema_comparison: Dict, table_name: str  
) -> List[str]:  
    """  
    Generate DDL statements for schema evolution.

    Args:  
        schema_comparison: Schema comparison from compare_schemas().  
        table_name: Name of the table to evolve.

    Returns:  
        List of SQL statements for schema evolution.  
    """  
    statements = []  
      
    # Generate ADD COLUMN statements for new columns  
    for col in schema_comparison["new_columns"]:  
        stmt = f"ALTER TABLE {table_name} ADD COLUMN {col['name']} {col['type']}"  
        statements.append(stmt)  
          
    # Generate ALTER COLUMN TYPE statements for changed columns  
    for changed in schema_comparison["changed_columns"]:  
        if self._is_valid_type_widening(changed["source_type"], changed["target_type"]):  
            stmt = f"ALTER TABLE {table_name} ALTER COLUMN {changed['name']} TYPE {changed['source_type']}"  
            statements.append(stmt)  
              
    return statements

def _is_valid_type_widening(self, source_type: str, target_type: str) -> bool:  
    """  
    Check if source_type can be widened to target_type.

    Args:  
        source_type: Source data type.  
        target_type: Target data type.

    Returns:  
        True if valid widening, False otherwise.  
    """  
    # Extract base type from complex types  
    base_source_type = source_type.split("(")[0].lower()  
    base_target_type = target_type.split("(")[0].lower()  
      
    # Check if source type can be widened to target type  
    if base_source_type in self.type_widening_map:  
        return base_target_type in self.type_widening_map[base_source_type]  
          
    return False  

End File"""This module contains all exceptions that can be raised by the engine."""

class LakehouseEngineException(Exception):
"""Base exception class for all Lakehouse Engine exceptions."""

class DeprecationException(LakehouseEngineException):
"""Base exception class for deprecated features."""

class DataValidationException(LakehouseEngineException):
"""Base exception class for Data Validation errors."""

class ConfigurationError(LakehouseEngineException):
"""Exception raised when the configuration is invalid."""

class InvalidDataException(LakehouseEngineException):
"""Exception raised when the data doesn't meet validation requirements."""

class DataQualityCheckException(DataValidationException):
"""Exception raised when data quality checks fail."""

class DataQualityConfigurationError(ConfigurationError, DataValidationException):
"""Exception raised when data quality configuration is invalid."""

class SchemaExpectationsException(DataValidationException):
"""Exception raised when schema expectations are not met."""

class UnknownReaderException(ConfigurationError):
"""Exception raised when an unknown reader is specified."""

class UnknownWriterException(ConfigurationError):
"""Exception raised when an unknown writer is specified."""

class MissingFieldException(ConfigurationError):
"""Exception raised when a required field is missing in the configuration."""

class UnknownTransformationException(ConfigurationError):
"""Exception raised when an unknown transformation is specified."""

class ProcessingException(LakehouseEngineException):
"""Exception raised when there's an error in data processing."""

class ExecutionException(LakehouseEngineException):
"""Exception raised when there's an error during pipeline execution."""
End Fileimport base64
import json
import requests
import logging
from datetime import datetime
from typing import Dict, List, Optional, Any

from lakehouse_engine.utils.logging_handler import LoggingHandler

class APIClient(LoggingHandler):
"""Client for interacting with REST APIs.

This class provides methods for making HTTP requests to REST APIs with   
various authentication methods and handling common response patterns.  
"""  
  
def __init__(  
    self,   
    base_url: str,  
    auth_type: Optional[str] = None,  
    auth_config: Optional[Dict[str, str]] = None,  
    headers: Optional[Dict[str, str]] = None,  
    timeout: int = 30  
):  
    """Initialize the API client.  
      
    Args:  
        base_url: Base URL for API requests  
        auth_type: Type of authentication ('basic', 'token', 'oauth', etc.)  
        auth_config: Authentication configuration parameters  
        headers: Default headers to include in all requests  
        timeout: Default timeout for requests in seconds  
    """  
    super().__init__(__name__)  
    self.base_url = base_url.rstrip('/')  
    self.timeout = timeout  
    self.headers = headers or {}  
    self.session = requests.Session()  

    # Apply authentication if provided  
    if auth_type and auth_config:  
        self._configure_auth(auth_type, auth_config)

def _configure_auth(self, auth_type: str, auth_config: Dict[str, str]) -> None:  
    """Configure authentication for requests.  
      
    Args:  
        auth_type: Type of authentication  
        auth_config: Authentication configuration  
    """  
    auth_type = auth_type.lower()  
      
    if auth_type == 'basic':  
        username = auth_config.get('username', '')  
        password = auth_config.get('password', '')  
        credentials = f"{username}:{password}"  
        encoded = base64.b64encode(credentials.encode()).decode()  
        self.headers['Authorization'] = f'Basic {encoded}'  
          
    elif auth_type == 'token':  
        token = auth_config.get('token', '')  
        prefix = auth_config.get('prefix', 'Bearer')  
        self.headers['Authorization'] = f'{prefix} {token}'  
          
    elif auth_type == 'oauth':  
        # Get token from OAuth provider  
        token_url = auth_config.get('token_url', '')  
        client_id = auth_config.get('client_id', '')  
        client_secret = auth_config.get('client_secret', '')  
        scope = auth_config.get('scope', '')  
          
        data = {  
            'grant_type': 'client_credentials',  
            'client_id': client_id,  
            'client_secret': client_secret  
        }  
        if scope:  
            data['scope'] = scope  
              
        try:  
            response = requests.post(token_url, data=data)  
            response.raise_for_status()  
            token_data = response.json()  
            self.headers['Authorization'] = f"Bearer {token_data.get('access_token')}"  
              
            # Save token expiration for refresh logic  
            self.token_expiry = datetime.now().timestamp() + token_data.get('expires_in', 3600)  
            self.token_config = auth_config  
              
        except Exception as e:  
            self.log.error(f"OAuth authentication failed: {str(e)}")  
            raise  
              
    elif auth_type == 'api_key':  
        key = auth_config.get('key', '')  
        key_name = auth_config.get('key_name', 'api-key')  
        in_header = auth_config.get('in_header', True)  
          
        if in_header:  
            self.headers[key_name] = key  
        else:  
            # Add as query parameter to all requests  
            self.api_key_param = (key_name, key)  
              
    else:  
        self.log.warning(f"Unknown authentication type: {auth_type}")

def request(  
    self,   
    method: str,   
    endpoint: str,   
    params: Optional[Dict[str, Any]] = None,  
    data: Optional[Any] = None,  
    json_data: Optional[Dict[str, Any]] = None,  
    headers: Optional[Dict[str, str]] = None,  
    timeout: Optional[int] = None,  
    stream: bool = False  
) -> requests.Response:  
    """Make an HTTP request to the API.  
      
    Args:  
        method: HTTP method (GET, POST, PUT, DELETE, etc.)  
        endpoint: API endpoint (will be appended to base_url)  
        params: Query parameters  
        data: Request data (for form data)  
        json_data: Request data (will be JSON-encoded)  
        headers: Additional headers for this request  
        timeout: Request timeout in seconds  
        stream: Whether to stream the response  
          
    Returns:  
        Response object from the requests library  
    """  
    # Build URL  
    url = f"{self.base_url}/{endpoint.lstrip('/')}"  
      
    # Combine headers  
    request_headers = {**self.headers}  
    if headers:  
        request_headers.update(headers)  
          
    # Set timeout  
    request_timeout = timeout if timeout is not None else self.timeout  
      
    # Make request  
    try:  
        response = self.session.request(  
            method=method.upper(),  
            url=url,  
            params=params,  
            data=data,  
            json=json_data,  
            headers=request_headers,  
            timeout=request_timeout,  
            stream=stream  
        )  
          
        self.log.debug(f"{method} {url} - Status: {response.status_code}")  
        return response  
          
    except requests.RequestException as e:  
        self.log.error(f"Request failed: {method} {url} - {str(e)}")  
        raise

def get(  
    self,   
    endpoint: str,   
    params: Optional[Dict[str, Any]] = None,  
    **kwargs  
) -> requests.Response:  
    """Make a GET request to the API.  
      
    Args:  
        endpoint: API endpoint  
        params: Query parameters  
        **kwargs: Additional arguments to pass to request()  
          
    Returns:  
        Response object  
    """  
    return self.request('GET', endpoint, params=params, **kwargs)

def post(  
    self,   
    endpoint: str,   
    json_data: Optional[Dict[str, Any]] = None,  
    data: Optional[Any] = None,  
    **kwargs  
) -> requests.Response:  
    """Make a POST request to the API.  
      
    Args:  
        endpoint: API endpoint  
        json_data: JSON data for the request body  
        data: Form data for the request body  
        **kwargs: Additional arguments to pass to request()  
          
    Returns:  
        Response object  
    """  
    return self.request('POST', endpoint, json_data=json_data, data=data, **kwargs)

def put(  
    self,   
    endpoint: str,   
    json_data: Optional[Dict[str, Any]] = None,  
    data: Optional[Any] = None,  
    **kwargs  
) -> requests.Response:  
    """Make a PUT request to the API.  
      
    Args:  
        endpoint: API endpoint  
        json_data: JSON data for the request body  
        data: Form data for the request body  
        **kwargs: Additional arguments to pass to request()  
          
    Returns:  
        Response object  
    """  
    return self.request('PUT', endpoint, json_data=json_data, data=data, **kwargs)

def delete(  
    self,   
    endpoint: str,   
    **kwargs  
) -> requests.Response:  
    """Make a DELETE request to the API.  
      
    Args:  
        endpoint: API endpoint  
        **kwargs: Additional arguments to pass to request()  
          
    Returns:  
        Response object  
    """  
    return self.request('DELETE', endpoint, **kwargs)

def get_json(  
    self,   
    endpoint: str,   
    params: Optional[Dict[str, Any]] = None,  
    **kwargs  
) -> Any:  
    """Make a GET request and parse the JSON response.  
      
    Args:  
        endpoint: API endpoint  
        params: Query parameters  
        **kwargs: Additional arguments to pass to request()  
          
    Returns:  
        Parsed JSON response  
    """  
    response = self.get(endpoint, params=params, **kwargs)  
    response.raise_for_status()  
    return response.json()

def post_json(  
    self,   
    endpoint: str,   
    json_data: Optional[Dict[str, Any]] = None,  
    **kwargs  
) -> Any:  
    """Make a POST request and parse the JSON response.  
      
    Args:  
        endpoint: API endpoint  
        json_data: JSON data for the request body  
        **kwargs: Additional arguments to pass to request()  
          
    Returns:  
        Parsed JSON response  
    """  
    response = self.post(endpoint, json_data=json_data, **kwargs)  
    response.raise_for_status()  
    return response.json()  
      
def paginate(  
    self,   
    endpoint: str,   
    params: Optional[Dict[str, Any]] = None,  
    page_param: str = 'page',  
    size_param: str = 'size',  
    page_size: int = 100,  
    max_pages: Optional[int] = None,  
    data_key: Optional[str] = None,  
    next_page_key: Optional[str] = None,  
    **kwargs  
) -> List[Any]:  
    """Paginate through API results.  
      
    Args:  
        endpoint: API endpoint  
        params: Query parameters  
        page_param: Name of page parameter for standard pagination  
        size_param: Name of size parameter for standard pagination  
        page_size: Number of items per page  
        max_pages: Maximum number of pages to retrieve (None for all)  
        data_key: JSON key containing the data array in response  
        next_page_key: Key in response that contains next page token/URL  
        **kwargs: Additional arguments to pass to request()  
          
    Returns:  
        List of all items from paginated responses  
    """  
    all_results = []  
    current_page = 1  
    request_params = params.copy() if params else {}  
      
    # If we use standard page/size pagination  
    if next_page_key is None:  
        request_params[page_param] = current_page  
        request_params[size_param] = page_size  
      
    while True:  
        response = self.get(endpoint, params=request_params, **kwargs)  
        response.raise_for_status()  
        data = response.json()  
          
        # Extract data based on data_key if provided  
        page_data = data.get(data_key, data) if data_key else data  
          
        # Add data to results  
        if isinstance(page_data, list):  
            all_results.extend(page_data)  
        else:  
            self.log.warning(f"Unexpected data format in pagination response: {type(page_data)}")  
          
        # Check if we need to stop  
        if max_pages and current_page >= max_pages:  
            break  
              
        # For cursor-based pagination  
        if next_page_key:  
            next_page_token = data.get(next_page_key)  
            if not next_page_token:  
                break  
            request_params['cursor'] = next_page_token  
        # For standard pagination  
        else:  
            current_page += 1  
            request_params[page_param] = current_page  
              
            # Check if we received fewer items than the page size  
            if isinstance(page_data, list) and len(page_data) < page_size:  
                break  
      
    return all_results

def download_file(  
    self,   
    endpoint: str,   
    output_path: str,   
    params: Optional[Dict[str, Any]] = None,  
    chunk_size: int = 8192,  
    **kwargs  
) -> str:  
    """Download a file from the API.  
      
    Args:  
        endpoint: API endpoint  
        output_path: Path to save the downloaded file  
        params: Query parameters  
        chunk_size: Size of chunks for streaming download  
        **kwargs: Additional arguments to pass to request()  
          
    Returns:  
        Path to the downloaded file  
    """  
    response = self.get(endpoint, params=params, stream=True, **kwargs)  
    response.raise_for_status()  
      
    with open(output_path, 'wb') as f:  
        for chunk in response.iter_content(chunk_size=chunk_size):  
            if chunk:  # filter out keep-alive new chunks  
                f.write(chunk)  
      
    self.log.info(f"Downloaded file to {output_path}")  
    return output_path

def upload_file(  
    self,   
    endpoint: str,   
    file_path: str,   
    file_param: str = 'file',  
    additional_data: Optional[Dict[str, Any]] = None,  
    **kwargs  
) -> requests.Response:  
    """Upload a file to the API.  
      
    Args:  
        endpoint: API endpoint  
        file_path: Path to the file to upload  
        file_param: Name of the file parameter  
        additional_data: Additional form data to include  
        **kwargs: Additional arguments to pass to request()  
          
    Returns:  
        Response object  
    """  
    files = {file_param: open(file_path, 'rb')}  
    data = additional_data or {}  
      
    response = self.post(endpoint, data=data, files=files, **kwargs)  
    response.raise_for_status()  
      
    self.log.info(f"Uploaded file {file_path} to {endpoint}")  
    return response  

End File# adidas/lakehouse-engine
import os
import time
from typing import Dict, List, Optional, Any, Union

import requests
from lakehouse_engine.utils.logging_handler import LoggingHandler

class SlackClient(LoggingHandler):
"""Client for interacting with Slack API.

This class provides methods for sending messages, uploading files,  
and working with channels in Slack.  
"""  
  
def __init__(self, token: Optional[str] = None):  
    """Initialize the Slack client.  
      
    Args:  
        token: Slack API token (defaults to SLACK_API_TOKEN environment variable)  
    """  
    super().__init__(__name__)  
    self.token = token or os.environ.get("SLACK_API_TOKEN")  
    if not self.token:  
        raise ValueError("Slack API token is required")  
          
    self.base_url = "https://slack.com/api"  
    self.headers = {  
        "Authorization": f"Bearer {self.token}",  
        "Content-Type": "application/json"  
    }  
  
def send_message(  
    self,   
    channel: str,   
    text: str,   
    blocks: Optional[List[Dict[str, Any]]] = None,  
    thread_ts: Optional[str] = None,   
    reply_broadcast: bool = False,  
    attachments: Optional[List[Dict[str, Any]]] = None  
) -> Dict[str, Any]:  
    """Send a message to a Slack channel.  
      
    Args:  
        channel: Channel ID or name to send the message to  
        text: Message text (fallback for clients that don't support blocks)  
        blocks: Rich message layout blocks  
        thread_ts: Thread timestamp to reply to  
        reply_broadcast: Whether to also send reply to the channel  
        attachments: Legacy message attachments  
          
    Returns:  
        Slack API response  
    """  
    payload = {  
        "channel": channel,  
        "text": text  
    }  
      
    if blocks:  
        payload["blocks"] = blocks  
          
    if thread_ts:  
        payload["thread_ts"] = thread_ts  
        payload["reply_broadcast"] = reply_broadcast  
          
    if attachments:  
        payload["attachments"] = attachments  
          
    response = requests.post(  
        f"{self.base_url}/chat.postMessage",   
        headers=self.headers,   
        json=payload  
    )  
    response.raise_for_status()  
    data = response.json()  
      
    if not data.get("ok", False):  
        self.log.error(f"Failed to send message: {data.get('error', 'Unknown error')}")  
          
    return data  
      
def upload_file(  
    self,   
    channels: Union[str, List[str]],  
    file_path: Optional[str] = None,  
    content: Optional[str] = None,  
    filename: Optional[str] = None,  
    filetype: Optional[str] = None,  
    initial_comment: Optional[str] = None,  
    thread_ts: Optional[str] = None  
) -> Dict[str, Any]:  
    """Upload a file to Slack.  
      
    Args:  
        channels: Channel ID(s) to share the file in  
        file_path: Path to the file to upload  
        content: Content of the file as string (alternative to file_path)  
        filename: Name of the file  
        filetype: File type (e.g., 'csv', 'txt')  
        initial_comment: Initial comment for the file  
        thread_ts: Thread timestamp to share the file in  
          
    Returns:  
        Slack API response  
          
    Raises:  
        ValueError: If neither file_path nor content is provided  
    """  
    if not file_path and not content:  
        raise ValueError("Either file_path or content must be provided")  
          
    # Prepare the channels string  
    if isinstance(channels, list):  
        channels_str = ",".join(channels)  
    else:  
        channels_str = channels  
          
    # Prepare request data  
    data = {"channels": channels_str}  
      
    if initial_comment:  
        data["initial_comment"] = initial_comment  
          
    if thread_ts:  
        data["thread_ts"] = thread_ts  
          
    if filename:  
        data["filename"] = filename  
          
    if filetype:  
        data["filetype"] = filetype  
          
    # Prepare files for upload  
    files = {}  
    if file_path:  
        files["file"] = open(file_path, "rb")  
        if not filename:  
            data["filename"] = os.path.basename(file_path)  
    else:  
        files["file"] = (filename or "file.txt", content)  
          
    # Update headers for file upload (remove Content-Type)  
    upload_headers = {  
        "Authorization": f"Bearer {self.token}"  
    }  
      
    try:  
        response = requests.post(  
            f"{self.base_url}/files.upload",  
            headers=upload_headers,  
            data=data,  
            files=files  
        )  
        response.raise_for_status()  
        result = response.json()  
          
        if not result.get("ok", False):  
            self.log.error(f"Failed to upload file: {result.get('error', 'Unknown error')}")  
              
        return result  
    finally:  
        # Close file if it was opened  
        if file_path and "file" in files:  
            files["file"].close()  
              
def get_channel_id(self, channel_name: str) -> Optional[str]:  
    """Get channel ID from channel name.  
      
    Args:  
        channel_name: Name of the channel (without #)  
          
    Returns:  
        Channel ID if found, None otherwise  
    """  
    # Remove # if present at the beginning  
    if channel_name.startswith("#"):  
        channel_name = channel_name[1:]  
          
    # Try conversations.list approach first  
    response = requests.get(  
        f"{self.base_url}/conversations.list",  
        headers=self.headers,  
        params={"types": "public_channel,private_channel"}  
    )  
    response.raise_for_status()  
    data = response.json()  
      
    if data.get("ok", False):  
        for channel in data.get("channels", []):  
            if channel.get("name") == channel_name:  
                return channel.get("id")  
                  
    # Try searching next  
    response = requests.get(  
        f"{self.base_url}/conversations.list",  
        headers=self.headers,  
        params={"exclude_archived": True}  
    )  
    response.raise_for_status()  
    data = response.json()  
      
    if data.get("ok", False):  
        for channel in data.get("channels", []):  
            if channel.get("name") == channel_name:  
                return channel.get("id")  
                  
    self.log.warning(f"Could not find channel with name: {channel_name}")  
    return None  
      
def create_channel(  
    self,   
    name: str,   
    is_private: bool = False,   
    user_ids: Optional[List[str]] = None  
) -> Dict[str, Any]:  
    """Create a new channel in Slack.  
      
    Args:  
        name: Name for the channel  
        is_private: Whether the channel should be private  
        user_ids: List of user IDs to invite to the channel  
          
    Returns:  
        Slack API response  
    """  
    # Ensure channel name meets Slack requirements  
    # - Lowercase letters, numbers, hyphens, and underscores only  
    # - Maximum 80 characters  
    # - Cannot begin with a hyphen or underscore  
    sanitized_name = ''.join(  
        c for c in name.lower()   
        if c.isalnum() or c in ['-', '_']  
    )  
    if sanitized_name.startswith(('-', '_')):  
        sanitized_name = 'c' + sanitized_name[1:]  
    sanitized_name = sanitized_name[:80]  
      
    if sanitized_name != name:  
        self.log.info(f"Channel name sanitized: '{name}' → '{sanitized_name}'")  
          
    payload = {  
        "name": sanitized_name,  
        "is_private": is_private  
    }  
      
    # Create the channel  
    response = requests.post(  
        f"{self.base_url}/conversations.create",  
        headers=self.headers,  
        json=payload  
    )  
    response.raise_for_status()  
    data = response.json()  
      
    if not data.get("ok", False):  
        error = data.get("error", "Unknown error")  
        self.log.error(f"Failed to create channel: {error}")  
        return data  
          
    # Invite users if specified  
    channel_id = data.get("channel", {}).get("id")  
    if channel_id and user_ids:  
        invite_payload = {  
            "channel": channel_id,  
            "users": ",".join(user_ids)  
        }  
          
        invite_response = requests.post(  
            f"{self.base_url}/conversations.invite",  
            headers=self.headers,  
            json=invite_payload  
        )  
          
        if not invite_response.json().get("ok", False):  
            self.log.warning(f"Failed to invite users to channel: {invite_response.json().get('error')}")  
              
    return data

def update_message(  
    self,   
    channel: str,   
    ts: str,   
    text: str,   
    blocks: Optional[List[Dict[str, Any]]] = None,  
    attachments: Optional[List[Dict[str, Any]]] = None  
) -> Dict[str, Any]:  
    """Update an existing message in Slack.  
      
    Args:  
        channel: Channel ID where the message exists  
        ts: Timestamp of the message to update  
        text: New message text  
        blocks: New rich message layout blocks  
        attachments: New message attachments  
          
    Returns:  
        Slack API response  
    """  
    payload = {  
        "channel": channel,  
        "ts": ts,  
        "text": text  
    }  
      
    if blocks:  
        payload["blocks"] = blocks  
          
    if attachments:  
        payload["attachments"] = attachments  
          
    response = requests.post(  
        f"{self.base_url}/chat.update",   
        headers=self.headers,   
        json=payload  
    )  
    response.raise_for_status()  
    data = response.json()  
      
    if not data.get("ok", False):  
        self.log.error(f"Failed to update message: {data.get('error', 'Unknown error')}")  
          
    return data  
      
def add_reaction(  
    self,   
    channel: str,   
    timestamp: str,   
    reaction: str  
) -> Dict[str, Any]:  
    """Add a reaction to a message.  
      
    Args:  
        channel: Channel ID where the message exists  
        timestamp: Timestamp of the message  
        reaction: Emoji name (without colons)  
          
    Returns:  
        Slack API response  
    """  
    # Remove colons if they exist  
    if reaction.startswith(':') and reaction.endswith(':'):  
        reaction = reaction[1:-1]  
          
    payload = {  
        "channel": channel,  
        "timestamp": timestamp,  
        "name": reaction  
    }  
      
    response = requests.post(  
        f"{self.base_url}/reactions.add",   
        headers=self.headers,   
        json=payload  
    )  
    response.raise_for_status()  
    data = response.json()  
      
    if not data.get("ok", False):  
        self.log.error(f"Failed to add reaction: {data.get('error', 'Unknown error')}")  
          
    return data

def delete_message(self, channel: str, ts: str) -> Dict[str, Any]:  
    """Delete a message from Slack.  
      
    Args:  
        channel: Channel ID where the message exists  
        ts: Timestamp of the message to delete  
          
    Returns:  
        Slack API response  
    """  
    payload = {  
        "channel": channel,  
        "ts": ts  
    }  
      
    response = requests.post(  
        f"{self.base_url}/chat.delete",   
        headers=self.headers,   
        json=payload  
    )  
    response.raise_for_status()  
    data = response.json()  
      
    if not data.get("ok", False):  
        self.log.error(f"Failed to delete message: {data.get('error', 'Unknown error')}")  
          
    return data  
      
def get_user_info(self, user_id: str) -> Dict[str, Any]:  
    """Get information about a user.  
      
    Args:  
        user_id: User ID to get information for  
          
    Returns:  
        User information  
    """  
    response = requests.get(  
        f"{self.base_url}/users.info",  
        headers=self.headers,  
        params={"user": user_id}  
    )  
    response.raise_for_status()  
    data = response.json()  
      
    if not data.get("ok", False):  
        self.log.error(f"Failed to get user info: {data.get('error', 'Unknown error')}")  
          
    return data  
      
def search_messages(  
    self,   
    query: str,   
    count: int = 100,   
    page: int = 1,  
    sort: str = "timestamp",  
    sort_dir: str = "desc"  
) -> Dict[str, Any]:  
    """Search for messages in Slack.  
      
    Args:  
        query: Search query  
        count: Number of results per page  
        page: Page number  
        sort: Sort field ('score' or 'timestamp')  
        sort_dir: Sort direction ('asc' or 'desc')  
          
    Returns:  
        Search results  
    """  
    params = {  
        "query": query,  
        "count": count,  
        "page": page,  
        "sort": sort,  
        "sort_dir": sort_dir  
    }  
      
    response = requests.get(  
        f"{self.base_url}/search.messages",  
        headers=self.headers,  
        params=params  
    )  
    response.raise_for_status()  
    data = response.json()  
      
    if not data.get("ok", False):  
        self.log.error(f"Failed to search messages: {data.get('error', 'Unknown error')}")  
          
    return data  

End File# adidas/lakehouse-engine
"""Tests for the JDBCReader class."""
import pytest
from unittest.mock import MagicMock, patch

from lakehouse_engine.io.readers_writers.readers import JDBCReader

class TestJDBCReader:
"""Test class for JDBCReader."""

def test_read_data_query(self, mocker):  
    """Test reading from a JDBC query."""  
    # Mock dependencies  
    mock_spark = MagicMock()  
    mock_jdbc_read = MagicMock()  
    mock_spark.read.jdbc.return_value = mock_jdbc_read

    # Create reader instance  
    reader = JDBCReader(mock_spark)

    # Mock a query read  
    query = "SELECT * FROM table"  
    kwargs = {  
        "url": "jdbc:mysql://localhost:3306/db",  
        "user": "user",  
        "password": "pass",  
        "driver": "com.mysql.jdbc.Driver"  
    }

    reader._read_data(query, **kwargs)

    # Verify JDBC was called correctly for a query  
    mock_spark.read.jdbc.assert_called_once()  
    call_args = mock_spark.read.jdbc.call_args[1]  
    assert call_args["url"] == "jdbc:mysql://localhost:3306/db"  
    assert call_args["table"] == "dummy"  # Not used for queries  
    assert "query" in call_args["properties"]  
    assert call_args["properties"]["query"] == query

def test_read_data_table(self, mocker):  
    """Test reading from a JDBC table."""  
    # Mock dependencies  
    mock_spark = MagicMock()  
    mock_option = MagicMock()  
    mock_load = MagicMock()  
      
    # Set up chained mock methods  
    mock_spark.read.format.return_value = mock_option  
    mock_option.option.return_value = mock_option  
    mock_option.load.return_value = mock_load

    # Create reader instance  
    reader = JDBCReader(mock_spark)

    # Mock a table read  
    table_name = "my_table"  
    kwargs = {  
        "url": "jdbc:mysql://localhost:3306/db",  
        "user": "user",  
        "password": "pass",  
        "driver": "com.mysql.jdbc.Driver",  
        "fetchsize": "10000"  
    }

    reader._read_data(table_name, **kwargs)

    # Verify format was set to jdbc  
    mock_spark.read.format.assert_called_once_with("jdbc")  
      
    # Check that options were set correctly  
    option_calls = mock_option.option.call_args_list  
    expected_options = {  
        "url": "jdbc:mysql://localhost:3306/db",  
        "dbtable": "my_table",  
        "user": "user",  
        "password": "pass",  
        "driver": "com.mysql.jdbc.Driver",  
        "fetchsize": "10000"  
    }  
      
    # Verify each option was set  
    for key, value in expected_options.items():  
        assert (key, value) in [(call[0][0], call[0][1]) for call in option_calls]  
      
    # Verify load was called  
    mock_option.load.assert_called_once()

def test_read_with_invalid_params(self):  
    """Test reading with invalid parameters."""  
    # Mock dependencies  
    mock_spark = MagicMock()  
      
    # Create reader instance  
    reader = JDBCReader(mock_spark)  
      
    # Call without required url  
    with pytest.raises(KeyError):  
        reader._read_data("my_table", driver="com.mysql.jdbc.Driver", user="user", password="pass")  

End File# tests/unit/io/readers_writers/test_delta_lake_writer.py
"""Tests for the DeltaLakeWriter class."""
import pytest
from unittest.mock import MagicMock, patch

from lakehouse_engine.io.readers_writers.delta_lake_writer import DeltaLakeWriter

class TestDeltaLakeWriter:
"""Test class for DeltaLakeWriter."""

def test_write_new_table(self):  
    """Test writing to a new Delta table."""  
    # Create mocks  
    mock_df = MagicMock()  
    mock_writer = MagicMock()  
    mock_df.write.format.return_value = mock_writer  
    mock_writer.mode.return_value = mock_writer  
      
    # Create writer instance  
    writer = DeltaLakeWriter()  
      
    # Mock table existence check  
    with patch.object(writer, '_table_exists', return_value=False):  
        # Test writing to a new table by path  
        writer_config = {  
            "path": "/path/to/delta",  
            "mode": "overwrite"  
        }  
          
        result = writer.write(mock_df, writer_config, "bronze")  
          
        # Verify correct format and mode were set  
        mock_df.write.format.assert_called_once_with("delta")  
        mock_writer.mode.assert_called_once_with("overwrite")  
          
        # Verify save was called with the path  
        mock_writer.save.assert_called_once_with("/path/to/delta")  
          
        # Verify the result contains the correct metadata  
        assert result["table_type"] == "bronze"  
        assert result["path"] == "/path/to/delta"  
        assert result["mode"] == "overwrite"  
        assert result["success"] == True

def test_write_with_table_name(self):  
    """Test writing to a Delta table using table name."""  
    # Create mocks  
    mock_df = MagicMock()  
    mock_writer = MagicMock()  
    mock_df.write.format.return_value = mock_writer  
    mock_writer.mode.return_value = mock_writer  
      
    # Create writer instance  
    writer = DeltaLakeWriter()  
      
    # Mock table existence check  
    with patch.object(writer, '_table_exists', return_value=False):  
        # Test writing to a table by name  
        writer_config = {  
            "table_name": "my_database.my_table",  
            "mode": "overwrite"  
        }  
          
        result = writer.write(mock_df, writer_config, "silver")  
          
        # Verify correct format and mode were set  
        mock_df.write.format.assert_called_once_with("delta")  
        mock_writer.mode.assert_called_once_with("overwrite")  
          
        # Verify saveAsTable was called with the table name  
        mock_writer.saveAsTable.assert_called_once_with("my_database.my_table")  
          
        # Verify the result contains the correct metadata  
        assert result["table_type"] == "silver"  
        assert result["table_name"] == "my_database.my_table"  
        assert result["mode"] == "overwrite"  
        assert result["success"] == True

def test_write_with_partitioning(self):  
    """Test writing to a Delta table with partitioning."""  
    # Create mocks  
    mock_df = MagicMock()  
    mock_writer = MagicMock()  
    mock_partitioned_writer = MagicMock()  
    mock_df.write.format.return_value = mock_writer  
    mock_writer.mode.return_value = mock_writer  
    mock_writer.partitionBy.return_value = mock_partitioned_writer  
      
    # Create writer instance  
    writer = DeltaLakeWriter()  
      
    # Mock table existence check  
    with patch.object(writer, '_table_exists', return_value=False):  
        # Test writing to a table with partitioning  
        writer_config = {  
            "path": "/path/to/delta",  
            "mode": "overwrite",  
            "partition_by": ["year", "month"]  
        }  
          
        result = writer.write(mock_df, writer_config, "bronze")  
          
        # Verify correct format and mode were set  
        mock_df.write.format.assert_called_once_with("delta")  
        mock_writer.mode.assert_called_once_with("overwrite")  
          
        # Verify partitionBy was called with the correct columns  
        mock_writer.partitionBy.assert_called_once_with(["year", "month"])  
          
        # Verify save was called on the partitioned writer  
        mock_partitioned_writer.save.assert_called_once_with("/path/to/delta")  
          
        # Verify the result contains the correct metadata  
        assert result["table_type"] == "bronze"  
        assert result["path"] == "/path/to/delta"  
        assert result["mode"] == "overwrite"  
        assert result["partitions"] == ["year", "month"]  
        assert result["success"] == True

def test_write_with_schema_overwrite(self):  
    """Test writing to a Delta table with schema overwrite."""  
    # Create mocks  
    mock_df = MagicMock()  
    mock_writer = MagicMock()  
    mock_schema_writer = MagicMock()  
    mock_df.write.format.return_value = mock_writer  
    mock_writer.mode.return_value = mock_writer  
    mock_writer.option.return_value = mock_schema_writer  
      
    # Create writer instance  
    writer = DeltaLakeWriter()  
      
    # Mock table existence check  
    with patch.object(writer, '_table_exists', return_value=False):  
        # Test writing with schema overwrite  
        writer_config = {  
            "path": "/path/to/delta",  
            "mode": "overwrite",  
            "overwrite_schema": True  
        }  
          
        result = writer.write(mock_df, writer_config, "gold")  
          
        # Verify correct format and mode were set  
        mock_df.write.format.assert_called_once_with("delta")  
        mock_writer.mode.assert_called_once_with("overwrite")  
          
        # Verify option was set for schema overwrite  
        mock_writer.option.assert_called_once_with("overwriteSchema", "true")  
          
        # Verify save was called on the schema writer  
        mock_schema_writer.save.assert_called_once_with("/path/to/delta")  
          
        # Verify the result contains the correct metadata  
        assert result["table_type"] == "gold"  
        assert result["path"] == "/path/to/delta"  
        assert result["mode"] == "overwrite"  
        assert result["success"] == True

def test_optimization(self):  
    """Test writing to a Delta table with optimization."""  
    # Create mocks  
    mock_df = MagicMock()  
    mock_writer = MagicMock()  
    mock_spark = MagicMock()  
    mock_df.write.format.return_value = mock_writer  
    mock_writer.mode.return_value = mock_writer  
    mock_df.sparkSession = mock_spark  
      
    # Create writer instance  
    writer = DeltaLakeWriter()  
      
    # Mock table existence check  
    with patch.object(writer, '_table_exists', return_value=False):  
        # Test writing with optimization  
        writer_config = {  
            "path": "/path/to/delta",  
            "mode": "overwrite",  
            "optimize": True,  
            "z_order_by": ["id", "timestamp"]  
        }  
          
        result = writer.write(mock_df, writer_config, "silver")  
          
        # Verify SQL was executed for optimization  
        mock_spark.sql.assert_called_once()  
        sql_arg = mock_spark.sql.call_args[0][0]  
        assert "OPTIMIZE" in sql_arg  
        assert "ZORDER BY" in sql_arg  
        assert "`id`, `timestamp`" in sql_arg  
          
        # Verify the result contains the correct metadata  
        assert result["table_type"] == "silver"  
        assert result["path"] == "/path/to/delta"  
        assert result["mode"] == "overwrite"  
        assert result["optimized"] == True  
        assert result["success"] == True

def test_cdc_write(self):  
    """Test CDC write to a Delta table."""  
    # Create mocks  
    mock_df = MagicMock()  
    mock_delta_table = MagicMock()  
    mock_merge = MagicMock()  
      
    # Mock the DeltaTable.forPath method  
    with patch('delta.tables.DeltaTable') as mock_delta:  
        mock_delta.forPath.return_value = mock_delta_table  
        mock_delta_table.alias.return_value = mock_delta_table  
        mock_delta_table.merge.return_value = mock_merge  
        mock_merge.whenMatchedUpdate.return_value = mock_merge  
        mock_merge.whenNotMatchedInsert.return_value = mock_merge  
          
        # Create writer instance  
        writer = DeltaLakeWriter()  
          
        # Mock table existence check  
        with patch.object(writer, '_table_exists', return_value=True):  
            # Test CDC write  
            writer_config = {  
                "path": "/path/to/delta",  
                "mode": "merge",  
                "cdc_config": {  
                    "cdc_enabled": True,  
                    "cdc_key_columns": ["id"],  
                    "cdc_watermark_column": "updated_at"  
                }  
            }  
              
            # Call the private CDC method directly for testing  
            writer._handle_cdc_writes(  
                mock_df,  
                writer_config,  
                None,  # table_name  
                "/path/to/delta",  # path  
                ["id"],  # key_columns  
                "updated_at"  # watermark_column  
            )  
              
            # Verify DeltaTable.forPath was called with correct path  
            mock_delta.forPath.assert_called_once()  
              
            # Verify merge was called with correct condition  
            mock_delta_table.merge.assert_called_once()  
              
            # Verify update and insert operations were called  
            mock_merge.whenMatchedUpdate.assert_called_once()  
            mock_merge.whenNotMatchedInsert.assert_called_once()  
            mock_merge.execute.assert_called_once()

def test_missing_required_params(self):  
    """Test writing with missing required parameters."""  
    # Create mocks  
    mock_df = MagicMock()  
      
    # Create writer instance  
    writer = DeltaLakeWriter()  
      
    # Test with missing path and table_name  
    writer_config = {  
        "mode": "overwrite"  
    }  
      
    with pytest.raises(ValueError, match="Either path or table_name must be provided"):  
        writer.write(mock_df, writer_config, "bronze")  

End File# tests/unit/io/readers_writers/test_file_writer.py
"""Tests for the FileWriter class."""
import pytest
from unittest.mock import MagicMock, patch

from lakehouse_engine.io.readers_writers.file_writer import FileWriter

class TestFileWriter:
"""Test class for FileWriter."""

def test_write_basic(self):  
    """Test basic file writing functionality."""  
    # Create mocks  
    mock_df = MagicMock()  
    mock_writer = MagicMock()  
    mock_df.write.format.return_value = mock_writer  
    mock_writer.mode.return_value = mock_writer  
    mock_df.count.return_value = 100  
      
    # Create writer instance  
    writer = FileWriter()  
      
    # Test writing a parquet file  
    writer_config = {  
        "path": "/path/to/output",  
        "format": "parquet",  
        "mode": "overwrite"  
    }  
      
    result = writer.write(mock_df, writer_config)  
      
    # Verify correct format and mode were set  
    mock_df.write.format.assert_called_once_with("parquet")  
    mock_writer.mode.assert_called_once_with("overwrite")  
      
    # Verify save was called with the correct path  
    mock_writer.save.assert_called_once_with("/path/to/output")  
      
    # Verify the result contains the correct metadata  
    assert result["path"] == "/path/to/output"  
    assert result["format"] == "parquet"  
    assert result["mode"] == "overwrite"  
    assert result["num_rows"] == 100  
    assert result["success"] == True  
  
def test_write_with_options(self):  
    """Test file writing with options."""  
    # Create mocks  
    mock_df = MagicMock()  
    mock_writer = MagicMock()  
    mock_df.write.format.return_value = mock_writer  
    mock_writer.mode.return_value = mock_writer  
    mock_writer.option.return_value = mock_writer  
    mock_df.count.return_value = 200  
      
    # Create writer instance  
    writer = FileWriter()  
      
    # Test writing CSV with options  
    writer_config = {  
        "path": "/path/to/csv",  
        "format": "csv",  
        "mode": "overwrite",  
        "options": {  
            "header": "true",  
            "delimiter": ",",  
            "quote": """  
        }  
    }  
      
    result = writer.write(mock_df, writer_config)  
      
    # Verify correct format and mode were set  
    mock_df.write.format.assert_called_once_with("csv")  
    mock_writer.mode.assert_called_once_with("overwrite")  
      
    # Verify options were set correctly  
    mock_writer.option.assert_any_call("header", "true")  
    mock_writer.option.assert_any_call("delimiter", ",")  
    mock_writer.option.assert_any_call("quote", """)  
      
    # Verify save was called with the correct path  
    mock_writer.save.assert_called_once_with("/path/to/csv")  
      
    # Verify the result contains the correct metadata  
    assert result["path"] == "/path/to/csv"  
    assert result["format"] == "csv"  
    assert result["mode"] == "overwrite"  
    assert result["options"] == {"header": "true", "delimiter": ",", "quote": """}  
    assert result["num_rows"] == 200  
    assert result["success"] == True  
  
def test_write_with_partitioning(self):  
    """Test file writing with partitioning."""  
    # Create mocks  
    mock_df = MagicMock()  
    mock_writer = MagicMock()  
    mock_partitioned_writer = MagicMock()  
    mock_df.write.format.return_value = mock_writer  
    mock_writer.mode.return_value = mock_writer  
    mock_writer.partitionBy.return_value = mock_partitioned_writer  
    mock_df.count.return_value = 300  
      
    # Create writer instance  
    writer = FileWriter()  
      
    # Test writing with partitioning  
    writer_config = {  
        "path": "/path/to/data",  
        "format": "parquet",  
        "mode": "append",  
        "partition_by": ["year", "month", "day"]  
    }  
      
    result = writer.write(mock_df, writer_config)  
      
    # Verify correct format and mode were set  
    mock_df.write.format.assert_called_once_with("parquet")  
    mock_writer.mode.assert_called_once_with("append")  
      
    # Verify partitionBy was called with the correct columns  
    mock_writer.partitionBy.assert_called_once_with(["year", "month", "day"])  
      
    # Verify save was called on the partitioned writer  
    mock_partitioned_writer.save.assert_called_once_with("/path/to/data")  
      
    # Verify the result contains the correct metadata  
    assert result["path"] == "/path/to/data"  
    assert result["format"] == "parquet"  
    assert result["mode"] == "append"  
    assert result["partitions"] == ["year", "month", "day"]  
    assert result["num_rows"] == 300  
    assert result["success"] == True  
  
def test_write_alternate_config_keys(self):  
    """Test file writing with alternate configuration keys."""  
    # Create mocks  
    mock_df = MagicMock()  
    mock_writer = MagicMock()  
    mock_df.write.format.return_value = mock_writer  
    mock_writer.mode.return_value = mock_writer  
    mock_df.count.return_value = 100  
      
    # Create writer instance  
    writer = FileWriter()  
      
    # Test writing using alternate config keys  
    writer_config = {  
        "output_path": "/path/to/alt",  
        "output_format": "json",  
        "output_mode": "errorifexists",  
        "output_options": {  
            "compression": "gzip"  
        }  
    }  
      
    result = writer.write(mock_df, writer_config)  
      
    # Verify correct format and mode were set  
    mock_df.write.format.assert_called_once_with("json")  
    mock_writer.mode.assert_called_once_with("errorifexists")  
      
    # Verify option was set correctly  
    mock_writer.option.assert_called_once_with("compression", "gzip")  
      
    # Verify save was called with the correct path  
    mock_writer.save.assert_called_once_with("/path/to/alt")  
      
    # Verify the result contains the correct metadata  
    assert result["path"] == "/path/to/alt"  
    assert result["format"] == "json"  
    assert result["mode"] == "errorifexists"  
    assert result["options"] == {"compression": "gzip"}  
    assert result["success"] == True

def test_missing_path(self):  
    """Test writing without a path."""  
    # Create mocks  
    mock_df = MagicMock()  
      
    # Create writer