Aggiornare o unire record in database SQL di Azure usando Funzioni di Azure

Attualmente, Analisi di flusso di Azure (ASA) supporta solo l'inserimento (accodamento) di righe agli output SQL (Azure SQL Database e Azure Synapse Analytics). Questo articolo illustra le soluzioni alternative per abilitare UPDATE, UPSERT o MERGE nei database SQL usando Funzioni di Azure come livello intermedio.

Le opzioni alternative per Funzioni di Azure vengono presentate alla fine.

Requisito

È possibile scrivere dati in una tabella usando una delle modalità seguenti:

Mode Istruzione T-SQL equivalente Requisiti
Accoda INSERT nessuno
Replace UNIRE (UPSERT) Chiave univoca
Accumulate UNIRE (UPSERT) con operatore di assegnazione composta (+=, -=...) Chiave univoca e identificatore

Per illustrare le differenze, considerare cosa accade quando si inseriscono i due record seguenti:

Ora di arrivo Device_Id Measure_Value
10:00 Una 1
10:05 Una 20

Nella modalità di accodamento si inseriscono due record. L'istruzione T-SQL equivalente è:

INSERT INTO [target] VALUES (...);

Risultato:

Modified_Time Device_Id Measure_Value
10:00 Una 1
10:05 Una 20

In modalità di sostituzione si ottiene solo l'ultimo valore per chiave. Qui si usa Device_Id come chiave. L'istruzione T-SQL equivalente è:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value = v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Risultato:

Modified_Time Device_Key Measure_Value
10:05 Una 20

Infine, in modalità accumula si somma Value con un operatore di assegnazione composta (+=). Qui si usano anche Device_Id come chiave:

MERGE INTO [target] t
USING (VALUES ...) AS v (Modified_Time,Device_Id,Measure_Value)
ON t.Device_Key = v.Device_Id
-- Replace and/or accumulate when the key exists
WHEN MATCHED THEN
    UPDATE SET
        t.Modified_Time = v.Modified_Time,
        t.Measure_Value += v.Measure_Value
-- Insert new keys
WHEN NOT MATCHED BY t THEN
    INSERT (Modified_Time,Device_Key,Measure_Value)
    VALUES (v.Modified_Time,v.Device_Id,v.Measure_Value)

Risultato:

Modified_Time Device_Key Measure_Value
10:05 Una 21

Per considerazioni sulle prestazioni, gli adattatori di output del database SQL ASA supportano attualmente solo la modalità di accodamento in modo nativo. Questi adattatori usano l'inserimento bulk per ottimizzare la velocità effettiva e limitare la pressione.

Questo articolo illustra come usare Funzioni di Azure per implementare le modalità Replace e Accumulate per ASA. Quando si usa una funzione come livello intermedio, le potenziali prestazioni di scrittura non influiscono sul processo di streaming. A questo proposito, l'uso di Funzioni di Azure funziona meglio con Azure SQL. Con Synapse SQL, il passaggio da istruzioni bulk a righe per riga potrebbe creare problemi di prestazioni maggiori.

Funzioni di Azure output

In questo processo si sostituisce l'output di ASA SQL con ASA Funzioni di Azure output. La funzione implementa le funzionalità UPDATE, UPSERT o MERGE.

Attualmente, è possibile accedere a un database SQL in una funzione usando due opzioni. La prima opzione è l'associazione di output Azure SQL. Attualmente è limitato a C# e offre solo la modalità di sostituzione. La seconda opzione consiste nel comporre una query SQL da inviare tramite il driver SQL (Microsoft. Data.SqlClient per .NET).

Entrambi gli esempi seguenti presuppongono lo schema di tabella seguente. L'opzione di associazione richiede l'impostazione di una chiave primaria nella tabella di destinazione. Non è necessario, ma consigliato, quando si usa un driver SQL.

