'Stream Analytics: Best parameters to choose for the autopause of a day wise TUMBLINGWINDOW stream job and best trigger time to set for that function

Context

I have a daywise TUMBLINGWINDOW (similar to the one shown below)

SELECT
    DATEADD(day, -1, System.Timestamp()) AS WindowStart
    System.Timestamp() AS WindowEnd, 
    TollId, 
    COUNT(*)
FROM Input TIMESTAMP BY EntryTime  
GROUP BY TumblingWindow(day, 1), TollId

I have been reading the autopause documentation and have started following the steps included within it. I have a test stream job along with a function app which can host the autopause PowerShell code all setup so that I can playaround without impacting the actual job, since I am using a separate test job for now). The PowerShell code has been left as is (no changes except parameter values) however I am yet to actually start the test stream job and am planning to do so once I have got a little more of a clue as to the parameters and trigger time to use for the auto-pause stuff.

Here is a previous stackoverflow post which provide additional useful explanation for understanding purposes along with what I am trying to achieve (I created the post):

Post explaining how start time works with specific examples as to how I'd want the job to pause

Summary of Background Scenario in other posts

Aim is to be able to allow the stream job to run once in a day (long enough to allow the full days TUMBLINGWINDOW output to come out each day with the full days worth of data). To ensure enough time is given for this purpose I was, thinking that the job can remain off majority of the day (from 00:30 UTC) except for 23:30 UTC when it should turn on and "catch up with the backlog" for the day (00:00-23:30 UTC) after which the day wise window outputs at 00:00 UTC and subsequently switch off, say at 00:30 UTC (having had enough time to ensure output). This process would then repeat in a cycle

My Question

Do the main parameters I have chosen (added below) fit with my intentions (as described in above context) and if so how do I set the trigger time of the function app so that this code runs as intended per these parameters?

Would I set the trigger to run the script at 23:30 and 00:30(mentioned in docs this is done using CRON expressions) since at both of those points it would either need to start or stop the job respectively?

# This snippet is taken from the auto-pause doc linked above

# Set my own values in minutes based on above discussion
$restartThresholdMinute = 1380 # This is M (1380min = 23hours ie time left off 00:30-23:30 UTC)
$stopThresholdMinute = 60 # This is N (60min = 1hours ie time left on 23:30-00:30 UTC)

# Have left these as default due to present advice
$maxInputBacklog = 0 # The amount of backlog we tolerate when stopping the job (in event count, 0 is a good starting point)
$maxWatermark = 10 # The amount of watermark we tolerate when stopping the job (in seconds, 10 is a good starting point at low SUs)

Side point:

If my parameters are not a good choice to start with what are some other suggestions? (Bearing in mind the primary constraints that I have discussed in the context section)

Edit: Update 2022-03-16

@Florian I have a few thoughts based on my understanding of what you mentioned in your post however not sure exactly the best way to handle this. If you could add an adaptation to your code for this implementation in your answer that would be good.

  • The overall structure of the PowerShell script can remain the same. In the end its probably best to change the console write outputs etc too but havent added that.
    • A primary difference could be the if-else logic which starts/ stops the jobs these would have a condition which compares the time to some predefined set value rather than relying on M and N.
    • Perhaps the watermark and backlog checks can be kept just to output to the console for reference but removed from all condition sections.
    • Have kept the -OutputStartMode LastOutputEventTime as the start_time option (think its basically when last stopped) to ensure that we don't lose out on any data and have the full days worth of data as you mentioned in a previous post.
  • For initial concept purposes I have kept almost all the documentation code (even though it may not be needed) and just added a few variables and changed the stop/ start if-else conditions.
    • The changes I have made have a nishcs edit comment near them.
    • I am using the final function app code as a starting point since currently have setup with function app in mind for hosting.
# Input bindings are passed in via param block.
Param($Timer)

# Stop on error
$ErrorActionPreference = 'stop'

# Write an information log with the current time.
$currentUTCtime = (Get-Date).ToUniversalTime()
$currentUTCstringtime = Get-Date -Date $currentUTCtime -UFormat %R  # nishcs edit: Getting the 24hour UTC time format as a string
Write-Host "asaRobotPause - PowerShell timer trigger function is starting at time: $currentUTCtime"

