from ast import List
from operator import ge
import os
import json
from itertools import chain
from copy import deepcopy
import re
from typing import Optional, Union
from shapely import (
Geometry, LineString, from_wkt, intersection, distance, buffer, intersects, convex_hull, reverse,
extract_unique_points, line_merge, intersection_all, is_simple)
from shapely import transform as sh_transform
from shapely.ops import nearest_points, split, snap, linemerge, transform, substring
from shapely.geometry import MultiPolygon, Polygon, MultiPoint, Point, LineString, shape, MultiLineString
from shapely.prepared import prep
import numpy as np
import networkx as nx
from pyproj import CRS, Transformer
from geoalchemy2.shape import from_shape, to_shape
from geoalchemy2.elements import WKBElement
from math import ceil
SWISS_SRID = 2056
GPS_SRID = 4326
[docs]
def get_point_side(line: LineString, point: Point) -> Optional[str]:
"""
Determine which side of a line a point is on.
Args:
point (Point): The point to check.
line (LineString): The line to check against.
Returns:
str: `start` if the point is to the beginning of the line, 1 if to the right,
and `end` if it is at the end.
"""
boundaries = list(line.boundary.geoms)
if point == boundaries[0]:
return "start"
elif point == boundaries[1]:
return "end"
else:
return None
[docs]
def get_nearest_point_within_distance(point: Point, point_list: MultiPoint, min_distance: float) -> Optional[str]:
"""
Find the nearest point within a specified distance from a given point.
Args:
point (Point): The reference point.
point_list (MultiPoint): The list of points to search.
min_distance (float): The minimum distance to search for the nearest point.
Returns:
str or None: The nearest point in wkt format within the specified distance, or None if no
point is found.
"""
nearest_points_list = nearest_points(point_list, point)
if distance(*nearest_points_list) < min_distance:
return nearest_points_list[0].wkt
return None
[docs]
def get_point_list_centroid(point_list: list[Point]) -> Point:
"""
Calculate the centroid of a list of points.
Args:
points (Point): The list of points.
Returns:
Point: The centroid of the list of points.
"""
return MultiPoint(point_list).centroid
[docs]
def get_multipoint_from_wkt_list(point_list: list[str]) -> MultiPoint:
"""
Convert a list of WKT representations of points into a MultiPoint geometry.
Args:
point_list (list[str])): The list of WKT points strings.
Returns:
MultiPoint: The MultiPoint geometry.
"""
return MultiPoint(list(map(from_wkt, point_list))) # type: ignore
[docs]
def get_multilinestring_from_wkt_list(linestring_list: list[str]) -> MultiPoint:
"""
Convert a list of WKT representations of linestring into a MultiLineString geometry.
Args:
linestring_list (list[str]): The list of WKT linestring strings.
Returns:
MultiLineString: The MultiLineString geometry.
"""
return MultiLineString(list(map(from_wkt, linestring_list))) # type: ignore
[docs]
def get_multipolygon_from_wkt_list(polygon_list: list[str]) -> MultiPolygon:
"""
Convert a list of WKT representations of polygons into a MultiPolygon geometry.
Args:
polygon_list (list[str])): The list of WKT polygons strings.
Returns:
MultiPolygon: The MultiPolygon geometry.
"""
return MultiPolygon(list(map(from_wkt, polygon_list))) # type: ignore
[docs]
def point_list_to_linestring(point_list_str: list[str]) -> str:
"""
Convert a list of WKT point strings to a WKT LineString.
Args:
point_list_str (list[str]): The list of WKT point strings.
Returns:
str: The WKT LineString.
"""
return LineString(list(map(from_wkt, point_list_str))).wkt # type: ignore
[docs]
def get_polygon_multipoint_intersection(polygon_str: str, multipoint: MultiPoint) -> Optional[list[str]]:
"""
Get the intersection points between a polygon and a multipoint.
Args:
polygon_str (str): The WKT string of the polygon.
multipoint (MultiPoint): The MultiPoint object.
Returns:
Optional[list[str]]: The list of WKT strings representing the intersection points.
"""
point_shape: Geometry = intersection(from_wkt(polygon_str), multipoint)
if isinstance(point_shape, MultiPoint):
return list(map(lambda x: x.wkt, point_shape.geoms))
if isinstance(point_shape, Point):
if point_shape.is_empty:
return []
return [point_shape.wkt]
return []
[docs]
def find_closest_node_from_list(data: dict, node_name: str, node_list_name: str) -> Optional[str]:
"""
Find the closest node ID from a given node ID geometry mapping.
Args:
data (dict): The data dictionary containing node information.
node_name (str): The key for the node ID.
node_list_name (str): The key for the list of node IDs.
Returns:
Optional[str]: The closest node ID.
"""
if data["node_id"] is None:
return None
if len(data[node_list_name]) == 0:
return None
if len(data[node_list_name]) == 1:
return data[node_list_name][0]
if len(data[node_list_name]) > 1:
return min(data[node_list_name], key=lambda x: from_wkt(data[node_name]).distance(from_wkt(x)))
return None
[docs]
def explode_multipolygon(geometry_str: str) -> list[Polygon]:
"""
Explode a MultiPolygon WKT string into a list of Polygon objects.
Args:
geometry_str (str): The WKT string of the MultiPolygon.
Returns:
list[Polygon]: The list of Polygon objects.
"""
geometry_shape: Geometry = from_wkt(geometry_str)
if isinstance(geometry_shape, Polygon):
return [geometry_shape]
if isinstance(geometry_shape, MultiPolygon):
return list(geometry_shape.geoms)
return []
[docs]
def geoalchemy2_to_shape(geo_str: str) -> Geometry:
"""
Convert a GeoAlchemy2 WKBElement string to a Shapely Geometry object.
Args:
geo_str (str): The GeoAlchemy2 WKBElement string.
Returns:
Geometry: The Shapely Geometry object.
"""
return to_shape(WKBElement(str(geo_str)))
[docs]
def shape_to_geoalchemy2(geo: Geometry, srid: int = GPS_SRID) -> str:
"""
Convert a Shapely Geometry object to a GeoAlchemy2 WKBElement string.
Args:
geo (Geometry): The Shapely Geometry object.
srid (int, optional): The spatial reference system identifier. Defaults to GPS_SRID = 4326.
Returns:
str: The GeoAlchemy2 WKBElement string.
"""
if isinstance(geo, Geometry):
return from_shape(geo, srid=srid).desc
return None
[docs]
def get_closest_point_from_multi_point(geo_str: str, multi_point: MultiPoint, max_distance: float=100) -> Optional[str]:
"""
Find the closest point within a maximum distance from a given geometry WKT string.
Args:
geo_str (str): The WKT string of the geometry.
multi_point (MultiPoint): The MultiPoint object.
max_distance (float, optional): The maximum distance. Defaults to 100.
Returns:
Optional[str]: The WKT string of the closest point.
"""
geo = from_wkt(geo_str)
_, closest_point = nearest_points(geo, multi_point)
if distance(geo, closest_point) < max_distance:
return closest_point.wkt
return None
[docs]
def remove_z_coordinates(geom: Geometry)->Geometry:
"""
Remove the Z coordinates from a geometry.
Args:
geom (Geometry): The Shapely Geometry object.
Returns:
Geometry: The Shapely Geometry object without Z coordinates.
"""
return transform(lambda x, y, z=None: (x, y), geom)
[docs]
def get_valid_polygon_str(polygon_str: dict) -> str:
"""
Get a valid polygon WKT string from a dictionary.
Args:
polygon_str (dict): The dictionary containing polygon information.
Returns:
str: The valid polygon WKT string.
"""
polygon: Polygon = list(polygon_str.wkt.geoms)[0] # type: ignore
if polygon.is_valid:
return polygon.wkt
return polygon.convex_hull.wkt
[docs]
def grid_bounds(geom, delta):
"""
Generate a grid of polygons within the bounds of a geometry.
Args:
geom (Geometry): The Shapely Geometry object.
delta (float): The grid cell size.
Returns:
list[Polygon]: The list of grid polygons.
"""
minx, miny, maxx, maxy = geom.bounds
nx = int(ceil((maxx - minx)/delta)) + 1
ny = int(ceil((maxy - miny)/delta)) + 1
gx, gy = np.linspace(minx,maxx,nx), np.linspace(miny,maxy,ny)
grid = []
for i in range(len(gx) - 1):
for j in range(len(gy) - 1):
poly_ij = Polygon([[gx[i],gy[j]],[gx[i],gy[j+1]],[gx[i+1],gy[j+1]],[gx[i+1],gy[j]]])
grid.append( poly_ij )
return grid
[docs]
def partition(geom: Polygon, delta: float) -> list:
"""
Partition a polygon into smaller polygons based on a grid.
Args:
geom (Polygon): The Shapely Polygon object.
delta (float): The grid cell size.
Returns:
list[Polygon]: The list of partitioned polygons.
"""
prepared_geom = prep(geom)
grid = list(filter(prepared_geom.intersects, grid_bounds(geom, delta)))
return grid
[docs]
def generate_valid_polygon(multipolygon_str: str) -> Optional[Union[Polygon, MultiPolygon]]:
"""
Generate a valid polygon from a MultiPolygon WKT string.
Args:
multipolygon_str (str): The WKT string of the MultiPolygon.
Returns:
Optional[Polygon]: The valid polygon.
"""
shape: Geometry = from_wkt(multipolygon_str)
if isinstance(shape, MultiPolygon):
return MultiPolygon(list(
map(
lambda x: x if x.is_valid
else x.convex_hull,
shape.geoms
)) # type: ignore
)
elif isinstance(shape, Polygon):
return shape if shape.is_valid else convex_hull(shape) # type: ignore
else:
return None
[docs]
def load_shape_from_geo_json(
file_name: str, srid_from: Optional[str] = None, srid_to: Optional[str]= None
) -> Geometry:
"""
Load a shape from a GeoJSON file and optionally transform its coordinates.
Args:
file_name (str): The path to the GeoJSON file.
srid_from (Optional[str], optional): The source spatial reference system identifier. Defaults to None.
srid_to (Optional[str], optional): The target spatial reference system identifier. Defaults to None.
Returns:
Geometry: The loaded shape.
Raises:
FileNotFoundError: If the GeoJSON file is not found.
ValueError: If only one of srid_from or srid_to is provided.
"""
if not os.path.exists(file_name):
raise FileNotFoundError(f"File {file_name} not found")
with open(file_name) as f:
loading_shape = json.load(f)
geo_shape: Geometry = shape(loading_shape["features"][0]["geometry"])
if (srid_from is None) | (srid_to is None):
return geo_shape
elif (srid_from is not None) and (srid_to is not None):
return shape_coordinate_transformer(geo_shape, srid_from=srid_from, srid_to=srid_to) # type: ignore
else:
raise ValueError("Both srid_from and srid_to must be provided or None.")
[docs]
def segment_list_from_multilinestring(multi_linestring: MultiLineString) -> list[LineString]:
"""
Generate a list of LineString segments from a MultiLineString.
Args:
multi_linestring (MultiLineString): The MultiLineString object.
Returns:
list[LineString]: The list of LineString segments.
"""
segments: MultiLineString = intersection(multi_linestring, multi_linestring) # type: ignore
return list(segments.geoms) # type: ignore
[docs]
def shape_list_to_wkt_list(shape_list: list[Geometry]) -> list[str]:
"""
Convert a list of Shapely Geometry objects to a list of WKT strings.
Args:
shape_list (list[Geometry]): The list of Shapely Geometry objects.
Returns:
list[str]: The list of WKT strings.
"""
return list(map(lambda x: x.wkt, shape_list)) # type: ignore
[docs]
def wkt_list_to_shape_list(str_list: list[str]) -> list[Geometry]:
"""
Convert a list of Shapely Geometry objects to a list of WKT strings.
Args:
shape_list (list[Geometry]): The list of Shapely Geometry objects.
Returns:
list[str]: The list of WKT strings.
"""
return list(map(from_wkt, str_list)) # type: ignore
[docs]
def multipoint_from_multilinestring(multilinestring: MultiLineString) -> MultiPoint:
"""
Generate a MultiPoint from a MultiLineString by extracting unique points.
Args:
multilinestring (MultiLineString): The MultiLineString object.
Returns:
MultiPoint: The MultiPoint object containing unique points from the MultiLineString.
"""
return MultiPoint(extract_unique_points(multilinestring))
[docs]
def move_geometry(data: dict) -> str:
return (
sh_transform(
geometry = from_wkt(data["geometry"]),
transformation=lambda x: x + np.array([np.cos(-data["angle"]), np.sin(-data["angle"])])*data["distance"], # type: ignore
).wkt
)
[docs]
def merge_multilinestring_creating_missing_segments(
mls: Optional[Union[MultiLineString, LineString]]) -> Optional[LineString]:
"""
Merge a MultiLineString into a single LineString, creating missing segments if necessary.
Args:
mls (Union[MultiLineString, LineString]): The MultiLineString or LineString to merge.
Returns:
LineString: The merged LineString.
Raises:
ValueError: If the merging process fails.
"""
if isinstance(mls, LineString):
return mls
if mls is None:
return None
multipoint_list= []
for line in mls.geoms:
multipoint_list.append(list(line.boundary.geoms))
new_segment = []
for i, multipoint in enumerate(multipoint_list):
remaining_list = deepcopy(multipoint_list)
remaining_list.pop(i)
new_nearest_points = nearest_points(g1=MultiPoint(multipoint), g2=MultiPoint(list(chain(*remaining_list))))
new_segment.append(LineString(sorted(LineString(new_nearest_points).coords)))
new_linestring = line_merge(MultiLineString(list(mls.geoms) + list(set(new_segment))))
# print(new_linestring)
if isinstance(new_linestring, MultiLineString):
new_linestring= merge_multilinestring_creating_missing_segments(new_linestring)
if isinstance(new_linestring, LineString):
return new_linestring
raise ValueError("Error in merge_multilinestring_creating_missing_segments")
[docs]
def linestring_splitter(linestring_str: Optional[str], nb_split: int) -> Optional[list[str]]:
"""
Split LineString in WKT format into a list of LineString with the same length.
Args:
linestring_str (Optional]): The LineString to split in in WKT format.
nb_split (int): The number of segments to split the LineString into.
Returns:
List[str]: The list of the splitted LineString.
"""
if linestring_str is None:
return None
line: Geometry = from_wkt(linestring_str)
if not isinstance(line, LineString):
return None
sub_line: list[str] = list(map(
lambda x :
substring(line, start_dist=x*line.length/nb_split, end_dist=(x+1)*line.length/nb_split).wkt, range(nb_split)
))
return sub_line
[docs]
def get_geometry_list_intersection(geometry_list: list) -> Optional[str]:
"""
Get the intersection of a list of geometries in WKT format.
Args:
geometry_list (list): The list of geometries in WKT format.
Returns:
str: The WKT string of the intersection of the geometries.
"""
multi_line= get_multilinestring_from_wkt_list(geometry_list)
multi_line = intersection_all(multi_line.geoms) # type: ignore
if isinstance(multi_line, MultiLineString):
multi_line = linemerge(multi_line)
if multi_line.is_empty:
return None
return multi_line.wkt
[docs]
def remove_linestring_circle_from_multilinestring(shape_str: str) -> str:
"""
Remove the circle in the multilinestring and merge results
Args:
shape_str (str): WKT representation of the geometry
Returns:
str: WKT representation of the geometry without the circle
1. Convert the WKT string to a Shapely geometry object.
2. Check if the geometry is a MultiLineString.
3. If it is, filter out the circular linestrings (those that are rings).
4. Merge the remaining linestrings using the `line_merge` function.
5. Return the WKT representation of the merged geometry.
"""
multi_line: Geometry = from_wkt(shape_str)
if isinstance(multi_line, MultiLineString):
# print(MultiLineString(list(filter(lambda x: not x.is_ring, multi_line.geoms))))
return line_merge(MultiLineString(list(filter(lambda x: not x.is_ring, multi_line.geoms)))).wkt # type: ignore
else:
return shape_str
[docs]
def simplify_linestring(linestring: LineString):
"""
Simplify a self-intersecting LineString.
Args:
linestring (LineString): The LineString to simplify.
Returns:
LineString: The simplified LineString.
"""
if is_simple(linestring):
return linestring
coord_list = linestring.coords
nx_graph = nx.Graph()
nx_graph.add_edges_from(zip(coord_list[:-1], coord_list[1:]))
path = list(nx.shortest_path(G=nx_graph, source=coord_list[0], target=coord_list[-1]))
if len(path) > 1:
return LineString(path)
else:
return LineString(path*2)
[docs]
def simplify_multilinestring(multilinestring_str: str, from_point_str: str, to_point_str: str):
"""
Simplify a self-intersecting MultiLineString in format.
Args:
multilinestring (str): The MultiLineString to simplify.
from_point (str): The starting point of the path.
to_point (str): The ending point of the path.
Returns:
MultiLineString: The simplified MultiLineString.
"""
multilinestring = from_wkt(multilinestring_str)
from_point = from_wkt(from_point_str)
to_point = from_wkt(to_point_str)
if is_simple(multilinestring):
return multilinestring
if isinstance(multilinestring, LineString):
multilinestring = MultiLineString([multilinestring])
nx_graph = nx.Graph()
for coord_list in list(map(lambda x: list(x.coords), list(multilinestring.geoms))): # type: ignore
nx_graph.add_edges_from(zip(coord_list[:-1], coord_list[1:]))
path = list(nx.shortest_path(G=nx_graph, source=list(from_point.coords)[0], target=list(to_point.coords)[0]))
if len(path) > 1:
return LineString(path)
else:
return LineString(path*2)
[docs]
def force_linestring_direction(first_point_str: str, linestring_str: str) -> str:
"""
Check if the point is in the direction of the linestring
Args:
first_point_str (str): The point to check in WKT format
linestring_str (str): The linestring to check in WKT format
Returns:
str: The linestring in the right direction in WKT format
Raises:
ValueError: If the point is not in the linestring boundaries
"""
first_point: Point = from_wkt(first_point_str) # type: ignore
linestring: LineString = from_wkt(linestring_str) # type: ignore
if linestring.boundary.geoms[0] == first_point:
return linestring_str
elif linestring.boundary.geoms[1] == first_point:
return reverse(linestring).wkt
else:
print(f"from_geo {first_point_str} not in linsestring {linestring}")
return linestring_str