KQL
Def et Fonctionnement :
Kusto Query Language (donc KQL) est un langage de database qui set notamment dans nombreux services d'Azure par exemple pour créer des Analytics, des workbooks ou chercher des logs. Ce langage est essentiel dans Microsoft Sentinel.
KQL est un language en read-only
. Les requêtes KQL utilisent les shéma des tables de données qui sont organisées sur une hiérarchies (database -> tables -> colonnes).
Une query est constituée d'une séquence d'instructions.
Lab de test : https://aka.ms/lademo
Explications et Apprentissage :
On a donc bien 3 emplacements :
- À gauche les tables
- en haut l'éditeur de queries
- en bas le résultat des requêtes
Faire de requêtes :
Where :
Un peu comme dans le langage SQL le where permet de filtrer les résultats d'une recherche en fonction de certaines conditions :
Exemple :
SecurityEvent
| where TimeGenerated > ago(1d)
SecurityEvent
| where TimeGenerated > ago(1h) and EventID == "4624"
SecurityEvent
| where TimeGenerated > ago(1h)
| where EventID == 4624
| where AccountType =~ "user"
SecurityEvent | where EventID in (4624, 4625)
Ici on a beaucoup de résultats car nous sommes dans un lab dédié qui comporte énormément de données. On voit que l'on a plusieurs niveaux de granularité dans nos requêtes
Let :
Let permet de définir des variables qui seront réutilisées dans notre code (souvent en préambule).
let timeOffset = 7d;
let discardEventId = 4688;
SecurityEvent
| where TimeGenerated > ago(timeOffset*2) and TimeGenerated < ago(timeOffset)
| where EventID != discardEventId
Il est aussi possible de créer des listes dynamiques :
let LowActivityAccounts =
SecurityEvent
| summarize cnt = count() by Account
| where cnt < 1000;
LowActivityAccounts | where Account contains "SQL"
Extend :
Il est possible de rajouter des colonnes à notre requête, pour ajouter des données comme des sommes etc...
SecurityEvent
| where ProcessName != "" and Process != ""
| extend StartDir = substring(ProcessName,0, string_size(ProcessName)-string_size(Process))
By order :
By order permet de filtrer les résultats dans un ordre précis :
SecurityEvent
| where ProcessName != "" and Process != ""
| extend StartDir = substring(ProcessName,0, string_size(ProcessName)-string_size(Process))
| order by StartDir desc, Process asc
Project Operator :
Le but est de modifier des colones, en ajouter, enlever etc...
Operator | Description |
---|---|
project | Select the columns to include, rename or drop, and insert new computed columns. |
project-away | Select what columns from the input to exclude from the output. |
project-keep | Select what columns from the input to keep in the output. |
project-rename | Select the columns to rename in the resulting output. |
project-reorder | Set the column order in the resulting output. |
SecurityEvent | project Computer, Account
Cela permet par exemple de garder uniquement les colonnes Computer et account
Analyser des données :
Résumer des données:
Pour résumer des données plusieurs opérateurs sont disponibles, par exemple count()
.
Ce qui permet de rassembler des entrée en fonction de quelque chose :
SecurityEvent | summarize by Activity
SecurityEvent
| where EventID == "4688"
| summarize count() by Process, Computer
Voici une liste des opérateurs disponibles :
Function(s) | Description |
---|---|
count(), countif() | Returns a count of the records per summarization group |
dcount(), dcountif() | Returns an estimate for the number of distinct values taken by a scalar expression in the summary group. |
avg(), avgif() | Calculates the average of Expr across the group. |
max(), maxif() | Returns the maximum value across the group. |
min(), minif() | Returns the minimum value across the group. |
percentile() | Returns an estimate for the specified nearest-rank percentile of the population defined by Expr. The accuracy depends on the density of population in the region of the percentile. |
stdev(), stdevif() | Calculates the standard deviation of Expr across the group, considering the group as a sample. |
sum(), sumif() | Calculates the sum of Expr across the group. |
variance(), varianceif() | Calculates the variance of Expr across the group, considering the group as a sample. |
Filtrer des résultats :
On peut avoir besoins de maximum etc... On a donc :
arg_max()
arg_min()
SecurityEvent
| where Computer == "SQL12.na.contosohotels.com"
| summarize arg_max(TimeGenerated,*) by Computer
ou bien
SecurityEvent
| where Computer == "SQL12.na.contosohotels.com"
| summarize arg_min(TimeGenerated,*) by Computer
Préparer des données :
make_array
permet de retourner une liste en JSON :
SecurityEvent
| where EventID == "4624"
| summarize make_list(Account) by Computer
Dans ce cas par exemple pour chaque ordinateur on voit quels sont les comptes qui se sont connectés (4624 est un code de connexion)
On a aussi make_set
qui permet de faire la même chose avec un set.
Visualiser des données :
On peut créer des graphiques directement dans les requêtes de logs pour les analyser directement :
areachart
barchart
columnchart
piechart
scatterchart
timechart
SecurityEvent
| summarize count() by Account
| render barchart
Pour rendre des données numériques plus lisibles sur un graphe il est parfois nécessaire de les arrondir, on utilise pour ça la fonction bin
:
SecurityEvent
| summarize count() by Account
| render barchart
sans le bin :
Références multi-tables :
Cas concrets pour Sentinel :
- 500126 : Invalid username or password or Invalid on-premises username or password
- 50074 : failed mfa request
- 500121 : failed attempt to authenticate using a second factore (10 mfa ratés en moins de 10 min)
On recherche donc à lever des alertes dans ces cas : - x10 500126 d'une même IP dans la minute
- x10 50074 en 10 min
- x1 500121
On commence par formater le résultat pour obtenir des colonnes et des lignes utiles :
SigninLogs
| where ResultType != 0
| project ResultType,ResultDescription,AuthenticationRequirement,DeviceDetail,IPAddress,Location,ProcessingTimeInMilliseconds,TimeGenerated,Identity
Une première version d'amélioration de cette requête serait de regrouper ces requêtes échouée par IP, par exemple lister les comptes sur lesquels il y a une tentative en fonction de l'addresse IP
SigninLogs
| where ResultType != 0
| project ResultType,ResultDescription,AuthenticationRequirement,DeviceDetail,IPAddress,Location,ProcessingTimeInMilliseconds,TimeGenerated,Identity
| summarize make_set(Identity) by IPAddress
Une autre chose intéressante serait de compte le nombre de connexions sur un compte.
Compter les nombre de fois qu'une erreur se produit :
SigninLogs
| where ResultType != 0
| project ResultType,ResultDescription,AuthenticationRequirement,DeviceDetail,IPAddress,Location,ProcessingTimeInMilliseconds,TimeGenerated,Identity
| summarize count() by ResultType
Une autre chose est le nombre de connections par adresses IP (on pourrait ajouter un petit graph) :
SigninLogs
| where ResultType != 0
| project ResultType,ResultDescription,AuthenticationRequirement,DeviceDetail,IPAddress,Location,ProcessingTimeInMilliseconds,TimeGenerated,Identity
| summarize count() by IPAddress
Idée déclencher une alerte si il y a 10 connexions en une minute ou bien si il y a 100 connexions en 1h.
Voici une première version d'une requête qui fonctionne, elle récupère sur une la dernière période si il y a eu plus de x requete provenant d'une addresse IP qui ont déclenché une erreur d'authentification :
let timeRange = 1h;
let threshold = 10;
SigninLogs
| where ResultType != 0
| where TimeGenerated > ago(timeRange)
| summarize count() by IPAddress
| where count_ > threshold