CREATE TABLE [dbo].[device_updated](
	[DeviceId] [bigint] NOT NULL, -- bigint in ASA
	[Value] [decimal](18, 10) NULL, -- float in ASA
	[Timestamp] [datetime2](7) NULL, -- datetime in ASA
CONSTRAINT [PK_device_updated] PRIMARY KEY CLUSTERED
(
	[DeviceId] ASC
)
);

Per usare una funzione come output di ASA, la funzione deve soddisfare le aspettative seguenti:

  • Analisi di flusso di Azure richiede lo stato HTTP 200 dall'app delle Funzioni per i batch che elabora correttamente.
  • Quando Analisi di flusso di Azure riceve un'eccezione 413 ("http Request Entity Too Large") da una funzione Azure, riduce le dimensioni dei batch inviati a Azure Funzione.
  • Durante la connessione di prova, Stream Analytics invia una richiesta POST con un batch vuoto a Funzioni di Azure e si aspetta uno stato HTTP 20x come conferma del test.

Opzione 1: Aggiornare per chiave con l'associazione SQL della funzione di Azure

Questa opzione usa l'associazione di output SQL della funzione di Azure. Questa estensione può sostituire un oggetto in una tabella senza dover scrivere un'istruzione SQL. Al momento, non supporta operatori di assegnazione composti (accumuli).

Questo esempio è stato compilato in:

Per comprendere meglio l'approccio di associazione, seguire questa esercitazione.

Prima di tutto, creare un'app per le funzioni HttpTrigger predefinita seguendo questa esercitazione. Utilizza le informazioni seguenti:

  • Lingua: C#
  • Runtime: .NET 6 (in funzione/runtime v4)
  • Modello: HTTP trigger

Installare l'estensione di associazione eseguendo il comando seguente in un terminale che si trova nella cartella del progetto:

dotnet add package Microsoft.Azure.WebJobs.Extensions.Sql --prerelease

Aggiungere l'elemento SqlConnectionString nella sezione Values del local.settings.json, inserendo la stringa di connessione del server di destinazione:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Sostituire l'intera funzione (.cs file nel progetto) con il frammento di codice seguente. Aggiorna lo spazio dei nomi, il nome della classe e il nome della funzione con i tuoi:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run (
            // http trigger binding
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log,
            [Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices
            )
        {

            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            // Parse items and send to binding
            for (var i = 0; i < data.Count; i++)
            {
                var device = new Device();
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;

                await devices.AddAsync(device);
            }
            await devices.FlushAsync();

            return new OkResult(); // 200
        }
    }

    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }
    }
}

Aggiornare il nome della tabella di destinazione nella sezione di associazione:

[Sql("dbo.device_updated", ConnectionStringSetting = "SqlConnectionString")] IAsyncCollector<Device> devices

Aggiornare la sezione di classe e mapping Device in modo che corrisponda al proprio schema:

...
                device.DeviceId = data[i].DeviceId;
                device.Value = data[i].Value;
                device.Timestamp = data[i].Timestamp;
