From 71f9f62ce23d736b96f51cb785faa036a42a450d Mon Sep 17 00:00:00 2001 From: Gunter Niemeyer <gunter@caltech.edu> Date: Mon, 24 May 2021 11:24:48 -0700 Subject: [PATCH] Added skeleton planner code and updated the move node skeleton to V2 The planners: (i) AStarSkeleton.py comes from 133b HW#1, running directly on a grid. But updated for ROS and to run backwards (from goal to start), in case the start is moving. Please FIX the distance and other costs!! (ii) prm_mattress.py is straight from 133b HW#3. You'll have to remove the visualization and update for our map, etc. Also note it was originally written for python 3, though the differences are small (see Ed Discussion for a post). The moveskeletonv2.py adds two features: (a) it publishes the waypoints for RVIZ - nice to see and debug. You will need a MARKER display in RVIZ to see. And (b) it checks whether the TF library is properly set before starting. Else startup (could) trigger temporary TF errors, always annoying. --- shared_demo/scripts/AStarSkeleton.py | 114 ++++++++ shared_demo/scripts/moveskeletonv2.py | 342 ++++++++++++++++++++++ shared_demo/scripts/prm_mattress.py | 398 ++++++++++++++++++++++++++ 3 files changed, 854 insertions(+) create mode 100644 shared_demo/scripts/AStarSkeleton.py create mode 100644 shared_demo/scripts/moveskeletonv2.py create mode 100644 shared_demo/scripts/prm_mattress.py diff --git a/shared_demo/scripts/AStarSkeleton.py b/shared_demo/scripts/AStarSkeleton.py new file mode 100644 index 0000000..0a76ab9 --- /dev/null +++ b/shared_demo/scripts/AStarSkeleton.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python +# +# AStar.py +# +# Run the A* planner on a simple grid. It plans backwards from the +# goal to the start, in case the start is moving. +# +# THIS IS A SKELETON. PLESAE FIX THE COSTS, ETC!! +# +import numpy as np + + +# +# Constants +# +SQRT2 = np.sqrt(2.0) + + +# +# A* Planning Algorithm, on a Simple Grid +# +# map should be a numpy array of booleans (True if occupied) +# +# Returns a list of (u,v) tuples, leading from start to goal +# +def AStar(map, us, vs, ug, vg): + print("A* planning from (%d,%d) to (%d,%d)" % (us, vs, ug, vg)) + + # Transpose the map, so everything can be index by the tuple (u,v)!!! + wall = map.astype(np.bool).T + (w,h) = np.shape(wall) + start = (us, vs) + goal = (ug, vg) + + # Prepare the "nodes" - start the total cost as infinite (higher + # than anything to come) and the variables for each state. + seen = np.full((w,h), False) + done = np.full((w,h), False) + costTotal = np.full((w,h), np.inf) + costToGoal = np.full((w,h), 0.0) + successor = np.full((w,h,2), 0, dtype=np.int) + + # And clear the (unsorted!) on-deck queue. + onDeck = [] + + + # Function to check a point (relative to a successor point). + def CheckPoint(pt, suc, cGoal): + # Skip if the point is invalid (at edges) or already done. + if ((pt[0]<0) or (pt[0]>=w) or (pt[1]<0) or (pt[1]>=h) or done[pt]): + return + + # Check whether the point is blocked (occupied). + # IS THIS WHAT YOU WANT? + if wall[pt]: + return + + # Add to the on-deck queue (if not yet seen) + if not seen[pt]: + onDeck.append(pt) + seen[pt] = True + + # Estimate the cost from the start to here. + # IS THIS WHAT YOU WANT? + cFromStartEst = np.sqrt((pt[0]-start[0])**2 + (pt[1]-start[1])**2) + + # Compute/udpate the costs. + cTotal = cGoal + cFromStartEst + if cTotal < costTotal[pt]: + costToGoal[pt] = cGoal + costTotal[pt] = cTotal + successor[pt] = suc + + + # Begin placing the start state on-deck, the continually expand... + print("A* building the search tree...") + CheckPoint(tuple(goal), tuple(goal), 0.0) + while True: + # Make sure we still have something to look at! + if len(onDeck) == 0: + return [] + + # Pop the onDeck point with the lowest (estimated) total cost. + point = onDeck.pop(np.argmin(costTotal[tuple(zip(*onDeck))])) + done[point] = True + + # Check whether we have found the start. + if done[start]: + break + + # Beyond the pure distance cost, also consider a cost for being + # at this point. How "good" is the point? + # IS THIS WHAT YOU WANT? + cPoint = 0.0 + cGoal = costToGoal[point] + + # Check the surrounding points. Set the travel cost by the distance. + (u,v) = point + CheckPoint((u+1,v ), (u,v), 1.0 + cPoint + cGoal) + CheckPoint((u+1,v+1), (u,v), SQRT2 + cPoint + cGoal) + CheckPoint((u ,v+1), (u,v), 1.0 + cPoint + cGoal) + CheckPoint((u-1,v+1), (u,v), SQRT2 + cPoint + cGoal) + CheckPoint((u-1,v ), (u,v), 1.0 + cPoint + cGoal) + CheckPoint((u-1,v-1), (u,v), SQRT2 + cPoint + cGoal) + CheckPoint((u ,v-1), (u,v), 1.0 + cPoint + cGoal) + CheckPoint((u+1,v-1), (u,v), SQRT2 + cPoint + cGoal) + + + # Build and return the path. + path = [start] + while tuple(successor[path[-1]]) != path[-1]: + path.append(tuple(successor[path[-1]])) + print("A* path has %d points" % (len(path))) + return path diff --git a/shared_demo/scripts/moveskeletonv2.py b/shared_demo/scripts/moveskeletonv2.py new file mode 100644 index 0000000..e4311e1 --- /dev/null +++ b/shared_demo/scripts/moveskeletonv2.py @@ -0,0 +1,342 @@ +#!/usr/bin/env python +# +# moveskeleton.py V2!! +# +# The is the skeleton for the move node. This execute movements, +# first simple movements, and ultimately via a planner. This will +# also use/update the map with obstacles, as appropriate/needed. +# +# Node: /move +# +# Params: none yet? +# +# Subscribe: /pose geometry_msgs/PoseStamped +# /move_base_simple/goal geometry_msgs/PoseStamped +# /scan sensor_msgs/LaserScan +# /map nav_msgs/OccupancyGrid +# +# Publish: /vel_cmd geometry_msgs/Twist +# /obstacles nav_msgs/OccupancyGrid +# +# TF Required: map -> laser +# +import math +import numpy as np +import rospy +import sys +import tf2_ros + +from geometry_msgs.msg import Point +from geometry_msgs.msg import PoseStamped +from geometry_msgs.msg import TransformStamped +from geometry_msgs.msg import Twist +from nav_msgs.msg import OccupancyGrid +from sensor_msgs.msg import LaserScan +from visualization_msgs.msg import Marker + +from PlanarTransform import PlanarTransform + + +# +# Constants +# +# Perhaps these should be parameters? +# +VMAX = 0.20 # Max forward speed (m/s) +WMAX = 1.00 # Max rotational speed (rad/sec) + +# Other constants? + +TTIMER = 1.0 # Time between publishing the obstacle map/waypoints + + +# +# Global Variables +# +# The obstancle map info. +map2grid = None # Map bottom/left corner (origin) in map frame +grid2map = None # Map frame relative to the bottom/left corner +w = 0 # Width +h = 0 # Height +res = 1.0 # Resolution (meters/pixel) +obstacles = None # The actual map + +# Desired - these will likely become a waypoint list... +waypoints = [] +xdes = 2.0 +ydes = 0.0 +thetades = 0.0 + +# Flag whether to start moving +drive = False + + +###################################################################### +# Driving +###################################################################### +# +# Goal Pose Callback +# +# This comes from the RVIZ "2D Nav Goal" button, telling us where we +# want the robot to go. +# +def callback_goal(posemsg): + # Make sure the goal pose is published w.r.t. the map frame. + if not (posemsg.header.frame_id == 'map'): + rospy.logerr("Goal Pose is not in 'map' frame!") + return + + # Extract the bot's goal pose w.r.t. the map frame and report. + map2goal = PlanarTransform.fromPose(posemsg.pose) + rospy.loginfo("New target: %s" % map2goal) + + # + # CALL A PLANNER HERE... For now, we simply drive directly to + # the goal position, without consideration of the map! + # + # Use the goal x/y/theta as the desired value for the current motion. + global xdes, ydes, thetades + xdes = map2goal.x() + ydes = map2goal.y() + thetades = map2goal.theta() + + # Set the waypoints... Here, I'm just setting one point. + waypoints = [(xdes, ydes)] + + # Start the motion by turning the drive flag on. + global drive + drive = True + + +# +# Actual Pose Callback +# +# When we receive an actual pose, update the velocity command to +# head to the next desired waypoint. +# +def callback_pose(posemsg): + # Use/change the global drive flag. + global drive + + # Make sure the actual pose is published w.r.t. the map frame. + if not (posemsg.header.frame_id == 'map'): + rospy.logerr("Actual pose is not in 'map' frame!") + return + + # Grab the actual x/y/theta. + map2pose = PlanarTransform.fromPose(posemsg.pose) + x = map2pose.x() + y = map2pose.y() + theta = map2pose.theta() + + # + # DETERMINE VELOCITY COMMANDS! You may want to turn off the + # drive flag when you want to stop? + # + # For fun, drive in a CCW circle... + vx = 0.08 + wz = 1.0 + + + # Make sure we are supposed to be driving! This means the bot + # doesn't drive until the first goal is received or after + # something turns if off. + if not drive: + wz = 0.0 + vx = 0.0 + + # Send the commands. + cmdmsg = Twist() + cmdmsg.linear.x = vx + cmdmsg.angular.z = wz + cmdpub.publish(cmdmsg) + + +###################################################################### +# Mapping +###################################################################### +# +# Grab and Pre-Process the Hand-Created Map +# +# We could do this repeatedly, i.e. set up a subscriber. But I'm +# assuming the map will remain constant? +# +def prepareMap(): + # Grab a single message of the map topic. Give it 10 seconds. + rospy.loginfo("Move: waiting for a map...") + mapmsg = rospy.wait_for_message('/map', OccupancyGrid, 10.0) + + # Extract the info. + global map2grid, grid2map, w, h, res + map2grid = PlanarTransform.fromPose(mapmsg.info.origin) + grid2map = map2grid.inv() + w = mapmsg.info.width + h = mapmsg.info.height + res = mapmsg.info.resolution + + # Report + rospy.loginfo("Basic map: %dx%d = %5.3fm x %5.3fm (res = %4.3fm)" + % (w, h, res * w, res * h, res)) + rospy.loginfo("Map grid bottom/left at %s" % map2grid) + + # Set up the initial obstacle map. From the map message doc: + # The is row-major order, starting with (0,0). Occupancy + # probabilities are in the range [0,100]. Unknown is -1. + global obstacles + obstacles = np.array(mapmsg.data, dtype=np.int8).reshape(h,w) + + # Leave 0 (presumably free) and 100 (unchangably obstacle). + # Change -1 to 50 (mid way between the two). + obstacles[np.where(obstacles < 0)] = 50 + + +# +# Publish the Obstacle Map and Waypoints. +# +def callback_timer(event): + # Create the obstacle map message. + obsmapmsg = OccupancyGrid() + obsmapmsg.header.stamp = rospy.Time.now() + obsmapmsg.header.frame_id = 'map' + obsmapmsg.info.resolution = res + obsmapmsg.info.width = w + obsmapmsg.info.height = h + obsmapmsg.info.origin = map2grid.toPose() + + # Add the data, row-wise. + obsmapmsg.data = obstacles.flatten().tolist() + + # Publish. + obspub.publish(obsmapmsg) + + # Create the marker message. + markermsg = Marker() + markermsg.header.frame_id = "map" + markermsg.header.stamp = rospy.Time.now() + markermsg.ns = "waypoints" + markermsg.id = 0 + markermsg.type = Marker.POINTS + markermsg.action = Marker.ADD + markermsg.scale.x = 0.02 + markermsg.scale.y = 0.02 + markermsg.scale.z = 0.02 + markermsg.color.a = 1.0 + markermsg.color.r = 0.0 + markermsg.color.g = 1.0 + markermsg.color.b = 0.0 + markermsg.pose.position.x = 0.0 + markermsg.pose.position.y = 0.0 + markermsg.pose.position.z = 0.0 + markermsg.pose.orientation.x = 0.0 + markermsg.pose.orientation.y = 0.0 + markermsg.pose.orientation.z = 0.0 + markermsg.pose.orientation.w = 1.0 + + # Add the waypoints. + markermsg.points = [] + for pt in waypoints: + markermsg.points.append(Point(pt[0], pt[1], 0.0)) + + # Publish. + markerpub.publish(markermsg) + + +# +# Process the Laser Scan +# +def callback_scan(scanmsg): + # For debugging report the message. + #rospy.loginfo("Scan #%4d at %10d secs" + # % (scanmsg.header.seq, scanmsg.header.stamp.secs)) + + # Extract the angles and ranges from the scan information. + ranges = np.array(scanmsg.ranges) + alphas = (scanmsg.angle_min + + scanmsg.angle_increment * np.arange(len(ranges))) + + # Also grab the transform from the map to the laser scan data's + # frame. In particular, ask for the transform at the time the + # scan data was captured - this makes things as self-consistent as + # possible. Give it up to 200ms, in case Linux has (temporarily) + # swapped out the other threads. + tfmsg = tfBuffer.lookup_transform('map', scanmsg.header.frame_id, + scanmsg.header.stamp, + rospy.Duration(0.300)) + map2laser = PlanarTransform.fromTransform(tfmsg.transform) + + # + # PROCESS THE MAP... For now we do nothing. + # + + # For the fun of it, make the pixel at the map frame (0,0) "grey". + # First map the (0,0) coordinates into the grid frame. + xg = 0 - map2grid.x() + yg = 0 - map2grid.y() + # or more generally, especially if the map is rotated. + (xg, yg) = grid2map.inParent(0, 0) + # And then convert to u/v coordinataes. + u = max(0, min(w-1, int(xg/res))) + v = max(0, min(h-1, int(yg/res))) + # And set the map value. + global obstacles + obstacles[v][u] = 50 + + +###################################################################### +# Main code +###################################################################### +if __name__ == "__main__": + # Initialize the ROS node. + rospy.init_node('move') + + # Any ROS parameters to grab? + # updatefrac = rospy.get_param('~update_fraction', 0.2) + + # Grab and prepare the map. + prepareMap() + + + # First create a TF2 listener. This implicitly fills a local + # buffer, so we can always retrive the transforms we want. + # Wait until we have at least one map -> base transform. + tfBuffer = tf2_ros.Buffer() + tflisten = tf2_ros.TransformListener(tfBuffer) + try: + tfBuffer.lookup_transform('map', 'base', rospy.Time(0), + rospy.Duration(30.0)) + except: + rospy.logerr("Unable to get a map->base transform in 30sec!") + sys.exit(-1) + + # Create a publisher to send the velocity command and obstacle map + # messages. We pick a queue size of 1 for the velocity command, + # as using older commands is pointless. We do the same for the + # obstacle map, though latch so you can also see the last + # published. + cmdpub = rospy.Publisher('/vel_cmd', Twist, queue_size=1) + obspub = rospy.Publisher('/obstacles', + OccupancyGrid, queue_size=1, latch=True) + markerpub = rospy.Publisher('/waypoints', Marker, queue_size=1, latch=True) + + # Then create subscribers to listen to the actual pose, scan, and + # goal pose. We'll grab the initial map once (not subscribe). + # For the scans (potentially incuring processing time) set the + # queue size to 1. This means the incoming queue will not keep + # more than the newest message, and drop/skip unanswered messages. + # Also the buffer size (defaulting to 64KB) must be big enough + # (>20 msgs) to accomodate any bursts that might arrive. Else + # messages might be buffered upstream, defeating the local queue + # limit and not getting dropped, causing things to lag behind. + rospy.Subscriber('/move_base_simple/goal', PoseStamped, callback_goal) + rospy.Subscriber('/pose', PoseStamped, callback_pose) + rospy.Subscriber('/scan', LaserScan, callback_scan, queue_size=1) + + # And finally, set up a timer to force publication of the obstacle + # map and waypoints, for us to view in RVIZ. + timer = rospy.Timer(rospy.Duration(TTIMER), callback_timer) + + + # Report and spin (waiting while callbacks do their work). + rospy.loginfo("Move node spinning...") + rospy.spin() + rospy.loginfo("Move node stopped.") diff --git a/shared_demo/scripts/prm_mattress.py b/shared_demo/scripts/prm_mattress.py new file mode 100644 index 0000000..fdd2c45 --- /dev/null +++ b/shared_demo/scripts/prm_mattress.py @@ -0,0 +1,398 @@ +#!/usr/bin/env python3 +# +# prm_mattress.py +# +import bisect +import math +import matplotlib.pyplot as plt +import numpy as np +import random + +from planarutils import PointInTriangle +from planarutils import SegmentCrossTriangle +from planarutils import SegmentNearSegment + + +# +# General World Definitions +# +# List of objects, start, and goal +# +(xmin, xmax) = (0, 30) +(ymin, ymax) = (0, 20) +xw1 = 12 +xw2 = 15 +xw3 = 18 +xw4 = 21 +yw1 = 10 +yw2 = 12 + +walls = (((xmin, ymin), (xmax, ymin)), + ((xmax, ymin), (xmax, ymax)), + ((xmax, ymax), (xmin, ymax)), + ((xmin, ymax), (xmin, ymin)), + ((xmin, yw1), ( xw2, yw1)), + (( xw3, yw1), (xmax, yw1)), + (( xw3, yw2), ( xw4, yw2)), + (( xw1, yw2), ( xw2, yw2)), + (( xw2, yw2), ( xw2, ymax))) + +(startx, starty, startt) = ( 5, 15, 0.5*math.pi) +(goalx, goaly, goalt) = (10, 5, 0.0*math.pi) + +L = 6 +D = 1/2 + +N = 300 +K = 15 +K2 = 3*K + + +# +# Visualization Tools +# +class Visual(): + def StartFigure(): + # Clear the current, or create a new figure. + plt.clf() + + # Create a new axes, enable the grid, and set axis limits. + plt.axes() + plt.grid(True) + plt.gca().axis('on') + plt.gca().set_xlim(xmin, xmax) + plt.gca().set_ylim(ymin, ymax) + plt.gca().set_xticks([xmin, xw1, xw2, xw3, xw4, xmax]) + plt.gca().set_yticks([ymin, yw1, yw2, ymax]) + plt.gca().set_aspect('equal') + + # Show the walls. + for wall in walls: + plt.plot([wall[0][0], wall[1][0]], + [wall[0][1], wall[1][1]], 'k', linewidth=2) + plt.plot([xw3, xw4], [yw2, yw2], 'b:', linewidth=3) + + def FlushFigure(): + # Show the plot. + plt.pause(0.001) + + def ShowFigure(): + # Flush the figure and wait. + Visual.FlushFigure() + input("Hit return to continue") + + + def DrawState(s, *args, **kwargs): + plt.plot([s.x1, s.x2], [s.y1, s.y2], *args, **kwargs) + + def DrawLocalPath(head, tail, *args, **kwargs): + n = math.ceil(head.Distance(tail) / D) + for i in range(n+1): + Visual.DrawState(head.Intermediate(tail, i/n), *args, **kwargs) + + +# +# Object State +# +def AngleDiff(t1, t2): + return (t1-t2) - math.pi * round((t1-t2)/math.pi) + +class State: + def __init__(self, x, y, t): + # Remember the centroid position and angle theta. + self.x = x + self.y = y + self.t = t + + # Pre-compute the endpoint positions. + self.x1 = x + L/2 * math.cos(t) + self.y1 = y + L/2 * math.sin(t) + self.x2 = x - L/2 * math.cos(t) + self.y2 = y - L/2 * math.sin(t) + + # Compute/create an intermediate state. This can be useful if you + # need to check the local planner by testing intermediate states. + def Intermediate(self, other, alpha): + return State(self.x + alpha * (other.x - self.x), + self.y + alpha * (other.y - self.y), + self.t + alpha * AngleDiff(other.t, self.t)) + + # In case we want to print the state. + def __repr__(self): + return ("<Line %5.2f,%5.2f @ %5.2f>" % + (self.x, self.y, self.t)) + + + #### These are necessary for the PRM: + # Check whether in free space. + def InFreespace(self): + for wall in walls: + if SegmentNearSegment(D, (self.x1, self.y1), (self.x2, self.y2), + wall[0], wall[1]): + return False + return True + + # Compute the relative distance to another state. + def Distance(self, other): + return math.sqrt((self.x - other.x)**2 + + (self.y - other.y)**2 + + (L*AngleDiff(self.t, other.t))**2) + + # Check the local planner - whether this connects to another state. + def ConnectsTo(self, other): + n = math.ceil(self.Distance(other) / D) + for i in range(1,n): + if not self.Intermediate(other, i/n).InFreespace(): + return False + return True + + +# +# PRM Graph and A* Search Tree +# +class Node(State): + def __init__(self, *args): + # Initialize the state + super().__init__(*args) + + # List of neighbors (for the graph) + self.neighbors = [] + + # Parent and costs (for A* search tree) + self.seen = False + self.done = False + self.parent = [] + self.costToReach = 0 + self.costToGoEst = 0 + self.cost = self.costToReach + self.costToGoEst + + # Define the "less-than" to enable sorting by cost in A*. + def __lt__(self, other): + return self.cost < other.cost + + # Estimate the cost to go to another node, for use in A*. + def CostToGoEst(self, other): + return self.Distance(other) + + +# +# A* Planning Algorithm +# +def AStar(nodeList, start, goal): + # Prepare the still empty *sorted* on-deck queue. + onDeck = [] + + # Clear the search tree (to keep track). + for n in nodeList: + n.seen = False + n.done = False + + # Begin with the start state on-deck. + start.done = False + start.seen = True + start.parent = None + start.costToReach = 0 + start.costToGoEst = start.CostToGoEst(goal) + start.cost = start.costToReach + start.costToGoEst + bisect.insort(onDeck, start) + + # Continually expand/build the search tree. + while True: + # Make sure we still have something to look at! + if not (len(onDeck) > 0): + return [] + + # Grab the next node (first on deck). + n = onDeck.pop(0) + + # Check whether we have found the goal. + if (n == goal): + break + + # Add the children to the on-deck queue. + for c in n.neighbors: + # Skip if already done. + if c.done: + continue + + # Check the cost for the new path to reach the child. + costToReach = n.costToReach + n.Distance(c) + costToGoEst = c.CostToGoEst(goal) + cost = costToReach + costToGoEst + + # Check whether it is already seen (hence on-deck) + if c.seen: + # Skip if the previous cost was better! + if c.cost <= cost: + continue + # Else remove from the on-deck list (to keep sorted). + onDeck.remove(c) + + # Add to the on-deck list in the correct order. + c.seen = True + c.parent = n + c.costToReach = costToReach + c.costToGoEst = costToGoEst + c.cost = cost + bisect.insort(onDeck, c) + + # Declare the node done. + n.done = True + + # Build the path. + path = [goal] + while path[0].parent is not None: + path.insert(0, path[0].parent) + + # Return the path. + return path + + +# +# Select the Nodes +# +def AddNodesToListUniform(nodeList, N): + # Add uniformly distributed samples + while (N > 0): + node = Node(random.uniform(xmin, xmax), + random.uniform(ymin, ymax), + random.uniform(0, math.pi)) + if node.InFreespace(): + nodeList.append(node) + N = N-1 + +def AddNodesToListAtEdges(nodeList, N): + # Add nodes near edges + while (N > 0): + node1 = Node(random.uniform(xmin+D/2, xmax-D/2), + random.uniform(ymin+D/2, ymax-D/2), + random.uniform(0, math.pi)) + node2 = Node(node1.x + random.uniform(-D/2, D/2), + node1.y + random.uniform(-D/2, D/2), + node1.t + random.uniform(-D/2/L, D/2/L)) + if node1.InFreespace() and not node2.InFreespace(): + nodeList.append(node1) + N = N-1 + elif node2.InFreespace() and not node1.InFreespace(): + nodeList.append(node2) + N = N-1 + +def AddNodesToList(nodeList, N): + # AddNodesToListUniform(nodeList, N) + AddNodesToListAtEdges(nodeList, N) + + +# +# Brute-Force Nearest Neighbor +# +def NearestNodes(node, nodeList, K): + # Create a sorted list of (distance, node) tuples. + list = [] + + # Process/add all nodes to the sorted list, except the original + # node itself. Continually cap the list at K elements. + for n in nodeList: + if n is not node: + bisect.insort(list, (node.Distance(n), n)) + list = list[0:K] + + # Return only the near nodes. + return [n for _,n in list] + +def ConnectNearestNeighbors(nodeList, K): + # Clear any and all existing neighbors. + for node in nodeList: + node.neighbors = [] + + # Process every node in the list for find (K) nearest neighbors. + # Get the (K2) nearest nodes, as some might not connection. Once + # a match in found, be sure to create the connnection from both + # sides, i.e. add each node to each other's neighbor list. + for node in nodeList: + for nearnode in NearestNodes(node, nodeList, K2): + if len(node.neighbors) >= K: + break + if node.ConnectsTo(nearnode): + if (nearnode not in node.neighbors): + node.neighbors.append(nearnode) + if (node not in nearnode.neighbors): + nearnode.neighbors.append(node) + + +# +# Post Process the Path +# +def PostProcess(path): + i = 0 + while (i < len(path)-2): + if path[i].ConnectsTo(path[i+2]): + path.pop(i+1) + else: + i = i+1 + + +# +# Main Code +# +def main(): + # Create the figure. + Visual.StartFigure() + Visual.FlushFigure() + + # Create the start/goal nodes. + startnode = Node(startx, starty, startt) + goalnode = Node(goalx, goaly, goalt) + + # Show the start/goal states. + Visual.DrawState(startnode, 'r', linewidth=2) + Visual.DrawState(goalnode, 'r', linewidth=2) + Visual.ShowFigure() + + + # Create the list of sample points. + nodeList = [] + AddNodesToList(nodeList, N) + + # Show the sample states. + for node in nodeList: + Visual.DrawState(node, 'k', linewidth=1) + Visual.ShowFigure() + + # Add the start/goal nodes. + nodeList.append(startnode) + nodeList.append(goalnode) + + + # Connect to the nearest neighbors. + ConnectNearestNeighbors(nodeList, K) + + # # Show the neighbor connections. + # for node in nodeList: + # for neighbor in node.neighbors: + # Visual.DrawLocalPath(node, neighbor, 'g', linewidth=0.5) + # Visual.ShowFigure() + + + # Run the A* planner. + path = AStar(nodeList, startnode, goalnode) + if not path: + print("UNABLE TO FIND A PATH") + return + + # Show the path. + for i in range(len(path)-1): + Visual.DrawLocalPath(path[i], path[i+1], 'r', linewidth=1) + Visual.FlushFigure() + + + # Post Process the path. + PostProcess(path) + + # Show the post-processed path. + for i in range(len(path)-1): + Visual.DrawLocalPath(path[i], path[i+1], 'b', linewidth=2) + Visual.ShowFigure() + + +if __name__== "__main__": + main() -- GitLab