refactor: use new async code for runner and simplify task class

This commit is contained in:
Folke Lemaitre 2024-06-26 17:06:56 +02:00
parent 768de1ebf6
commit 765773a176
No known key found for this signature in database
GPG Key ID: 41F8B1FBACAE2040
5 changed files with 133 additions and 205 deletions

View File

@ -1,23 +1,24 @@
local Async = require("lazy.async")
local Config = require("lazy.core.config") local Config = require("lazy.core.config")
local Task = require("lazy.manage.task") local Task = require("lazy.manage.task")
local Util = require("lazy.util")
---@class RunnerOpts ---@class RunnerOpts
---@field pipeline (string|{[1]:string, [string]:any})[] ---@field pipeline (string|{[1]:string, [string]:any})[]
---@field plugins? LazyPlugin[]|fun(plugin:LazyPlugin):any? ---@field plugins? LazyPlugin[]|fun(plugin:LazyPlugin):any?
---@field concurrency? number ---@field concurrency? number
---@alias PipelineStep {task:string, opts?:TaskOptions} ---@class RunnerTask
---@alias LazyRunnerTask {co:thread, status: {task?:LazyTask, waiting?:boolean}, plugin: string} ---@field task? LazyTask
---@field step number
---@alias PipelineStep {task:string, opts?:TaskOptions }
---@class Runner ---@class Runner
---@field _plugins table<string,LazyPlugin> ---@field _plugins table<string,LazyPlugin>
---@field _running LazyRunnerTask[]
---@field _pipeline PipelineStep[] ---@field _pipeline PipelineStep[]
---@field _sync PipelineStep[]
---@field _on_done fun()[] ---@field _on_done fun()[]
---@field _syncing boolean
---@field _opts RunnerOpts ---@field _opts RunnerOpts
---@field _running? Async
local Runner = {} local Runner = {}
---@param opts RunnerOpts ---@param opts RunnerOpts
@ -37,7 +38,6 @@ function Runner.new(opts)
for _, plugin in ipairs(pp) do for _, plugin in ipairs(pp) do
self._plugins[plugin.name] = plugin self._plugins[plugin.name] = plugin
end end
self._running = {}
self._on_done = {} self._on_done = {}
---@param step string|(TaskOptions|{[1]:string}) ---@param step string|(TaskOptions|{[1]:string})
@ -45,10 +45,6 @@ function Runner.new(opts)
return type(step) == "string" and { task = step } or { task = step[1], opts = step } return type(step) == "string" and { task = step } or { task = step[1], opts = step }
end, self._opts.pipeline) end, self._opts.pipeline)
self._sync = vim.tbl_filter(function(step)
return step.task == "wait"
end, self._pipeline)
return self return self
end end
@ -56,139 +52,107 @@ function Runner:plugin(name)
return Config.plugins[name] or self._plugins[name] return Config.plugins[name] or self._plugins[name]
end end
---@param entry LazyRunnerTask
function Runner:_resume(entry)
if entry.status.task and not entry.status.task:is_done() then
return true
end
local ok, status = coroutine.resume(entry.co)
if not ok then
Util.error("Could not resume a task\n" .. status)
end
entry.status = ok and status
return entry.status ~= nil
end
function Runner:resume(waiting)
if self._syncing then
return true
end
if waiting then
local sync = self._sync[1]
table.remove(self._sync, 1)
if sync then
self._syncing = true
vim.schedule(function()
if sync.opts and type(sync.opts.sync) == "function" then
sync.opts.sync(self)
end
for _, entry in ipairs(self._running) do
if entry.status then
if entry.status.waiting then
entry.status.waiting = false
local plugin = self:plugin(entry.plugin)
if plugin then
plugin._.working = true
end
end
end
end
self._syncing = false
end)
end
end
local running = 0
for _, entry in ipairs(self._running) do
if entry.status then
if not entry.status.waiting and self:_resume(entry) then
running = running + 1
if self._opts.concurrency and running >= self._opts.concurrency then
break
end
end
end
end
return self._syncing or running > 0 or (not waiting and self:resume(true))
end
function Runner:start() function Runner:start()
---@type string[] ---@async
local names = vim.tbl_keys(self._plugins) self._running = Async.run(function()
table.sort(names) self:_start()
for _, name in pairs(names) do end, {
local co = coroutine.create(self.run_pipeline) on_done = function()
local ok, err = coroutine.resume(co, self, name) for _, cb in ipairs(self._on_done) do
if ok then cb()
table.insert(self._running, { co = co, status = {}, plugin = name }) end
else end,
Util.error("Could not start tasks for " .. name .. "\n" .. err) })
end
end
local check = vim.uv.new_check()
check:start(function()
if self:resume() then
return
end
check:stop()
self._running = {}
for _, cb in ipairs(self._on_done) do
vim.schedule(cb)
end
self._on_done = {}
end)
end end
---@async ---@async
---@param name string function Runner:_start()
function Runner:run_pipeline(name) ---@type string[]
local plugin = self:plugin(name) local names = vim.tbl_keys(self._plugins)
plugin._.working = true table.sort(names)
coroutine.yield()
for _, step in ipairs(self._pipeline) do ---@type table<string,RunnerTask>
if step.task == "wait" then local state = {}
plugin._.working = false
coroutine.yield({ waiting = true }) local active = 1
plugin._.working = true local waiting = 0
else ---@type number?
plugin = self:plugin(name) local wait_step = nil
local task = self:queue(plugin, step.task, step.opts)
if task then ---@param resume? boolean
coroutine.yield({ task = task }) local function continue(resume)
assert(task:is_done()) active = #names
if task.error then waiting = 0
wait_step = nil
for _, name in ipairs(names) do
state[name] = state[name] or { step = 0 }
local s = state[name]
local running = s.task and s.task:is_running()
local step = self._pipeline[s.step]
if step and step.task == "wait" and not resume then
waiting = waiting + 1
active = active - 1
wait_step = s.step
elseif not running then
local plugin = self:plugin(name)
if s.task and s.task.error then
active = active - 1
elseif s.step == #self._pipeline then
s.task = nil
active = active - 1
plugin._.working = false plugin._.working = false
return elseif s.step < #self._pipeline then
s.step = s.step + 1
step = self._pipeline[s.step]
if step.task == "wait" then
plugin._.working = false
else
s.task = self:queue(plugin, step)
plugin._.working = not not s.task
end
end end
end end
end end
end end
plugin._.working = false
while active > 0 do
continue()
if active == 0 and waiting > 0 then
local sync = self._pipeline[wait_step]
if sync and sync.opts and type(sync.opts.sync) == "function" then
sync.opts.sync(self)
end
continue(true)
end
coroutine.yield()
end
end end
---@param plugin LazyPlugin ---@param plugin LazyPlugin
---@param task_name string ---@param step PipelineStep
---@param opts? TaskOptions
---@return LazyTask? ---@return LazyTask?
function Runner:queue(plugin, task_name, opts) function Runner:queue(plugin, step)
assert(self._running) assert(self._running and self._running:running(), "Runner is not running")
local def = vim.split(task_name, ".", { plain = true }) local def = vim.split(step.task, ".", { plain = true })
---@type LazyTaskDef ---@type LazyTaskDef
local task_def = require("lazy.manage.task." .. def[1])[def[2]] local task_def = require("lazy.manage.task." .. def[1])[def[2]]
assert(task_def) assert(task_def, "Task not found: " .. step.task)
opts = opts or {} local opts = step.opts or {}
if not (task_def.skip and task_def.skip(plugin, opts)) then if not (task_def.skip and task_def.skip(plugin, opts)) then
local task = Task.new(plugin, def[2], task_def.run, opts) return Task.new(plugin, def[2], task_def.run, opts)
task:start()
return task
end end
end end
function Runner:is_running()
return self._running and self._running:running()
end
-- Execute the callback async when done. -- Execute the callback async when done.
-- When no callback is specified, this will wait sync -- When no callback is specified, this will wait sync
---@param cb? fun() ---@param cb? fun()
function Runner:wait(cb) function Runner:wait(cb)
if #self._running == 0 then if not self:is_running() then
if cb then if cb then
cb() cb()
end end
@ -199,7 +163,7 @@ function Runner:wait(cb)
table.insert(self._on_done, cb) table.insert(self._on_done, cb)
else else
-- sync wait -- sync wait
while #self._running > 0 do while self:is_running() do
vim.wait(10) vim.wait(10)
end end
end end

