from typing import Optional, Union
from shapely import LineString, from_wkt, buffer, intersects, union_all, Geometry, extract_unique_points
from shapely.ops import nearest_points
from shapely.geometry import MultiPolygon, Polygon, MultiPoint, Point, LineString, shape, MultiLineString
import numpy as np
from itertools import batched
import polars as pl
from polars import col as c
from pyproj import CRS, Transformer
from shapely_function import (
shape_to_geoalchemy2, geoalchemy2_to_shape, point_list_to_linestring, shape_coordinate_transformer,
get_multipoint_from_wkt_list, get_multilinestring_from_wkt_list, get_nearest_point_within_distance,
move_geometry
)
[docs]
def get_coordinates_list_from_col(df: pl.DataFrame, col_name: str = "geometry") -> list[tuple[float, float]]:
"""
Extract a list of coordinates from a specified column containing geometric data.
Args:
df (pl.DataFrame): The DataFrame to process.
col_name (str): The name of the column containing geometric data.
Returns:
list: A list of coordinates extracted from the specified column.
"""
point_list: list[Point] = list(extract_unique_points(get_multigeometry_from_col(df=df, col_name=col_name)).geoms)
return list(map(lambda point: (point.x, point.y), point_list))
[docs]
def get_coordinates_col(col: pl.Expr) -> pl.Expr:
"""
Add a new column to the DataFrame with coordinates extracted from a specified column containing geometric data.
Args:
col (pl.Expr): The column containing geometric data.
Returns:
polars.DataFrame: The DataFrame with an additional column of coordinates.
"""
return col.pipe(wkt_to_shape_col).map_elements(lambda x: list(x.coords), return_dtype=pl.List(pl.List(pl.Float64)))
[docs]
def get_nearest_point_within_distance_col(point: pl.Expr, point_list: MultiPoint, min_distance: float) -> pl.Expr:
"""
Find the nearest point within a specified minimum distance from each point in a column containing geometric data.
Args:
point (pl.Expr): The expression representing the column containing the reference points.
point_list (MultiPoint): The MultiPoint geometry containing the points to search.
min_distance (float): The minimum distance to search for the nearest point.
Returns:
pl.Expr: An expression with the nearest points within the specified minimum distance.
Examples:
~~~~~~~~~~
>>> data = {'geometry': [Point(0, 0).wkt, Point(2, 2).wkt, Point(4, 4).wkt]}
... df = pl.DataFrame(data)
... multi_point = MultiPoint([Point(1, 1), Point(10, 1), Point(2, 1)])
... df.with_column(get_nearest_point_within_distance_col(pl.col('geometry'), multi_point, 2).alias('nearest_point'))
shape: (3, 2)
┌─────────────────┬──────────────────┐
│ geometry ┆ nearest_point │
│ --- ┆ --- │
│ str ┆ str │
╞═════════════════╪══════════════════╡
│ POINT (0 0) ┆ POINT (1 1) │
│ POINT (2 2) ┆ POINT (2 1) │
│ POINT (4 4) ┆ None │
└─────────────────┴──────────────────┘
"""
return (
point
.pipe(wkt_to_shape_col)
.map_elements(
lambda x: get_nearest_point_within_distance(point=x, point_list=point_list, min_distance=min_distance),
return_dtype=pl.Utf8)
)
[docs]
def get_multipoint_from_wkt_list_col(point_list: pl.Expr) -> pl.Expr:
"""
Convert a column containing WKT representations of lines into a Multipoint geometry.
Args:
point_list (pl.Expr): The expression representing the column containing list of Point.
Returns:
pl.Expr: An expression with MultiPoint geometries.
"""
return point_list.map_elements(get_multipoint_from_wkt_list, pl.Object)
[docs]
def get_multilinestring_from_wkt_list_col(linestring_list: pl.Expr) -> pl.Expr:
"""
Convert a column containing WKT representations of lines into a MultiLineString geometry.
Args:
linestring_list (pl.Expr): The expression representing the column containing list of linestring.
Returns:
pl.Expr: An expression with MultiLineString geometries.
"""
return linestring_list.map_elements(get_multilinestring_from_wkt_list, pl.Object)
[docs]
def get_point_list_centroid_col(point_list: pl.Expr) -> pl.Expr:
"""
Calculate the centroid of a list of points from a specified column containing geometric data.
Args:
point_list (pl.Expr): The expression representing the column containing the list of points.
Returns:
pl.Expr: An expression with the centroid points.
Example:
>>> data = {'points': [Point(0, 0).wkt, Point(1, 1).wkt, Point(2, 2).wkt]}
... df = pl.DataFrame(data)
... df.with_column(get_point_list_centroid_col(pl.col('points')).alias('centroid'))
shape: (1, 1)
┌────────────────┐
│ centroid │
│ --- │
│ str │
╞════════════════╡
│ POINT (1 1) │
└────────────────┘
"""
return (
point_list.pipe(get_multipoint_from_wkt_list_col)
.map_elements(lambda x: x.centroid.wkt, return_dtype=pl.Utf8)
)
[docs]
def generate_linestring_from_nearest_points_col(point: pl.Expr, multi_point: MultiPoint):
"""
Generate a LineString geometry from the nearest points within a specified distance in a column containing geometric data.
Args:
point (pl.Expr): The expression representing the column containing the reference points.
multi_point (MultiPoint): The MultiPoint geometry containing the points to search.
Returns:
pl.Expr: An expression with LineString geometries created from the nearest points.
Example:
>>> data = {'geometry': [Point(0, 0).wkt, Point(2, 2).wkt]}
... df = pl.DataFrame(data)
... multi_point = MultiPoint([Point(0, 1), Point(1, 1), Point(2, 1)])
... df.with_column(
... c(geometry).pipe(generate_linestring_from_nearest_points_col, multi_point=multi_point)
... .alias('linestring')
... )
shape: (3, 2)
┌──────────────────┬─────────────────────────┐
│ geometry ┆ linestring │
│ --- ┆ --- │
│ str ┆ str │
╞══════════════════╪═════════════════════════╡
│ POINT (0 0) ┆ LINESTRING (0 0, 0 1) │
│ POINT (2 2) ┆ LINESTRING (2 2, 2 1) │
└──────────────────┴─────────────────────────┘
"""
return (
point.pipe(wkt_to_shape_col)
.map_elements(lambda x: LineString(nearest_points(x, multi_point)).wkt, return_dtype=pl.Utf8)
)
[docs]
def linestring_is_ring_col(linestring: pl.Expr) -> pl.Expr:
"""
Check if the LineString geometries in a specified column form a closed ring.
Args:
linestring (pl.Expr): The expression representing the column containing LineString geometries.
Returns:
pl.Expr: An expression indicating whether each LineString is a closed ring.
Example:
>>> data = {
... 'geometry': [
... LineString([(0, 0), (1, 1), (1, 0), (0, 0)]).wkt,
... LineString([(0, 0), (1, 1), (1, 0)]).wkt
... ]}
... df = pl.DataFrame(data)
... df.with_column(linestring_is_ring_col(pl.col('geometry')).alias('is_ring'))
shape: (2, 2)
┌───────────────────────────────────────┬──────────┐
│ geometry ┆ is_ring │
│ --- ┆ --- │
│ str ┆ bool │
╞═══════════════════════════════════════╪══════════╡
│ LINESTRING (0 0, 1 1, 1 0, 0 0) ┆ true │
│ LINESTRING (0 0, 1 1, 1 0) ┆ false │
└───────────────────────────────────────┴──────────┘
"""
return linestring.pipe(wkt_to_shape_col).map_elements(lambda x: x.is_closed, return_dtype=pl.Boolean)
[docs]
def shape_intersect_shape_col(geo_str: pl.Expr, geometry: Geometry) -> pl.Expr:
"""
Check if geometries in a Polars expression intersect with a given polygon.
Args:
geo_str (pl.Expr): The Polars expression containing geometries in WKT format.
polygon (Polygon): The polygon to check for intersection.
Returns:
pl.Expr: A Polars expression with boolean values indicating intersection.
"""
return geo_str.pipe(wkt_to_shape_col).map_elements(lambda x: intersects(x, geometry), return_dtype=pl.Boolean)
[docs]
def shape_intersect_polygon(geo_str: pl.Expr, polygon: Polygon) -> pl.Expr:
"""
Check if geometries in a Polars expression intersect with a given polygon.
Args:
geo_str (pl.Expr): The Polars expression containing geometries in WKT format.
polygon (Polygon): The polygon to check for intersection.
Returns:
pl.Expr: A Polars expression with boolean values indicating intersection.
"""
return geo_str.pipe(wkt_to_shape_col).map_elements(lambda x: intersects(x, polygon), return_dtype=pl.Boolean)
[docs]
def get_linestring_boundaries_col(line_str: pl.Expr) -> pl.Expr:
"""
Get the boundary nodes of geometries in a Polars expression.
Args:
line_str (pl.Expr): The Polars expression containing linestring in WKT format.
Returns:
pl.Expr: A Polars expression with lists of boundary nodes in WKT format.
"""
return (
line_str.pipe(wkt_to_shape_col).map_elements(
lambda x: list(map(lambda point: point.wkt, x.boundary.geoms)), return_dtype=pl.List(pl.Utf8))
)
[docs]
def get_geometry_list(df: pl.DataFrame, col_name: str = "geometry") -> list[Union[Point, LineString, Polygon]]:
"""
Get a list of geometries from a Polars DataFrame.
Args:
df (pl.DataFrame): The Polars DataFrame.
col_name (str, optional): The name of the geometry column. Defaults to "geometry".
Returns:
list[Union[Point, LineString, Polygon]]: The list of geometries.
"""
return df.select(c(col_name).pipe(wkt_to_shape_col))[col_name].to_list()
[docs]
def get_multigeometry_from_col(
df: pl.DataFrame, col_name: str = "geometry"
) -> Union[MultiPoint, MultiLineString, MultiPolygon]:
"""
Get a MultiGeometry object from a Polars DataFrame column.
Args:
df (pl.DataFrame): The Polars DataFrame.
col_name (str, optional): The name of the geometry column. Defaults to "geometry".
Returns:
Union[MultiPoint, MultiLineString, MultiPolygon]: The MultiGeometry object.
"""
geo_list: list[Union[Point, LineString, Polygon]] = get_geometry_list(df=df, col_name=col_name)
if isinstance(geo_list[0], Point):
return MultiPoint(geo_list) # type: ignore
elif isinstance(geo_list[0], LineString):
return MultiLineString(geo_list) # type: ignore
else:
return MultiPolygon(geo_list) # type: ignore
[docs]
def add_buffer(geo_str: pl.Expr, buffer_size: float) -> pl.Expr:
"""
Add a buffer to geometries in a Polars expression.
Args:
geo_str (pl.Expr): The Polars expression containing geometries in WKT format.
buffer_size (float): The buffer size.
Returns:
pl.Expr: A Polars expression with buffered geometries in WKT format.
"""
return geo_str.pipe(wkt_to_shape_col).map_elements(lambda x: buffer(x, buffer_size).wkt, return_dtype=pl.Utf8)
[docs]
def calculate_line_length(line_str: pl.Expr) -> pl.Expr:
"""
Calculate the length of LineString geometries in a Polars expression.
Args:
line_str (pl.Expr): The Polars expression containing LineString geometries in WKT format.
Returns:
pl.Expr: A Polars expression with the lengths of the LineString geometries.
"""
return line_str.pipe(wkt_to_shape_col).map_elements(lambda x: x.length, return_dtype=pl.Float64)
[docs]
def generate_point_from_coordinates(x: pl.Expr, y: pl.Expr) -> pl.Expr:
"""
Generate Point geometries from x and y coordinates in Polars expressions.
Args:
x (pl.Expr): The Polars expression containing x coordinates.
y (pl.Expr): The Polars expression containing y coordinates.
Returns:
pl.Expr: A Polars expression with Point geometries in WKT format.
"""
return (
pl.concat_list([x, y]).map_elements(lambda coord: Point(*coord).wkt, return_dtype=pl.Utf8)
)
[docs]
def generate_linestring_from_coordinates_list(coord_list: pl.Expr) -> pl.Expr:
"""
Generate LineString geometries from coordinate lists in a Polars expression.
Args:
coord_list (pl.Expr): The Polars expression containing coordinate lists.
Returns:
pl.Expr: A Polars expression with LineString geometries.
"""
return (
coord_list.map_elements(lambda x: LineString(batched(x, 2)).wkt, return_dtype=pl.Utf8)
)
[docs]
def get_linestring_from_point_list(point_list_str: pl.Expr) -> pl.Expr:
"""
Generate LineString geometries from point lists in a Polars expression.
Args:
point_list_str (pl.Expr): The Polars expression containing point lists in WKT format.
Returns:
pl.Expr: A Polars expression with LineString geometries in WKT format.
"""
return point_list_str.map_elements(point_list_to_linestring, return_dtype=pl.Utf8)
[docs]
def combine_shape(geometry_list_str: pl.Expr) -> pl.Expr:
"""
Combine multiple geometries into a single geometry in a Polars expression.
Args:
geometry_list_str (pl.Expr): The Polars expression containing geometry lists in WKT format.
Returns:
pl.Expr: A Polars expression with combined geometries in WKT format.
"""
return geometry_list_str.map_elements(lambda x: union_all(list(map(from_wkt, x))).wkt, return_dtype=pl.Utf8)
[docs]
def shape_to_wkt_col(geometry: pl.Expr) -> pl.Expr:
"""
Convert geometries in a Polars expression to WKT format.
Args:
geometry (pl.Expr): The Polars expression containing geometries.
Returns:
pl.Expr: A Polars expression with geometries in WKT format.
"""
return geometry.map_elements(lambda x: x.wkt, return_dtype=pl.Utf8)
[docs]
def wkt_to_shape_col(geometry: pl.Expr) -> pl.Expr:
"""
Convert WKT strings in a Polars expression to geometries.
Args:
geometry (pl.Expr): The Polars expression containing WKT strings.
Returns:
pl.Expr: A Polars expression with geometries.
"""
return geometry.map_elements(from_wkt, return_dtype=pl.Object)
[docs]
def geojson_to_wkt_col(geometry: pl.Expr) -> pl.Expr:
"""
Convert GeoJSON strings in a Polars expression to WKT format.
Args:
geometry (pl.Expr): The Polars expression containing GeoJSON strings.
Returns:
pl.Expr: A Polars expression with geometries in WKT format.
"""
return geometry.map_elements(lambda x: shape(x).wkt, return_dtype=pl.Utf8)
[docs]
def shape_to_geoalchemy2_col(geo: pl.Expr) -> pl.Expr:
"""
Convert geometries in a Polars expression to GeoAlchemy2 format.
Args:
geo (pl.Expr): The Polars expression containing geometries.
Returns:
pl.Expr: A Polars expression with geometries in GeoAlchemy2 format.
"""
return geo.map_elements(shape_to_geoalchemy2, return_dtype=pl.Utf8)
[docs]
def geoalchemy2_to_shape_col(geo_str: pl.Expr) -> pl.Expr:
"""
Convert GeoAlchemy2 strings in a Polars expression to geometries.
Args:
geo_str (pl.Expr): The Polars expression containing GeoAlchemy2 strings.
Returns:
pl.Expr: A Polars expression with geometries.
"""
return geo_str.map_elements(geoalchemy2_to_shape, return_dtype=pl.Object)
[docs]
def wkt_to_geoalchemy_col(geo_str: pl.Expr, srid_from: Optional[int], srid_to: Optional[int]) -> pl.Expr:
"""
Convert WKT strings in a Polars expression to GeoAlchemy2 format.
Args:
geo_str (pl.Expr): The Polars expression containing WKT strings.
srid_from (Optional[int]): The source spatial reference system identifier.
srid_to (Optional[int]): The target spatial reference system identifier.
Returns:
pl.Expr: A Polars expression with geometries in GeoAlchemy2 format.
Raises:
ValueError: If only one of srid_from or srid_to is provided.
"""
if (srid_from is None) and (srid_to is None):
return (
geo_str.pipe(wkt_to_shape_col)
.pipe(shape_to_geoalchemy2_col)
)
elif (srid_from is not None) and (srid_to is not None):
return (
geo_str
.pipe(wkt_to_shape_col)
.pipe(shape_coordinate_transformer_col, srid_from=srid_from, srid_to=srid_to)
.pipe(shape_to_geoalchemy2_col)
)
else:
raise ValueError("Both srid_from and srid_to must be provided or None.")
[docs]
def geoalchemy2_to_wkt_col(geo_str: pl.Expr, srid_from: Optional[int], srid_to: Optional[int]) -> pl.Expr:
"""
Convert GeoAlchemy2 strings in a Polars expression to WKT format.
Args:
geo_str (pl.Expr): The Polars expression containing GeoAlchemy2 strings.
srid_from (Optional[int]): The source spatial reference system identifier.
srid_to (Optional[int]): The target spatial reference system identifier.
Returns:
pl.Expr: A Polars expression with geometries in WKT format.
Raises:
ValueError: If only one of srid_from or srid_to is provided.
"""
if (srid_from is None) and (srid_to is None):
return (
geo_str
.pipe(geoalchemy2_to_shape_col)
.pipe(shape_to_wkt_col)
)
elif (srid_from is not None) and (srid_to is not None):
return (
geo_str
.pipe(geoalchemy2_to_shape_col)
.pipe(shape_coordinate_transformer_col, srid_from=srid_from, srid_to=srid_to)
.pipe(shape_to_wkt_col)
)
else:
raise ValueError("Both srid_from and srid_to must be provided or None.")
[docs]
def move_geometry_col(geometry: pl.Expr, angle: pl.Expr, distance: pl.Expr) -> pl.Expr:
return (
pl.struct(angle.alias("angle"), geometry.alias("geometry"), distance.alias("distance"))
.map_elements(lambda x: move_geometry(x), return_dtype=pl.Utf8)
)