Source code for pymarine.utils.geographic

"""
Collection of functions dealing with geographical coordinates based on the *LatLon* module.
"""
import logging
import fastkml as kml
import numpy as np
import pandas as pd
from latloncalc.latlon import LatLon
from scipy.constants import nautical_mile


_logger = logging.getLogger(__name__)


[docs] class LocationCheck(object): """ Class to store a location in lat/lon and check if the distance of another location is out of range Parameters ---------- longitude : float Longitude of target location as a float latitude : float Latitude of target location as a float distance_deviation_allowed: float, optional Maximum allowed distance from initial location in km, Default = 10 km Notes ----- * Use this class to store a reference location and check later if the distance of another location is out of range given by the maximum `distance_deviation_allowed` input argument. * Class uses the LatLon package so distances are all in km """ def __init__(self, longitude, latitude, distance_deviation_allowed=10): self.target_location = LatLon(latitude, longitude) self.distance_deviation_allowed = float(distance_deviation_allowed) self.distance = None self.current_location = None
[docs] def out_of_range(self, current_latitude, current_longitude): """Check if the current location is outside the given distance from the target location Parameters ---------- current_latitude : float Latitude as a float of the current location current_longitude : float Longitude as a float of the current location Returns ------- bool True in case the current location is outside the distance from the target given by `distance_deviation_allowed` Examples -------- Initialise the location_check with a target location (52.37, 4.89). The we can use the method `out_of_range` to test if the current_location is out of range of the target location >>> location_check = LocationCheck(latitude=52.37, longitude=4.89, ... distance_deviation_allowed=10) >>> out_range = location_check.out_of_range(current_latitude=52.37, current_longitude=4.89) >>> print("Distance : {:.1f} km. Out of range : {}".format(location_check.distance, out_range)) Distance : 0.0 km. Out of range : False >>> out_range = location_check.out_of_range(current_latitude=52.31, current_longitude=4.84) >>> print("Distance : {:.1f} km. Out of range : {}".format(location_check.distance, out_range)) Distance : 7.5 km. Out of range : False >>> out_range = location_check.out_of_range(current_latitude=52.36, current_longitude=4.90) >>> print("Distance : {:.1f} km. Out of range : {}".format(location_check.distance, out_range)) Distance : 1.3 km. Out of range : False >>> out_range = location_check.out_of_range(current_latitude=52.31, current_longitude=4.77) >>> print("Distance : {:.1f} km. Out of range : {}".format(location_check.distance, out_range)) Distance : 10.6 km. Out of range : True Also check around the 0th and 180th meridians >>> greenwich = LocationCheck(latitude=51.48, longitude=-1, ... distance_deviation_allowed=10) >>> out_range = greenwich.out_of_range(current_latitude=51.48, current_longitude=1) >>> print("Distance : {:.1f} km. Out of range : {}".format(greenwich.distance, out_range)) Distance : 138.9 km. Out of range : True >>> out_range = greenwich.out_of_range(current_latitude=51.48, current_longitude=359) >>> print("Distance : {:.1f} km. Out of range : {}".format(greenwich.distance, out_range)) Distance : 0.0 km. Out of range : False >>> pacific_ocean = LocationCheck(latitude=0.0, longitude=179, ... distance_deviation_allowed=223) >>> out_range = pacific_ocean.out_of_range(current_latitude=0.0, current_longitude=-179) >>> print("Distance : {:.1f} km. Out of range : {}".format(pacific_ocean.distance, out_range)) Distance : 222.6 km. Out of range : False >>> out_range = pacific_ocean.out_of_range(current_latitude=0.0, current_longitude=181) >>> print("Distance : {:.1f} km. Out of range : {}".format(pacific_ocean.distance, out_range)) Distance : 222.6 km. Out of range : False Notes ----- The outcome of this routine has been checked with the Google Earth ruler """ self.current_location = LatLon(current_latitude, current_longitude) self.distance = self.current_location.distance(self.target_location) _logger.debug( "Distance forecast location {} with target location {}: {}" "".format(self.current_location, self.target_location, self.distance) ) if self.distance > self.distance_deviation_allowed: _logger.debug( "Distance {} out of range of {} with {} km" "".format(self.distance, self.target_location, self.distance) ) out_of_range = True else: out_of_range = False return out_of_range
[docs] def make_report(self): msg = "{:20s} : {} {}" _logger.info(msg.format("Target location set at", self.target_location, "")) _logger.info( msg.format( "Maximum distance deviation allowed", self.distance_deviation_allowed, " km", ) )
[docs] def get_speed_from_distance_and_time( trajectory, distance_name="travel_distance", speed_name="speed_sustained", datetime_name="DateTime", travel_time_name="travel_time", speed_max_clip=None, ): """Calculate the speed based on the travel distance and travel time. Parameters ---------- trajectory : dataframe Trajectory distance_name: str, optional Name of the distance column, Default value = "travel_distance" speed_name : str, optional Name of the column with the sustained speed, Default value = "speed_sustained" datetime_name: str, optional Name of the column with the date time, Default value = "DateTime" distance_name : str, optional Name of the column with the travel distance, Default value = "travel_distance travel_time_name : str, optional Name of the newly created column with the travel time, Default value = "travel_time" speed_max_clip : float, optional clip all values above this speed in knots. Default value = None, which means that the speed is not clipped Returns ------- DataFrame Update of the input DataFrame `trajectory` with an extra column named `speed_name` containing the calculated speed Raises ------ KeyError: Raised in case no travel time is available Examples -------- First create a data frame with some travel distance at and interval of 3h over 12 hours. >>> data = pd.DataFrame(index=pd.date_range(start="20160101", end="20160101T120000", freq="3h")) >>> data["travel_distance"] = np.linspace(start=0, stop=60, num=data.index.size, endpoint=True) Now calculate the speed based on the travel distance change between each time step >>> data = get_speed_from_distance_and_time(data) >>> data travel_distance travel_time speed_sustained 2016-01-01 00:00:00 0.0 0.0 5.0 2016-01-01 03:00:00 15.0 3.0 5.0 2016-01-01 06:00:00 30.0 6.0 5.0 2016-01-01 09:00:00 45.0 9.0 5.0 2016-01-01 12:00:00 60.0 12.0 5.0 Notes ----- * The output of `travel_distance_and_heading_from_coordinates` can be used as an input for `get_speed_from_distance_and_time` See Also -------- travel_distance_and_heading_from coordinates : Calculate the travel distance from coordinates """ delete_datetime_column_at_end = False try: date_time = trajectory[datetime_name] except KeyError: # we failed to get the datetime from the column. This means we have to copy it to a column # first trajectory[datetime_name] = trajectory.index date_time = trajectory[datetime_name] delete_datetime_column_at_end = True # make sure we are dealing with date/time, not strings date_time = pd.Series([pd.Timestamp(t) for t in date_time]) time_in_hour = (date_time - date_time[0]) / pd.Timedelta("1 hour") delta_time = time_in_hour.diff().fillna(1).values trajectory[travel_time_name] = time_in_hour.values trajectory[speed_name] = trajectory[distance_name].diff().fillna(0) / delta_time # with the diff method we can not get the first position. Just assume it to be the same as the # next one trajectory.ix[0, speed_name] = trajectory.ix[1, speed_name] trajectory[speed_name].fillna(0, inplace=True) if speed_max_clip is not None: # clip the speed above the threshold value with nan and then interpolate between the missing # values trajectory.ix[trajectory[speed_name] > speed_max_clip, speed_name] = np.nan trajectory[speed_name].bfill(inplace=True) if delete_datetime_column_at_end: # only delete the date time column if we have created it internally trajectory.drop(datetime_name, inplace=True, axis=1) return trajectory
[docs] def travel_distance_and_heading_from_coordinates( db, latitude_name="GPS_LATITUDE", longitude_name="GPS_LONGITUDE", heading_name="HEADING", travel_distance_name="travel_distance", ): """ Calculate the travel distance and optionally the heading based on the latitude and longitude columns in the DataFrame `db` Parameters ---------- db : DataFrame Data base carrying all the data (including the latitude and longitude) latitude_name : str, optional Name of the latitude column (Default value = "GPS_LATITUDE") longitude_name : str, optional Name of the longitude column (Default value = "GPS_LONGITUDE") heading_name : str, optional The heading name. It is assumed that the heading entry already exist and that only the missing values are added based on the coordinate displacement (Default = "HEADING") travel_distance_name : str, optional name of the newly created column with the travel distance in nautical miles (Default value = "travel_distance") Returns ------- DataFrame DataFrame with an extra column named `travel_distance_name` containing the travel distance in nautical miles and optionally an extra column named `heading_name` with the headings. Examples -------- First create a data frame with some latitude longitude values at an 3 hour interval over 12 hour >>> data = pd.DataFrame(index=pd.date_range(start="20160101", end="20160101T120000", freq="3h")) >>> data["GPS_LATITUDE"] = np.linspace(start=55.4, stop=54.4, num=data.index.size) >>> data["GPS_LONGITUDE"] = np.linspace(start=3.34, stop=3.14, num=data.index.size) Now use the `travel_distance_and_heading_from_coordinates` function to add the travel distance and heading to the data frame. A new data frame will be copied to the output >>> data = travel_distance_and_heading_from_coordinates(data) >>> data GPS_LATITUDE GPS_LONGITUDE travel_distance HEADING 2016-01-01 00:00:00 55.40 3.34 0.000000 186.534194 2016-01-01 03:00:00 55.15 3.29 15.125795 186.574900 2016-01-01 06:00:00 54.90 3.24 30.252197 186.615478 2016-01-01 09:00:00 54.65 3.19 45.379211 186.655928 2016-01-01 12:00:00 54.40 3.14 60.506836 186.655928 Check the 180 meridian as well. Split `linspace` for the longitude in two parts in order to introduce the discontinuity in longitude when passing the 180-meridian >>> data = pd.DataFrame(index=pd.date_range(start="20160101", end="20160101T120000", freq="3h")) >>> data["GPS_LATITUDE"] = np.linspace(start=0.0, stop=0, num=data.index.size) >>> data["GPS_LONGITUDE"] = np.append(np.linspace(start=-179.0, stop=-180, num=data.index.size//2, ... endpoint=False), ... np.linspace(start=180.0, stop=179, num=data.index.size//2+1)) >>> data = travel_distance_and_heading_from_coordinates(data) >>> data GPS_LATITUDE GPS_LONGITUDE travel_distance HEADING 2016-01-01 00:00:00 0.0 -179.0 0.000000 270.0 2016-01-01 03:00:00 0.0 -179.5 30.053858 270.0 2016-01-01 06:00:00 0.0 180.0 60.107716 270.0 2016-01-01 09:00:00 0.0 179.5 90.161575 270.0 2016-01-01 12:00:00 0.0 179.0 120.215433 270.0 Notes ----- * The DataFrame `db` must have at least two columns containing the latitude and longitude. In case more columns are present this is not a problem: all columns will be copied the the output * Based on the latitude and longitude values, the travel distance and heading is calculated using the LatLon package * In case the heading field already exists, only the missing values will be updated. * The column names of the latitude, longitude, distance and headding can be defined via the arguments of the function. See Also -------- get_speed_from_distance_and_time : calculate the speed based on the distance and time import_way_points : import a list of coordinates from a kml file generated in Google Earth """ n_rows = db.index.size last_coordinates = None # create a list with the current distances (starting with 0 for the first location) delta_distance = [0] try: headings = db[heading_name].values except KeyError: headings = np.zeros(n_rows) else: # check if headings is not empty if True in db[heading_name].isnull(): _logger.debug("empty heading. filling it with zeros") headings = np.zeros(n_rows) _logger.debug("Start loop {} ".format(LatLon)) # loop over all the lines for i in range(n_rows): # get the coordinates from the GPS_LATITUDE and GPS_LONGITUDE columns and turn into a LatLon # object latitude = db[latitude_name].values[i] longitude = db[longitude_name].values[i] _logger.debug("Converting {} {} with {}".format(latitude, longitude, LatLon)) coordinates = LatLon(latitude, longitude) if last_coordinates is None: # for the first row, the travel distance =0, just store the coordinates last_coordinates = coordinates else: # for all other row, calculate the travel distance wro the last position. Convert to # miles try: displacement = coordinates.distance(last_coordinates) / ( nautical_mile / 1000.0 ) except ValueError: displacement = 0 # now store the displacement and the current position to last_position for the next row delta_distance.append(displacement) # heading from last coordinate to coordinate try: headings[i - 1] = last_coordinates.heading_initial(coordinates) headings[i] = headings[i - 1] except ValueError: headings[i] = np.nan # finally, update the last_coordinates with the current last_coordinates = coordinates # turn the displacement array into a cumulative sum to get the total travel distance db[travel_distance_name] = np.array(delta_distance).cumsum() db[heading_name] = headings db[heading_name].bfill(inplace=True) db[heading_name] = db[heading_name].mod(360) return db
[docs] def import_way_points( file_name, latitude_name="latitude", longitude_name="longitude", heading_name="heading", travel_distance_name="travel_distance", n_distance_points=2000, start_wp=None, ): """Import the way points of a Google Earth kml file. Parameters ---------- file_name : str Name of the google earth kml file to import latitude_name : str, optional Name of the latitudes column (Default value = "latitude") longitude_name : str, optional Name of the longitudes column (Default value = "longitude") heading_name : str, optional Name of the heading column (Default value = "heading") travel_distance_name : str, optional Name of the travel distance column (Default value = "travel_distance") n_distance_points : int, optional Number of points to use to interpolate the kml data (Default value = 2000) start_wp : int, optional If given, the first `start_wp` pins are skipped (Default value = None) Returns ------- DataFrame New data frame with the latitude and longitude coordinates. The travel distance is used for the index of the DataFrame Notes ----- * The kml input data can be made in Google Earth by putting a list of way points (yellow pin points) to the map and store this into one directory. Then, with the right mouse button on this directory you can choose "Save Place As" and pick the kml format. * If the `n_distance_points` is set to None, only the coordinates of the pin locations as found in the kml file are imported. * If `n_distance_points` is specified, an equidistant set of coordinates obtained from the interpolated values between the pin location in the kml file is returned. * In case that interpolation is used, make sure that there is at least one sample point in between the pin locations as defined in the kml file. Examples -------- First import the pure coordinates as defined at the pin point >>> data = import_way_points("../data/madeira_ivory.kml", n_distance_points=None) >>> data.info() <class 'pandas.core.frame.DataFrame'> Float64Index: 15 entries, 0.0 to 2519.33685128 Data columns (total 3 columns): latitude 15 non-null float64 longitude 15 non-null float64 heading 15 non-null float64 dtypes: float64(3) memory usage: 480.0 bytes >>> data["latitude"].head() travel_distance 0.000000 32.400000 51.353449 31.800000 103.761006 31.090513 280.375912 28.358815 540.155420 24.033664 Name: latitude, dtype: float64 >>> data["longitude"].head() travel_distance 0.000000 -17.180000 51.353449 -17.900000 103.761006 -18.498083 280.375912 -19.776735 540.155420 -20.206784 Name: longitude, dtype: float64 Do the same import but interpolate in between the pin locations on a regular grid >>> data = import_way_points("../data/madeira_ivory.kml", n_distance_points=2000) >>> data["latitude"].head() travel_distance 0.000000 32.400000 2.441572 32.371429 3.662502 32.357143 4.883529 32.342857 6.104652 32.328571 Name: latitude, dtype: float64 >>> data["longitude"].head() travel_distance 0.000000 -17.180000 2.441572 -17.214286 3.662502 -17.231429 4.883529 -17.248571 6.104652 -17.265714 Name: longitude, dtype: float64 """ # open the google earth kml file. Do it as binary in order to avoid decoding issues with open(file_name, "rb") as fp: kml_string = fp.read() latitudes = list() longitudes = list() # k_object = kml.KML() k_object.from_string(kml_string) for k_level1 in k_object.features(): for k_level2 in k_level1.features(): for cnt, k_level3 in enumerate(k_level2.features()): point = k_level3.geometry if start_wp is not None and cnt < start_wp: _logger.debug( "Skipping Way point {}: lat = {} lon = {}" "".format(cnt, point.x, point.y) ) continue longitudes.append(point.x) latitudes.append(point.y) _logger.debug( "Adding Way point {}: lat = {} lon = {}" "".format(cnt, latitudes[-1], longitudes[-1]) ) # create data frame out of way point coordinates and return df = pd.DataFrame( data=np.vstack((latitudes, longitudes)).T, columns=[latitude_name, longitude_name], ) # calculate the travel distance df = travel_distance_and_heading_from_coordinates( df, latitude_name=latitude_name, longitude_name=longitude_name, heading_name=heading_name, travel_distance_name=travel_distance_name, ) if n_distance_points is not None: df.set_index(travel_distance_name, inplace=True, drop=False) distances = np.linspace(0, df.index[-1], n_distance_points, endpoint=True) df2 = pd.DataFrame( index=distances, columns=[latitude_name, longitude_name, heading_name] ) df2[travel_distance_name] = distances df3 = pd.concat([df, df2]).sort_index().interpolate() df3 = ( df3.reset_index() .drop_duplicates(keep="first", subset=travel_distance_name) .set_index(travel_distance_name) ) df3 = df3.reindex(df2.index) df3.reset_index(inplace=True, drop=True) df3.drop("index", axis=1, inplace=True) df = travel_distance_and_heading_from_coordinates( df3, latitude_name=latitude_name, longitude_name=longitude_name, heading_name=heading_name, travel_distance_name=travel_distance_name, ) df.set_index(travel_distance_name, inplace=True, drop=True) return df