View File

@ -5,7 +5,7 @@ local Process = require("lazy.manage.process")
---@field skip? fun(plugin:LazyPlugin, opts?:TaskOptions):any? ---@field skip? fun(plugin:LazyPlugin, opts?:TaskOptions):any?
---@field run fun(task:LazyTask, opts:TaskOptions) ---@field run fun(task:LazyTask, opts:TaskOptions)
---@alias LazyTaskState {task:LazyTask, thread:thread} ---@alias LazyTaskFn async fun(task:LazyTask, opts:TaskOptions)
---@class LazyTask ---@class LazyTask
---@field plugin LazyPlugin ---@field plugin LazyPlugin
@ -14,11 +14,10 @@ local Process = require("lazy.manage.process")
---@field status string ---@field status string
---@field error? string ---@field error? string
---@field warn? string ---@field warn? string
---@field private _task fun(task:LazyTask, opts:TaskOptions)
---@field private _started? number ---@field private _started? number
---@field private _ended? number ---@field private _ended? number
---@field private _opts TaskOptions ---@field private _opts TaskOptions
---@field private _running Async[] ---@field private _running Async
local Task = {} local Task = {}
---@class TaskOptions: {[string]:any} ---@class TaskOptions: {[string]:any}
@ -27,15 +26,10 @@ local Task = {}
---@param plugin LazyPlugin ---@param plugin LazyPlugin
---@param name string ---@param name string
---@param opts? TaskOptions ---@param opts? TaskOptions
---@param task fun(task:LazyTask) ---@param task LazyTaskFn
function Task.new(plugin, name, task, opts) function Task.new(plugin, name, task, opts)
local self = setmetatable({}, { local self = setmetatable({}, { __index = Task })
__index = Task,
})
self._opts = opts or {} self._opts = opts or {}
self._running = {}
self._task = task
self._started = nil
self.plugin = plugin self.plugin = plugin
self.name = name self.name = name
self.output = "" self.output = ""
@ -45,6 +39,7 @@ function Task.new(plugin, name, task, opts)
return other.name ~= name or other:is_running() return other.name ~= name or other:is_running()
end, plugin._.tasks or {}) end, plugin._.tasks or {})
table.insert(plugin._.tasks, self) table.insert(plugin._.tasks, self)
self:_start(task)
return self return self
end end
@ -56,22 +51,31 @@ function Task:has_ended()
return self._ended ~= nil return self._ended ~= nil
end end
function Task:is_done()
return self:has_started() and self:has_ended()
end
function Task:is_running() function Task:is_running()
return self:has_started() and not self:has_ended() return not self:has_ended()
end end
function Task:start() ---@private
---@param task LazyTaskFn
function Task:_start(task)
assert(not self:has_started(), "task already started") assert(not self:has_started(), "task already started")
assert(not self:has_ended(), "task already done") assert(not self:has_ended(), "task already done")
self._started = vim.uv.hrtime() self._started = vim.uv.hrtime()
self:async(function() ---@async
self._task(self, self._opts) self._running = Async.run(function()
end) task(self, self._opts)
end, {
on_done = function()
self:_done()
end,
on_error = function(err)
self:notify_error(err)
end,
on_yield = function(res)
self:notify(res)
end,
})
end end
---@param msg string|string[] ---@param msg string|string[]
@ -98,31 +102,13 @@ function Task:notify_warn(msg)
self:notify(msg, vim.diagnostic.severity.WARN) self:notify(msg, vim.diagnostic.severity.WARN)
end end
---@param fn async fun()
function Task:async(fn)
local async = Async.run(fn, {
on_done = function()
self:_done()
end,
on_error = function(err)
self:notify_error(err)
end,
on_yield = function(res)
self:notify(res)
end,
})
table.insert(self._running, async)
end
---@private ---@private
function Task:_done() function Task:_done()
assert(self:has_started(), "task not started") assert(self:has_started(), "task not started")
assert(not self:has_ended(), "task already done") assert(not self:has_ended(), "task already done")
for _, t in ipairs(self._running) do if self._running and self._running:running() then
if t:running() then return
return
end
end end
self._ended = vim.uv.hrtime() self._ended = vim.uv.hrtime()
@ -180,16 +166,6 @@ function Task:spawn(cmd, opts)
end end
end end
---@param tasks (LazyTask?)[]
function Task.all_done(tasks)
for _, task in ipairs(tasks) do
if task and not task:is_done() then
return false
end
end
return true
end
function Task:wait() function Task:wait()
while self:is_running() do while self:is_running() do
vim.wait(10) vim.wait(10)

