Path Planning Algorithm with Threaded/Non Threaded Scripts Bug
Posted: 25 Jul 2024, 16:52
I am having an issue with my program, i intend to run a path finding algorithm Dijkstra, now, i created a slider with xml, that changes the obstacles available, the issue now is whenever this obstacle is changed, the Dijkstra has to run again, i sense this is too much work for the main thread cause it crashes, i have read on threaded scripts, but i have an issue figuring out how to implement, like do I change this from a non-threaded to threaded script and if i do that, I notice certain callbacks don't work anymore like sysCall_cleanup(), which is important for my program, or do i create a separate script and make it threaded, move the Dijkstra, but if i do so, how do i get the path returned by Dijkstra and still use it in the sysCall_actuation() which will be in the main script, cause there will be two separate scripts now, honestly, i just need advice on way forward, any input is appreciated. this is the code:
Please Pardon the roughness of the code and multiple comments, it's still in development.
Please Pardon the roughness of the code and multiple comments, it's still in development.
Code: Select all
#python
import math
import numpy as np
def sysCall_init():
sim = require('sim')
np.random.seed(18)
self.x_scale = 3
self.y_scale = 2
self.node_radius = 0.05
self.dummy_spacing_factor = 0.97
self.random_percent = 0.7
self.robot_handle = sim.getObject('/robot_front_based_group')
self.leftArm = sim.getObject('/left_arm_joint')
self.rightArm = sim.getObject('/right_arm_joint')
self.leftWheel = sim.getObject('/left_joint')
self.rightWheel = sim.getObject('/right_joint')
self.proximitySensor = sim.getObject('/Proximity_sensor')
self.velocity = 14.0 # Set the desired velocity
#getting floor handle and dimensions
self.floor = sim.getObject('/infiniteFloor')
result, pureType, dimensions = sim.getShapeGeomInfo(self.floor)
#print(f'result: {result}, pureType: {pureType}, dimensions: {dimensions}')
parameter = sim.setObjectInt32Param(self.floor, sim.objintparam_visibility_layer, 1)
sim.scaleObject(self.floor, self.x_scale, self.y_scale, 1,0)
#getting object handle and setting position
self.robot_handle = sim.getObject('/robot_front_based_group')
self.pos = sim.getObjectPosition(self.robot_handle, sim.handle_parent)
#print(pos)
sim.setObjectPosition(self.robot_handle, [self.pos[0],self.pos[1],self.pos[2]], sim.handle_parent)
# do some initialization here
self.line_container = sim.addDrawingObject(sim.drawing_lines, 1, 0, self.floor, 99999, [0.0, 1.0, 0.0])
self.point_container = sim.addDrawingObject(sim.drawing_points, self.node_radius, 0, self.floor, 99999, [1.0, 0.0, 0.0])
self.arrived = 0
self.dummies = dict()
x_offset = (self.x_scale * 5) // 2
y_offset = (self.y_scale * 5) // 2
for i in range(-x_offset, x_offset + 1):
for j in range(-y_offset, y_offset + 1):
x_pos = i * self.dummy_spacing_factor
y_pos = j * self.dummy_spacing_factor
self.dummies[f'{i+x_offset}-{j+y_offset}'] = create_dummy(f'{i+x_offset}-{j+y_offset}', [x_pos,y_pos])
self.path = dijkstra_graph('0-0','5-5')
xml = '<ui title="'+sim.getObjectAlias(self.robot_handle,1)+' speed" closeable="false" resizeable="false"
activate="false">'+ \
'<hslider minimum="0" maximum="100" on-change="onObstacleChange" id="1"/>'+ \
'<label text="" style="* {margin-left: 300px;}"/>'+ \
'</ui>'
ui = simUI.create(xml)
speed = 0.25*speedRange[0]+ 0.75*speedRange[1]
speed_scale100 = 100*(speed-speedRange[0])/(-speedRange[0]+speedRange[1])
simUI.setSliderValue(ui,1,speed_scale100);
def onObstacleChange(uiHandle, id, speed_scale100):
self.path = None
self.random_percent = speed_scale100/100
print(self.random_percent)
self.path = dijkstra_graph('0-0','5-5', self.random_percent)
print(f'path: {self.path}')
def create_dummy(name,position):
self.name = sim.createDummy(0.1)
sim.setObjectParent(self.name, self.floor)
sim.setObjectPosition(self.name, [position[0],position[1],self.pos[2]],sim.handle_parent)
sim.setObjectInt32Param(self.name, sim.objintparam_visibility_layer, 0)
#To turn off the collidable property:
sim.setObjectSpecialProperty(self.name, 0)
return self.name
def dijkstra_graph(start, end):
class Graph:
def __init__(self, length, width, dummies, random_percent):
self.dummies=dummies
self.graph = {}
self.obstacles = {}
self.random_percent = random_percent
self.length = length
self.width = width
self.generate_graph()
self.show_obstacles()
def generate_graph(self):
#generate random probability
#loop over length and width
for i in range(self.length):
for j in range(self.width):
random_choice = np.random.random()
if random_choice>self.random_percent:
self.obstacles[f'{i}-{j}'] = f'obstacle {i}-{j}'
print(f'obstacles:{self.obstacles}')
#if random choice is greater than 0.9
#node is an obstacle
for i in range(self.length):
for j in range(self.width):
if self.obstacles.get((i,j)) == None:
nodes = {
"north": [i, j - 1],
"northEast": [i + 1, j - 1],
"east": [i + 1, j],
"southEast": [i + 1, j + 1],
"south": [i, j + 1],
"southWest": [i - 1, j + 1],
"west": [i - 1, j],
"northWest": [i - 1, j - 1],
}
#print(f"node:{nodes}")
for node, coordinates in nodes.items():
x, y = coordinates
horizontal_condition = 0 <= x < self.length
vertical_condition = 0 <= y < self.width
if horizontal_condition and vertical_condition and (self.obstacles.get(f'{x}-{y}') == None):
if node in ["north", "south"]:
self.add_edge(f"{i}-{j}", f"{x}-{y}", 10)
elif node in ["west", "east"]:
self.add_edge(f"{i}-{j}", f"{x}-{y}", 10)
else:
self.add_edge(f"{i}-{j}", f"{x}-{y}", 14)
def show_obstacles(self):
for key in self.obstacles.keys():
sim.setObjectInt32Param(self.dummies[key], sim.objintparam_visibility_layer, 1)
sim.setObjectSpecialProperty(self.dummies[key], 0)
sim.setObjectSpecialProperty(self.dummies[key], sim.objectspecialproperty_collidable)
def does_node_exist(self, node):
return node in self.graph
def add_edge(self, src, dst, weight):
if not self.does_node_exist(src):
self.graph[src] = {dst: weight}
else:
self.graph[src][dst] = weight
if not self.does_node_exist(dst):
self.graph[dst] = {}
def get_distance(self, src, dst):
return self.graph[src][dst]
def get_direction(self, src, dst):
x0, y0 = map(int, src.split("-"))
x1, y1 = map(int, dst.split("-"))
dx = x1 - x0
dy = y1 - y0
direction = None
if dx == 0 and dy == 1:
direction = 0
elif dx == 1 and dy == 1:
direction = 45
elif dx == 1 and dy == 0:
direction = 90
elif dx == 1 and dy == -1:
direction = 135
elif dx == 0 and dy == -1:
direction = 180
elif dx == -1 and dy == -1:
direction = 225
elif dx == -1 and dy == 0:
direction = 270
elif dx == -1 and dy == 1:
direction = 315
return direction
def dijkstra(self, start, end):
def find_lowest_cost_node(costs, processed):
lowest_cost = float('inf')
lowest_cost_node = None
for node in costs:
cost = costs[node]
if cost < lowest_cost and node not in processed:
lowest_cost = cost
lowest_cost_node = node
return lowest_cost_node
if self.obstacles.get(start)==None and self.obstacles.get(end)==None:
print('entered if')
nodes_except_start = [node for node in self.graph if node != start]
costs = {node: float('inf') for node in nodes_except_start}
parents = {node: None for node in nodes_except_start}
processed = []
start_neighbors = self.graph[start].keys()
for node in start_neighbors:
parents[node] = start
costs[node] = self.graph[start][node]
current_node = find_lowest_cost_node(costs, processed)
while current_node is not None:
cost = costs[current_node]
neighbors = self.graph[current_node]
for n in neighbors:
new_cost = cost + neighbors[n]
if costs.get(n, float('inf')) > new_cost:
costs[n] = new_cost
parents[n] = current_node
processed.append(current_node)
current_node = find_lowest_cost_node(costs, processed)
path = [end]
point = parents[end]
while point != start:
path.insert(0, point)
point = parents[point]
path.insert(0, start)
return path
else:
print("Start or end node cannot be obstacle")
return None
def dfs(self, start):
stack = [start]
visited = set([start])
while stack:
current_node = stack.pop()
neighbors = self.graph[current_node]
for neighbor in neighbors:
if neighbor not in visited:
stack.append(neighbor)
visited.add(neighbor)
def bfs(self, start):
queue = [start]
visited = set([start])
while queue:
current_node = queue.pop(0)
neighbors = self.graph[current_node]
for neighbor in neighbors:
if neighbor not in visited:
queue.append(neighbor)
visited.add(neighbor)
graph = Graph(10,10,self.dummies, self.random_percent)
return graph.dijkstra(start,end)
def move_towards_path(path):
robot_pos = sim.getObjectPosition(self.robot_handle, sim.handle_parent)
self.dummy_handle = self.dummies[path]
target_pos = sim.getObjectPosition(self.dummy_handle, sim.handle_parent)
print(target_pos)
dx = target_pos[0] - robot_pos[0]
dy = target_pos[1] - robot_pos[1]
distance = math.sqrt(dx**2 + dy**2)
sim.addDrawingObjectItem(self.line_container, [robot_pos[0], robot_pos[1], 0.0, target_pos[0], target_pos[1], 0.0])
sim.addDrawingObjectItem(self.point_container, [robot_pos[0], robot_pos[1], 0.0])
if distance < 0.1: # If close to the target, stop moving
sim.setJointTargetVelocity(self.leftWheel, 0)
sim.setJointTargetVelocity(self.rightWheel, 0)
self.arrived+=1
return True
angle_to_target = math.atan2(dy, dx)
# Get the current orientation of the robot
_, _, current_angle = sim.getObjectOrientation(self.robot_handle, sim.handle_parent)
angle_diff = angle_to_target - current_angle
angle_diff = math.atan2(math.sin(angle_diff), math.cos(angle_diff)) # Normalize the angle
# Simple proportional controller for the wheels
if angle_diff > 0.1:
left_velocity = self.velocity * (1 - angle_diff)
right_velocity = self.velocity
elif angle_diff < -0.1:
left_velocity = self.velocity
right_velocity = self.velocity * (1 + angle_diff)
else:
left_velocity = self.velocity
right_velocity = self.velocity
sim.setJointTargetVelocity(self.leftWheel, left_velocity)
sim.setJointTargetVelocity(self.rightWheel, right_velocity)
def sysCall_actuation():
# put your actuation code here
if self.path:
point = self.arrived
#print(point)
if self.arrived<=len(self.path)-1:
move_towards_path(self.path[point])
def sysCall_sensing():
# put your sensing code here
pass
def sysCall_cleanup():
# do some clean-up here
sim.scaleObject(self.floor, 1/self.x_scale, 1/self.y_scale, 1,0)
# See the user manual or the available code snippets for additional callback functions and details