Ingénierie des données en Rust

Par : Scott Syms, Services partagés Canada

L'étendue de l'écosystème Python revêt une importance inestimable pour la communauté de la science des données. La sélection d'outils Python permet à ses utilisateurs d'accéder à des environnements expressifs pour étudier les données, former des modèles d'apprentissage automatique et afficher les résultats dans un format autodocumenté. Il a même été suggéré que les carnets Jupyter, un environnement d'exploration de données Python populaire, remplacent le traditionnel article scientifique.Note de bas de page 1

Cependant, Python a des lacunes. Les éléments mêmes qui le rendent accessible et utilisable, tels que le typage dynamique, le comptage des références et le verrouillage global de l'interpréteur, peuvent empêcher les programmes d'utiliser pleinement les ressources informatiques disponibles. Cela est particulièrement évident lors du traitement de grands ensembles de données ou de charges de travail à forte intensité de calcul.

En règle générale, il y a deux approches à adopter pour les charges de travail à forte intensité de calcul en Python.

Pour les scientifiques des données qui utilisent des mégadonnées ou des charges de travail à forte intensité de calcul, Python propose des solutions de rechange. Les accélérateurs de calcul, tels que Numba,Note de bas de page 2 PyPyNote de bas de page 3 et PystonNote de bas de page 4 utilisent des optimisations de codage pour accélérer les environnements de développement sur une seule machine. Combinées à la prise en charge du traitement simultané par Python,Note de bas de page 5 ces bibliothèques peuvent accroître la capacité de traitement d'une seule machine.

Une autre approche fait appel à des bibliothèques, telles que DaskFootnote 6 et PySparkNote de bas de page 7 pour répartir le traitement sur plusieurs machines. Rien ne vous empêche de faire les deux – optimiser sur place et répartir la charge.

En définitive, les outils que vous utilisez sont définis par des contraintes en matière de mise en œuvre. Pour les scientifiques des données qui ont accès à des environnements en nuage gérés, la possibilité de créer des centaines de machines pour analyser rapidement les données est un moyen évident de résoudre le problème du calcul. Pour les utilisateurs disposant de ressources plus modestes, cependant, les options d'évolutivité peuvent être limitées. L'intelligence artificielle à la périphérie du réseau peut ne pas disposer de la même capacité de calcul que dans les environnements en nuage.

Vers la fin de l'année 2020, la publication scientifique Nature a suggéré une solution de rechange à certaines des approches traditionnelles à l'égard du calcul des données scientifiques.Note de bas de page 8 L'auteur a proposé de développer des logiciels scientifiques en Rust, un nouveau langage émergent et très performant.

Le langage de programmation Rust a été lancé en 2009 en tant que projet parallèle du programmeur de Mozilla, Graydon Hoare. Il offre un rendement similaire à celui de C++, mais fournit de meilleures mesures de sécurité concernant l'utilisation de la mémoire et du traitement simultané. Comme C++ et Python, il peut être utilisé sur toute une série de plateformes, de la programmation de microcontrôleurs aux applications Web asynchrones à haute capacité. Les applications Rust peuvent être compilées en WebAssembly,Note de bas de page 9 ce qui leur permet de s'exécuter dans le navigateur à des vitesses quasi natives.

La combinaison de la vitesse, de la sécurité et de l'interopérabilité est un mélange idéal de caractéristiques lorsqu'il s'agit de traiter un problème d'ingénierie de mégadonnées provenant de l'analyse de données de position des navires à l'échelle mondiale.

Système d'identification automatique

Aux termes d'un accord international, les navires océaniques doivent transmettre des messages de données de voyage à l'aide du système d'identification automatique (AIS).Note de bas de page 10 Ces messages peuvent être collectés depuis l'espace, regroupés en une image générale des activités de navigation et vendus à des organismes commerciaux et gouvernementaux. L'article Building a Maritime Picture in the Era of Big Data : The Development of the Geospatial Communication Interface+ décrit les défis que pose la collecte de données sur la position des navires pour la surveillance mondiale.Note de bas de page 11

L'Agence spatiale canadienne (ASC) gère les marchés du gouvernement du Canada visant l'obtention de données de suivi maritime mondial depuis l'espace. Chaque jour, elle distribue des millions de messages de positionnement aux partenaires maritimes à l'échelle du gouvernement. Au cours de la dernière décennie, l'ASC a recueilli plus de 50 milliards de messages.

