Repository Summary
Checkout URI | https://github.com/uleroboticsgroup/yasmin.git |
VCS Type | git |
VCS Version | main |
Last Updated | 2024-12-06 |
Dev Status | DEVELOPED |
CI status | No Continuous Integration |
Released | UNRELEASED |
Tags | No category tags. |
Contributing |
Help Wanted (0)
Good First Issues (0) Pull Requests to Review (0) |
Packages
Name | Version |
---|---|
yasmin | 3.0.1 |
yasmin_demos | 3.0.1 |
yasmin_msgs | 3.0.1 |
yasmin_ros | 3.0.1 |
yasmin_viewer | 3.0.1 |
README
YASMIN (Yet Another State MachINe)
YASMIN is a project focused on implementing robot behaviors using Finite State Machines (FSM). It is available for ROS 2, Python and C++.
Table of Contents
Features
- Fully integrated into ROS 2.
- Available for Python and C++.
- Fast prototyping.
- Default states for ROS 2 action and service clients.
- Blackboards are used to share data between states and state machines.
- State machines can be canceled and stopped, which means stopping the current executing state.
- A web viewer is included, which allows monitoring of the execution of the state machines.
Installation
$ cd ~/ros2_ws/src
$ git clone https://github.com/uleroboticsgroup/yasmin.git
$ cd ~/ros2_ws
$ rosdep install --from-paths src --ignore-src -r -y
$ cd ~/ros2_ws
$ colcon build
If you are using a deprecated ROS 2 distro (like Foxy or Galactic) or the Rolling distro, you may need to install the example interfaces:
$ sudo apt install -y ros-$ROS_DISTRO-example-interfaces
Docker
If your operating system doesn’t support ROS 2, docker is a great alternative. You can use an image from Dockerhub or create your own images. First of all, to build the image you have to use the following command:
## Assuming you are in the YASMIN project directory
$ docker build -t yasmin .
To use a shortcut the docker build, you may use the following command:
## Assuming you are in the YASMIN project directory
$ make docker_build
After the image is created, run a docker container with the following command:
$ docker run -it --net=host --ipc=host --privileged --env="DISPLAY" --env="QT_X11_NO_MITSHM=1" --volume="/tmp/.X11-unix:/tmp/.X11-unix:rw" --volume="${XAUTHORITY}:/root/.Xauthority" --entrypoint /bin/bash yasmin
To use a shortcut the docker run, you may use following command:
## Assuming you are in the YASMIN project directory
$ make docker_run
Demos
There are some examples, for both Python and C++, that can be found in yasmin_demos.
Python
Vanilla Demo (FSM)
$ ros2 run yasmin_demos yasmin_demo.py
Click to expand
```python import time import rclpy import yasmin from yasmin import State from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros.ros_logs import set_ros_loggers from yasmin_viewer import YasminViewerPub # define state Foo class FooState(State): def __init__(self) -> None: super().__init__(["outcome1", "outcome2"]) self.counter = 0 def execute(self, blackboard: Blackboard) -> str: yasmin.YASMIN_LOG_INFO("Executing state FOO") time.sleep(3) if self.counter < 3: self.counter += 1 blackboard["foo_str"] = f"Counter: {self.counter}" return "outcome1" else: return "outcome2" # define state Bar class BarState(State): def __init__(self) -> None: super().__init__(outcomes=["outcome3"]) def execute(self, blackboard: Blackboard) -> str: yasmin.YASMIN_LOG_INFO("Executing state BAR") time.sleep(3) yasmin.YASMIN_LOG_INFO(blackboard["foo_str"]) return "outcome3" # main def main(): yasmin.YASMIN_LOG_INFO("yasmin_demo") # init ROS 2 rclpy.init() # set ROS 2 logs set_ros_loggers() # create a FSM sm = StateMachine(outcomes=["outcome4"]) # add states sm.add_state( "FOO", FooState(), transitions={ "outcome1": "BAR", "outcome2": "outcome4", }, ) sm.add_state( "BAR", BarState(), transitions={ "outcome3": "FOO", }, ) # pub FSM info YasminViewerPub("yasmin_demo", sm) # execute FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Service Demo (FSM + ROS 2 Service Client)
$ ros2 run demo_nodes_py add_two_ints_server
$ ros2 run yasmin_demos service_client_demo.py
Click to expand
```python import rclpy from example_interfaces.srv import AddTwoInts import yasmin from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ServiceState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT from yasmin_viewer import YasminViewerPub class AddTwoIntsState(ServiceState): def __init__(self) -> None: super().__init__( AddTwoInts, # srv type "/add_two_ints", # service name self.create_request_handler, # cb to create the request ["outcome1"], # outcomes. Includes (SUCCEED, ABORT) self.response_handler, # cb to process the response ) def create_request_handler(self, blackboard: Blackboard) -> AddTwoInts.Request: req = AddTwoInts.Request() req.a = blackboard["a"] req.b = blackboard["b"] return req def response_handler( self, blackboard: Blackboard, response: AddTwoInts.Response ) -> str: blackboard["sum"] = response.sum return "outcome1" def set_ints(blackboard: Blackboard) -> str: blackboard["a"] = 10 blackboard["b"] = 5 return SUCCEED def print_sum(blackboard: Blackboard) -> str: yasmin.YASMIN_LOG_INFO(f"Sum: {blackboard['sum']}") return SUCCEED # main def main(): yasmin.YASMIN_LOG_INFO("yasmin_service_client_demo") # init ROS 2 rclpy.init() # set ROS 2 logs set_ros_loggers() # create a FSM sm = StateMachine(outcomes=["outcome4"]) # add states sm.add_state( "SETTING_INTS", CbState([SUCCEED], set_ints), transitions={SUCCEED: "ADD_TWO_INTS"}, ) sm.add_state( "ADD_TWO_INTS", AddTwoIntsState(), transitions={ "outcome1": "PRINTING_SUM", SUCCEED: "outcome4", ABORT: "outcome4", }, ) sm.add_state( "PRINTING_SUM", CbState([SUCCEED], print_sum), transitions={ SUCCEED: "outcome4", }, ) # pub FSM info YasminViewerPub("YASMIN_SERVICE_CLIENT_DEMO", sm) # execute FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Action Demo (FSM + ROS 2 Action)
$ ros2 run action_tutorials_cpp fibonacci_action_server
$ ros2 run yasmin_demos action_client_demo.py
Click to expand
```python import rclpy from example_interfaces.action import Fibonacci import yasmin from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ActionState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT, CANCEL from yasmin_viewer import YasminViewerPub class FibonacciState(ActionState): def __init__(self) -> None: super().__init__( Fibonacci, # action type "/fibonacci", # action name self.create_goal_handler, # cb to create the goal None, # outcomes. Includes (SUCCEED, ABORT, CANCEL) self.response_handler, # cb to process the response self.print_feedback, # cb to process the feedback ) def create_goal_handler(self, blackboard: Blackboard) -> Fibonacci.Goal: goal = Fibonacci.Goal() goal.order = blackboard["n"] return goal def response_handler( self, blackboard: Blackboard, response: Fibonacci.Result ) -> str: blackboard["fibo_res"] = response.sequence return SUCCEED def print_feedback( self, blackboard: Blackboard, feedback: Fibonacci.Feedback ) -> None: yasmin.YASMIN_LOG_INFO(f"Received feedback: {list(feedback.partial_sequence)}") def print_result(blackboard: Blackboard) -> str: yasmin.YASMIN_LOG_INFO(f"Result: {blackboard['fibo_res']}") return SUCCEED # main def main(): yasmin.YASMIN_LOG_INFO("yasmin_action_client_demo") # init ROS 2 rclpy.init() # set ROS 2 logs set_ros_loggers() # create a FSM sm = StateMachine(outcomes=["outcome4"]) # add states sm.add_state( "CALLING_FIBONACCI", FibonacciState(), transitions={ SUCCEED: "PRINTING_RESULT", CANCEL: "outcome4", ABORT: "outcome4", }, ) sm.add_state( "PRINTING_RESULT", CbState([SUCCEED], print_result), transitions={ SUCCEED: "outcome4", }, ) # pub FSM info YasminViewerPub("YASMIN_ACTION_CLIENT_DEMO", sm) # create an initial blackoard blackboard = Blackboard() blackboard["n"] = 10 # execute FSM try: outcome = sm(blackboard) yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Monitor Demo (FSM + ROS 2 Subscriber)
$ ros2 run yasmin_demos monitor_demo.py
Click to expand
```python import rclpy from rclpy.qos import qos_profile_sensor_data from nav_msgs.msg import Odometry import yasmin from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import MonitorState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import TIMEOUT from yasmin_viewer import YasminViewerPub class PrintOdometryState(MonitorState): def __init__(self, times: int) -> None: super().__init__( Odometry, # msg type "odom", # topic name ["outcome1", "outcome2"], # outcomes self.monitor_handler, # monitor handler callback qos=qos_profile_sensor_data, # qos for the topic sbscription msg_queue=10, # queue of the monitor handler callback timeout=10, # timeout to wait for msgs in seconds # if not None, CANCEL outcome is added ) self.times = times def monitor_handler(self, blackboard: Blackboard, msg: Odometry) -> str: yasmin.YASMIN_LOG_INFO(msg) self.times -= 1 if self.times <= 0: return "outcome2" return "outcome1" # main def main(): yasmin.YASMIN_LOG_INFO("yasmin_monitor_demo") # init ROS 2 rclpy.init() # set ROS 2 logs set_ros_loggers() # create a FSM sm = StateMachine(outcomes=["outcome4"]) # add states sm.add_state( "PRINTING_ODOM", PrintOdometryState(5), transitions={ "outcome1": "PRINTING_ODOM", "outcome2": "outcome4", TIMEOUT: "outcome4", }, ) # pub FSM info YasminViewerPub("YASMIN_MONITOR_DEMO", sm) # execute FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Nav2 Demo (Hierarchical FSM + ROS 2 Action)
$ ros2 run yasmin_demos nav_demo.py
Click to expand
```python import random import rclpy from geometry_msgs.msg import Pose from nav2_msgs.action import NavigateToPose import yasmin from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ActionState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT, CANCEL from yasmin_viewer import YasminViewerPub HAS_NEXT = "has_next" END = "end" class Nav2State(ActionState): def __init__(self) -> None: super().__init__( NavigateToPose, # action type "/navigate_to_pose", # action name self.create_goal_handler, # cb to create the goal None, # outcomes. Includes (SUCCEED, ABORT, CANCEL) None, # cb to process the response ) def create_goal_handler(self, blackboard: Blackboard) -> NavigateToPose.Goal: goal = NavigateToPose.Goal() goal.pose.pose = blackboard["pose"] goal.pose.header.frame_id = "map" return goal def create_waypoints(blackboard: Blackboard) -> str: blackboard["waypoints"] = { "entrance": [1.25, 6.30, -0.78, 0.67], "bathroom": [4.89, 1.64, 0.0, 1.0], "livingroom": [1.55, 4.03, -0.69, 0.72], "kitchen": [3.79, 6.77, 0.99, 0.12], "bedroom": [7.50, 4.89, 0.76, 0.65], } return SUCCEED def take_random_waypoint(blackboard) -> str: blackboard["random_waypoints"] = random.sample( list(blackboard["waypoints"].keys()), blackboard["waypoints_num"] ) return SUCCEED def get_next_waypoint(blackboard: Blackboard) -> str: if not blackboard["random_waypoints"]: return END wp_name = blackboard["random_waypoints"].pop(0) wp = blackboard["waypoints"][wp_name] pose = Pose() pose.position.x = wp[0] pose.position.y = wp[1] pose.orientation.z = wp[2] pose.orientation.w = wp[3] blackboard["pose"] = pose blackboard["text"] = f"I have reached waypoint {wp_name}" return HAS_NEXT # main def main(): yasmin.YASMIN_LOG_INFO("yasmin_nav2_demo") # init ROS 2 rclpy.init() # set ROS 2 logs set_ros_loggers() # create state machines sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL]) nav_sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL]) # add states sm.add_state( "CREATING_WAYPOINTS", CbState([SUCCEED], create_waypoints), transitions={ SUCCEED: "TAKING_RANDOM_WAYPOINTS", }, ) sm.add_state( "TAKING_RANDOM_WAYPOINTS", CbState([SUCCEED], take_random_waypoint), transitions={ SUCCEED: "NAVIGATING", }, ) nav_sm.add_state( "GETTING_NEXT_WAYPOINT", CbState([END, HAS_NEXT], get_next_waypoint), transitions={ END: SUCCEED, HAS_NEXT: "NAVIGATING", }, ) nav_sm.add_state( "NAVIGATING", Nav2State(), transitions={ SUCCEED: "GETTING_NEXT_WAYPOINT", CANCEL: CANCEL, ABORT: ABORT, }, ) # pub FSM info YasminViewerPub("YASMIN_NAV_DEMO", sm) # execute FSM blackboard = Blackboard() blackboard["waypoints_num"] = 2 try: outcome = sm(blackboard) yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Cpp
Vanilla Demo
$ ros2 run yasmin_demos yasmin_demo
Click to expand
```cpp #includeClick to expand
```cpp #includeClick to expand
```cpp #includeClick to expand
```cpp #includeCONTRIBUTING
Repository Summary
Checkout URI | https://github.com/uleroboticsgroup/yasmin.git |
VCS Type | git |
VCS Version | main |
Last Updated | 2024-12-06 |
Dev Status | DEVELOPED |
CI status | No Continuous Integration |
Released | UNRELEASED |
Tags | No category tags. |
Contributing |
Help Wanted (0)
Good First Issues (0) Pull Requests to Review (0) |
Packages
Name | Version |
---|---|
yasmin | 3.0.1 |
yasmin_demos | 3.0.1 |
yasmin_msgs | 3.0.1 |
yasmin_ros | 3.0.1 |
yasmin_viewer | 3.0.1 |
README
YASMIN (Yet Another State MachINe)
YASMIN is a project focused on implementing robot behaviors using Finite State Machines (FSM). It is available for ROS 2, Python and C++.
Table of Contents
Features
- Fully integrated into ROS 2.
- Available for Python and C++.
- Fast prototyping.
- Default states for ROS 2 action and service clients.
- Blackboards are used to share data between states and state machines.
- State machines can be canceled and stopped, which means stopping the current executing state.
- A web viewer is included, which allows monitoring of the execution of the state machines.
Installation
$ cd ~/ros2_ws/src
$ git clone https://github.com/uleroboticsgroup/yasmin.git
$ cd ~/ros2_ws
$ rosdep install --from-paths src --ignore-src -r -y
$ cd ~/ros2_ws
$ colcon build
If you are using a deprecated ROS 2 distro (like Foxy or Galactic) or the Rolling distro, you may need to install the example interfaces:
$ sudo apt install -y ros-$ROS_DISTRO-example-interfaces
Docker
If your operating system doesn’t support ROS 2, docker is a great alternative. You can use an image from Dockerhub or create your own images. First of all, to build the image you have to use the following command:
## Assuming you are in the YASMIN project directory
$ docker build -t yasmin .
To use a shortcut the docker build, you may use the following command:
## Assuming you are in the YASMIN project directory
$ make docker_build
After the image is created, run a docker container with the following command:
$ docker run -it --net=host --ipc=host --privileged --env="DISPLAY" --env="QT_X11_NO_MITSHM=1" --volume="/tmp/.X11-unix:/tmp/.X11-unix:rw" --volume="${XAUTHORITY}:/root/.Xauthority" --entrypoint /bin/bash yasmin
To use a shortcut the docker run, you may use following command:
## Assuming you are in the YASMIN project directory
$ make docker_run
Demos
There are some examples, for both Python and C++, that can be found in yasmin_demos.
Python
Vanilla Demo (FSM)
$ ros2 run yasmin_demos yasmin_demo.py
Click to expand
```python import time import rclpy import yasmin from yasmin import State from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros.ros_logs import set_ros_loggers from yasmin_viewer import YasminViewerPub # define state Foo class FooState(State): def __init__(self) -> None: super().__init__(["outcome1", "outcome2"]) self.counter = 0 def execute(self, blackboard: Blackboard) -> str: yasmin.YASMIN_LOG_INFO("Executing state FOO") time.sleep(3) if self.counter < 3: self.counter += 1 blackboard["foo_str"] = f"Counter: {self.counter}" return "outcome1" else: return "outcome2" # define state Bar class BarState(State): def __init__(self) -> None: super().__init__(outcomes=["outcome3"]) def execute(self, blackboard: Blackboard) -> str: yasmin.YASMIN_LOG_INFO("Executing state BAR") time.sleep(3) yasmin.YASMIN_LOG_INFO(blackboard["foo_str"]) return "outcome3" # main def main(): yasmin.YASMIN_LOG_INFO("yasmin_demo") # init ROS 2 rclpy.init() # set ROS 2 logs set_ros_loggers() # create a FSM sm = StateMachine(outcomes=["outcome4"]) # add states sm.add_state( "FOO", FooState(), transitions={ "outcome1": "BAR", "outcome2": "outcome4", }, ) sm.add_state( "BAR", BarState(), transitions={ "outcome3": "FOO", }, ) # pub FSM info YasminViewerPub("yasmin_demo", sm) # execute FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Service Demo (FSM + ROS 2 Service Client)
$ ros2 run demo_nodes_py add_two_ints_server
$ ros2 run yasmin_demos service_client_demo.py
Click to expand
```python import rclpy from example_interfaces.srv import AddTwoInts import yasmin from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ServiceState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT from yasmin_viewer import YasminViewerPub class AddTwoIntsState(ServiceState): def __init__(self) -> None: super().__init__( AddTwoInts, # srv type "/add_two_ints", # service name self.create_request_handler, # cb to create the request ["outcome1"], # outcomes. Includes (SUCCEED, ABORT) self.response_handler, # cb to process the response ) def create_request_handler(self, blackboard: Blackboard) -> AddTwoInts.Request: req = AddTwoInts.Request() req.a = blackboard["a"] req.b = blackboard["b"] return req def response_handler( self, blackboard: Blackboard, response: AddTwoInts.Response ) -> str: blackboard["sum"] = response.sum return "outcome1" def set_ints(blackboard: Blackboard) -> str: blackboard["a"] = 10 blackboard["b"] = 5 return SUCCEED def print_sum(blackboard: Blackboard) -> str: yasmin.YASMIN_LOG_INFO(f"Sum: {blackboard['sum']}") return SUCCEED # main def main(): yasmin.YASMIN_LOG_INFO("yasmin_service_client_demo") # init ROS 2 rclpy.init() # set ROS 2 logs set_ros_loggers() # create a FSM sm = StateMachine(outcomes=["outcome4"]) # add states sm.add_state( "SETTING_INTS", CbState([SUCCEED], set_ints), transitions={SUCCEED: "ADD_TWO_INTS"}, ) sm.add_state( "ADD_TWO_INTS", AddTwoIntsState(), transitions={ "outcome1": "PRINTING_SUM", SUCCEED: "outcome4", ABORT: "outcome4", }, ) sm.add_state( "PRINTING_SUM", CbState([SUCCEED], print_sum), transitions={ SUCCEED: "outcome4", }, ) # pub FSM info YasminViewerPub("YASMIN_SERVICE_CLIENT_DEMO", sm) # execute FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Action Demo (FSM + ROS 2 Action)
$ ros2 run action_tutorials_cpp fibonacci_action_server
$ ros2 run yasmin_demos action_client_demo.py
Click to expand
```python import rclpy from example_interfaces.action import Fibonacci import yasmin from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ActionState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT, CANCEL from yasmin_viewer import YasminViewerPub class FibonacciState(ActionState): def __init__(self) -> None: super().__init__( Fibonacci, # action type "/fibonacci", # action name self.create_goal_handler, # cb to create the goal None, # outcomes. Includes (SUCCEED, ABORT, CANCEL) self.response_handler, # cb to process the response self.print_feedback, # cb to process the feedback ) def create_goal_handler(self, blackboard: Blackboard) -> Fibonacci.Goal: goal = Fibonacci.Goal() goal.order = blackboard["n"] return goal def response_handler( self, blackboard: Blackboard, response: Fibonacci.Result ) -> str: blackboard["fibo_res"] = response.sequence return SUCCEED def print_feedback( self, blackboard: Blackboard, feedback: Fibonacci.Feedback ) -> None: yasmin.YASMIN_LOG_INFO(f"Received feedback: {list(feedback.partial_sequence)}") def print_result(blackboard: Blackboard) -> str: yasmin.YASMIN_LOG_INFO(f"Result: {blackboard['fibo_res']}") return SUCCEED # main def main(): yasmin.YASMIN_LOG_INFO("yasmin_action_client_demo") # init ROS 2 rclpy.init() # set ROS 2 logs set_ros_loggers() # create a FSM sm = StateMachine(outcomes=["outcome4"]) # add states sm.add_state( "CALLING_FIBONACCI", FibonacciState(), transitions={ SUCCEED: "PRINTING_RESULT", CANCEL: "outcome4", ABORT: "outcome4", }, ) sm.add_state( "PRINTING_RESULT", CbState([SUCCEED], print_result), transitions={ SUCCEED: "outcome4", }, ) # pub FSM info YasminViewerPub("YASMIN_ACTION_CLIENT_DEMO", sm) # create an initial blackoard blackboard = Blackboard() blackboard["n"] = 10 # execute FSM try: outcome = sm(blackboard) yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Monitor Demo (FSM + ROS 2 Subscriber)
$ ros2 run yasmin_demos monitor_demo.py
Click to expand
```python import rclpy from rclpy.qos import qos_profile_sensor_data from nav_msgs.msg import Odometry import yasmin from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import MonitorState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import TIMEOUT from yasmin_viewer import YasminViewerPub class PrintOdometryState(MonitorState): def __init__(self, times: int) -> None: super().__init__( Odometry, # msg type "odom", # topic name ["outcome1", "outcome2"], # outcomes self.monitor_handler, # monitor handler callback qos=qos_profile_sensor_data, # qos for the topic sbscription msg_queue=10, # queue of the monitor handler callback timeout=10, # timeout to wait for msgs in seconds # if not None, CANCEL outcome is added ) self.times = times def monitor_handler(self, blackboard: Blackboard, msg: Odometry) -> str: yasmin.YASMIN_LOG_INFO(msg) self.times -= 1 if self.times <= 0: return "outcome2" return "outcome1" # main def main(): yasmin.YASMIN_LOG_INFO("yasmin_monitor_demo") # init ROS 2 rclpy.init() # set ROS 2 logs set_ros_loggers() # create a FSM sm = StateMachine(outcomes=["outcome4"]) # add states sm.add_state( "PRINTING_ODOM", PrintOdometryState(5), transitions={ "outcome1": "PRINTING_ODOM", "outcome2": "outcome4", TIMEOUT: "outcome4", }, ) # pub FSM info YasminViewerPub("YASMIN_MONITOR_DEMO", sm) # execute FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Nav2 Demo (Hierarchical FSM + ROS 2 Action)
$ ros2 run yasmin_demos nav_demo.py
Click to expand
```python import random import rclpy from geometry_msgs.msg import Pose from nav2_msgs.action import NavigateToPose import yasmin from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ActionState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT, CANCEL from yasmin_viewer import YasminViewerPub HAS_NEXT = "has_next" END = "end" class Nav2State(ActionState): def __init__(self) -> None: super().__init__( NavigateToPose, # action type "/navigate_to_pose", # action name self.create_goal_handler, # cb to create the goal None, # outcomes. Includes (SUCCEED, ABORT, CANCEL) None, # cb to process the response ) def create_goal_handler(self, blackboard: Blackboard) -> NavigateToPose.Goal: goal = NavigateToPose.Goal() goal.pose.pose = blackboard["pose"] goal.pose.header.frame_id = "map" return goal def create_waypoints(blackboard: Blackboard) -> str: blackboard["waypoints"] = { "entrance": [1.25, 6.30, -0.78, 0.67], "bathroom": [4.89, 1.64, 0.0, 1.0], "livingroom": [1.55, 4.03, -0.69, 0.72], "kitchen": [3.79, 6.77, 0.99, 0.12], "bedroom": [7.50, 4.89, 0.76, 0.65], } return SUCCEED def take_random_waypoint(blackboard) -> str: blackboard["random_waypoints"] = random.sample( list(blackboard["waypoints"].keys()), blackboard["waypoints_num"] ) return SUCCEED def get_next_waypoint(blackboard: Blackboard) -> str: if not blackboard["random_waypoints"]: return END wp_name = blackboard["random_waypoints"].pop(0) wp = blackboard["waypoints"][wp_name] pose = Pose() pose.position.x = wp[0] pose.position.y = wp[1] pose.orientation.z = wp[2] pose.orientation.w = wp[3] blackboard["pose"] = pose blackboard["text"] = f"I have reached waypoint {wp_name}" return HAS_NEXT # main def main(): yasmin.YASMIN_LOG_INFO("yasmin_nav2_demo") # init ROS 2 rclpy.init() # set ROS 2 logs set_ros_loggers() # create state machines sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL]) nav_sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL]) # add states sm.add_state( "CREATING_WAYPOINTS", CbState([SUCCEED], create_waypoints), transitions={ SUCCEED: "TAKING_RANDOM_WAYPOINTS", }, ) sm.add_state( "TAKING_RANDOM_WAYPOINTS", CbState([SUCCEED], take_random_waypoint), transitions={ SUCCEED: "NAVIGATING", }, ) nav_sm.add_state( "GETTING_NEXT_WAYPOINT", CbState([END, HAS_NEXT], get_next_waypoint), transitions={ END: SUCCEED, HAS_NEXT: "NAVIGATING", }, ) nav_sm.add_state( "NAVIGATING", Nav2State(), transitions={ SUCCEED: "GETTING_NEXT_WAYPOINT", CANCEL: CANCEL, ABORT: ABORT, }, ) # pub FSM info YasminViewerPub("YASMIN_NAV_DEMO", sm) # execute FSM blackboard = Blackboard() blackboard["waypoints_num"] = 2 try: outcome = sm(blackboard) yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Cpp
Vanilla Demo
$ ros2 run yasmin_demos yasmin_demo
Click to expand
```cpp #includeClick to expand
```cpp #includeClick to expand
```cpp #includeClick to expand
```cpp #includeCONTRIBUTING
Repository Summary
Checkout URI | https://github.com/uleroboticsgroup/yasmin.git |
VCS Type | git |
VCS Version | main |
Last Updated | 2024-12-06 |
Dev Status | DEVELOPED |
CI status | No Continuous Integration |
Released | UNRELEASED |
Tags | No category tags. |
Contributing |
Help Wanted (0)
Good First Issues (0) Pull Requests to Review (0) |
Packages
Name | Version |
---|---|
yasmin | 3.0.1 |
yasmin_demos | 3.0.1 |
yasmin_msgs | 3.0.1 |
yasmin_ros | 3.0.1 |
yasmin_viewer | 3.0.1 |
README
YASMIN (Yet Another State MachINe)
YASMIN is a project focused on implementing robot behaviors using Finite State Machines (FSM). It is available for ROS 2, Python and C++.
Table of Contents
Features
- Fully integrated into ROS 2.
- Available for Python and C++.
- Fast prototyping.
- Default states for ROS 2 action and service clients.
- Blackboards are used to share data between states and state machines.
- State machines can be canceled and stopped, which means stopping the current executing state.
- A web viewer is included, which allows monitoring of the execution of the state machines.
Installation
$ cd ~/ros2_ws/src
$ git clone https://github.com/uleroboticsgroup/yasmin.git
$ cd ~/ros2_ws
$ rosdep install --from-paths src --ignore-src -r -y
$ cd ~/ros2_ws
$ colcon build
If you are using a deprecated ROS 2 distro (like Foxy or Galactic) or the Rolling distro, you may need to install the example interfaces:
$ sudo apt install -y ros-$ROS_DISTRO-example-interfaces
Docker
If your operating system doesn’t support ROS 2, docker is a great alternative. You can use an image from Dockerhub or create your own images. First of all, to build the image you have to use the following command:
## Assuming you are in the YASMIN project directory
$ docker build -t yasmin .
To use a shortcut the docker build, you may use the following command:
## Assuming you are in the YASMIN project directory
$ make docker_build
After the image is created, run a docker container with the following command:
$ docker run -it --net=host --ipc=host --privileged --env="DISPLAY" --env="QT_X11_NO_MITSHM=1" --volume="/tmp/.X11-unix:/tmp/.X11-unix:rw" --volume="${XAUTHORITY}:/root/.Xauthority" --entrypoint /bin/bash yasmin
To use a shortcut the docker run, you may use following command:
## Assuming you are in the YASMIN project directory
$ make docker_run
Demos
There are some examples, for both Python and C++, that can be found in yasmin_demos.
Python
Vanilla Demo (FSM)
$ ros2 run yasmin_demos yasmin_demo.py
Click to expand
```python import time import rclpy import yasmin from yasmin import State from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros.ros_logs import set_ros_loggers from yasmin_viewer import YasminViewerPub # define state Foo class FooState(State): def __init__(self) -> None: super().__init__(["outcome1", "outcome2"]) self.counter = 0 def execute(self, blackboard: Blackboard) -> str: yasmin.YASMIN_LOG_INFO("Executing state FOO") time.sleep(3) if self.counter < 3: self.counter += 1 blackboard["foo_str"] = f"Counter: {self.counter}" return "outcome1" else: return "outcome2" # define state Bar class BarState(State): def __init__(self) -> None: super().__init__(outcomes=["outcome3"]) def execute(self, blackboard: Blackboard) -> str: yasmin.YASMIN_LOG_INFO("Executing state BAR") time.sleep(3) yasmin.YASMIN_LOG_INFO(blackboard["foo_str"]) return "outcome3" # main def main(): yasmin.YASMIN_LOG_INFO("yasmin_demo") # init ROS 2 rclpy.init() # set ROS 2 logs set_ros_loggers() # create a FSM sm = StateMachine(outcomes=["outcome4"]) # add states sm.add_state( "FOO", FooState(), transitions={ "outcome1": "BAR", "outcome2": "outcome4", }, ) sm.add_state( "BAR", BarState(), transitions={ "outcome3": "FOO", }, ) # pub FSM info YasminViewerPub("yasmin_demo", sm) # execute FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Service Demo (FSM + ROS 2 Service Client)
$ ros2 run demo_nodes_py add_two_ints_server
$ ros2 run yasmin_demos service_client_demo.py
Click to expand
```python import rclpy from example_interfaces.srv import AddTwoInts import yasmin from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ServiceState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT from yasmin_viewer import YasminViewerPub class AddTwoIntsState(ServiceState): def __init__(self) -> None: super().__init__( AddTwoInts, # srv type "/add_two_ints", # service name self.create_request_handler, # cb to create the request ["outcome1"], # outcomes. Includes (SUCCEED, ABORT) self.response_handler, # cb to process the response ) def create_request_handler(self, blackboard: Blackboard) -> AddTwoInts.Request: req = AddTwoInts.Request() req.a = blackboard["a"] req.b = blackboard["b"] return req def response_handler( self, blackboard: Blackboard, response: AddTwoInts.Response ) -> str: blackboard["sum"] = response.sum return "outcome1" def set_ints(blackboard: Blackboard) -> str: blackboard["a"] = 10 blackboard["b"] = 5 return SUCCEED def print_sum(blackboard: Blackboard) -> str: yasmin.YASMIN_LOG_INFO(f"Sum: {blackboard['sum']}") return SUCCEED # main def main(): yasmin.YASMIN_LOG_INFO("yasmin_service_client_demo") # init ROS 2 rclpy.init() # set ROS 2 logs set_ros_loggers() # create a FSM sm = StateMachine(outcomes=["outcome4"]) # add states sm.add_state( "SETTING_INTS", CbState([SUCCEED], set_ints), transitions={SUCCEED: "ADD_TWO_INTS"}, ) sm.add_state( "ADD_TWO_INTS", AddTwoIntsState(), transitions={ "outcome1": "PRINTING_SUM", SUCCEED: "outcome4", ABORT: "outcome4", }, ) sm.add_state( "PRINTING_SUM", CbState([SUCCEED], print_sum), transitions={ SUCCEED: "outcome4", }, ) # pub FSM info YasminViewerPub("YASMIN_SERVICE_CLIENT_DEMO", sm) # execute FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Action Demo (FSM + ROS 2 Action)
$ ros2 run action_tutorials_cpp fibonacci_action_server
$ ros2 run yasmin_demos action_client_demo.py
Click to expand
```python import rclpy from example_interfaces.action import Fibonacci import yasmin from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ActionState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT, CANCEL from yasmin_viewer import YasminViewerPub class FibonacciState(ActionState): def __init__(self) -> None: super().__init__( Fibonacci, # action type "/fibonacci", # action name self.create_goal_handler, # cb to create the goal None, # outcomes. Includes (SUCCEED, ABORT, CANCEL) self.response_handler, # cb to process the response self.print_feedback, # cb to process the feedback ) def create_goal_handler(self, blackboard: Blackboard) -> Fibonacci.Goal: goal = Fibonacci.Goal() goal.order = blackboard["n"] return goal def response_handler( self, blackboard: Blackboard, response: Fibonacci.Result ) -> str: blackboard["fibo_res"] = response.sequence return SUCCEED def print_feedback( self, blackboard: Blackboard, feedback: Fibonacci.Feedback ) -> None: yasmin.YASMIN_LOG_INFO(f"Received feedback: {list(feedback.partial_sequence)}") def print_result(blackboard: Blackboard) -> str: yasmin.YASMIN_LOG_INFO(f"Result: {blackboard['fibo_res']}") return SUCCEED # main def main(): yasmin.YASMIN_LOG_INFO("yasmin_action_client_demo") # init ROS 2 rclpy.init() # set ROS 2 logs set_ros_loggers() # create a FSM sm = StateMachine(outcomes=["outcome4"]) # add states sm.add_state( "CALLING_FIBONACCI", FibonacciState(), transitions={ SUCCEED: "PRINTING_RESULT", CANCEL: "outcome4", ABORT: "outcome4", }, ) sm.add_state( "PRINTING_RESULT", CbState([SUCCEED], print_result), transitions={ SUCCEED: "outcome4", }, ) # pub FSM info YasminViewerPub("YASMIN_ACTION_CLIENT_DEMO", sm) # create an initial blackoard blackboard = Blackboard() blackboard["n"] = 10 # execute FSM try: outcome = sm(blackboard) yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Monitor Demo (FSM + ROS 2 Subscriber)
$ ros2 run yasmin_demos monitor_demo.py
Click to expand
```python import rclpy from rclpy.qos import qos_profile_sensor_data from nav_msgs.msg import Odometry import yasmin from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import MonitorState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import TIMEOUT from yasmin_viewer import YasminViewerPub class PrintOdometryState(MonitorState): def __init__(self, times: int) -> None: super().__init__( Odometry, # msg type "odom", # topic name ["outcome1", "outcome2"], # outcomes self.monitor_handler, # monitor handler callback qos=qos_profile_sensor_data, # qos for the topic sbscription msg_queue=10, # queue of the monitor handler callback timeout=10, # timeout to wait for msgs in seconds # if not None, CANCEL outcome is added ) self.times = times def monitor_handler(self, blackboard: Blackboard, msg: Odometry) -> str: yasmin.YASMIN_LOG_INFO(msg) self.times -= 1 if self.times <= 0: return "outcome2" return "outcome1" # main def main(): yasmin.YASMIN_LOG_INFO("yasmin_monitor_demo") # init ROS 2 rclpy.init() # set ROS 2 logs set_ros_loggers() # create a FSM sm = StateMachine(outcomes=["outcome4"]) # add states sm.add_state( "PRINTING_ODOM", PrintOdometryState(5), transitions={ "outcome1": "PRINTING_ODOM", "outcome2": "outcome4", TIMEOUT: "outcome4", }, ) # pub FSM info YasminViewerPub("YASMIN_MONITOR_DEMO", sm) # execute FSM try: outcome = sm() yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Nav2 Demo (Hierarchical FSM + ROS 2 Action)
$ ros2 run yasmin_demos nav_demo.py
Click to expand
```python import random import rclpy from geometry_msgs.msg import Pose from nav2_msgs.action import NavigateToPose import yasmin from yasmin import CbState from yasmin import Blackboard from yasmin import StateMachine from yasmin_ros import ActionState from yasmin_ros.ros_logs import set_ros_loggers from yasmin_ros.basic_outcomes import SUCCEED, ABORT, CANCEL from yasmin_viewer import YasminViewerPub HAS_NEXT = "has_next" END = "end" class Nav2State(ActionState): def __init__(self) -> None: super().__init__( NavigateToPose, # action type "/navigate_to_pose", # action name self.create_goal_handler, # cb to create the goal None, # outcomes. Includes (SUCCEED, ABORT, CANCEL) None, # cb to process the response ) def create_goal_handler(self, blackboard: Blackboard) -> NavigateToPose.Goal: goal = NavigateToPose.Goal() goal.pose.pose = blackboard["pose"] goal.pose.header.frame_id = "map" return goal def create_waypoints(blackboard: Blackboard) -> str: blackboard["waypoints"] = { "entrance": [1.25, 6.30, -0.78, 0.67], "bathroom": [4.89, 1.64, 0.0, 1.0], "livingroom": [1.55, 4.03, -0.69, 0.72], "kitchen": [3.79, 6.77, 0.99, 0.12], "bedroom": [7.50, 4.89, 0.76, 0.65], } return SUCCEED def take_random_waypoint(blackboard) -> str: blackboard["random_waypoints"] = random.sample( list(blackboard["waypoints"].keys()), blackboard["waypoints_num"] ) return SUCCEED def get_next_waypoint(blackboard: Blackboard) -> str: if not blackboard["random_waypoints"]: return END wp_name = blackboard["random_waypoints"].pop(0) wp = blackboard["waypoints"][wp_name] pose = Pose() pose.position.x = wp[0] pose.position.y = wp[1] pose.orientation.z = wp[2] pose.orientation.w = wp[3] blackboard["pose"] = pose blackboard["text"] = f"I have reached waypoint {wp_name}" return HAS_NEXT # main def main(): yasmin.YASMIN_LOG_INFO("yasmin_nav2_demo") # init ROS 2 rclpy.init() # set ROS 2 logs set_ros_loggers() # create state machines sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL]) nav_sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL]) # add states sm.add_state( "CREATING_WAYPOINTS", CbState([SUCCEED], create_waypoints), transitions={ SUCCEED: "TAKING_RANDOM_WAYPOINTS", }, ) sm.add_state( "TAKING_RANDOM_WAYPOINTS", CbState([SUCCEED], take_random_waypoint), transitions={ SUCCEED: "NAVIGATING", }, ) nav_sm.add_state( "GETTING_NEXT_WAYPOINT", CbState([END, HAS_NEXT], get_next_waypoint), transitions={ END: SUCCEED, HAS_NEXT: "NAVIGATING", }, ) nav_sm.add_state( "NAVIGATING", Nav2State(), transitions={ SUCCEED: "GETTING_NEXT_WAYPOINT", CANCEL: CANCEL, ABORT: ABORT, }, ) # pub FSM info YasminViewerPub("YASMIN_NAV_DEMO", sm) # execute FSM blackboard = Blackboard() blackboard["waypoints_num"] = 2 try: outcome = sm(blackboard) yasmin.YASMIN_LOG_INFO(outcome) except KeyboardInterrupt: sm.cancel_state() # shutdown ROS 2 if rclpy.ok(): rclpy.shutdown() if __name__ == "__main__": main() ```Cpp
Vanilla Demo
$ ros2 run yasmin_demos yasmin_demo