Filtering by attributes and relations

Now that we know how geometries relate to each other, we can search points using these relations. We already have the means for importing the points and the polygons that represent any kind of boundaries that may be of our interest.

The data that comes with the book files contains examples of world countries' boundaries, but you are free to search the Internet for any data that is significant to you. Remember only that the data coordinates should be in latitudes and longitudes, and they need to have a name field.

For our tests, I prepared a special set of geocaching points that spans the whole globe, and as an exercise, we will filter these points by a country.

The proposed workflow is as follows:

  • Import the points and boundaries
  • Find the boundary that we want to use
  • Filter the points by that boundary
  • Return the points to the user

To find the points that we want, we will iterate over the data until it hits a match. Iterations can be costly in terms of processing depending on the amount of data and on the operations that are performed on each loop. Let's keep this in mind.

The first step in the workflow is already done, so let's write the code to find the boundary of our interest. If you are using the data provided, we can find the boundary of your country as follows:

  1. Go to the BoundaryCollection class and add a new method get_by_name:
    class BoundaryCollection(BaseGeoCollection):
        """This class represents a collection of
        geographic boundaries.
        """
        def _parse_data(self, features):
            for feature in features:
                geom = feature['geometry']['coordinates']
                attributes = feature['properties']
                polygon = wkt.loads(geom)
                boundary = Boundary(geometry=polygon,
                                    attributes=attributes)
                self.data.append(boundary)
                
        def get_by_name(self, name):
            """Find an object by its name attribute and returns it."""
            for item in self.data:
                if item.get_attribute('name') == name:
                    return item
            raise LookupError(
                "Object not found with the name: {}".format(name))

    This very simple method iterates over the data. When it finds the first boundary whose name property matches the name passed as an argument, the function execution stops and the object is returned. If nothing is found, LookupError will be raised.

  2. Let's play with it. Go to the if __name__ == '__main__': block at the end of the file and edit it:
    if __name__ == '__main__':    
        world = BoundaryCollection("../data/world_borders_simple.shp")
        print(world.get_by_name('Brazil'))
  3. Try the different countries' names and see the results. If it's found, you should have an output similar to this:
    File imported: ../data/world_borders_simple.shp
    Brazil
    
    Process finished with exit code 0
  4. If it's not found, you should get a nice exception:
    Traceback (most recent call last):
      File "Chapter 4/code/models.py", line 153, in <module>
        print(world_Boundarys.get_by_name('Foo'))
      File "Chapter 4/code/models.py", line 148, in get_by_name
        'Object not found with the name: {}'.format(name))
    LookupError: Object not found with the name: Foo
    
    Process finished with exit code 1

    Very well, our method works nice and with an additional (almost) unexpected feature: it's not specific for the boundaries; it can be used to find any type of GeoObject. Take a look and notice how it only uses properties that are available in our base classes.

  5. Move the get_by_name method to the BaseGeoCollection class and test you code again. Remember that the order of the methods inside a class is irrelevant for the class' behavior, but the best practices recommend you to put the magic methods first, then the private ones, and then the others. Your complete BaseGeoCollection class should be as follows:
    class BaseGeoCollection(object):
        """This class represents a collection of spatial data."""
        def __init__(self, file_path=None):
            self.data = []
            self.epsg = None
    
            if file_path:
                self.import_data(file_path)
    
        def __add__(self, other):
            self.data += other.data
            return self
    
        def _parse_data(self, features):
            raise NotImplementedError
    
        def import_data(self, file_path):
            """Opens an vector file compatible with OGR and parses
             the data.
    
            :param str file_path: The full path to the file.
            """
            features, metadata = open_vector_file(file_path)
            self._parse_data(features)
            self.epsg = metadata['epsg']
            print("File imported: {}".format(file_path))
    
        def describe(self):
            print("SRS EPSG code: {}".format(self.epsg))
            print("Number of features: {}".format(len(self.data)))
    
        def get_by_name(self, name):
            """Find an object by its name attribute and returns it."""
            for item in self.data:
                if item.get_attribute('name') == name:
                    return item
            raise LookupError(
                "Object not found with the name: {}".format(name))

    Now, in the next step, we will search for the points that are within the boundary that we found. This time, we will create a method directly inside the BaseGeoCollection class, so it becomes available to the PointCollection and the BoundaryCollection classes through inheritance. By doing this, we will get a bonus feature—we are able to filter the boundaries by another boundary.

  6. Go to the BaseGeoCollection class and add the method filter_by_boundary:
    #... 
        def filter_by_boundary(self, boundary):
            """Filters the data by a given boundary"""
            result = []
            for item in self.data:
                if item.geom.within(boundary.geom):
                    result.append(item)
            return result

    Here, we created a variable result containing a list to store the objects that passes the test. The within predicate is used to test every item if it is inside the boundary that is passed as an argument. In this case, if nothing is found, no exception is raised and an empty list is returned.

  7. Edit the testing code in the if __name__ == '__main__': block:
    if __name__ == '__main__':
        gdal.PushErrorHandler('CPLQuietErrorHandler')
        world = BoundaryCollection("../data/world_borders_simple.shp")
        geocaching_points = PointCollection("../data/geocaching.gpx")
        usa_boundary = world.get_by_name('United States')
        result = geocaching_points.filter_by_boundary(usa_boundary)
        for item in result:
            print(item)

    While testing, two instances are created, one from the BoundaryCollection class and one from the PointCollection class. The data files are passed as arguments. Then, the country of interest is found and stored in the usa_boundary variable. This variable is then passed to the filter_by_boundary method.

  8. Run the code. You should see a long list of geocaches as follows:
    -78.90175 42.89648  -  LaSalle Park No 1
    -78.89818 42.89293  -  LaSalle Park No 2
    -78.47808 43.02617  -  A Unique Walk in Akron
    -78.93865 42.95982  -  A view of Strawberry Island
    -78.90007 42.7484  -  A View to a Windmill
    -79.07533 43.08133  -  A Virtual Made in the Mist
    -74.43207 43.86942  -  Adirondack Museum Guestbook
    ...
    
    Process finished with exit code 0

    As expected, it prints a list of Geocache objects whose representation given by the __repr__ method is their coordinates and names.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.12.107.31