La National Marine Electronics Association (NMEA) met à jour la norme de l'AIS mondiale. Vous trouverez ci-dessous un échantillon de données de l'AIS.


1569890647\s:VENDOR,q:u,c:1569890555*5F\!AIVDM,1,1,,A,13KG9?10031jQUNRI72jM5?40>@<,0*5C
1569890647\s:VENDOR,q:u,c:1569890555*5F\!AIVDM,1,1,,B,13aEPIPP00PE33dMdJNaegw4R>@<,0*77
1569890647\g:1-2-6056,s:VENDOR,c:1569890555*3A\!AIVDM,2,1,6,A,56:GTg0!03408aHj221<QDr1UD4r3?F22222221A:`>966PW0:TBC`6R3mH8,0*0E
1569890647\g:2-2-6056*58\!AIVDM,2,2,6,A,88888888880,2*22

Chaque phrase ci-dessus contient des métadonnées sur le rapport de position. Elles comprennent les éléments suivants :

  • Heure à laquelle l'observation a été réalisée par le capteur
  • Source de la détection
  • Moment où le rapport a été transmis du satellite à une station au sol
  • Si la phrase constitue ou non une partie d'un groupe de messages

Bien qu'une partie du message soit lisible par l'homme, les données importantes concernant l'identité et le déplacement du navire sont enveloppées dans une charge utile ASCII à six bits vers la fin de la phrase. Le site Web d'Eric Raymond sur le décodage du protocole AIVDM/AIVDONote de bas de page 12 présente un guide détaillé sur la façon dont les données du navire sont placées dans la chaîne.

Décodage de l'AIS avec une application Rust

L'objectif fixé pour cette application Rust est de convertir une archive de données brutes de l'AIS en un équivalent JSON qui peut être utilisé pour l'analyse des données. Le résultat devrait permettre la préservation des données d'origine pour un nouveau traitement, si nécessaire. Le reformatage des données au format JSON est une étape importante dans le pipeline d'ingénierie des données, car il permet de charger les données dans un cadre de données ou une base de données ou encore de les convertir dans un format optimisé pour la lecture, comme Apache Parquet.

Voici le résultat souhaité lorsque l'on utilise JSON pour mettre en paquets les données et préserver les données d'origine parallèlement aux éléments dérivés.


{
"sentence":"1569888002\\s:VENDOR,q:u,c:1569884202*4F\\!AIVDM,1,1,,B,1:kJS6001UJgA`mV1sFrGHAP0@L;,0*56",
"landfall_time":"1569888002",
"group":"",
"satellite_acquisition_time":"1569884202",
"source":"VENDOR",
"channel":"B",
"raw_payload":"1:kJS6001UJgA`mV1sFrGHAP0@L;",
"message_type":1,
"message_class":"singleline",
"mmsi":"725000984",
"latitude":-45.385661666666664,
"longitude":-73.55857,
"call_sign":"CQ4F3",
"destination":"HALIFAX",
"name":"SS MINNOW",
"ship_type":"23",
"eta":"",
"draught":"",
"imo":"",
"course_over_ground":"86950448",
"position_accuracy":"0",
"speed_over_ground":"101",
"navigation_status":"0"
}

Présentation du programme RUST

Pour extraire des renseignements de données de l'AIS, chaque caractère de la charge utile doit être converti de l'ASCII à six bits en son équivalent binaire. La phrase entière est également fusionnée en une longue chaîne binaire. Les morceaux de chaîne sont reconvertis en chiffres et en texte lisibles par l'homme.

Figure 1 : L'extraction des données de la charge utile nécessite leur conversion en format binaire.

Figure 1 : L'extraction des données de la charge utile nécessite leur conversion en format binaire.
Figure 1 : L'extraction des données de la charge utile nécessite leur conversion en format binaire. La charge utile est convertie en format binaire avant que les données puissent être extraites. Le processus est décrit dans la section suivante. Image du texte : kJS6001UJgA'mV1sFrGHAP0@L; 11001011110110100100010111101010100101010100101010010100010101001010100101010101001010 SS MINNOW

