perf: automatically suspend the scheduler when all threads are waiting

This commit is contained in:
Folke Lemaitre 2024-06-29 22:44:51 +02:00
parent 5e3c112cb3
commit 021de31682
No known key found for this signature in database
GPG Key ID: 41F8B1FBACAE2040
3 changed files with 78 additions and 47 deletions

View File

@ -1,11 +1,14 @@
local Util = require("lazy.core.util")
local M = {} local M = {}
---@type Async[] ---@type Async[]
M._queue = {} M._active = {}
M._executor = assert(vim.loop.new_timer()) ---@type Async[]
M._suspended = {}
M._executor = assert(vim.loop.new_check())
M.TIMER = 10 M.BUDGET = 10
M.BUDGET = 100
---@type table<thread, Async> ---@type table<thread, Async>
M._threads = setmetatable({}, { __mode = "k" }) M._threads = setmetatable({}, { __mode = "k" })
@ -42,11 +45,6 @@ function Async:init(fn)
return M.add(self) return M.add(self)
end end
function Async:restart()
assert(not self:running(), "Cannot restart a running async")
self:init(self._fn)
end
---@param event AsyncEvent ---@param event AsyncEvent
---@param cb async fun(res:any, async:Async) ---@param cb async fun(res:any, async:Async)
function Async:on(event, cb) function Async:on(event, cb)
@ -77,27 +75,41 @@ function Async:sleep(ms)
end end
---@async ---@async
function Async:suspend() ---@param yield? boolean
function Async:suspend(yield)
self._suspended = true self._suspended = true
if coroutine.running() == self._co then if coroutine.running() == self._co and yield ~= false then
coroutine.yield() coroutine.yield()
end end
end end
function Async:resume() function Async:resume()
self._suspended = false self._suspended = false
M._run()
end end
function Async:wait() ---@async
---@param yield? boolean
function Async:wake(yield)
local async = M.running() local async = M.running()
assert(async, "Not in an async context")
self:on("done", function()
async:resume()
end)
async:suspend(yield)
end
---@async
function Async:wait()
if coroutine.running() == self._co then if coroutine.running() == self._co then
error("Cannot wait on self") error("Cannot wait on self")
end end
while self:running() do local async = M.running()
if async then if async then
coroutine.yield() self:wake()
else else
while self:running() do
vim.wait(10) vim.wait(10)
end end
end end
@ -121,35 +133,44 @@ function Async:step()
end end
function M.step() function M.step()
local budget = M.BUDGET * 1e6
local start = vim.uv.hrtime() local start = vim.uv.hrtime()
local count = #M._queue for _ = 1, #M._active do
local i = 0 if vim.uv.hrtime() - start > M.BUDGET * 1e6 then
while #M._queue > 0 and vim.uv.hrtime() - start < budget do
---@type Async
local state = table.remove(M._queue, 1)
if state:step() then
table.insert(M._queue, state)
end
i = i + 1
if i >= count then
break break
end end
local state = table.remove(M._active, 1)
if state:step() then
if state._suspended then
table.insert(M._suspended, state)
else
table.insert(M._active, state)
end end
if #M._queue == 0 then end
end
for _ = 1, #M._suspended do
local state = table.remove(M._suspended, 1)
table.insert(state._suspended and M._suspended or M._active, state)
end
-- print("step", #M._active, #M._suspended)
if #M._active == 0 then
return M._executor:stop() return M._executor:stop()
end end
end end
---@param async Async ---@param async Async
function M.add(async) function M.add(async)
table.insert(M._queue, async) table.insert(M._active, async)
if not M._executor:is_active() then M._run()
M._executor:start(1, M.TIMER, vim.schedule_wrap(M.step))
end
return async return async
end end
function M._run()
if not M._executor:is_active() then
M._executor:start(vim.schedule_wrap(M.step))
end
end
function M.running() function M.running()
local co = coroutine.running() local co = coroutine.running()
if co then if co then

View File

@ -78,6 +78,7 @@ function Runner:_start()
---@type number? ---@type number?
local wait_step = nil local wait_step = nil
---@async
---@param resume? boolean ---@param resume? boolean
local function continue(resume) local function continue(resume)
active = 0 active = 0
@ -114,10 +115,12 @@ function Runner:_start()
end end
local s = state[name] local s = state[name]
local plugin = self:plugin(name) local plugin = self:plugin(name)
while s.step <= #self._pipeline do
if s.step == #self._pipeline then if s.step == #self._pipeline then
-- done -- done
s.task = nil s.task = nil
plugin._.working = false plugin._.working = false
break
elseif s.step < #self._pipeline then elseif s.step < #self._pipeline then
-- next -- next
s.step = s.step + 1 s.step = s.step + 1
@ -126,10 +129,16 @@ function Runner:_start()
plugin._.working = false plugin._.working = false
waiting = waiting + 1 waiting = waiting + 1
wait_step = s.step wait_step = s.step
break
else else
s.task = self:queue(plugin, step) s.task = self:queue(plugin, step)
plugin._.working = true plugin._.working = true
if s.task then
active = active + 1 active = active + 1
s.task:wake(false)
break
end
end
end end
end end
end end

View File

@ -21,6 +21,7 @@ M.log = {
---@async ---@async
---@param opts {args?: string[], updated?:boolean, check?:boolean} ---@param opts {args?: string[], updated?:boolean, check?:boolean}
run = function(self, opts) run = function(self, opts)
-- self:spawn({ "sleep", "5" })
local args = { local args = {
"log", "log",
"--pretty=format:%h %s (%cr)", "--pretty=format:%h %s (%cr)",