Source code for polars_function

import re
import uuid
import json
from datetime import timedelta, datetime

from typing import Optional, Union

import polars as pl
from polars import col as c
import numpy as np

from general_function import modify_string, generate_log, generate_uuid


# Global variable
log = generate_log(name=__name__)

[docs] def cum_count_duplicates(cols_names: Union[str, list[str]]) -> pl.Expr: """ Calculate the cumulative count of duplicate values in a specified column of a DataFrame, assigning half of the count as strict positive values and the other half as strict negative values. Parameters: cols_names (Union[str, list[str]]): The name of the column to check for duplicates. Returns: pl.Expr: A polar expression showing the cumulative count of duplicates. Example: ~~~~~~~~ >>> df = pl.DataFrame({"a": [1, 1, 2, 3, 4, 4, 4]}) ... df.with_columns( ... cum_count_duplicates(cols_names="a").alias("cum_count") ... ) shape: (7, 2) ┌─────┬────────────┐ │ id ┆ cum_count │ │ --- ┆ --- │ │ i64 ┆ i64 │ ╞═════╪════════════╡ │ 1 ┆ -1 │ │ 1 ┆ 1 │ │ 2 ┆ 1 │ │ 3 ┆ 1 │ │ 4 ┆ -1 │ │ 4 ┆ 1 │ │ 4 ┆ 2 │ └─────┴────────────┘ """ if isinstance(cols_names, str): cols_names = [cols_names] cum_count_col: pl.Expr = ( c(cols_names[0]).cum_count().cast(pl.Int32) - c(cols_names[0]).count() // 2 -1).over(cols_names) return pl.when(cum_count_col < 0).then(cum_count_col).otherwise(cum_count_col+1)
[docs] def generate_uuid_col( col: pl.Expr, base_uuid: Optional[uuid.UUID] = None, added_string: str = "") -> pl.Expr: """ Generate UUIDs for a column based on a base UUID and an optional added string. Args: col (pl.Expr): The column to generate UUIDs for. base_uuid (uuid.UUID, optional): The base UUID for generating the UUIDs. added_string (str, optional): The optional added string. Defaults to "". Returns: pl.Expr: The column with generated UUIDs. """ return ( col.cast(pl.Utf8) .map_elements(lambda x: generate_uuid(base_value=x, base_uuid=base_uuid, added_string=added_string), pl.Utf8) )
[docs] def cast_float(float_str: pl.Expr) -> pl.Expr: """ Cast a string column to float, modifying the string format as needed. Args: float_str (pl.Expr): The string column to cast. Returns: pl.Expr: The casted float column. """ format_str = {r'^,': "0.", ',': "."} return float_str.pipe(modify_string_col, format_str=format_str).cast(pl.Float64)
[docs] def cast_boolean(col: pl.Expr) -> pl.Expr: """ Cast a column to boolean based on predefined replacements. Args: col (pl.Expr): The column to cast. Returns: pl.Expr: The casted boolean column. """ format_str = { "1": True, "true": True , "oui": True, "0": False, "false": False, "vrai": True, "non": False, "off": False, "on": True} return col.str.to_lowercase().replace_strict(format_str, default=False).cast(pl.Boolean)
[docs] def modify_string_col(string_col: pl.Expr, format_str: dict) -> pl.Expr: """ Modify string columns based on a given format dictionary. Args: string_col (pl.Expr): The string column to modify. format_str (dict): The format dictionary containing the string modifications. Returns: pl.Expr: The modified string column. """ return ( string_col.map_elements( lambda x: modify_string(string=x, format_str=format_str), return_dtype=pl.Utf8, skip_nulls=True) )
[docs] def parse_date(date_str: Optional[str], default_date: datetime) -> datetime: """ Parse a date string and return a datetime object. Args: date_str (str, optional): The date string to parse. default_date (datetime): The default date to return if the date string is None. Returns: datetime: The parsed datetime object. Raises: ValueError: If the date format is not recognized. """ if date_str is None: return default_date if bool(re.match(r"[0-9]{5}", date_str)): return datetime(1899, 12, 30) + timedelta(days=int(date_str)) format_str: dict[str, str] = {r"[-:.//]": "_"} date_str = modify_string(date_str, format_str) if bool(re.match(r"[0-9]{4}_[0-9]{2}_[0-9]{2}", date_str)): return datetime.strptime(date_str, '%Y_%m_%d') if bool(re.match(r"[0-9]{2}_[0-9]{2}_[0-9]{4}", date_str)): return datetime.strptime(date_str, '%d_%m_%Y') raise ValueError("Date format not recognized")
[docs] def parse_timestamp( timestamp_str: pl.Expr, item: Optional[str], keep_string_format: bool= False, convert_to_utc: bool = False, initial_time_zone: str = "Europe/Zurich" ) -> pl.Expr: """ Parse a timestamp column based on a given item. Args: timestamp_str (pl.Expr): The timestamp column. item (str, optional): The item to parse. keep_string_format (bool, optional): Whether to keep the string format. Defaults to False. convert_to_utc (bool, optional): Whether to convert the timestamp to UTC. Defaults to False. initial_time_zone (str, optional): The initial time zone of the timestamps. Defaults to "Europe/Zurich". Returns: pl.Expr: The parsed timestamp column. Raises: ValueError: If the timestamp format is not recognized. """ format_str: dict[str, str] = {r"[-:\.//]": "_"} if item is None: return pl.lit(None) item = modify_string(item, format_str) if bool(re.match(r"[0-9]{5}", item)): timestamp: pl.Expr = (3.6e6*24*timestamp_str.cast(pl.Int32)).cast(pl.Duration("ms")) + datetime(1899, 12, 30) else: if bool(re.match(r"[0-9]{2}_[0-9]{2}_[0-9]{4}\s[0-9]{2}_[0-9]{2}_[0-9]{2}_[0-9]{3}", item)): format_str: dict[str, str] = { r"[-:.//]": "_", r"_[0-9]{3}$": ""} format_timestamp: str = "%d_%m_%Y %H_%M_%S" elif bool(re.match(r"[0-9]{4}_[0-9]{2}_[0-9]{2}\s[0-9]{2}_[0-9]{2}_[0-9]{2}_[0-9]{3}", item)): format_str = { r"[-:.//]": "_", r"_[0-9]{3}$": ""} format_timestamp: str = "%Y_%m_%d %H_%M_%S" elif bool(re.match(r"[0-9]{4}_[0-9]{2}_[0-9]{2}\s[0-9]{2}_[0-9]{2}_[0-9]{2}", item)): format_timestamp: str = "%Y_%m_%d %H_%M_%S" elif bool(re.match(r"[0-9]{2}_[0-9]{2}_[0-9]{4}\s[0-9]{2}_[0-9]{2}_[0-9]{2}", item)): format_timestamp: str ="%d_%m_%Y %H_%M_%S" elif bool(re.match(r"[0-9]{2}_[0-9]{2}_[0-9]{2}\s[0-9]{2}_[0-9]{2}_[0-9]{2}", item)): format_timestamp: str ="%d_%m_%y %H_%M_%S" elif bool(re.match(r"[0-9]{4}_[0-9]{2}_[0-9]{2}", item)): timestamp_str = timestamp_str + " 00_00_00" format_timestamp: str ="%Y_%m_%d %H_%M_%S" elif bool(re.match(r"[0-9]{2}_[0-9]{2}_[0-9]{4}", item)): timestamp_str = timestamp_str + " 00_00_00" format_timestamp: str ="%d_%m_%Y %H_%M_%S" else: raise ValueError("Timestamp format not recognized") timestamp: pl.Expr = ( modify_string_col(timestamp_str, format_str) .str.strptime(pl.Datetime, format_timestamp) .dt.cast_time_unit(time_unit="us") ) if keep_string_format: return timestamp.dt.strftime("%Y/%m/%d %H:%M:%S") elif convert_to_utc: return timestamp.pipe(cast_to_utc_timestamp, initial_time_zone=initial_time_zone) return timestamp
[docs] def cast_to_utc_timestamp(timestamp: pl.Expr, initial_time_zone: str = "Europe/Zurich") -> pl.Expr: """ Convert a timestamp column to UTC from the specified initial time zone. Args: timestamp (pl.Expr): The timestamp column to convert. initial_time_zone (str, optional): The initial time zone of the timestamps. Defaults to "Europe/Zurich". Returns: pl.Expr: The timestamp column converted to UTC. """ return ( pl.when(timestamp.is_first_distinct()) .then(timestamp.dt.replace_time_zone(initial_time_zone, ambiguous='earliest')) .otherwise(timestamp.dt.replace_time_zone(initial_time_zone, ambiguous='latest')) .dt.convert_time_zone("UTC") )
[docs] def generate_random_uuid(col: pl.Expr) -> pl.Expr: """ Generate a random UUID. Returns: str: The generated UUID. """ return col.map_elements(lambda x: str(uuid.uuid4()), return_dtype=pl.Utf8, skip_nulls=False)
[docs] def get_meta_data_string(metadata: pl.Expr) -> pl.Expr: """ Convert metadata to a JSON string, excluding keys with None values. Args: metadata (pl.Expr): The metadata column. Returns: pl.Expr: The metadata column as JSON strings. """ return ( metadata.map_elements( lambda x: json.dumps({key: value for key, value in x.items() if value is not None}, ensure_ascii=False), return_dtype=pl.Utf8) ).replace({"{}": None})
[docs] def digitize_col(col: pl.Expr, min: float, max: float, nb_state: int) -> pl.Expr: """ Digitize a column into discrete states based on the specified number of states. Args: col (pl.Expr): The column to digitize. min (float): The minimum value of the column. max (float): The maximum value of the column. nb_state (int): The number of discrete states. Returns: pl.Expr: The digitized column. """ bins = np.linspace(min, max, nb_state + 1) return ( col.map_elements(lambda x: np.digitize(x, bins), return_dtype=pl.Int64) )
[docs] def get_transfo_impedance(rated_v: pl.Expr, rated_s: pl.Expr, voltage_ratio: pl.Expr) -> pl.Expr: """ Get the transformer impedance (or resistance if real part) based on the short-circuit tests. Args: rated_v (pl.Expr): The rated voltage column indicates which side of the transformer the parameters are associated with (usually lv side).[V]. rated_s (pl.Expr): The rated power column [VA]. voltage_ratio (pl.Expr): The ratio between the applied input voltage to get rated current when transformer secondary is short-circuited and the rated voltage [%]. Returns: pl.Expr: The transformer impedance column [Ohm]. """ return voltage_ratio / 100 * (rated_v**2)/ rated_s
[docs] def get_transfo_admittance(rated_v: pl.Expr, rated_s: pl.Expr, oc_current_ratio: pl.Expr) -> pl.Expr: """ Get the transformer admittance based on the open circuit test Args: rated_v (pl.Expr): The rated voltage column indicates which side of the transformer the parameters are associated with (usually lv side).[V]. rated_s (pl.Expr): The rated power column [VA]. oc_current_ratio (pl.Expr): The ratio between the measured current when transformer secondary is opened and the rated current [%]. Returns: pl.Expr: The transformer admittance column [Simens]. """ return oc_current_ratio / 100 * rated_s / (rated_v **2)
[docs] def get_transfo_conductance(rated_v: pl.Expr, iron_losses: pl.Expr) -> pl.Expr: """ Get the transformer conductance based on iron losses measurement. Args: rated_v (pl.Expr): The rated voltage column indicates which side of the transformer the parameters are associated with (usually lv side).[V]. iron_losses (pl.Expr): The iron losses column [W]. Returns: pl.Expr: The transformer conductance column [Simens]. """ return iron_losses /(rated_v**2)
[docs] def get_transfo_resistance(rated_v: pl.Expr, rated_s: pl.Expr, copper_losses: pl.Expr) -> pl.Expr: """ Get the transformer resistance based on copper losses measurement. Args: rated_v (pl.Expr): The rated voltage column indicates which side of the transformer the parameters are associated with (usually lv side).[V]. rated_s (pl.Expr): The rated power column [VA]. copper_losses (pl.Expr): The copper losses column [W]. Returns: pl.Expr: The transformer resistance column [Ohm]. """ return copper_losses * ((rated_v/rated_s)**2)
[docs] def get_transfo_imaginary_component(module: pl.Expr, real: pl.Expr) -> pl.Expr: """ Get the transformer imaginary component based on the module and real component. Args: module (pl.Expr): The module column [Ohm or Simens]. real (pl.Expr): The real component column [Ohm or Simens]. Returns: pl.Expr: The transformer imaginary component column [Ohm or Simens]. """ return (np.sqrt(module ** 2 - real ** 2))
[docs] def concat_list_of_list(col_list: pl.Expr) -> pl.Expr: """ Concatenate a column of lists into a list containing sublist. Args: col_list (pl.Expr): The column of lists to concatenate. Returns: pl.Expr: The concatenated list column. """ return pl.concat_list( col_list.map_elements(lambda x: [x], return_dtype=pl.List(pl.List(pl.Float64))) )
[docs] def linear_interpolation_for_bound(x_col: pl.Expr, y_col: pl.Expr) -> pl.Expr: """ Perform linear interpolation for boundary values in a column. Args: x_col (pl.Expr): The x-axis column. y_col (pl.Expr): The y-axis column to interpolate. Returns: pl.Expr: The interpolated y-axis column. """ a_diff: pl.Expr = y_col.diff()/x_col.diff() x_diff: pl.Expr = x_col.diff().backward_fill() y_diff: pl.Expr = pl.coalesce( pl.when(y_col.is_null().or_(y_col.is_nan())) .then(a_diff.forward_fill()*x_diff) .otherwise(pl.lit(0)).cum_sum(), pl.when(y_col.is_null().or_(y_col.is_nan())) .then(-a_diff.backward_fill()*x_diff) .otherwise(pl.lit(0)).cum_sum(reverse=True) ) return y_col.backward_fill().forward_fill() + y_diff
[docs] def linear_interpolation_using_cols( df: pl.DataFrame, x_col: str, y_col: Union[list[str], str] ) -> pl.DataFrame: """ Perform linear interpolation on specified columns of a DataFrame. Args: df (pl.DataFrame): The DataFrame containing the data. x_col (str): The name of the x-axis column. y_col (Union[list[str], str]): The name(s) of the y-axis column(s) to interpolate. Returns: pl.DataFrame: The DataFrame with interpolated y-axis columns. """ df = df.sort(x_col) x = df[x_col].to_numpy() if isinstance(y_col, str): y_col = [y_col] for col in y_col: y = df[col].to_numpy() mask = ~np.isnan(y) df = df.with_columns( pl.Series(np.interp(x, x[mask], y[mask], left=np.nan, right=np.nan)).fill_nan(None).alias(col) ).with_columns( linear_interpolation_for_bound(x_col=c(x_col), y_col=c(col)).alias(col) ) return df