Pour aider à tirer tout le potentiel de calcul possible de la machine hôte, Rust assure un « traitement simultané audacieux ». La charge de travail peut être facilement répartie sur tous les cœurs d'ordinateur disponibles grâce à des canaux de passage de messages transmettant les données entre les fils d'exécution (ou simplement, entre les fils).

Le processus est divisé en trois groupes. Le premier est un processus à un seul fil qui lit un fichier source de données de l'AIS, en insérant chaque ligne dans un champ struct, et transmet ce dernier à un groupe de fils qui effectue l'analyse syntaxique initiale par le biais d'un canal.

Le fil de réception analyse les messages de position de ligne unique et transmet les résultats à un rédacteur de fichiers sous forme de paquet JSON. Les phrases multilignes sont transmises à un second groupe de fils qui met en cache et réassemble les parties de phrases. Là encore, ces résultats sont transmis au rédacteur de fichiers sous forme de chaîne JSON.

Étant donné que le traitement est confié à des fils concomitants, il n'y a aucune garantie d'ordre en sortie. En raison des délais d'attente et des différences quant au temps de traitement, il est possible que l'ordre de sortie de chaque rapport ne soit pas identique à l'ordre d'entrée.

Faits saillants du programme

Figure 2 : Processus du programme

Figure 2 : Processus du programme
Figure 2 : Processus du programme Ce processus du programme commence au niveau du fil principal et du fichier de sortie. La case Lire le fichier source y est adjacente. À partir de là, une flèche du canal source pointe vers l'étape suivante, à savoir : Le groupe de fils analyse les données à ligne unique, au moyen du canal des rapports multilignes. Ensuite, le processus peut soit revenir en boucle vers le fil principal au moyen du canal d'analyse à ligne unique, soit suivre la flèche vers la case Le groupe de fils met en cache et analyse les données multilignes. À partir de là, la boucle est bouclée et la flèche pointe vers le fil principal au moyen du canal d'analyse multilignes. Vous avez le choix de continuer dans le processus ou au moyen de la sérialisation JSON afin de créer le fichier de sortie.

Une expression ordinaire peut être utilisée pour extraire les données lisibles par l'homme dans la phrase de l'AIS, mais un travail supplémentaire doit être effectué pour convertir les données de la charge utile à six bits en une chaîne binaire, prendre des parties du résultat et reconvertir ces parties en texte, en nombres flottants et entiers.

Épinglage Cargo et épinglage de la version

Rust utilise un système de construction bien pensé où les dépendances des paquets et les directives de construction peuvent être indiquées dans un fichier de définition.


[package]
name = "rustaise"
version = "0.1.0"
edition = "2021"

[dependencies]
crossbeam-channel = "0.5.2"
threadpool = "1.8.1"
num_cpus  = "1.13.1"
hashbrown = "0.12.0"
clap = "3.0.7"
regex = "1.5.4"
bitvec = "1.0.0"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.78"

[profile.release]
lto = true

Les dépendances appelées « boîtes » peuvent être énumérées avec une version donnée. La mise à niveau vers une version différente de la boîte doit être réalisée explicitement, ce qui réduit les erreurs imputables au passage à des versions différentes de la bibliothèque.

Contrôle étroit de la portée des variables

L'une des méthodes utilisées par Rust pour assurer la sécurité de la mémoire consiste à contrôler étroitement la portée des variables.


// Attribuons 21 à X
let x = 21;
	{
	// Attribuons maintenant 12 à X
	let x =12;
	}
// Puisque le champ d'application a pris fin pour l'affectation précédente
// la valeur de X est toujours 21
println!("{}", x);

L'extrait de code ci-dessus imprimerait le nombre « 21 », car l'affectation « x = 12 » n'est valable qu'entre les accolades.

Bien qu'il s'agisse d'un moyen efficace de garder votre mémoire en ordre, il peut être contre-intuitif. Par exemple, ce qui suit ne fonctionne pas, car la variable -x est tronquée à la fin de la paire d'accolades de blocs jf.


if y == 1 {
    let x = 21;
} else {
    let x = 0;
}
println!("{}", x);

}

Une façon de résoudre ce problème consiste à créer une fonction anonyme dont la sortie est affectée à la variable. Des affectations de variables avec des évaluations imbriquées « if » sont utilisées dans l'ensemble du programme.