# Set variables
[string]$restartTime = $env:restartTime # nishcs edit: Set this to '23:30' These can infact be hard coded (perhaps best practice to have these as set variables in function app settings, not sure.)
[string]$stopTime = $env:stopTime # nishcs edit: Set this to '00:30'. These can infact be hard coded (perhaps best practice to have these as set variables in function app settings, not sure.)

$maxInputBacklog = $env:maxInputBacklog
$maxWatermark = $env:maxWatermark

$restartThresholdMinute = $env:restartThresholdMinute
$stopThresholdMinute = $env:stopThresholdMinute

$subscriptionId = $env:subscriptionId
$resourceGroupName = $env:resourceGroupName
$asaJobName = $env:asaJobName

$resourceId = "/subscriptions/$($subscriptionId )/resourceGroups/$($resourceGroupName)/providers/Microsoft.StreamAnalytics/streamingjobs/$($asaJobName)"

# Check if managed identity has been enabled and granted access to a subscription, resource group, or resource
$AzContext = Get-AzContext -ErrorAction SilentlyContinue
if (-not $AzContext.Subscription.Id)
{
    Throw ("Managed identity is not enabled for this app or it has not been granted access to any Azure resources. Please see https://docs.microsoft.com/en-us/azure/app-service/overview-managed-identity for additional details.")
}

