Le Touilleur ExpressLe Touilleur ExpressLe Touilleur ExpressLe Touilleur Express
  • Accueil
  • A propos de l’auteur
  • A propos du Touilleur Express

Elasticsearch, le pouvoir des Aggregations

    Home Big Data Elasticsearch, le pouvoir des Aggregations

    Elasticsearch, le pouvoir des Aggregations

    Par Nicolas Martignole | Big Data, Scala | Commentaires fermés | 3 novembre, 2016 | 0 | 13 134 affichages
         

    Elasticsearch est un serveur de recherche, conçu pour indexer des ressources et offrir un système de moteur de recherche. Construit sur Apache Lucene, Elasticsearch s’utilise principalement via une API REST. Depuis 2004, Elasticsearch est devenu aussi la société Elastic.

    J’ai réalisé plusieurs projets et à chaque fois, Elasticsearch m’étonne et me surprend par ses capacités. On pense d’abord que l’outil permet de rechercher. On découvre ensuite qu’il peut servir l’ensemble des données d’un site, et on comprend enfin qu’il ne fait pas QUE de la recherche. Parlons de ce sujet.

    Elasticsearch à la recherche de l’information

    Quelles sont les étapes d’un développement avec Elasticsearch ?

    En général, je commence par le modèle objet, directement venu de mon application Java ou Scala. Je prends sa structure et quelques exemples de données, le tout au format JSON. J’indexe ensuite ce premier objet pour débuter la configuration de l’index. Il est très fréquent au début de détruire et de re-créer ces indexes. Ici, pas de schémas de bases de données. Vous allez aider et améliorer l’indexation en définissant un mapping. Quels sont les types de champ ? Y-a-t-il des dates ? Est-ce que je souhaite ajouter des champs techniques comme un timestamp ?

    Cela veut dire que la création des indexes fait partie intégrante du code de mon application. La définition du mapping est presque obligatoire. Certes, vous pouvez prendre un bout de JSON et l’indexer, cela fonctionnera. Cependant, dès qu’il s’agit de prendre le contrôle sur la recherche, la définition du mapping permet de dire à Elasticsearch : « voilà ce que je veux indexer ».

    Pour illustrer cela, basé sur des données Open Data de la RATP, nous allons d’abord définir d’abord une classe simple en Scala pour représenter un horaire de RER dans une Gare.

    Qu’est-ce que le RER ? 

    Le RER est le réseau de train express qui couvre la région Ile-de-France et la ville de Paris. C’est un des réseaux les plus denses au monde. Le RER A transporte 1,14 millions de personnes en moyenne par jour. Lorsque vous habitez autour de Paris, et que vous devez vous rendre en ville, c’est le moyen le plus rapide. Le Métro est plus situé dans Paris Intra-muros, alors que les lignes du RER couvrent toute la région Ile-de-France. Une rame du RER A, type MI09, c’est 2 x 5 voitures, 2600 places assises, 575 tonnes (information Wikipedia). Il y a 66 trains aux heures de pointe en circulation (source). Bref un sujet intéressant.

    Revenons à notre code. Nous allons définir une case class Scala pour représenter un horaire, pour une station donnée. L’heure de départ ou d’arrivée est optionnelle, pour représenter les départs et les terminus. De même, le numéro d’arrêt dans un parcours est optionnel. Pour simplifier ce modèle, nous avons retiré quelques éléments.

    case class RERTimeTable(arrival_time: Option[LocalDateTime]
                              , departure_time: Option[LocalDateTime]
                              , stop_sequence: Option[Int]
                              , stop_name: String
                              , trip_headsign: String
                              , stop_id: String
                              , direction: Boolean)
    

    Par exemple, le RER NAGA20 qui arrive le 12 octobre à 10h02 et qui repart à 10h05 à Vincennes, sera créé en Scala de cette façon :

    val arrival=LocalDateTime.parse("2016-10-12T10:02",DateTimeFormatter.ISO_LOCAL_DATE_TIME)
    val departure=LocalDateTime.parse("2016-10-12T10:05",DateTimeFormatter.ISO_LOCAL_DATE_TIME)
    val aTimeTable01=RERTimeTable(Some(arrival),Some(departure),Some(14),"VINCENNES","NAGA20","
    StopPoint:8775811:810:A",true)

    Un export JSON de cet object, prêt à être indexé vers Elasticsearch :

    {
      "stop_name" : "VINCENNES",
      "departure_time" : "2016-10-12T10:05:00",
      "reel_stop_seq" : 14,
      "reel_sens_circu" : true,
      "reel_trip_headsign" : "NAGA20",
      "reel_stop_id" : "StopPoint:8775811:810:A",
      "arrival_time" : "2016-10-12T10:02:00"
    }

    Une fois notre modèle en place, voyons maintenant un peu de code écrit avec Elastic4S. Cette librairie propose un DSL qui vous permet d’écrire votre code Elasticsearch avec un typage fort, et donc d’éviter de discuter directement avec l’API REST d’Elasticsearch (ou le client Java sur le port 9300, mais nous n’en parlerons pas ici).

    Voici par exemple une fonction simple qui vérifie si l’index existe, le détruit et le re-créé, puis déclare le mapping pour notre objet RERTimeTable :

     def doRecreateCaptaindashIndex() = {
      val doesExist = indexExists("captaindash")
      val result = esClient.execute(doesExist).await
      if (result.isExists) {
        val deleteIndexDefinition = delete index "captaindash"
        esClient.execute(deleteIndexDefinition).await
      }
      val createIndexDefinition = create index "captaindash" shards 5 replicas 1 refreshInterval "30s" mappings(
        mapping("rer_time_table").fields(
          field("reel_arrival_time", DateType),
          field("reel_departure_time", DateType),
          field("reel_stop_seq", IntegerType),
          field("reel_stop_name", StringType).index("not_analyzed"),
          field("reel_trip_headsign", StringType).index("not_analyzed"),
          field("reel_stop_id", StringType).index("not_analyzed"),
          field("reel_direction", BooleanType)
        ).timestamp(
          timestamp enabled true
        )
      )
    
      esClient.execute(createIndexDefinition).await
    }

    Plusieurs remarques sur ce code : notez que chaque champ de notre objet JSON est déclaré avec son type. Les dates sont plutôt bien gérées sur Elasticsearch. Concernant les attributs de type texte comme « tt_stop_name », on remarque que le mapping demande de ne pas analyser le texte. En effet, il s’agit ici de texte « technique » comme le numéro de la mission, et nous ne souhaitons pas d’analyse sur ce texte, simplement son indexation telle quelle. A noter aussi l’attribut timestamp qui permet d’avoir automatiquement un champ « date de dernière modification » sur votre entité.

    L’indexation d’un objet dans un service ultra-simple, dans une application Play 2.5 ressemble à ceci :

    package services
    
    import javax.inject.Inject
    
    import com.sksamuel.elastic4s.ElasticDsl.{index, _}
    import components.{ClusterSetup, PlayElasticFactory}
    import models.RERTimetable
    
    class ESIndexer @Inject()(cs: ClusterSetup, elasticFactory: PlayElasticFactory) {
      lazy val esClient = elasticFactory(cs)
    
      def doIndexREROffreReelle(offreReel: RERTimetable): Unit = {
        val indexQuery = index into "captaindash" -> "rer_reel" fields(
          "reel_arrival_time" -> offreReel.arrival_time.orNull,
          "reel_departure_time" -> offreReel.departure_time.orNull,
          "reel_stop_seq" -> offreReel.stop_sequence.orNull,
          "reel_stop_name" -> offreReel.stop_name,
          "reel_trip_headsign" -> offreReel.trip_headsign,
          "reel_stop_id" -> offreReel.stop_id,
          "reel_direction" -> offreReel.direction
        )
        esClient.execute(indexQuery)
      }

    Les classes ClusterSetup et PlayElasticFactory sont 2 classes définies et adaptées du projet Play-Elastic4s.

    Et voilà !

    Nous avons défini un modèle, configuré un index et écrit de quoi indexer notre entité. Passons maintenant aux aggregations, le coeur de notre article.

    Je veux chercher

    Ecrire un article sur les possibilités de recherche reviendrait à ré-écrire la documentation d’Elasticsearch sur ce sujet. La recherche la plus simple en ligne de commande ou via Postman :

    curl -XGET 'http://localhost:9200/captaindash/_search?q=*'

    Si vous souhaitez restreindre la recherche sur les objets de type RERTimeTable

    curl -XGET 'http://localhost:9200/captaindash/rer_reel/_search?q=*'

    Il est aussi possible d’exécuter des requêtes plus complexes via POST (ici pour chercher l’ensemble des horaires de trains pour la Gare de Vincens)

    curl -XPOST 'http://localhost:9200/captaindash/rer_reel/_search'
    { 
     "query" : {
       "term" : { "reel_stop_name" : "GARE DE VINCENNES" }
     }
    }
    
    

    La version Scala avec Elastic4S est aussi simple :

    def allRERTimeTableForVincennes: Future[RichSearchResponse] = {
      val searchQuery = search in "captaindash" types "rer_theorique" query termQuery("reel_stop_name", "GARE DE VINCENNES")
      esClient.execute(searchQuery)
    }

    Si la notation infix utilisée ici vous dérange, vous pouvez aussi écrire :

    def allRERTimeTableForVincennes: Future[RichSearchResponse] = {
      val searchQuery = search
                         .in("captaindash")
                         .types("rer_theorique")
                         .query(termQuery("reel_stop_name", "GARE DE VINCENNES"))
      esClient.execute(searchQuery)
    }

    Arrivé ici, nous savons lister l’ensemble des RERTimeTable pour la station de Vincennes. Cependant nous aimerions restreindre notre sélection à une plage horaire particulière. La documentation explique qu’il faut créer une requête composée d’une recherche part term et d’un filtre de type range.

    Il est temps de découvrir un peu le Query DSL d’Elasticsearch.

    Je veux filtrer par date

    Je souhaite maintenant rechercher les horaires des trains pour la Gare de Vincennes, pour la journée du 30 avril entre 8h et 9h du matin. Notez que pour l’instant, nous n’utilisons pas les Aggregations, nous verrons pourquoi plus loin.

    Elasticsearch explique que nous allons devoir créer une compound query, composée de 2 leaves queries. Une requête composée, et 2 requêtes feuilles. Nous utiliserons d’une part une requête term pour le nom de notre gare, et une requête range pour notre plage de date. Le tout sera regroupé dans une requête compound de type bool.

    Si vous souhaitez écrire la requête avec le DSL d’Elasticsearch, le projet ES QueryBuilder est pas mal. Nous allons d’abord l’écrire de cette façon, et nous verrons ensuite la version Scala.

    POST /captaindash/rer_reel/_search
    
     {
      "query" : {
        "bool" : {
          "must" : [ {
            "bool" : {
              "filter" : {
                "range" : {
                  "reel_departure_time" : {
                    "from" : "2016-04-30T08:00:00",
                    "to" : "2016-04-30T09:00:00",
                    "include_lower" : true,
                    "include_upper" : true
                  }
                }
              }
            }
          }, {
            "bool" : {
              "filter" : {
                "term" : {
                  "reel_stop_name" : "GARE DE VINCENNES"
                }
              }
            }
          } ]
        }
      }
    }
    
    

    Bonjour l’indigestion d’accolades…
    Et encore, je ne suis même pas certain que la combinaison de bool-filter-range et bool-filter-term soit la façon la plus simple de composer les 2 requêtes.

    La version Scala de cette requête est plus lisible :

        val searchQuery1 = search.in("captaindash").types("rer_reel").query(
          bool(
            must(
              filter(
                rangeQuery("reel_departure_time").from("2016-04-30T08:00:00").to("2016-04-30T09:00:00")
              ),
              filter(
                termQuery("reel_stop_name","GARE DE VINCENNES")
              )
            )
          )
        )
    

    Je veux l’ensemble des trains qui partent entre 8h et 9h de toutes les gares, regroupés par Gare.

    Ce que nous avons vu jusqu’à maintenant va se heurter à la réalité. Si vous voulez l’ensemble des trains qui partent, une première approche serait d’utiliser des requêtes de type terms, puis de lister l’ensemble des stations. Je vous arrête tout de suite : ce n’est pas la bonne façon de procéder. Ceci vous forcera à tenir à jour cette liste, et surtout, il existe un moyen beaucoup plus simple avec Elasticsearch

    Les Aggregations !

    En remplacement des Facets, les Aggregations d’Elasticsearch sont géniales. Ne pas les connaître c’est passer à côté d’une des fonctionnalités les plus intéressantes de l’outil. Certains disent même que ne pas connaître les Aggregations accélèrerait la chute des cheveux… Mais bon… revenons à notre problème métier.

    D’abord la solution, ensuite l’explication :

     {
      "size" : 0,
      "query" : {
        "bool" : {
          "must" : {
            "bool" : {
              "filter" : {
                "range" : {
                  "reel_departure_time" : {
                    "from" : "2016-04-30T07:20:00",
                    "to" : "2016-04-30T08:20:00",
                    "include_lower" : true,
                    "include_upper" : true
                  }
                }
              }
            }
          }
        }
      },
      "aggregations" : {
        "all_stops" : {
          "terms" : {
            "field" : "reel_stop_name"
          }
        }
      }
    }
    
    

    On demande ici l’ensemble des trains (quelque soit le sens de circulation) entre 7h20 et 8h20. Puis une fois le résultat connu, l’aggrégation regroupe les résultats.

    Ci-dessous, voici lerésultat retourné par Elasticsearch. On retrouve bien notre liste de trains prévus au départ pour chaque station, entre 7h20 et 8h20 le samedi 30 avril 2016 :

    {
        "took": 2,
        "timed_out": false,
        "_shards": {
            "total": 5,
            "successful": 5,
            "failed": 0
        },
        "hits": {
            "total": 9,
            "max_score": 0,
            "hits": []
        },
        "aggregations": {
            "all_stops": {
                "doc_count_error_upper_bound": 0,
                "sum_other_doc_count": 0,
                "buckets": [
                    {
                        "key": "GARE DE VINCENNES",
                        "doc_count": 5
                    },
                    {
                        "key": "CHARLES DE GAULLE ETOILE",
                        "doc_count": 1
                    },
                    {
                        "key": "GARE DE NOISY LE GRAND MONT D EST",
                        "doc_count": 1
                    },
                    {
                        "key": "GARE DE TORCY",
                        "doc_count": 1
                    },
                    {
                        "key": "GARE DE VAL DE FONTENAY RER A",
                        "doc_count": 1
                    }
                ]
            }
        }
    }
    
    

    Pourquoi mettre size(0) dans la requête ? Depuis la 2.4 c’est le moyen d’indiquer que vous ne souhaitez pas effectuer une recherche, mais que vous vous intéressez uniquement à l’aggrégation (voir ici la documentation). On peut voir ces requêtes comme des SELECT / GROUP BY. A noter que les Aggregations permettent d’effectuer des opérations simples comme la somme, la moyenne, la recherche de la valeur max/min… le tout très rapidement.

    Si vous souhaitez aussi récupérer les entités indexées et associées à la recherche, il suffit de retirer size(0) et de laisser faire la pagination, qui retourne 10 résultats par défaut. Voir la documentation de from/size pour aussi bien comprendre le moteur de pagination des résultats d’Elasticsearch.

    Top 10

    Dernier exemple, afin de vous montrer comment effectuer une sélection de type top 10. La story sera la suivante : je souhaite les 10 trains les plus en retard circulant dans les 5 dernières minutes.

    Voyons d’abord la version Scala, plus facile à lire que la requête HTTP sur l’API ES :

     def getTop10TrainsRunningLate(dateTime: LocalDateTime): Future[List[(String, Double)]] = {
    
        import scala.collection.JavaConverters._
    
        val searchQuery = search.in("captaindash").types("index_train_delay").query(
          bool(
            must(
              filter(
                rangeQuery("train_event_time").from(dateTime.minusMinutes(5)).to(dateTime).includeUpper(false)
              )
            )
          )
        ).aggregations(
          aggregation terms "all_missions" field "train_headsign" size 10
          aggregations(
            aggregation max  "max_delays" field "train_trip_delay"
          )order Terms.Order.aggregation("max_delays",false)
        ).size(0)
    
        esClient.execute(searchQuery).map {
          hits =>
           val terms = hits.aggregations.get("all_missions").asInstanceOf[Terms]
    
            val missionsByAverateFullRate = terms.getBuckets.asScala.map {
              bucket =>
                val maxDelay = bucket.getAggregations.get("max_delays").asInstanceOf[InternalMax]
                (bucket.getKeyAsString, maxDelay.getValue)
            }
            missionsByAverateFullRate.toList
        }
      }
    
    

    Cette fonction retourne un Future de List, avec un tuple. Le premier terme est le code mission (train_headsign) et le deuxième, sera le retard (train_trip_delay).
    Ligne 20, j’ai laissé aussi le code de lecture du résultat, pour vous montrer comment extraire les résultats.

    Cette requête est construite d’abord avec une fenêtre de temps de 5 minutes. Une fois la sélection effectuée, nous regroupons par mission puis par retard, en triant par retard. Et nous ne gardons que 10 résultats.

    La requête Elastic Search correspondante est la suivante :

    {
      "size" : 0,
      "query" : {
        "bool" : {
          "must" : {
            "bool" : {
              "filter" : {
                "range" : {
                  "train_event_time" : {
                    "from" : "2016-05-02T07:45",
                    "to" : "2016-05-02T07:50",
                    "include_lower" : true,
                    "include_upper" : false
                  }
                }
              }
            }
          }
        }
      },
      "aggregations" : {
        "all_missions" : {
          "terms" : {
            "field" : "train_headsign",
            "size" : 10,
            "order" : {
              "max_delays" : "desc"
            }
          },
          "aggregations" : {
            "max_delays" : {
              "max" : {
                "field" : "train_trip_delay"
              }
            }
          }
        }
      }
    }
    

    Conclusion

    J’ai (re)découvert Elasticsearch et les aggregations, plus pratique et plus simple que les Facets de la version antérieure à la 2.4. Pensez à Elasticsearch comme base principale pour une application de reporting, où l’essentiel de votre code lit les données. C’est un système simple et robuste. Scala apporte en plus ici un typage fort sur les requêtes. Le DSL d’Elastic4S permet de faire mieux que du SQL à la mano. Il permet aussi d’apprendre rapidement la syntaxe d’Elasticsearch. Comptez quelques jours pour être à l’aise, et pouvoir écrire vos premières requêtes.

    Articles similaires:

    Environnement de développement ElasticSearch 2.4 avec Docker Default ThumbnailTrojan et sécurisation avec ElasticSearch Hadoop et Elasticsearch, comment indexer vers ES
    No tags.

    Chercher

    Derniers articles

    • GitHub Actions : le tueur de Jenkins ?
    • Comment recréer du lien social dans l’Entreprise avec des outils numériques en 2021
    • FizzBuzz en Java et Scala (surtout Scala)
    • Scala Kata – 01
    • Podcast à (re)découvrir

    Commentaires récents

    • Frédéric Camblor dans Comment recréer du lien social dans l’Entreprise avec des outils numériques en 2021
    • b dans FizzBuzz en Java et Scala (surtout Scala)
    • Fabien Lamarque dans Scala Kata – 01
    • Cédric Clavier dans Podcast à (re)découvrir
    • Cédric Clavier dans Podcast à (re)découvrir

    Les plus lus

    • Les revenus d’un informaticien indépendant en EURL - 85 461 affichage(s)
    • Changer la batterie d’un MacBook Pro de 2011 - 61 294 affichage(s)
    • Optional en Java 8 - 52 010 affichage(s)
    • Quelle est la différence entre volatile et synchronized ? - 50 220 affichage(s)
    • Un modèle de Product Backlog et de Sprint Backlog avec Excel - 48 923 affichage(s)
    • Redis, découverte d’un moteur clé-valeur simple et puissant - 46 408 affichage(s)
    • Comment simuler le navigateur de l'iphone avec Firefox ou Safari ? - 41 505 affichage(s)
    • serialVersionUID mythes et légendes - 36 127 affichage(s)
    • Faut-il gérer une équipe de développeurs ? - 34 649 affichage(s)
    • Développeur après 31 ans ? Ridé et chauve tu seras - 33 938 affichage(s)

    Mots clés

    agile ajax Apple architecture barcamp BarCampJavaParis ddd devoxx esb exo flex geek google grails groovy humeur humour independant iphone Java javascript jazoon jboss jboss seam jsf jug Linux mac mule parisjug paris jug pjug play playframework portlet recrutement ria Scala scrum spring Startup usi usi2010 web xebia

    Recent Posts

    • GitHub Actions : le tueur de Jenkins ?

      Avouez-le : ce titre de blog est super racoleur. J’avais aussi pensé

      15 février, 2021
    • Comment recréer du lien social dans l’Entreprise avec des outils numériques en 2021

      Nous sommes en février 2021 pendant le 3ème confinement lié à la

      10 février, 2021
    • FizzBuzz en Java et Scala (surtout Scala)

      L’exercice FizzBuzz est un petit exercice très simple, à tester par exemple

      9 février, 2021

    Recent Tweets

    •  @steeve  Agree. Those conversations remind me the old Paris Java User Group beer talks we had after each event cc  @mfiguiere   @DidierGirard 

      1 day ago
    • RT  @_yom_ : Allez Twitter sois sympa et passe le mot autour de toi : on cherche toujours la perle rare qui voudra bien donner de l'amour aux…

      1 day ago
    •  @dorianmariefr  Ben ce sont les chiffres officiels (je fais ma DA cette semaine) 😉

      2 days ago
    • Doctolib c’est 1580 personnes dont 350 personnes « Produits/Tech » (chiffre officiel jan 2021)

      2 days ago
    •  @rophilogene  Par exemple Doctolib Médecin c’est un outil SaaS très simple qui remplace les vieux logiciels installé… https://t.co/selHtXAi8s

      2 days ago

    Mots clés

    agile (18) ajax (11) Apple (11) architecture (6) barcamp (5) BarCampJavaParis (5) ddd (5) devoxx (33) esb (6) exo (6) flex (9) geek (5) google (11) grails (5) groovy (10) humeur (12) humour (7) independant (6) iphone (12) Java (77) javascript (7) jazoon (28) jboss (22) jboss seam (12) jsf (9) jug (16) Linux (11) mac (6) mule (5) parisjug (7) paris jug (22) pjug (6) play (8) playframework (6) portlet (5) recrutement (6) ria (8) Scala (21) scrum (44) spring (23) Startup (11) usi (21) usi2010 (9) web (16) xebia (7)

    Le Touilleur Express

    Contactez-moi : nicolas@touilleur-express.fr

    Suivez-moi sur Twitter : @nmartignole

    Copyright© 2008 - 2020 Nicolas Martignole | Tous droits réservés
    • A propos de l’auteur
    • A propos du Touilleur Express
    Le Touilleur Express