let y = 1;
let x = 21;
let x: i8 ={
    if y == 1 {
     67
    }
    else {
    0
    }
};

println!("{}", x);

Le contrôle rigide des durées de vie et de la propriété des variables modifie la façon dont vous structurez votre programme.

Il est inutile de résister

Le compilateur est très bavard et refusera de compiler du code qui transgresse les garde-fous de Rust.

error[E0308]: mismatched types
 --> src/main.rs:8:10
  |
8 |  67.0
  |  ^^^^ expected `i8`, found floating-point number

For more information about this error, try `rustc --explain E0308`.
error: could not compile `playground` due to previous error

Bien que parfois frustrants, les messages du compilateur sont utiles pour déterminer la cause de l'erreur. On voit souvent le mantra « travaillez en collaboration avec le compilateur » dans les commentaires en ligne.

Le logiciel commence par définir une structure qui contiendra les phrases brutes et les données extraites lors des phases de traitement.


#[derive(Serialize, Default, Clone, Debug)]
struct PositionReport {
    pub sentence: String,
    pub landfall_time: String,
    pub group: String,
    pub satellite_acquisition_time: String,
    pub source: String,
    pub channel: String,
    pub raw_payload: String,
    pub message_type: u64,
    pub message_class: String,
    pub mmsi: String,
    pub latitude: f64,
    pub longitude: f64,
    pub call_sign: String,
    pub destination: String,
    pub name: String,
    pub ship_type: String,
    pub eta: String,
    pub draught: String,
    pub imo: String,
    pub course_over_ground: String,
    pub position_accuracy: String,
    pub speed_over_ground: String,
    pub navigation_status: String,
} // fin de struct PositionReport

Notez le mot-clé #Derive qui précède la définition de la structure. Bien que Rust ne soit pas un langage orienté objet comme Java, il permet d'échanger des méthodes entre les structures à l'aide d'une fonctionnalité appelée Traits d'une manière qui « émule » l'héritage.

Dans la déclaration ci-dessus, les traits Serialize, Default, Clone et Debug sont ajoutés à la structure.

Groupes de fils

La définition des groupes de fils est assez simple en Rust. Le programme trouve le nombre de cœurs disponibles et déclare le nombre de processeurs au traval pour chaque fil.

