From 950da0ea024fe1e292603aa84a96f4bc5fea0417 Mon Sep 17 00:00:00 2001 From: Ian Brown Date: Sun, 31 Dec 2023 09:27:03 -0800 Subject: [PATCH 1/2] Limit jobs to 5 and wrap in exception handler to release Signed-off-by: Ian Brown --- src/invidious/jobs.cr | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/invidious/jobs.cr b/src/invidious/jobs.cr index b6b673f7..b415f62b 100644 --- a/src/invidious/jobs.cr +++ b/src/invidious/jobs.cr @@ -1,5 +1,6 @@ module Invidious::Jobs - JOBS = [] of BaseJob + JOBS = [] of BaseJob + SEMAPHORE = ::Channel(Nil).new(5) # Max 5 jobs running at once # Automatically generate a structure that wraps the various # jobs' configs, so that the following YAML config can be used: @@ -31,10 +32,19 @@ module Invidious::Jobs def self.start_all JOBS.each do |job| - # Don't run the main rountine if the job is disabled by config - next if job.disabled? + SEMAPHORE.send(nil) # Acquire a "slot" + spawn do + begin + # Don't run the main routine if the job is disabled by config + next if job.disabled? - spawn { job.begin } + job.begin + rescue ex + Log.error { "Job failed: #{ex.message}" } + ensure + SEMAPHORE.receive # Release the "slot" + end + end end end end From cce734d70ebecbc7dc1a3aabc3cb3f00690f85fe Mon Sep 17 00:00:00 2001 From: Ian Brown Date: Sun, 7 Jan 2024 23:49:12 -0800 Subject: [PATCH 2/2] Added inline comments --- src/invidious/jobs.cr | 42 +++++++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/src/invidious/jobs.cr b/src/invidious/jobs.cr index b415f62b..57d62816 100644 --- a/src/invidious/jobs.cr +++ b/src/invidious/jobs.cr @@ -1,48 +1,60 @@ module Invidious::Jobs - JOBS = [] of BaseJob - SEMAPHORE = ::Channel(Nil).new(5) # Max 5 jobs running at once + # This line defines an empty array named JOBS to hold objects of type BaseJob. + JOBS = [] of BaseJob - # Automatically generate a structure that wraps the various - # jobs' configs, so that the following YAML config can be used: - # - # jobs: - # job_name: - # enabled: true - # some_property: "value" - # + # SEMAPHORE is a Channel that allows up to 5 items (jobs) to be processed concurrently. + # This is a way to limit the maximum number of jobs running at the same time to 5. + SEMAPHORE = ::Channel(Nil).new(5) + + # The `macro finished` block is executed once the module definition is complete. macro finished + # Define a struct named JobsConfig inside the module. + # This struct is for storing configuration for different jobs. struct JobsConfig - include YAML::Serializable + include YAML::Serializable # Allows serialization and deserialization from YAML. + # This loop iterates over all subclasses of BaseJob. {% for sc in BaseJob.subclasses %} - # Voodoo macro to transform `Some::Module::CustomJob` to `custom` + # Generate a getter method for each job. The job's class name is transformed + # from something like `Some::Module::CustomJob` to a simpler form `custom`. {% class_name = sc.id.split("::").last.id.gsub(/Job$/, "").underscore %} getter {{ class_name }} = {{ sc.name }}::Config.new {% end %} + # Define an empty initializer. def initialize end end end + # This class method allows registration of a job to the JOBS array. def self.register(job : BaseJob) JOBS << job end + # This class method starts all registered jobs. def self.start_all + # Iterate over each job in the JOBS array. JOBS.each do |job| - SEMAPHORE.send(nil) # Acquire a "slot" + # Send a nil value to the SEMAPHORE channel. This is like acquiring a "slot". + # If the SEMAPHORE is full (5 jobs running), this line will block until a slot is free. + SEMAPHORE.send(nil) + + # Start a new concurrent fiber (lightweight thread) for the job. spawn do begin - # Don't run the main routine if the job is disabled by config + # If the job is disabled in its configuration, skip the execution. next if job.disabled? + # Start the job by calling its 'begin' method. job.begin rescue ex + # If an exception occurs, log the error. Log.error { "Job failed: #{ex.message}" } ensure - SEMAPHORE.receive # Release the "slot" + # After the job is finished or if an error occurred, release the "slot" in the semaphore. + SEMAPHORE.receive end end end