try
{
    # throw "This is an error."
    
    # Check current ASA job status
    $currentJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotPause - Job $($asaJobName) is $($currentJobState)."

    # Switch state
    if ($currentJobState -eq "Running")
    { 
        # Get-AzActivityLog issues warnings about deprecation coming in future releases, here we ignore them via -WarningAction Ignore
        # We check in 1000 record of history, to make sure we're not missing what we're looking for. It may need adjustment for a job that has a lot of logging happening.
        # There is a bug in Get-AzActivityLog that triggers an error when Select-Object First is in the same pipeline (on the same line). We move it down.
        $startTimeStamp = Get-AzActivityLog -ResourceId $resourceId -MaxRecord 1000 -WarningAction Ignore | Where-Object {$_.EventName.Value -like "Start Job*"}
        $startTimeStamp = $startTimeStamp | Select-Object -First 1 | Foreach-Object {$_.EventTimeStamp}

        # Get-AzMetric issues warnings about deprecation coming in future releases, here we ignore them via -WarningAction Ignore
        $currentBacklog = Get-AzMetric -ResourceId $resourceId -TimeGrain 00:01:00 -MetricName "InputEventsSourcesBacklogged" -DetailedOutput -WarningAction Ignore
        $currentWatermark = Get-AzMetric -ResourceId $resourceId -TimeGrain 00:01:00 -MetricName "OutputWatermarkDelaySeconds" -DetailedOutput -WarningAction Ignore

        # Metric are always lagging 1-3 minutes behind, so grabbing the last N minutes means checking N+3 actually. This may be overly safe and fined tune down per job.
        $Backlog =  $currentBacklog.Data | `
                        Where-Object {$_.Maximum -ge 0} | `
                        Sort-Object -Property Timestamp -Descending | `
                        Where-Object {$_.Timestamp -ge $startTimeStamp} | `
                        Select-Object -First $stopThresholdMinute | 
                        Measure-Object -Sum Maximum
        $BacklogSum = $Backlog.Sum

        $Watermark = $currentWatermark.Data | `
                        Where-Object {$_.Maximum -ge 0} | `
                        Sort-Object -Property Timestamp -Descending | `
                        Where-Object {$_.Timestamp -ge $startTimeStamp} | `
                        Select-Object -First $stopThresholdMinute | `
                        Measure-Object -Average Maximum
        $WatermarkAvg = [int]$Watermark.Average

        Write-Output "asaRobotPause - Job $($asaJobName) is running since $($startTimeStamp) with a sum of $($BacklogSum) backlogged events, and an average watermark of $($WatermarkAvg) sec, for $($Watermark.Count) minutes."

        # nishcs edit: Conditions no longer reliant on the M and N minute. Just on the predefined start/ stop time that have been set.
        if (
            ($currentUTCstringtime -eq $stopTime)
            )
        {
            Write-Output "asaRobotPause - Job $($asaJobName) is stopping..."
            Stop-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName
        }
        else {
            Write-Output "asaRobotPause - Job $($asaJobName) is not stopping yet, it needs to have less than $($maxInputBacklog) backlogged events and under $($maxWatermark) sec watermark for at least $($stopThresholdMinute) minutes."
        }
    }

    elseif ($currentJobState -eq "Stopped")
    {
        # Get-AzActivityLog issues warnings about deprecation coming in future releases, here we ignore them via -WarningAction Ignore
        # We check in 1000 record of history, to make sure we're not missing what we're looking for. It may need adjustment for a job that has a lot of logging happening.
        # There is a bug in Get-AzActivityLog that triggers an error when Select-Object First is in the same pipeline (on the same line). We move it down.
        $stopTimeStamp = Get-AzActivityLog -ResourceId $resourceId -MaxRecord 1000 -WarningAction Ignore | Where-Object {$_.EventName.Value -like "Stop Job*"}
        $stopTimeStamp = $stopTimeStamp | Select-Object -First 1 | Foreach-Object {$_.EventTimeStamp}

        # Get-Date returns a local time, we project it to the same time zone (universal) as the result of Get-AzActivityLog that we extracted above
        $minutesSinceStopped = ((Get-Date).ToUniversalTime()- $stopTimeStamp).TotalMinutes

        # nishcs edit: Conditions no longer reliant on the M and N minute. Just on the predefined start/ stop time that have been set. 
        if ($currentUTCstringtime -eq $restartTime)
        {
            Write-Output "asaRobotPause - Job $($jobName) was paused $([int]$minutesSinceStopped) minutes ago, set interval is $($restartThresholdMinute), it is now starting..."
            Start-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName -OutputStartMode LastOutputEventTime
        }
        else{
            Write-Output "asaRobotPause - Job $($jobName) was paused $([int]$minutesSinceStopped) minutes ago, set interval is $($restartThresholdMinute), it will not be restarted yet."
        }
    }
    else {
        Write-Output "asaRobotPause - Job $($jobName) is not in a state I can manage: $($currentJobState). Let's wait a bit, but consider helping is that doesn't go away!"
    }

    # Final ASA job status check
    $newJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotPause - Job $($asaJobName) was $($currentJobState), is now $($newJobState). Job completed."
}
catch
{
    throw $_.Exception.Message
}


Solution 1:[1]

Posting here for completeness of the question. I have provided the modified script to handle stopping and starting at a particular time.

This follows from the suggestions by @Florian.

Method 1: Function App Method

If you plan on using a function app to host the code you can create 2 separate functions within a single function app. One for stopping and one for restarting the stream job. Below I have attached the PowerShell code for each of the functions (run.ps1). The parameters for the function can be added to the config section of the function app and pulled into the script here using the environment variable syntax.

Function 1 (restart job): asa-autorestart

<# 
Function for restarting the stream job.
This uses the when last stopped logic to try and ensure no data is missed during the restart process, this can be changed as necessary.
#>

# Input bindings are passed in via param block.
Param($Timer)

# Stop on error
$ErrorActionPreference = 'stop'

# Write an information log with the current time.
$currentUTCtime = (Get-Date).ToUniversalTime()
Write-Host "asaRobotRestart - PowerShell timer trigger function is starting at time: $currentUTCtime"

# Set variables
$resourceGroupName = $env:resourceGroupName
$asaJobName = $env:asaJobName

# Not being used in code but kept just encase
$subscriptionId = $env:subscriptionId
#$resourceId = "/subscriptions/$($subscriptionId )/resourceGroups/$($resourceGroupName)/providers/Microsoft.StreamAnalytics/streamingjobs/$($asaJobName)"

# Check if managed identity has been enabled and granted access to a subscription, resource group, or resource
$AzContext = Get-AzContext -ErrorAction SilentlyContinue
if (-not $AzContext.Subscription.Id)
{
    Throw ("Managed identity is not enabled for this app or it has not been granted access to any Azure resources. Please see https://docs.microsoft.com/en-us/azure/app-service/overview-managed-identity for additional details.")
}

try
{
    # throw "This is an error."
    
    # Check current ASA job status
    $currentJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotRestart - Job $($asaJobName) is $($currentJobState)."

    if ($currentJobState -eq "Stopped")
    {   
        # Conditions no longer reliant on the M and N minute. Just on the predefined restart trigger time that has been set.
        Write-Output "asaRobotRestart - Job $($jobName) is now starting from when last stopped..."
        Start-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName -OutputStartMode LastOutputEventTime
        Write-Output "asaRobotRestart - Job $($jobName) has been started."
    }
    else {
        Write-Output "asaRobotRestart - Job $($jobName) is not in a state I can manage: $($currentJobState). Let's wait a bit, but consider helping is that doesn't go away!"
    }
    # Final ASA job status check
    $newJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotRestart - Job $($asaJobName) was $($currentJobState), is now $($newJobState). Job completed."
}
catch
{
    throw $_.Exception.Message
}

Function 2 (stopping job): asa-autostop

<# 
Function for stopping the stream job.
#>

# Input bindings are passed in via param block.
Param($Timer)

# Stop on error
$ErrorActionPreference = 'stop'

# Write an information log with the current time.
$currentUTCtime = (Get-Date).ToUniversalTime()
Write-Host "asaRobotStop - PowerShell timer trigger function is starting at time: $currentUTCtime"

# Set variables
$resourceGroupName = $env:resourceGroupName
$asaJobName = $env:asaJobName

# Not being used in code but kept just encase
$subscriptionId = $env:subscriptionId
#$resourceId = "/subscriptions/$($subscriptionId )/resourceGroups/$($resourceGroupName)/providers/Microsoft.StreamAnalytics/streamingjobs/$($asaJobName)"

# Check if managed identity has been enabled and granted access to a subscription, resource group, or resource
$AzContext = Get-AzContext -ErrorAction SilentlyContinue
if (-not $AzContext.Subscription.Id)
{
    Throw ("Managed identity is not enabled for this app or it has not been granted access to any Azure resources. Please see https://docs.microsoft.com/en-us/azure/app-service/overview-managed-identity for additional details.")
}

try
{
    # throw "This is an error."
    
    # Check current ASA job status
    $currentJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotStop - Job $($asaJobName) is $($currentJobState)."

    # Switch state
    if ($currentJobState -eq "Running")
    { 
        # Conditions no longer reliant on the M and N minute. Just on the predefined stop trigger time that has been set.
        Write-Output "asaRobotStop - Job $($asaJobName) is stopping..."
        Stop-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName
        Write-Output "asaRobotStop - Job $($asaJobName) has stopped."

    }
    else {
            Write-Output "asaRobotStop - Job $($jobName) is not in a state I can manage: $($currentJobState). Let's wait a bit, but consider helping is that doesn't go away!"
        }
    # Final ASA job status check
    $newJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotStop - Job $($asaJobName) was $($currentJobState), is now $($newJobState). Job completed."
}
catch
{
    throw $_.Exception.Message
}

Method 2: Automation Job Method

If you plan on using a Automation account to host the code you can create 2 separate runbooks within a automation account. One for stopping and one for restarting the stream job. Below I have attached the PowerShell code for each of the Runbooks . The parameters for the runbook can be added once the runbook has been published and you are scheduling the each book to run at a specific time. This can then be pulled into the script using the standard parameter syntax.

Runbook 1 (restarting job): asa-autorestart

#Re-starting job
Param(
    [string]$subscriptionId,
    [string]$resourceGroupName,
    [string]$asaJobName
)
# Stop on error
$ErrorActionPreference = 'stop'
# Write an information log with the current time.
$currentUTCtime = (Get-Date).ToUniversalTime()
Write-Host "asaRobotRestart - PowerShell timer trigger function is starting at time: $currentUTCtime"
# Set variables
$resourceId = "/subscriptions/$($subscriptionId )/resourceGroups/$($resourceGroupName)/providers/Microsoft.StreamAnalytics/streamingjobs/$($asaJobName)"
# Ensures you do not inherit an AzContext in your runbook
Disable-AzContextAutosave -Scope Process | Out-Null
# Connect using a Managed Service Identity
try {
        $AzureContext = (Connect-AzAccount -Identity).context
    }
catch{
        Write-Output "There is no system-assigned user identity. Aborting.";
        exit
    }
try
{
    # throw "This is an error."
    # Check current ASA job status
    $currentJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotRestart - Job $($asaJobName) is $($currentJobState)."
    if ($currentJobState -eq "Stopped")
    {
        # Conditions no longer reliant on the M and N minute. Just on the predefined restart trigger time that has been set.
        Write-Output "asaRobotRestart - Job $($jobName) is now starting from when last stopped..."
        Start-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName -OutputStartMode LastOutputEventTime
        Write-Output "asaRobotRestart - Job $($jobName) has been started."
    }
    else {
        Write-Output "asaRobotRestart - Job $($jobName) is not in a state I can manage: $($currentJobState). Let's wait a bit, but consider helping is that doesn't go away!"
    }
    # Final ASA job status check
    $newJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotRestart - Job $($asaJobName) was $($currentJobState), is now $($newJobState). Job completed."
}
catch
{
    throw $_.Exception.Message
}

Runbook 2 (stopping job): asa-autostop

# Stopping job
Param(
    [string]$subscriptionId,
    [string]$resourceGroupName,
    [string]$asaJobName
)
# Stop on error
$ErrorActionPreference = 'stop'
# Write an information log with the current time.
$currentUTCtime = (Get-Date).ToUniversalTime()
Write-Host "asaRobotStop - PowerShell timer trigger function is starting at time: $currentUTCtime"
# Set variables
$resourceId = "/subscriptions/$($subscriptionId )/resourceGroups/$($resourceGroupName)/providers/Microsoft.StreamAnalytics/streamingjobs/$($asaJobName)"
# Ensures you do not inherit an AzContext in your runbook
Disable-AzContextAutosave -Scope Process | Out-Null
# Connect using a Managed Service Identity
try {
        $AzureContext = (Connect-AzAccount -Identity).context
    }
catch{
        Write-Output "There is no system-assigned user identity. Aborting.";
        exit
    }
try
{
    # throw "This is an error."
    # Check current ASA job status
    $currentJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotStop - Job $($asaJobName) is $($currentJobState)."
    # Switch state
    if ($currentJobState -eq "Running")
    {
        # Conditions no longer reliant on the M and N minute. Just on the predefined stop trigger time that has been set.
        Write-Output "asaRobotStop - Job $($asaJobName) is stopping..."
        Stop-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName
        Write-Output "asaRobotStop - Job $($asaJobName) has stopped."
    }
    else {
            Write-Output "asaRobotStop - Job $($jobName) is not in a state I can manage: $($currentJobState). Let's wait a bit, but consider helping is that doesn't go away!"
        }
    # Final ASA job status check
    $newJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
    Write-Output "asaRobotStop - Job $($asaJobName) was $($currentJobState), is now $($newJobState). Job completed."
}
catch
{
    throw $_.Exception.Message
}

Solution 2:[2]

I think you need a different scheduling logic that the one described in the article.

From the article:

  • A stopped job is restarted after M minutes
  • A running job is stopped anytime after N minutes, as soon as its backlog and watermark metrics are healthy

What I think you need:

  • A stopped job is restarted at 23:30
  • A running job is stopped at 00:30 (you can still check for watermark, but that may be unnecessary if you give it enough time)

The simplest way to implement your use case is to create 2 simple jobs, one for starting, one for stopping. In terms of triggers:

  • Azure Function you will need something like:
    • stop at 0 30 23 * * *
    • start at 0 30 0 * * *
  • Azure Automation with a schedule trigger

Let me know if you need help adapting the code.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Curious
Solution 2 Florian Eiden