...
    public class Device{
        public int DeviceId { get; set; }
        public double Value { get; set; }
        public DateTime Timestamp { get; set; }

È ora possibile testare il collegamento tra la funzione locale e il database eseguendo il debug (F5 in Visual Studio Code). Il database SQL deve essere raggiungibile dal computer. È possibile usare SSMS per controllare la connettività. Inviare quindi richieste POST all'endpoint locale. Una richiesta con un corpo vuoto deve restituire HTTP 204. Una richiesta con un payload effettivo dovrebbe essere mantenuta nella tabella di destinazione (in modalità replace/update). Ecco un payload di esempio corrispondente allo schema usato in questo esempio:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

È ora possibile pubblicare la funzione in Azure. Impostare un'impostazione dell'applicazione per SqlConnectionString. Il firewall di SQL Server di Azure dovrebbe consentire l'accesso dei servizi di Azure perché la funzione live possa raggiungerlo.

È quindi possibile definire la funzione come output nel processo ASA e usarla per sostituire i record anziché inserirli.

Opzione 2: eseguire il merge con l'assegnazione composta (accumulare) tramite una query SQL personalizzata

Note

Al riavvio e al ripristino, ASA potrebbe inviare nuovamente gli eventi di output già generati. Questo comportamento può causare l'esito negativo della logica di accumulo (raddoppiando i singoli valori). Per evitare questo problema, eseguire l'output degli stessi dati in una tabella utilizzando l'output SQL "ASA" nativo. È possibile usare questa tabella di controllo per rilevare i problemi e risincronizzare l'accumulo quando necessario.

Questa opzione usa Microsoft.Data.SqlClient. Questa libreria consente di inviare query SQL a un database SQL.

Questo esempio è stato compilato in:

Prima di tutto, creare un'app per le funzioni HttpTrigger predefinita seguendo questa esercitazione. Sono richieste le informazioni seguenti:

  • Lingua: C#
  • Runtime: .NET 6 (in funzione/runtime v4)
  • Modello: HTTP trigger

Installare la libreria SqlClient eseguendo il comando seguente in un terminale che si trova nella cartella del progetto:

dotnet add package Microsoft.Data.SqlClient --version 4.0.0

Aggiungere l'elemento SqlConnectionString nella sezione Values del local.settings.json, inserendo la stringa di connessione del server di destinazione:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "SqlConnectionString": "Your connection string"
    }
}

Sostituire l'intera funzione (.cs file nel progetto) con il frammento di codice seguente. Aggiornare lo spazio dei nomi, il nome della classe e il nome della funzione in base al proprio:

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.Data.SqlClient;

namespace Company.Function
{
    public static class HttpTrigger1{
        [FunctionName("HttpTrigger1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get","post", Route = null)] HttpRequest req,
            ILogger log)
        {
            // Extract the body from the request
            string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
            if (string.IsNullOrEmpty(requestBody)) {return new StatusCodeResult(204);} // 204, ASA connectivity check

            dynamic data = JsonConvert.DeserializeObject(requestBody);

            // Reject if too large, as per the doc
            if (data.ToString().Length > 262144) {return new StatusCodeResult(413);} //HttpStatusCode.RequestEntityTooLarge

            var SqlConnectionString = Environment.GetEnvironmentVariable("SqlConnectionString");
            using (SqlConnection conn = new SqlConnection(SqlConnectionString))
            {
                conn.Open();

                // Parse items and send to binding
                for (var i = 0; i < data.Count; i++)
                {
                    int DeviceId = data[i].DeviceId;
                    double Value = data[i].Value;
                    DateTime Timestamp = data[i].Timestamp;

                    var sqltext =
                    $"MERGE INTO [device_updated] AS old " +
                    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
                    $"ON new.DeviceId = old.DeviceId " +
                    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
                    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

                    //log.LogInformation($"Running {sqltext}");

                    using (SqlCommand cmd = new SqlCommand(sqltext, conn))
                    {
                        // Execute the command and log the # rows affected.
                        var rows = await cmd.ExecuteNonQueryAsync();
                        log.LogInformation($"{rows} rows updated");
                    }
                }
                conn.Close();
            }
            return new OkResult(); // 200
        }
    }
}