View File

@ -65,9 +65,7 @@ M.build = {
---@cast builders (string|fun(LazyPlugin))[] ---@cast builders (string|fun(LazyPlugin))[]
for _, build in ipairs(builders) do for _, build in ipairs(builders) do
if type(build) == "function" then if type(build) == "function" then
self:async(function() build(self.plugin)
build(self.plugin)
end)
elseif build == "rockspec" then elseif build == "rockspec" then
Rocks.build(self) Rocks.build(self)
elseif build:sub(1, 1) == ":" then elseif build:sub(1, 1) == ":" then
@ -78,7 +76,7 @@ M.build = {
if not chunk or err then if not chunk or err then
error(err) error(err)
end end
self:async(chunk) chunk()
else else
B.shell(self, build) B.shell(self, build)
end end

View File

@ -30,12 +30,11 @@ describe("runner", function()
end, end,
} }
package.loaded["lazy.manage.task.test"]["async" .. i] = { package.loaded["lazy.manage.task.test"]["async" .. i] = {
---@async
---@param task LazyTask ---@param task LazyTask
run = function(task) run = function(task)
task:async(function() coroutine.yield()
coroutine.yield() table.insert(runs, { plugin = task.plugin.name, task = task.name })
table.insert(runs, { plugin = task.plugin.name, task = task.name })
end)
end, end,
} }
end end

View File

@ -1,3 +1,4 @@
---@module 'luassert'
--# selene:allow(incorrect_standard_library_use) --# selene:allow(incorrect_standard_library_use)
local Task = require("lazy.manage.task") local Task = require("lazy.manage.task")
@ -20,12 +21,10 @@ describe("task", function()
it("simple function", function() it("simple function", function()
local task = Task.new(plugin, "test", function() end, opts) local task = Task.new(plugin, "test", function() end, opts)
assert(not task:has_started()) assert(task:has_started())
assert(not task:is_running()) assert(task:is_running())
task:start()
task:wait() task:wait()
assert(not task:is_running()) assert(not task:is_running())
assert(task:is_done())
assert(task_result.done) assert(task_result.done)
end) end)
@ -33,11 +32,9 @@ describe("task", function()
local task = Task.new(plugin, "test", function() local task = Task.new(plugin, "test", function()
error("test") error("test")
end, opts) end, opts)
assert(not task:has_started()) assert(task:has_started())
assert(not task:is_running()) assert(task:is_running())
task:start()
task:wait() task:wait()
assert(task:is_done())
assert(not task:is_running()) assert(not task:is_running())
assert(task_result.done) assert(task_result.done)
assert(task_result.error) assert(task_result.error)
@ -46,21 +43,17 @@ describe("task", function()
it("async", function() it("async", function()
local running = true local running = true
local task = Task.new(plugin, "test", function(task) ---@async
task:async(function() local task = Task.new(plugin, "test", function()
coroutine.yield() coroutine.yield()
running = false running = false
end)
end, opts) end, opts)
assert(not task:is_running()) assert(task:has_started())
assert(not task:has_started()) assert(task:is_running())
task:start()
assert(running) assert(running)
assert(task:is_running()) assert(task:is_running())
assert(not task:is_done())
task:wait() task:wait()
assert(not running) assert(not running)
assert(task:is_done())
assert(not task:is_running()) assert(not task:is_running())
assert(task_result.done) assert(task_result.done)
assert(not task.error) assert(not task.error)
@ -70,8 +63,8 @@ describe("task", function()
local task = Task.new(plugin, "spawn_errors", function(task) local task = Task.new(plugin, "spawn_errors", function(task)
task:spawn("foobar") task:spawn("foobar")
end, opts) end, opts)
assert(not task:is_running()) assert(task:has_started())
task:start() assert(task:is_running())
task:wait() task:wait()
assert(not task:is_running()) assert(not task:is_running())
assert(task_result.done) assert(task_result.done)
@ -82,13 +75,11 @@ describe("task", function()
local task = Task.new(plugin, "test", function(task) local task = Task.new(plugin, "test", function(task)
task:spawn("echo", { args = { "foo" } }) task:spawn("echo", { args = { "foo" } })
end, opts) end, opts)
assert(not task:is_running()) assert(task:has_started())
assert(not task:has_started()) assert(task:is_running())
task:start()
assert(task:has_started()) assert(task:has_started())
assert(task:is_running()) assert(task:is_running())
task:wait() task:wait()
assert(task:is_done())
assert.same(task.output, "foo\n") assert.same(task.output, "foo\n")
assert(task_result.done) assert(task_result.done)
assert(not task.error) assert(not task.error)
@ -99,8 +90,8 @@ describe("task", function()
task:spawn("echo", { args = { "foo" } }) task:spawn("echo", { args = { "foo" } })
task:spawn("echo", { args = { "bar" } }) task:spawn("echo", { args = { "bar" } })
end, opts) end, opts)
assert(not task:is_running()) assert(task:has_started())
task:start() assert(task:is_running())
assert(task:is_running()) assert(task:is_running())
task:wait() task:wait()
assert(task.output == "foo\nbar\n" or task.output == "bar\nfoo\n", task.output) assert(task.output == "foo\nbar\n" or task.output == "bar\nfoo\n", task.output)