ros2 callback during loop

Typically: "How do I... ", "How can I... " questions
Post Reply
stevenh18
Posts: 2
Joined: 20 Apr 2022, 11:20

ros2 callback during loop

Post by stevenh18 »

I am working with a physical robot and attempting to do something along the lines of

Code: Select all

func step(action):
	do_action(action)
	while (not check_complete()):
		await()
	
where check_complete would check the value of a ros topic that it is subscribed with a callback.

the issue I'm running into is that the equivalent of ros:spinonce only get called outside the main loop, so I get stuck in an infinite loop where I am waiting for a callback result that will only be processed after the main loop has... looped.

i know that I theoretically could build the process to exit the loop with some sort of mechanism to say "this is the end, but on the next actuation run check_complete() and then do some sort of follow up, but the codebase is already pretty complex and there would be a lot of edge cases to address where this might be problematic, so I was hoping there was another way to force a ros/ros2 spinonce in the thread.
fferri
Posts: 1334
Joined: 09 Sep 2013, 19:28

Re: ros2 callback during loop

Post by fferri »

Can you share a minimal reproducible testcase?

It is not clear whether you are using Lua or Python, ROS or ROS2.

You can call spin_once only with native libraries in Python.

With Lua, best is probably to use a threaded script.
stevenh18
Posts: 2
Joined: 20 Apr 2022, 11:20

Re: ros2 callback during loop

Post by stevenh18 »

ok this will get a little silly, but its a minimal working example of my setup (I had to dump some stuff in the global space or this snippet would be a few thousand lines line with all my state management, but if it works, it works)

--important: i am using coppelia 4.3.0, its old, but the breaking API changes would take too long to update to current, so you might not be able to run this--


rundown:
RosManager: wrapper for simROS2 which manages setup and cleanup of ros objects
RosContainer: wrapper around a subscription callback so that I can cache the last result/ handle the result outside of when it was received.

await_ functions:
await_non_blocking: allows the sysCall_actuate method to continue and checks condition_func until it's true, then executes some resume_func afterward.

await_blocking: blocks the sysCall_actuate method from continuing until condition_func is met.

the issue:
as there is no way to spin_once in the lua implementation, await_blocking will never return true as new messages are never processed.

await_non_blocking is a viable alternative, but it would require me to completely redesign my control logic, and we're only using ros in a subset of use cases, so it doesn't really make sense to do this.

Code: Select all

RosContainer = {}
RosContainer.__index = RosContainer
setmetatable(RosContainer, {
    __call = function (cls, ...)
      local self = setmetatable({}, cls)
      self:_init(...)
      return self
    end,
  }
)

function RosContainer:_init(manager,topic, type, buff)
    if buff == nil then buff = 1 end
    if hints == nil then hints = {} end
    _G[topic.."callback"] = function(res) manager.subscriptions[topic].result = res end
    self.topic = topic
    self.result = nil
    self.sub = manager:subscribe(topic, type, topic.."callback", buff)
end


RosManager = {}
RosManager.__index = RosManager
setmetatable(RosManager, {
    __call = function (cls, ...)
      local self = setmetatable({}, cls)
      self:_init(...)
      return self
    end,
  }
)

function RosManager:_init(operationState)
    self.publishers = {}
    self.subscriptions = {}
    self.services = {}
    self.opstate = operationState
    if self.opstate > 1 then
        self.cleanable = {self.publishers, self.services}
        self.shutdowns = {simROS2.shutdownPublisher, simROS2.shutdownClient}
        table.insert(cleanables, self)
    end
end

function RosManager:advertise(topic, type, buff, latch)
    if self.opstate <= 1 then return nil end
    if buff == nil then buff = 1 end
    if latch == nil then latch = false end
    local pub = simROS2.createPublisher(topic, type, buff, latch)
    table.insert(self.publishers, pub)
    return pub
end

function RosManager:subscribeContainer(topic, type, buff, hints)
    if self.opstate <= 1 then return nil end
    if buff == nil then buff = 1 end
    if hints == nil then hints = {} end
    local container = RosContainer(self, topic, type, buff, hints)
    self.subscriptions[topic] = container
    return container
end

function RosManager:subscribe(topic, type, callback, buff, hints)
    if self.opstate <= 1 then return nil end
    if buff == nil then buff = 1 end
    if hints == nil then hints = {} end
    local sub = simROS2.createSubscription(topic, type, callback, buff)
    return sub
end

function RosManager:buildJointMessage(joints)
    if self.opstate <= 1 then return nil end
    local t = simROS2.getTime()
    local msg = {
        header = {
            stamp=t,
            frame_id='world'
        },
        position=joints
    }
    return msg
end

function RosManager:cleanup()
    if self.opstate <= 1 then return nil end
    for k,v in ipairs(self.cleanable) do
        while #v > 0 do
            self.shutdowns[k](table.remove(v))
        end
    end
end

function sysCall_init()
    -- do some initialization here
    cleanables = {}
    rosManager = RosManager(3)
    local id = 1
    jointPublisher = rosManager:advertise("/ARM"..id.."/move_jp", "sensor_msgs/msg/JointState")
    goalReachedSub = rosManager:subscribeContainer("/ARM"..id.."/goal_reached", "std_msgs/msg/Bool")
    iter = 0
    
    await_state = nil
    resume_func = nil
    condition_func = nil
    
    await_start(function() return goalReachedSub.result ~= nil end, function() return goalReachedSub.result end)
end

function await_start(condition, resume)
    await_state = true
    resume_func = resume
    condition_func = condition
end

function await_non_blocking()
    if await_state == nil then return end
    print(iter)
    iter = iter + 1
    await_check()
end

function await_check()
    if condition_func() then 
        await_state = nil
        print(resume_func())
    end
end

function await_inline(condition_func)
    local c = 0
    while condition_func() do
        c = c + 1
        if c % 20000 == 0 then
            print(c)
        end
    end
    -- this is where we would want to continue on.
    print("done awaiting after:", c, goalReachedSub.result)
end

function await_blocking()
    await_inline(function() return goalReachedSub.result == nil end)
end

function sysCall_actuation()
    -- allows the main loop to continue without block, executing resume_func once the condition has been met.
    await_non_blocking()
    
    -- blocks the thread, stopping the main loop from continuing execution until condition is met.
    -- causes issues if the condition only updates in the main loop.
    
    --UNCOMMENT TO TEST
    --await_blocking()
end

function sysCall_sensing()
    -- put your sensing code here
end

function sysCall_cleanup()
    -- do some clean-up here
    for k,v in ipairs(cleanables) do
        v:cleanup()
    end
end

-- See the user manual or the available code snippets for additional callback functions and details
fferri
Posts: 1334
Joined: 09 Sep 2013, 19:28

Re: ros2 callback during loop

Post by fferri »

You can't put blocking code in sysCall_actuation() or the simulation will freeze, and you can also cause deadlock like you described.

This has nothing to do with ROS, but it is a general rule of CoppeliaSim.

If you don't want to redesign your code to be non-blocking, you may want to have a look at threaded scripts.
Post Reply