Aggiornare la sezione di compilazione dei comandi sqltext in modo che corrisponda al proprio schema (si noti come l'accumulo viene ottenuto tramite l'operatore += all'aggiornamento):

    var sqltext =
    $"MERGE INTO [device_updated] AS old " +
    $"USING (VALUES ({DeviceId},{Value},'{Timestamp}')) AS new (DeviceId, Value, Timestamp) " +
    $"ON new.DeviceId = old.DeviceId " +
    $"WHEN MATCHED THEN UPDATE SET old.Value += new.Value, old.Timestamp = new.Timestamp " +
    $"WHEN NOT MATCHED BY TARGET THEN INSERT (DeviceId, Value, TimeStamp) VALUES (DeviceId, Value, Timestamp);";

È ora possibile testare il collegamento tra la funzione locale e il database eseguendo il debug (F5 in VS Code). Il database SQL deve essere raggiungibile dal computer. È possibile usare SSMS per controllare la connettività. Inviare quindi richieste POST all'endpoint locale. Una richiesta con un corpo vuoto deve restituire HTTP 204. Una richiesta con un payload effettivo dovrebbe essere salvata in modo permanente nella tabella di destinazione (in modalità di accumulo/unione). Ecco un payload di esempio corrispondente allo schema usato in questo esempio:

[{"DeviceId":3,"Value":13.4,"Timestamp":"2021-11-30T03:22:12.991Z"},{"DeviceId":4,"Value":41.4,"Timestamp":"2021-11-30T03:22:12.991Z"}]

È ora possibile pubblicare la funzione in Azure. È necessario specificare un'impostazione dell'applicazione per SqlConnectionString. Il firewall di SQL Server di Azure dovrebbe consentire l'accesso dei servizi di Azure perché la funzione live possa raggiungerlo.

È quindi possibile definire la funzione come output nel processo ASA e usarla per sostituire i record anziché inserirli.

Alternativi

Al di fuori di Funzioni di Azure, più metodi possono ottenere il risultato previsto. In questa sezione vengono descritti alcuni di questi metodi.

Post-elaborazione nel database SQL di destinazione

Un'attività in background viene eseguita dopo l'inserimento dei dati nel database tramite gli output STANDARD di ASA SQL.

Per Azure SQL, usare i trigger INSTEAD OFDML per intercettare i comandi INSERT generati da AsA.

CREATE TRIGGER tr_devices_updated_upsert ON device_updated INSTEAD OF INSERT
AS
BEGIN
	MERGE device_updated AS old
	
	-- In case of duplicates on the key below, use a subquery to make the key unique via aggregation or ranking functions
	USING inserted AS new
		ON new.DeviceId = old.DeviceId

	WHEN MATCHED THEN 
		UPDATE SET
			old.Value += new.Value, 
			old.Timestamp = new.Timestamp

	WHEN NOT MATCHED THEN
		INSERT (DeviceId, Value, Timestamp)
		VALUES (new.DeviceId, new.Value, new.Timestamp);  
END;

Per Synapse SQL, ASA è in grado di inserire in una tabella di staging. Un'attività ricorrente può quindi trasformare i dati in base alle esigenze in una tabella intermedia. Infine, i dati vengono spostati nella tabella di produzione.

Pre-elaborazione in Azure Cosmos DB

Azure Cosmos DB supporta UPSERT in modo nativo. In questo caso, è possibile solo aggiungere o sostituire. È necessario gestire gli accumuli sul lato client in Azure Cosmos DB.

Se i requisiti corrispondono, è possibile sostituire il database SQL di destinazione con un'istanza di Azure Cosmos DB. Questa modifica richiede una modifica importante nell'architettura complessiva della soluzione.

Per Synapse SQL, è possibile usare Azure Cosmos DB come livello intermedio tramite Azure Collegamento a Synapse per Azure Cosmos DB. Usare Azure Collegamento a Synapse per creare un archivio analitico. È quindi possibile eseguire query su questo archivio dati direttamente in Synapse SQL.

Confronto delle alternative

Ogni approccio offre diverse proposte di valore e funzionalità:

Type Opzione Modalità database SQL di Azure Azure Synapse Analytics
Post-elaborazione
Trigger Sostituisci, Accumula + N/D, i trigger non sono disponibili in Synapse SQL
Staging Sostituisci, Accumula + +
Pre-elaborazione
Funzioni di Azure Sostituisci, Accumula + - (prestazioni riga per riga)
Sostituzione di Azure Cosmos DB Replace N/D N/D
Azure Cosmos DB azure Collegamento a Synapse Replace N/D +

Supporto

Per ulteriore assistenza, provare la pagina delle domande di Microsoft Q&A per Analisi di flusso di Azure.

Passaggi successivi