Simple FSM
  • 10 Minutes to read

Simple FSM


Article summary

1. Overview

Finite States Machines (FSM from now on) allow you to easily control the flow of an application. Ra-Ya Python library includes a module to easily create FSMs.

The module embeds everything inside the FSM class, that can be imported as:

from raya.tools.fsm import FSM

This document will use the pyRa-Ya example called fsm_simple to explain the working of the module (example available here).

2. FSM to implement

Consider the FSM below

FSM Example.drawio 1.svg

We want the application to start its logic from a specific location on the map, so we create a state to get localized (LOCALIZING) and another one to navigate to the desired starting point in the map (NAV_TO_HOME). After that, the robot gets aware of receiving new commands in the state IDLE.

When the robot gets the IDLE state, shows a menu on the screen and waits for commands from the user. There are four behaviors to perform from this state:

2.1. High-temperature alarm

If during the IDLE state, the environment temperature increases above a defined value, an alarm is shown on the screen and it’s not possible to perform other actions (HOT state).

When the temperature comes back to the normal range, a message is shown on the screen for a while (FRESH state), and then the robot gets the IDLE state again.

2.2. Take a photograph in the kitchen

If during the IDLE state, the KITCHEN option is selected on the screen, the robot will navigate to the kitchen (NAV_TO_KITCHEN), take a photograph and show it on the robot’s screen for a while (TAKE_PHOTO, and navigate to the home location again (NAV_TO_HOME).

2.3. Wave in the bedroom

If during the IDLE state, the BEDROOM option is selected on the screen, the robot will navigate to the bedroom (NAV_TO_BEDROOM), wave with one of its arms (WAVE), and navigate to the home location again (NAV_TO_HOME).

2.4. Look for some clothes

If during the IDLE state, the LAUNDRY option is selected on the screen, the robot will navigate to the laundry (NAV_TO_LAUNDRY), “think about what to wear” (wait for a while) (LOOK_FOR_PANTS), and navigate to the home location again (NAV_TO_HOME).

3. Implementation of the FSM

3.1. Files organization

For the use of the FSM module, it’s mandatory to follow the following folders structure inside the application:

<app_id>
├── ...
├── src
│   ├── ...
│   ├── FMSs
│   │   ├── <fsm1_name>
│   │   │   ├── __init__.py
│   │   │   ├── actions.py
│   │   │   ├── helpers.py
│   │   │   ├── states.py
│   │   │   └── trantitions.py
│   │   ├── <fsm2_name>
│   │   │   ├── __init__.py
│   │   │   ├── actions.py
│   │   │   ├── helpers.py
│   │   │   ├── states.py
│   │   │   └── trantitions.py
│   │   └── ...
│   ├── ...
│   ├── app.py
│   └── ...
├── __main__.py
└── ...

There must be a folder called FSMs inside src, and one folder inside it for each FSM to implement in our application. The name of that folder is the identifier of that FSM from the application.

Each FSM folder contains the files that describe the desired behavior:

  • __init__.py needed to recognize the folder as a Python module.
  • states.py
  • actions.py
  • trantitions.py
  • helpers.py (optional)

For our example, we’re only going to implement one FSM called task1, so the folder would look like this:

<app_id>
├── ...
├── src
│   ├── ...
│   ├── FMSs
│   │   └── task1
│   │       ├── __init__.py
│   │       ├── actions.py
│   │       ├── helpers.py
│   │       ├── states.py
│   │       └── trantitions.py
│   ├── ...
│   ├── app.py
│   └── ...
├── __main__.py
└── ...

3.2. States

The states.py file includes the description of the states that make up the FSM. First, create the list STATES that includes the list of states (represented as strings):

STATES = [
        'LOCALIZING',
        'NAV_TO_HOME',
        'IDLE',
        'HOT',
        'FRESH',
        'NAV_TO_KITCHEN',
        'TAKE_PHOTO',
        'NAV_TO_BEDROOM',
        'WAVE',
        'NAV_TO_LAUNDRY',
        'LOOK_FOR_PANTS',
        'END',
    ]

Now, define the initial state:

INITIAL_STATE = 'LOCALIZING'

If you don’t define the INITIAL_STATE variable, the initial state will be the first element in the STATES list.

An FSM can have multiple end states, so you must define the END_STATES list as:

END_STATES = [
    'END',
]

In addition, you can set timeout to states. That implies that if the FSM keeps for more than a specific period of time in the same state, it’ll finish with a specific error.

In this case, we’lll define a timeout of 10 seconds for the LOCALIZING state. If it takes more than 10 seconds to localize, the FSM will finish with error. To define states timeouts, add this to the states.py file:

STATES_TIMEOUTS = {
    'LOCALIZING' : (10.0, APPERR_COULD_NOT_LOCALIZE),
}

It’s not mandatory to define the variable STATES_TIMEOUTS.

The APPERR_COULD_NOT_LOCALIZE is a constant defined in the file src/static/app_errors.py like:

APPERR_COULD_NOT_LOCALIZE = (1, 'Robot could not get localized')

3.3. Actions

The actions are the commands or tasks to perform during the different stages of the FSM.

The file actions.py must have at least the following content:

from raya.tools.fsm import BaseActions

from src.app import RayaApplication

class Actions(BaseActions):

    def __init__(self, app: RayaApplication, helpers: Helpers):
        self.app = app
        self.helpers = helpers

The Actions class includes all the actions as methods. There are three types of actions:

  • Transition from STATE: It’s executed when the FSM leaves a state, regardless of which state it goes to. Method name format: leave_STATE(self):
  • Transition to STATE: It’s executed when the FSM gets a specific state, regardless of which state it comes from. Method name format: enter_STATE(self):
  • Transition from STATE1 to STATE2: It’s executed when the FSM leaves a specific state 1 to go to another specific state 2. Method name format: SATE1_to_STATE2(self):

Each transition must be implemented as an ASYNC method in the Action class.

For example, consider that getting into the state LOCALIZING implies showing a message on the screen that indicates the robot is localizing, and setting the map to start the localization process. So that action can be implemented by adding the method below to the Actions class:

	async def enter_LOCALIZING(self):
        await self.app.ui.display_screen(**UI_SCREEN_LOCALIZING)
        await self.app.nav.set_map(NAV_MAP_NAME)

As you can see, you can directly access the app using the self.app variable.

It’s important to remark that the methods and functions called inside the actions must be NOT BLOCKING. The FSM must always keep free to continue executing.

You can see other action implementations in the file src/FSMs/task1/actions.py file in the fsm_simple pyRa-Ya example.

3.4. Transitions

The transitions.py describes the flow of the FSM, and how to move through the states.

This file must have at least the following content:

from raya.tools.fsm import BaseTransitions

from src.app import RayaApplication

class Transitions(BaseTransitions):

    def __init__(self, app: RayaApplication, helpers: Helpers):
        super().__init__()
        self.app = app
        self.helpers = helpers

All the states MUST have their transition ASYNC method in the Transition class. The name of the method is the same name as the state.

The transition methods are basically if…else statements that control which state to go to depending on the conditions. For example, when the FSM is in the state LOCALIZING, we know that we previously stated the localization process (in the enter_LOCALIZING action), so now we can evaluate if it’s already localized. If yes, just change the state using the self.set_state method.

    async def LOCALIZING(self):
        if await self.app.nav.is_localized():
            self.set_state('NAV_TO_HOME')

As you can see, you can directly access the app using the self.app variable.

You can see other transition implementations in the file src/FSMs/task1/transitions.py file in the fsm_simple pyRa-Ya example.

If the FSM keeps in a specific state, its transition method will be periodically called each time the FSM is ticked (discussed later).

3.5. Helpers

The actions.py and transitions.py must be as clean as possible, to make them clear, readable, and easy to compare with the FSM graph. The helpers.py file includes the functions, methods or other stuff that can be called from an action or transition, and can take too much lines of code.

The helper.py file must have at least the following content:

from raya.exceptions import *

from src.app import RayaApplication

class Helpers:

    def __init__(self, app: RayaApplication):        
        self.app = app

Consider the waving with the arm when entering the state WAVE. It implies two movements, so instead of creating two states, we can just put both commands in a helper method like:

# Helpers class

    async def arm_wave(self):
        await self.app.arms.execute_joint_values_array(
                **ARMS_WAVE_SEQUENCE, 
                wait=True
            )
        await self.app.arms.set_predefined_pose(
                arm=ARMS_WAVE_ARM,
                predefined_pose='home', 
                wait=True
            )

As you can see, you can directly access the app using the self.app variable.

Then, in the enter_WAVE action, just create a task:

# Actions class

	async def enter_WAVE(self):
        await self.app.ui.display_screen(**UI_SCREEN_WAVING)
        self.app.create_task(
                name='WAVE', 
                afunc=self.helpers.arm_wave,
            )

And in the WAVE transition, just evaluate if the task finished:

# Transitions class

    async def WAVE(self):
        if self.app.is_task_done('WAVE'):
            self.set_state('NAV_TO_HOME')

3.6. Errors

In any of the Transitions' methods (transitions.py file), you can call the self.abort(code, msg) method. That method stops the FSM execution and set a tuple of error code and error message to be notified to the FSM caller.

Consider the transition method of the state NAV_TO_BEDROOM in the example:

# File transitions.py
# Class Transitions

	async def NAV_TO_BEDROOM(self):
      if not self.app.nav.is_navigating():
          nav_error = self.app.nav.get_last_navigation_error()
          if nav_error[0] == 0:
              self.set_state('WAVE')
          else:
              self.abort(*APPERR_COULD_NOT_NAV_TO_HOME)

If the navigation goes wrong, the FSM is aborted and the error code and message contained in APPERR_COULD_NOT_NAV_TO_HOME is set.

In the Actions class (actions.py file) you can define the async method aborted to execute a specific task when the FSM is aborted. In our example, the error is shown in the robot’s screen and the error sound is played:

# File actions.py
# Class Actions

		async def aborted(self, error, msg):
        await self.app.ui.display_screen(
                **UI_SCREEN_FAILED,
                subtitle=f'ERROR {error}: {msg}'
            )
        await self.app.sound.play_sound(name='error')

4. FSM Class

In your application, you can create the FSM object like:

self.fsm_task1 = FSM(app=self, name='task1')

It’ll take the name task1 as the identifier, and it’ll look inside the src/FSMs/task1 folder to create the FSM.

You can create the FSM objects in any section of the app.py file, and even in other modules of the application.

4.1. Tick

Each time the FSM is ticked, these steps are internally performed:

  • Check if the current status reached timeout
  • Execute the current status transition method
  • Execute the corresponding actions, based on the previous and new state

The faster the FSM is ticked, the faster the states will change, but the more resources you use. A good tick period is 100ms (10 times a second).

4.2. Execution FSM

You can manually tick the FSM like:

while await self.fsm_task1.tick():
    self.sleep(0.1) # Async sleep

The FSM.tick() method will return False when the FSM execution finishes.

However, manually ticking is not recommended, instead, you should use one of the async methods run_and_await (awaits until the execution finishes) or run_in_background (runs the FSM in an async task in the background).

You can then create the FSM and immediately execute in the background, wait for its execution, and then check it final status:

class RayaApplication(RayaApplicationBase):

    async def setup(self):
        ...
        # FSMs
        self.fsm_task1 = FSM(app=self, name='task1', log_transitions=True)
        self.fsm_task1.run_in_background()

    async def loop(self):
        # Do other non blocking stuff...
        await self.sleep(1.0)
        # Check if the FSM has finished
        if self.fsm_task1.has_finished():
            self.finish_app()

    async def finish(self):
        # Has the FSM finished without error?
        if self.fsm_task1.was_successfull():
            self.log.info('App correctly finished')
        else:
            # fsm_error[0]: error code, fsm_error[1]: error message
            fsm_error_code, fsm_error_msg = self.fsm_task1.get_error()
            self.log.error(
                f'App finished with error [{fsm_error_code}]: {fsm_error_msg}'
            )

Check the example fsm_simple for full implementation.

5. Others

5.1. Sending arguments to the FSM execution

If you’re using run_and_await or run_in_background, you can send the arguments you want, like:

self.fsm_important_task.run_in_background(temperature=34.2, person_id='martin')

If you’re manually ticking the FSM (not recommended), you can set the arguments just before start ticking:

self.fsm_important_task.set_args(temperature=34.2, person_id='martin')
while await self.fsm_important_task.tick():
    self.sleep(0.1) # Async sleepg

Inside the Actions, Transitions or Helper classes, you can get those arguments just using the args attribute:

# Actions, Transitions or Helpers class

    def any_method(self):
				self.args.temperature
				self.args.person_id

Optionally, you can add a arguments.py file inside the FSM folder with the following structure:

OPTIONAL_ARGS = {
        'temperature': 34.2,
        'person_id': 'martin',
    }

REQUIRED_ARGS = [
        'predictor'
    ]

pyRa-Ya automatically detects that file and sets the arguments in OPTIONAL_ARGS with their default values if they’re not defined when running the FSM.

In addition, it throws the exception RayaFSMMissingRequiredArgument if one of the arguments in REQUIRED_ARGS is not defined when running the FSM.


Was this article helpful?

What's Next