Elasticsearch est un serveur de recherche, conçu pour indexer des ressources et offrir un système de moteur de recherche. Construit sur Apache Lucene, Elasticsearch s’utilise principalement via une API REST. Depuis 2004, Elasticsearch est devenu aussi la société Elastic.

J’ai réalisé plusieurs projets et à chaque fois, Elasticsearch m’étonne et me surprend par ses capacités. On pense d’abord que l’outil permet de rechercher. On découvre ensuite qu’il peut servir l’ensemble des données d’un site, et on comprend enfin qu’il ne fait pas QUE de la recherche. Parlons de ce sujet.

Elasticsearch à la recherche de l’information

Quelles sont les étapes d’un développement avec Elasticsearch ?

En général, je commence par le modèle objet, directement venu de mon application Java ou Scala. Je prends sa structure et quelques exemples de données, le tout au format JSON. J’indexe ensuite ce premier objet pour débuter la configuration de l’index. Il est très fréquent au début de détruire et de re-créer ces indexes. Ici, pas de schémas de bases de données. Vous allez aider et améliorer l’indexation en définissant un mapping. Quels sont les types de champ ? Y-a-t-il des dates ? Est-ce que je souhaite ajouter des champs techniques comme un timestamp ?

Cela veut dire que la création des indexes fait partie intégrante du code de mon application. La définition du mapping est presque obligatoire. Certes, vous pouvez prendre un bout de JSON et l’indexer, cela fonctionnera. Cependant, dès qu’il s’agit de prendre le contrôle sur la recherche, la définition du mapping permet de dire à Elasticsearch : « voilà ce que je veux indexer ».

Pour illustrer cela, basé sur des données Open Data de la RATP, nous allons d’abord définir d’abord une classe simple en Scala pour représenter un horaire de RER dans une Gare.

Qu’est-ce que le RER ? 

Le RER est le réseau de train express qui couvre la région Ile-de-France et la ville de Paris. C’est un des réseaux les plus denses au monde. Le RER A transporte 1,14 millions de personnes en moyenne par jour. Lorsque vous habitez autour de Paris, et que vous devez vous rendre en ville, c’est le moyen le plus rapide. Le Métro est plus situé dans Paris Intra-muros, alors que les lignes du RER couvrent toute la région Ile-de-France. Une rame du RER A, type MI09, c’est 2 x 5 voitures, 2600 places assises, 575 tonnes (information Wikipedia). Il y a 66 trains aux heures de pointe en circulation (source). Bref un sujet intéressant.

Revenons à notre code. Nous allons définir une case class Scala pour représenter un horaire, pour une station donnée. L’heure de départ ou d’arrivée est optionnelle, pour représenter les départs et les terminus. De même, le numéro d’arrêt dans un parcours est optionnel. Pour simplifier ce modèle, nous avons retiré quelques éléments.

case class RERTimeTable(arrival_time: Option[LocalDateTime]
                          , departure_time: Option[LocalDateTime]
                          , stop_sequence: Option[Int]
                          , stop_name: String
                          , trip_headsign: String
                          , stop_id: String
                          , direction: Boolean)

Par exemple, le RER NAGA20 qui arrive le 12 octobre à 10h02 et qui repart à 10h05 à Vincennes, sera créé en Scala de cette façon :