Les boucles for sont utilisées pour lancer des fils individuels.


    // Les travailleurs sont les nombres de processeurs
    let n_workers = num_cpus::get();

    let reading_thread = ThreadPool::new(1);
    let extraction_pool = ThreadPool::new(n_workers);
    let multiline_assembly_thread = ThreadPool::new(n_workers);

    for _a..n_workers: {
    multiline_assembly.execute(move || {
    // Faire des choses
    }
    }

    for _b..n_workers: {
    extraction_pool.execute(move || {
    // Faire des choses
    }
    }

    reading_thread.execute(move || {
    // Faire des choses
    }

Les boucles for contrôlent le nombre de fils lancés, tandis que le mot-clé move transmet les variables actuelles au fil.

Définitions des canaux et contrôle du flux

Les canaux de relais entre les fils sont définis avec une limite pour éviter que les fils producteurs ne surchargent le canal et n'épuisent la mémoire. Par défaut, le programme fixe la borne supérieure à 500 000 éléments, mais elle peut être modifiée depuis la ligne de commande pour s'adapter au mieux à la mémoire disponible.

Chaque déclaration définit un canal d'envoi et de réception, de même que les types de données qui circuleront sur le bus de messages.


let (raw_file_tx, raw_file_rx): (Sender<PositionReport>, Receiver<PositionReport>) = bounded(flow_limit);
let (multiline_handling_tx, multiline_handling_rx): ( Sender<PositionReport>, Receiver<PositionReport>) = bounded(flow_limit);
let (ready_for_output_tx, ready_for_output_rx): (Sender<String>, Receiver<String>) =
bounded(flow_limit);

En raison des règles de Rust sur la réutilisation des variables, le type de données du canal doit être cloné dans chaque fil, mais chaque clone fait en réalité référence à l'instance d'origine du bus de messages.


 extraction_pool.execute(move || {
    let raw_file_rx = raw_file_rx.clone().clone();
    let extract_ready_for_output_tx = extract_ready_for_output_tx.clone();
    let multiline_handling_tx = multiline_handling_tx.clone();
 }

Correspondance des messages

Les types de messages de l'AIS déterminent la manière dont les renseignements relatifs au navire sont stockés dans la charge utile à six bits, de sorte que toute tâche d'analyse syntaxique doit commencer par déterminer le type de la phrase actuelle et l'introduire comme un INT non signé dans le champ struct approprié.


line.message_type = pick_u64(&payload, 0, 6);

À partir de là, le type de message peut être comparé aux modèles d'analyse syntaxique et aux autres champs dans la structure remplie.


match line.message_type {
1 | 2 | 3 => {
// Si le message est de classe A cinétique
line.mmsi = format!("{}", pick_u64(&payload, 8, 30));
line.latitude = pick_i64(&payload, 89, 27) as f64 / 600_000.0;
line.longitude = pick_i64(&payload, 61, 28) as f64 / 600_000.0;
...
}
5 => {
// Si le message est de classe A statique
line.mmsi = format!("{}", pick_u64(&payload, 8, 30));
line.call_sign = pick_string(&payload, 70, 42);
line.name = pick_string(&payload, 112, 120);
...
}

Arc, Mutex et cartes de hachage

L'assemblage de messages multilignes dans des fils multiples nécessite la mise en cache de parties de phrases pour pouvoir les transmettre. Ce programme utilise une carte de hachage commune enveloppée dans un mutex pour contenir des parties de phrases.


// Initialiser les cartes de hachage pour les messages AIS multi-phrases
// Ceux-ci sont développés par des ARC et des Mutex pour une utilisation en filetage multiple
let mut payload_cache: Arc<Mutex<HashMap<String, String>>> =
Arc::new(Mutex::new(HashMap::new()));
let mut source_cache: Arc<Mutex<HashMap<String, String>>> =
Arc::new(Mutex::new(HashMap::new()));
let mut sat_time_cache: Arc<Mutex<HashMap<String, String>>> =
Arc::new(Mutex::new(HashMap::new()));

À l'instar des canaux interprocessus, les cartes de hachage doivent être clonées dans chaque instance de fil.


// Initialiser les cartes de hachage pour les messages AIS multi-phrases
let payload_cache = Arc::clone(&mut payload_cache);
let source_cache = Arc::clone(&mut source_cache);
let sat_time_cache = Arc::clone(&mut sat_time_cache);

Chaque carte de hachage doit disposer d'un verrou défini dans chaque fil pour harmoniser les lectures et effectuer des suppressions à partir de plusieurs fils.


    let mut payload_lock = payload_cache.lock().unwrap();
    let mut source_lock = source_cache.lock().unwrap();
    let mut sat_time_lock = sat_time_cache.lock().unwrap();

    // insérer dans le cache temporel si le champ struct n'est pas vide
    if line.satellite_acquisition_time.len() > 0 {
    sat_time_lock.insert(line.group.clone(), line.satellite_acquisition_time);
    }

Sérialisation JSON

Le cadre SERDE permet de sérialiser une structure en une chaîne JSON. À la fin du cycle d'analyse syntaxique, chaque fil convertit la structure remplie en une chaîne JSON pour l'écrire dans un fichier.


ready_for_output_tx.send(serde_json::to_string(&line).unwrap());

Le résultat du programme peut être chargé dans le logiciel Pandas avec la commande suivante :


import pandas as pd
df=pd.read_json("output.json", lines=True)

Il peut également être converti en un fichier Parquet compressé en utilisant le programme json2parquet de Dominik Moritz.


json2parquet -c brotli norway.json norway.parquet

Exécution du programme

L'exécution du programme sans paramètres produira le résultat suivant :


error: The following required arguments were not provided:
		<INPUT>
		<OUTPUT>

    USAGE:
		rustaise <INPUT> <OUTPUT> [FLOW_LIMIT]
	For more information try --help
	With the --help flag.

    AIS parsing program 1.0
	Scott Syms <ezrapound1967@gmail.com>
	Does selective parsing of a raw AIS stream

	USAGE:
	rustaise <INPUT> <OUTPUT> [ARGS]
	ARGS:
		<INPUT>    Sets the input file to use
		<OUTPUT>   Sets a custom output file
		<FLOW_LIMIT>   Sets a limit on the number of objects in memory at one time (default: 500000)
		<PARSE_THREADS>    Sets the number of threads to use for parsing (default: number of CPUs)
		<MULTILINE_THREADS>    Sets the number of threads to use for multiline parsing (default: number of CPUs)
  	OPTIONS:
		-h, --help   Print help information
		-V, --version    Print version information

En décompressant le programme norway.7z et en exécutant ce qui suit, on obtient un fichier JSON dont le contenu est analysé syntaxiquement.


rustaise norway.nmea norway.json

Le paramètre FLOW LIMIT vous permet de limiter les données contenues dans les canaux de messages. Dans certains systèmes soumis à des contraintes de mémoire, le plafonnement des messages en cours permet d'éviter les problèmes de mémoire insuffisante. Les paramètres PARSE_THREADS et MULTILINE_THREADS sont des paramètres facultatifs qui permettent de contrôler le nombre de fils créés pour les fils d'analyse syntaxique à une ou plusieurs lignes.

Vitesse d'exécution

Rust est à la hauteur de sa réputation en tant que langage incroyablement rapide.

Les résultats d'exécution dans le tableau de temps ci-dessous sont obtenus à partir d'un MacBook doté d'un processeur 8 cœurs Intel Core i9 à 2,3 GHz avec 32 Go de mémoire. La rangée du tableau indique le nombre de lignes du fichier d'entrée. La première colonne indique le temps nécessaire pour traiter l'échantillon, et la dernière colonne annonce le volume de données pouvant être traitées en une journée au taux d'échantillonnage.

Figure 3 : Tableau de temps – Taille de l'échantillon, temps de traitement et volume prévu par jour
Taille de l'échantillon Temps de traitement Volume prévu par jour
1 million 7 s 12 342 857 142
25 millions 65 s 33 230 769 230
174 millions 435 s 34 560 000 000

D'après ces chiffres, le logiciel serait capable de traiter une archive de l'AIS de 50 milliards de lignes en un peu moins de deux jours, sur un seul ordinateur portatif.

Dernières réflexions

Il s'agit de ma première tentative de programmation sérieuse en Rust et, même du point de vue d'un novice, il est possible de faire mieux.

  • D'abord, comme j'ai une connaissance imparfaite du langage, la solution peut ne pas être idiomatique (c.-à-d. faire un usage optimum de ce que Rust offre pour résoudre le problème).
  • Le programme ignore largement le cadre de gestion des erreurs de Rust. L'initialisation de la structure avec des valeurs par défaut peut consommer inutilement de la mémoire. L'utilisation des méthodes struct et des traits personnalisés peut présenter certains avantages.
  • La refactorisation du code dans le format de bibliothèque et de module de Rust permettrait de rendre le code plus lisible.
  • Enfin, Rust offre un cadre de test unitaire qui simplifierait la mise à jour du code.

Le « jeu en vaut généralement la chandelle » ici. Même avec les inconvénients du code, Rust fonctionne comme annoncé, et il vaut la peine d'apprendre le langage dans les situations où le code sera réutilisé ou lorsque le temps d'exécution posera problème.

Par ailleurs, les développeurs qui souhaitent tirer parti du rendement de Rust tout en demeurant dans l'écosystème Python pourraient utiliser le projet PyO3Note de bas de page 13 pour créer des extensions Python natives en Rust.

Obtenir le code

Tout le code est disponible à Github - ScottSyms/RustAISe (le contenu de cette page est en anglais).

N'hésitez pas à utiliser ce site Web. Merci de bien vouloir me faire part de vos commentaires.

Licences

Le logiciel est distribué sous une licence Apache 2.0.

L'archive comprend un échantillon compressé en sept fichiers .zip de données de l'AIS provenant du gouvernement norvégien. Il est distribué sous la licence Norwegian License for Open Government Data (NLOD) 2.0.Note de bas de page 14

Une partie du code de manipulation bitvec est tirée du paquet nmea-parser de Timo Saarinen,Note de bas de page 15 qui est fourni sous une licence Apache 2.0.

Scott Syms est conseiller technique en matière de science des données et d'intelligence artificielle, et relève de la Direction générale du dirigeant principal de la technologie de SPC.

Date de modification :