fix(runner): properly do concurrency

This commit is contained in:
Folke Lemaitre 2024-06-26 19:58:45 +02:00
parent 97f4df0824
commit 66a4170f0e
No known key found for this signature in database
GPG Key ID: 41F8B1FBACAE2040
2 changed files with 38 additions and 22 deletions

View File

@ -91,40 +91,56 @@ function Runner:_start()
active = 0 active = 0
waiting = 0 waiting = 0
wait_step = nil wait_step = nil
local next = {} ---@type string[]
-- check running tasks
for _, name in ipairs(names) do for _, name in ipairs(names) do
state[name] = state[name] or { step = 0 } state[name] = state[name] or { step = 0 }
local s = state[name] local s = state[name]
local running = s.task and s.task:is_running() local is_running = s.task and s.task:is_running()
local step = self._pipeline[s.step] local step = self._pipeline[s.step]
-- selene:allow(empty_if)
if s.task and s.task:has_errors() then if s.task and s.task:has_errors() then
local ignore = true -- don't continue tasks if there are errors
elseif step and step.task == "wait" and not resume then elseif step and step.task == "wait" and not resume then
-- waiting for sync
waiting = waiting + 1 waiting = waiting + 1
wait_step = s.step wait_step = s.step
elseif not running then elseif is_running then
if not self._opts.concurrency or active < self._opts.concurrency then -- still running
active = active + 1
else
next[#next + 1] = name
end
end
-- schedule next tasks
for _, name in ipairs(next) do
if self._opts.concurrency and active >= self._opts.concurrency then
break
end
local s = state[name]
local plugin = self:plugin(name) local plugin = self:plugin(name)
if s.step == #self._pipeline then if s.step == #self._pipeline then
-- done
s.task = nil s.task = nil
plugin._.working = false plugin._.working = false
elseif s.step < #self._pipeline then elseif s.step < #self._pipeline then
active = active + 1 -- next
s.step = s.step + 1 s.step = s.step + 1
step = self._pipeline[s.step] local step = self._pipeline[s.step]
if step.task == "wait" then if step.task == "wait" then
plugin._.working = false plugin._.working = false
waiting = waiting + 1
else else
s.task = self:queue(plugin, step) s.task = self:queue(plugin, step)
plugin._.working = not not s.task plugin._.working = not not s.task
end
end
end
else
active = active + 1 active = active + 1
end end
end end
end end
end
while active > 0 do while active > 0 do
continue() continue()

View File

@ -64,7 +64,7 @@ describe("runner", function()
local runner = Runner.new({ plugins = plugins, pipeline = { "test.test1", "test.skip", "test.test2" } }) local runner = Runner.new({ plugins = plugins, pipeline = { "test.test1", "test.skip", "test.test2" } })
runner:start() runner:start()
runner:wait() runner:wait()
assert.equal(4, #runs) assert.equal(4, #runs, runs)
end) end)
it("handles opts", function() it("handles opts", function()