val arrival=LocalDateTime.parse("2016-10-12T10:02",DateTimeFormatter.ISO_LOCAL_DATE_TIME)
val departure=LocalDateTime.parse("2016-10-12T10:05",DateTimeFormatter.ISO_LOCAL_DATE_TIME)
val aTimeTable01=RERTimeTable(Some(arrival),Some(departure),Some(14),"VINCENNES","NAGA20","
StopPoint:8775811:810:A",true)

Un export JSON de cet object, prêt à être indexé vers Elasticsearch :

{
  "stop_name" : "VINCENNES",
  "departure_time" : "2016-10-12T10:05:00",
  "reel_stop_seq" : 14,
  "reel_sens_circu" : true,
  "reel_trip_headsign" : "NAGA20",
  "reel_stop_id" : "StopPoint:8775811:810:A",
  "arrival_time" : "2016-10-12T10:02:00"
}

Une fois notre modèle en place, voyons maintenant un peu de code écrit avec Elastic4S. Cette librairie propose un DSL qui vous permet d’écrire votre code Elasticsearch avec un typage fort, et donc d’éviter de discuter directement avec l’API REST d’Elasticsearch (ou le client Java sur le port 9300, mais nous n’en parlerons pas ici).

Voici par exemple une fonction simple qui vérifie si l’index existe, le détruit et le re-créé, puis déclare le mapping pour notre objet RERTimeTable :

 def doRecreateCaptaindashIndex() = {
  val doesExist = indexExists("captaindash")
  val result = esClient.execute(doesExist).await
  if (result.isExists) {
    val deleteIndexDefinition = delete index "captaindash"
    esClient.execute(deleteIndexDefinition).await
  }
  val createIndexDefinition = create index "captaindash" shards 5 replicas 1 refreshInterval "30s" mappings(
    mapping("rer_time_table").fields(
      field("reel_arrival_time", DateType),
      field("reel_departure_time", DateType),
      field("reel_stop_seq", IntegerType),
      field("reel_stop_name", StringType).index("not_analyzed"),
      field("reel_trip_headsign", StringType).index("not_analyzed"),
      field("reel_stop_id", StringType).index("not_analyzed"),
      field("reel_direction", BooleanType)
    ).timestamp(
      timestamp enabled true
    )
  )

  esClient.execute(createIndexDefinition).await
}

Plusieurs remarques sur ce code : notez que chaque champ de notre objet JSON est déclaré avec son type. Les dates sont plutôt bien gérées sur Elasticsearch. Concernant les attributs de type texte comme « tt_stop_name », on remarque que le mapping demande de ne pas analyser le texte. En effet, il s’agit ici de texte « technique » comme le numéro de la mission, et nous ne souhaitons pas d’analyse sur ce texte, simplement son indexation telle quelle. A noter aussi l’attribut timestamp qui permet d’avoir automatiquement un champ « date de dernière modification » sur votre entité.

L’indexation d’un objet dans un service ultra-simple, dans une application Play 2.5 ressemble à ceci :

package services

import javax.inject.Inject

import com.sksamuel.elastic4s.ElasticDsl.{index, _}
import components.{ClusterSetup, PlayElasticFactory}
import models.RERTimetable

class ESIndexer @Inject()(cs: ClusterSetup, elasticFactory: PlayElasticFactory) {
  lazy val esClient = elasticFactory(cs)

  def doIndexREROffreReelle(offreReel: RERTimetable): Unit = {
    val indexQuery = index into "captaindash" -> "rer_reel" fields(
      "reel_arrival_time" -> offreReel.arrival_time.orNull,
      "reel_departure_time" -> offreReel.departure_time.orNull,
      "reel_stop_seq" -> offreReel.stop_sequence.orNull,
      "reel_stop_name" -> offreReel.stop_name,
      "reel_trip_headsign" -> offreReel.trip_headsign,
      "reel_stop_id" -> offreReel.stop_id,
      "reel_direction" -> offreReel.direction
    )
    esClient.execute(indexQuery)
  }

Les classes ClusterSetup et PlayElasticFactory sont 2 classes définies et adaptées du projet Play-Elastic4s.

Et voilà !

Nous avons défini un modèle, configuré un index et écrit de quoi indexer notre entité. Passons maintenant aux aggregations, le coeur de notre article.

Je veux chercher

Ecrire un article sur les possibilités de recherche reviendrait à ré-écrire la documentation d’Elasticsearch sur ce sujet. La recherche la plus simple en ligne de commande ou via Postman :

curl -XGET 'http://localhost:9200/captaindash/_search?q=*'

Si vous souhaitez restreindre la recherche sur les objets de type RERTimeTable

curl -XGET 'http://localhost:9200/captaindash/rer_reel/_search?q=*'

Il est aussi possible d’exécuter des requêtes plus complexes via POST (ici pour chercher l’ensemble des horaires de trains pour la Gare de Vincens)

curl -XPOST 'http://localhost:9200/captaindash/rer_reel/_search'
{ 
 "query" : {
   "term" : { "reel_stop_name" : "GARE DE VINCENNES" }
 }
}

La version Scala avec Elastic4S est aussi simple :

def allRERTimeTableForVincennes: Future[RichSearchResponse] = {
  val searchQuery = search in "captaindash" types "rer_theorique" query termQuery("reel_stop_name", "GARE DE VINCENNES")
  esClient.execute(searchQuery)
}

Si la notation infix utilisée ici vous dérange, vous pouvez aussi écrire :

def allRERTimeTableForVincennes: Future[RichSearchResponse] = {
  val searchQuery = search
                     .in("captaindash")
                     .types("rer_theorique")
                     .query(termQuery("reel_stop_name", "GARE DE VINCENNES"))
  esClient.execute(searchQuery)
}

Arrivé ici, nous savons lister l’ensemble des RERTimeTable pour la station de Vincennes. Cependant nous aimerions restreindre notre sélection à une plage horaire particulière. La documentation explique qu’il faut créer une requête composée d’une recherche part term et d’un filtre de type range.

Il est temps de découvrir un peu le Query DSL d’Elasticsearch.

Je veux filtrer par date

Je souhaite maintenant rechercher les horaires des trains pour la Gare de Vincennes, pour la journée du 30 avril entre 8h et 9h du matin. Notez que pour l’instant, nous n’utilisons pas les Aggregations, nous verrons pourquoi plus loin.

Elasticsearch explique que nous allons devoir créer une compound query, composée de 2 leaves queries. Une requête composée, et 2 requêtes feuilles. Nous utiliserons d’une part une requête term pour le nom de notre gare, et une requête range pour notre plage de date. Le tout sera regroupé dans une requête compound de type bool.

Si vous souhaitez écrire la requête avec le DSL d’Elasticsearch, le projet ES QueryBuilder est pas mal. Nous allons d’abord l’écrire de cette façon, et nous verrons ensuite la version Scala.

POST /captaindash/rer_reel/_search

 {
  "query" : {
    "bool" : {
      "must" : [ {
        "bool" : {
          "filter" : {
            "range" : {
              "reel_departure_time" : {
                "from" : "2016-04-30T08:00:00",
                "to" : "2016-04-30T09:00:00",
                "include_lower" : true,
                "include_upper" : true
              }
            }
          }
        }
      }, {
        "bool" : {
          "filter" : {
            "term" : {
              "reel_stop_name" : "GARE DE VINCENNES"
            }
          }
        }
      } ]
    }
  }
}

Bonjour l’indigestion d’accolades…
Et encore, je ne suis même pas certain que la combinaison de bool-filter-range et bool-filter-term soit la façon la plus simple de composer les 2 requêtes.

La version Scala de cette requête est plus lisible :

    val searchQuery1 = search.in("captaindash").types("rer_reel").query(
      bool(
        must(
          filter(
            rangeQuery("reel_departure_time").from("2016-04-30T08:00:00").to("2016-04-30T09:00:00")
          ),
          filter(
            termQuery("reel_stop_name","GARE DE VINCENNES")
          )
        )
      )
    )

Je veux l’ensemble des trains qui partent entre 8h et 9h de toutes les gares, regroupés par Gare.

Ce que nous avons vu jusqu’à maintenant va se heurter à la réalité. Si vous voulez l’ensemble des trains qui partent, une première approche serait d’utiliser des requêtes de type terms, puis de lister l’ensemble des stations. Je vous arrête tout de suite : ce n’est pas la bonne façon de procéder. Ceci vous forcera à tenir à jour cette liste, et surtout, il existe un moyen beaucoup plus simple avec Elasticsearch

Les Aggregations !

En remplacement des Facets, les Aggregations d’Elasticsearch sont géniales. Ne pas les connaître c’est passer à côté d’une des fonctionnalités les plus intéressantes de l’outil. Certains disent même que ne pas connaître les Aggregations accélèrerait la chute des cheveux… Mais bon… revenons à notre problème métier.

D’abord la solution, ensuite l’explication :

 {
  "size" : 0,
  "query" : {
    "bool" : {
      "must" : {
        "bool" : {
          "filter" : {
            "range" : {
              "reel_departure_time" : {
                "from" : "2016-04-30T07:20:00",
                "to" : "2016-04-30T08:20:00",
                "include_lower" : true,
                "include_upper" : true
              }
            }
          }
        }
      }
    }
  },
  "aggregations" : {
    "all_stops" : {
      "terms" : {
        "field" : "reel_stop_name"
      }
    }
  }
}

On demande ici l’ensemble des trains (quelque soit le sens de circulation) entre 7h20 et 8h20. Puis une fois le résultat connu, l’aggrégation regroupe les résultats.

Ci-dessous, voici lerésultat retourné par Elasticsearch. On retrouve bien notre liste de trains prévus au départ pour chaque station, entre 7h20 et 8h20 le samedi 30 avril 2016 :

{
    "took": 2,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "failed": 0
    },
    "hits": {
        "total": 9,
        "max_score": 0,
        "hits": []
    },
    "aggregations": {
        "all_stops": {
            "doc_count_error_upper_bound": 0,
            "sum_other_doc_count": 0,
            "buckets": [
                {
                    "key": "GARE DE VINCENNES",
                    "doc_count": 5
                },
                {
                    "key": "CHARLES DE GAULLE ETOILE",
                    "doc_count": 1
                },
                {
                    "key": "GARE DE NOISY LE GRAND MONT D EST",
                    "doc_count": 1
                },
                {
                    "key": "GARE DE TORCY",
                    "doc_count": 1
                },
                {
                    "key": "GARE DE VAL DE FONTENAY RER A",
                    "doc_count": 1
                }
            ]
        }
    }
}

Pourquoi mettre size(0) dans la requête ? Depuis la 2.4 c’est le moyen d’indiquer que vous ne souhaitez pas effectuer une recherche, mais que vous vous intéressez uniquement à l’aggrégation (voir ici la documentation). On peut voir ces requêtes comme des SELECT / GROUP BY. A noter que les Aggregations permettent d’effectuer des opérations simples comme la somme, la moyenne, la recherche de la valeur max/min… le tout très rapidement.

Si vous souhaitez aussi récupérer les entités indexées et associées à la recherche, il suffit de retirer size(0) et de laisser faire la pagination, qui retourne 10 résultats par défaut. Voir la documentation de from/size pour aussi bien comprendre le moteur de pagination des résultats d’Elasticsearch.

Top 10

Dernier exemple, afin de vous montrer comment effectuer une sélection de type top 10. La story sera la suivante : je souhaite les 10 trains les plus en retard circulant dans les 5 dernières minutes.

Voyons d’abord la version Scala, plus facile à lire que la requête HTTP sur l’API ES :

 def getTop10TrainsRunningLate(dateTime: LocalDateTime): Future[List[(String, Double)]] = {

    import scala.collection.JavaConverters._

    val searchQuery = search.in("captaindash").types("index_train_delay").query(
      bool(
        must(
          filter(
            rangeQuery("train_event_time").from(dateTime.minusMinutes(5)).to(dateTime).includeUpper(false)
          )
        )
      )
    ).aggregations(
      aggregation terms "all_missions" field "train_headsign" size 10
      aggregations(
        aggregation max  "max_delays" field "train_trip_delay"
      )order Terms.Order.aggregation("max_delays",false)
    ).size(0)

    esClient.execute(searchQuery).map {
      hits =>
       val terms = hits.aggregations.get("all_missions").asInstanceOf[Terms]

        val missionsByAverateFullRate = terms.getBuckets.asScala.map {
          bucket =>
            val maxDelay = bucket.getAggregations.get("max_delays").asInstanceOf[InternalMax]
            (bucket.getKeyAsString, maxDelay.getValue)
        }
        missionsByAverateFullRate.toList
    }
  }

Cette fonction retourne un Future de List, avec un tuple. Le premier terme est le code mission (train_headsign) et le deuxième, sera le retard (train_trip_delay).
Ligne 20, j’ai laissé aussi le code de lecture du résultat, pour vous montrer comment extraire les résultats.

Cette requête est construite d’abord avec une fenêtre de temps de 5 minutes. Une fois la sélection effectuée, nous regroupons par mission puis par retard, en triant par retard. Et nous ne gardons que 10 résultats.

La requête Elastic Search correspondante est la suivante :

{
  "size" : 0,
  "query" : {
    "bool" : {
      "must" : {
        "bool" : {
          "filter" : {
            "range" : {
              "train_event_time" : {
                "from" : "2016-05-02T07:45",
                "to" : "2016-05-02T07:50",
                "include_lower" : true,
                "include_upper" : false
              }
            }
          }
        }
      }
    }
  },
  "aggregations" : {
    "all_missions" : {
      "terms" : {
        "field" : "train_headsign",
        "size" : 10,
        "order" : {
          "max_delays" : "desc"
        }
      },
      "aggregations" : {
        "max_delays" : {
          "max" : {
            "field" : "train_trip_delay"
          }
        }
      }
    }
  }
}

Conclusion

J’ai (re)découvert Elasticsearch et les aggregations, plus pratique et plus simple que les Facets de la version antérieure à la 2.4. Pensez à Elasticsearch comme base principale pour une application de reporting, où l’essentiel de votre code lit les données. C’est un système simple et robuste. Scala apporte en plus ici un typage fort sur les requêtes. Le DSL d’Elastic4S permet de faire mieux que du SQL à la mano. Il permet aussi d’apprendre rapidement la syntaxe d’Elasticsearch. Comptez quelques jours pour être à l’aise, et pouvoir écrire vos premières requêtes.

Update 3 novembre 2016
Si cet article vous a intéressé, sachez que chez Captain Dash, nous recrutons des développeurs (h/f) Scala, des devs full-stack, des personnes qui veulent développer sur des projets en Scala, avec Akka, Play2, Elasticsearch, Spark, Hadoop, etc.

Contactez-moi directement par email avec un résumé de votre parcours. Poste à Paris, CDI uniquement, télé-travail une ou deux fois par semaine possible.