You may have not even noticed, but geospatial data has become an indispensable part of our life. We use maps and GPS trackers almost every day — generating or consuming lots of data with coordinates in one way or another. Therefore, leveraging data science to analyze this data is of interest for many individuals and organizations. Is this the case for you?
As Dataiku software engineers, we have recently developed new features that provide our users with a simple and smooth way to get the most out of their geospatial data and we want you to benefit from this experience.
In a recent blog post, we listed everything that we would have liked to know about geospatial analysis and the tools that it requires, just before starting our project. Now is time to explain to you how we built our geo join feature — what decisions were made, why, and how it’s performing now.
Geo Join Recipe
Considering a number of requests for improvements of our geo capabilities, we decided to implement several geospatial features in the recent Dataiku 10 updates. One of these features is a new type of visual recipe called geo join.
The goal of this recipe is to join datasets based on geospatial columns by defining matching conditions between them:
Implementation
We considered many different options for implementing this recipe in Dataiku. Since we knew that a lot of people store their geospatial data in databases like PostGIS, we wanted to support multiple execution engines in order to offload the actual join logic to the underlying databases if possible. However, if it’s not possible, we wanted to be able to run the join operation locally by Dataiku itself.
The join operation is widely used in Dataiku. Since we were adding a new join recipe specific to geo data, we still wanted to provide the same user experience as for a regular Join recipe. However, even though the UI is very similar, the underlying execution was designed and implemented from scratch.
Dataiku's Built-In Engine
The core of the Dataiku product is implemented in Java. It means that when a user selects a Dataiku engine for a visual recipe execution (for example Group or Window), the internal Java recipe implementation is evoked. The regular Join recipe also uses an internal H2 database to perform a join. Unfortunately, H2 doesn’t support geospatial data types, so we couldn’t use it. That’s why we had to implement a geo join logic ourselves.
What tools did we have available? Fortunately, there’s a de-facto standard Java library for working with geo data called JTS. This library allows you to read WKT (Well Known Text), calculate distance between objects, check if one geometry contains or intersects another one, create geospatial indexes, and much more. Most of the time we worked with JTS through a higher level library called GeoTools.
Join Algorithm
So, let’s see how the join operation actually works. Imagine that we’re trying to join two datasets: A and B. Each of these datasets contains at least one geospatial column (e.g., point, polygon, etc.). We then specify the join conditions between these datasets, for example, geometries in column geo1 of dataset A should be within 100 meters of geometries in column geo1 of dataset B.
How would the join algorithm work in this case?
Reading the Data
There’s a core level of abstraction in Dataiku that allows recipes to easily read data from any underlying data storage, no matter where the data is (i.e., a local filesystem or a cloud-based database). When Dataiku reads a dataset, it produces a stream of data. Writing the output datasets is also done in a streaming fashion. This allows Dataiku to save memory resources when processing the data.
Joining the Data
So considering that input data arrives in streams, we have two possible scenarios when joining it:
2 Inputs
In instances where there are only two datasets to join, the algorithm is the following:
- Choose one of the dataset streams (B in the diagram above) and consume it while building two structures:
- In-memory spatial index (R-tree) based on join keys of dataset B. (More information about spatial indexes will follow)
- On-disk file storage with the rest of attributes of dataset B that are required by the recipe output schema
Each leaf of the R-tree stores a position of a corresponding object in the attributes file.
2. The other dataset (A in our example) is streamed as is. For each line of this dataset, we query the R-tree index to find the matching geometries corresponding to the query.
3. Next, with the row of dataset A and a list of matching geometries of dataset B, we start creating output rows based on the output dataset schema. Since each geometry object in the index also contains a link to the rest of its attributes on disk, we can easily access it. In order to improve the performance, there’s also a recently used cache associated with the attributes file that allows us to save time on repetitive disk reads.
4. Finally, with a full row of dataset A and all necessary attributes of matching dataset B row, we create and emit a resulting row to the output dataset.
5. Repeat steps 1-4 for the rest of the rows in dataset A.
2+ Inputs
When joining more than two input datasets, the algorithm is similar to the above case, but slightly more complex.
In the diagram above, there are five datasets to be joined using different matching conditions between their geospatial columns.
1. The process starts similarly to the two inputs case, so we choose two inputs and join them. The only difference is that now we’re not emitting results directly to the output dataset, but to an intermediate memory table. For each matching pair of rows from dataset A and B, we keep a record that contains their row ids in the corresponding attribute files on disk. This intermediate result table also requires us to have attributes files for a left dataset (A), whereas in the two inputs case this extra file was only required for the right dataset.
2. After the first step, we have:
- Attribute files for dataset A and B that contain columns of corresponding datasets required for the geo join: either used in matching conditions or output schema
- R-tree index of dataset B
- A memory table of intermediate results, pairs of row ids pointing to attributes files of dataset A and B
3. Now we repeat the join operation, this time between a memory table and another dataset stream (C in our example).
While performing this join, we iterate on rows of the previous result stored in a table in memory and read geometries to be used for join from attribute files. For each row from the memory table, we find matching rows of dataset C using its index. As a result, there’s not a triplet of row ids to be written to the new resulting memory table.
4. Step 2 is repeated until the last joined dataset is reached. By this time, there are as many attribute files as there are joined datasets. Also, the memory table contains as many columns as there are datasets.
5. The last step is to iterate on the resulting memory table and retrieve actual values from attribute files in order to create output rows and emit them.
Distance-Based Operations Optimization
As it was mentioned earlier, Dataiku natively uses a Spatial Reference System (SRS) EPSG:4326. The unit of measurement of this reference system is degrees. It means that a spatial index storing objects from this SRS will also be built using degrees. Using this index for relative operations like contains is fine, but what do we do for a query like “find all objects within 100 meters of a given point”? We’re now mixing two units of measurement: degrees and meters. Meters can’t be translated to degrees because for different places on Earth, one degree represents a different number of meters.
While it’s impossible to translate linear distance to an angular one directly, we can, however, come up with an upper limit. The maximum value of meters represented by a degree can be obtained for a point on the equator when moving along it. It means that we have to use Earth’s equatorial radius of 6378137 meters to make this conversion.
This upper limit lets us do a rough candidate elimination when querying an index (less accurate than if we could convert degrees to meters for any given point).
Normally after an index returns some matching objects, we still have to perform a validation using a spatial operation on those objects (not their bounding boxes). In case of distance-based operations like “within/beyond X meters of” validation means calculating an orthodromic distance between two points. This operation is pretty expensive due to trigonometric functions. On the other hand, calculating angular distances between objects is fast because it’s just a Euclidean formula.
This is where we’ve implemented a second optimization. Not only do we compute a maximum possible angular distance for a given linear distance, we also find a minimum possible value which can be found in the direction of 0 azimuth. We’re using GeodeticCalculator of geotools in order to calculate this value.
Having min and max values of possible angular distances for a given linear distance helps us to avoid some expensive orthodromic distance calculations. For example, when searching for objects within 100 meters of a given point we say that:
- If an angular distance between an indexed object and a point is less than a minimum angular threshold, an object matches
- If an angular distance between an indexed object and a point is greater than a maximum angular threshold, an object doesn’t match
- Otherwise, we calculate an orthodromic distance between a point and an object and see if it’s within our linear threshold
Dataiku Built-In Engine Limitations
The algorithm above points out one limitation that can be observed when joining large datasets. There are two main objects that are stored in memory:
- Spatial index for a currently joined dataset
- Intermediate results memory table
One of the main powers of Dataiku is its ability to run recipes in their native environments. This becomes extremely useful when the geo-joined datasets are stored in spatial databases. More precisely, when working with large datasets, in order to reduce memory consumption of the Dataiku instance, it is recommended to move the datasets to one of the supported databases: PostGIS or Snowflake and perform the geo join using a SQL engine.
In-Database Geo Join
A lot of people already use relational databases in order to store the geospatial data. There’s a number of databases that support it, for instance PostGIS (PostgreSQL extension), Snowflake, MySQL, or Oracle. Typically, these databases also support the same set of spatial operations defined in a SQL/MM standard.
Dataiku supports geo join execution using PostGIS and Snowflake. If the input datasets are stored in one of these databases, there’s a possibility to offload the joining execution to the underlying database by translating recipe parameters to a SQL query.
However, geospatial indexes aren’t managed by Dataiku and they need to be created separately by Dataiku users or database admins.
What's Next?
Implementing geo join for Dataiku 10 was an interesting challenge since it implied the understanding of specifics of geographic data. We managed to create a feature that when compared to similar solutions (PostGIS) showed competitive performance.
Our biggest goal was to allow users to perform more actions related to geospatial analysis in Dataiku. We implemented a Dataiku-native way of joining multiple datasets, no matter what their source is. Also, we developed two SQL dialects that can be used to run geo join queries in PostGIS and Snowflake from Dataiku.
In the future, this work could pave the way to integrate other databases with Dataiku that support SQL/MM. There’s also an exciting project called Apache Sedona that allows users to run geo analysis on Spark cluster and supports Spark SQL.