Embedded Python API

Sage Class

The Sage class is used to interact with the SageMotion hardware by getting data and providing feedback.

class sage.sage.Sage(sensor_pairings=None, feedback_pairings=None, user_fields=None, save_mode='csv', simulate_file=None, start_ui_thread=True)

A sage object which gathers data from wireless sensor nodes and delivers feedback.

At creation, user must specify the hardware addresses of nodes to connect with as well as custom data fields to be saved.

Parameters:
  • sensor_pairings (dict) – A dictionary of sensor indexes and hardware addresses

  • feedback_pairings (dict) – A dictionary of feedback indexes and hardware addresses

  • user_fields (list) – A list of sage.UserField Objects

  • save_mode (str) – The file format how the data would be saved. Available options: ‘csv’, ‘h5’ or ‘xlsx’.

  • simulate_file (str) – The csv file location used to generate simulated sensor data. This is used for debugging purpose.

Note

-Each hardware address can only be included once

-Indexes must be contiguous and start at zero

-Hardware addresses must be six hexadecimal numbers connected by : like so: A4:37:BF:C2:11:9E. Addresses are not case sensitive.

UserField Args:

name (str): The name of the custom field data_type (type): The data type of the custom field size (int): A int delineating the size of data

Example:

USER_FIELDS = [Sage.UserField(name='iteration', data_type=int, size=1)]
SENSORS = {0: 'A4:37:BF:C2:11:00', 1:'A4:37:BF:C2:11:01'}
FEEDBACK = {0: 'A4:37:BF:C2:11:02'}
my_sage = Sage(SENSORS, FEEDBACK, USER_FIELDS, 'csv')
clear_buffered_data()

Clear buffered data

This function clears the sensor data_queue. Possible usage including:

Examples:

input("Enter to start the trial")
my_sage.clear_buffered_data()
# starting trial
control_data_flow(is_on, start_time=None)

Control the data flow of the sensor.

This method allows the user to start or stop the data flow from the sensor.

Parameters:
  • is_on (bool) – A boolean value indicating whether to start (True) or stop (False) the data flow.

  • start_time (float, optional) – The time to start the data flow. If not specified, the data flow starts immediately.

Example:

my_sage = Sage(...)
# user time-consuming initialization process
my_sage.control_data_flow(True)
data = my_sage.get_next_data()
feedback_off(feedback_node)

Stop delivering vibration to specified feedback node (Outdated API)

Parameters:

feedback_node (int) – The index of the feedback node. Indexes are set by the user using the function sage.Sage.set_pairings()

feedback_on(feedback_node, duration=5)

Deliver continuous vibration to specified feedback node (Outdated API)

Parameters:
  • feedback_node (int) – The index of the feedback node. Indexes are set by the user using the function sage.Sage.set_pairings()

  • duration (float) – The duration of the vibration in seconds with precision of 50 ms.

feedback_vibration_off(feedback_node)

Stop delivering vibration to specified feedback node

Parameters:

feedback_node (int) – The index of the feedback node. Indexes are set by the user using the function sage.Sage.set_pairings()

feedback_vibration_on(feedback_node, duration=5, strength=2)

Deliver continuous vibration to specified feedback node

Parameters:
  • feedback_node (int) – The index of the feedback node. Indexes are set by the user using the function sage.Sage.set_pairings()

  • duration (float) – The duration of the vibration in seconds with precision of 50 ms.

  • strength (int) – The intensity of the vibration. The value should be integers between 1 and 4 where 4 is full strength and 1 is low strength.

get_dip_state(dip_num)

Get the current state of a dip switch

Parameters:

dip_num (int) – The hardware index of the dip switch (1-4)

Returns:

True if the switch is up, False if the switch is down

Return type:

bool

get_newest_data(is_serious=False)

Get the newest sensor packet

Returns a message containing the newest message containing data for all sensors. If there are multiple sensor packets waiting in the buffer, they will be discarded and only the newest one returned. If it is important for your application to read every sample, it is recommended that you use :meth”sage.get_next_data instead This function will block until new data is ready.

Data is extracted from a message by specifying the sensor index and then the data field.

Example:

while True:
    raw_data = my_sage.get_newest_data()
    sensor_index = 0
    acc_x = raw_data[sensor_index]['AccelX']
    print(acc_x)

Available sensor data data fields:

SensorIndex

AccelX
AccelY
AccelZ

GyroX
GyroY
GyroZ

MagX
MagY
MagZ

Quat1
Quat2
Quat3
Quat4

Sampletime
Package
get_next_data()

Get the next sensor packet

Returns a message containing the newest message containing data for all sensors. If this method is called at a slower frequency than the sampling rate, the age of each message will slowly grow with each read until internal buffer overflows and is cleared. If your algorithm executes in time slower than the sampling rate, it is recommended that you use sage.get_newest_data() instead. If this method is called before new data has arrived, it will block until new data is ready.

Data is extracted from a message by specifying the sensor index and then the data field.

Example:

while True:
    raw_data = my_sage.get_next_data()
    sensor_index = 0
    acc_x = raw_data[sensor_index]['AccelX']
    print(acc_x)

Available sensor data data fields:

SensorIndex

AccelX
AccelY
AccelZ

GyroX
GyroY
GyroZ

MagX
MagY
MagZ

Quat1
Quat2
Quat3
Quat4

Sampletime
Package
get_sensor_sample_period_in_milliseconds()

Get the sensor sample period in milliseconds.

This method allows the user to get the current sensor sample period.

Returns:

The current sensor sample period in milliseconds.

Return type:

int

register_dip_switch_callback(callback_func)

A decorator to set a callback function to execute when any dip switch changes state

Parameters:

callback_func – The function to be executed when any dip switch changes state

The callback function must have three arguments, one for the Sage object, one for the switch hardware index that changed state, and one denoting the new state. Example:

@Sage.register_dip_switch_callback
def dip_switch_callback(self, switch_num, is_on):
    if (switch_num is 1 and is_on):
        print("Switch one was just flipped on")

Note

Dip switch state can be observed at any time by using the function

sage.Sage.get_dip_state().

register_feedback_button_callback(callback_func)

A decorator to set a callback function to execute when the feedback button is pressed

Parameters:

callback_func – The function to be executed when the sensor button is pressed

The callback function takes three arguments which are your Sage object, feedback index and event. Example:

@my_sage.register_feedback_button_callback
def feedback_button_callback(self, sensor_index, event):
    if (event==Sage.BUTTON_DOWN):
        print("Feedback {} Button was pressed".format(sensor_index)
    if (event==Sage.BUTTON_UP):
        print("Feedback {} Button was released".format(sensor_index)
register_function_button_callback(callback_func)

A decorator to set a callback function to execute when the function button is pressed

Parameters:

callback_func – The function to be executed when the function button is pressed

The callback function only takes one argument which is your Sage object Example:

@my_sage.register_function_button_callback
def button_callback(self, event):
    if (event==Sage.BUTTON_DOWN):
        print("Function Button was pressed")
    if (event==Sage.BUTTON_UP):
        print("Function Button was released")
register_sensor_button_callback(callback_func)

A decorator to set a callback function to execute when the sensor button is pressed

Parameters:

callback_func – The function to be executed when the sensor button is pressed

The callback function takes three arguments which are your Sage object, sensor index and event. Example:

@my_sage.register_sensor_button_callback
def sensor_button_callback(self, sensor_index, event):
    if (event==Sage.BUTTON_DOWN):
        print("Sensor {} Button was pressed".format(sensor_index)
    if (event==Sage.BUTTON_UP):
        print("Sensor {} Button was released".format(sensor_index)
save_data(raw_data, user_data)

Store data for offline analysis

This function converts data to persistent long term storage which can be retreived later for offline analysis. Data is stored in h5, csv or xlsx format and located in directory ~/sage_code/rpi_embedded/experiment_data. Raw_data is expected to be the same message object which was read into user algorithm, while user_data has an expected structure which was specified when sage.Sage object was created.

Parameters:
  • raw_data (list) – The data package received from sage.get_next_data() or sage.get_newest_data()

  • user_data (dict) – Custom user data dict with format as specified on sage.Sage creation

Examples:

iteration = 0
raw_data = my_sage.get_next_data()
my_data = {'iteration': [iteration]}
my_sage.save_data(raw_data, my_data)
send_stream_data(raw_data=None, custom_data=None)

Export data for online analysis

This function exports data to web socket

Parameters:
  • raw_data (list) – Raw data read from sensors, a list of dicts of length SENSOR_COUNT. Each dict should have identically named members.

  • custom_data (dict) – Computed values output by the algorithm. Custom data fields must be specified in the algorithm’s info.json.

Examples:

iteration = 0
my_data = {'iteration': [iteration]}
my_sage.stream_data(raw_data, my_data)
sensor_vibration_off(sensor_node)

Stop delivering vibrate to specified sensor node

Parameters:

sensor_node (int) – The index of the sensor node. Indexes are set by the user using the function sage.Sage.set_pairings()

sensor_vibration_on(sensor_node, duration=5, strength=2)

Deliver continuous vibration to specified sensor node

Parameters:
  • sensor_node (int) – The index of the sensor node. Indexes are set by the user using the function sage.Sage.set_pairings()

  • duration (float) – The duration of the vibration in seconds with precision of 50 ms.

  • strength (int) – The intensity of the vibration. The value should be integers between 1 and 4 where 4 is full strength and 1 is low strength.

set_feedback_led_off(node_index)

Turn off feedback LED.

This method allows the user to turn off the feedback LED of a feedback node.

Parameters:

node_index (int) – The index of the feedback node that should have the LED turn off.

Example:

# Turn off the feedback LED of the node at index 0
my_sage.set_feedback_led_off(0)
set_feedback_led_on(node_index)

Turn on feedback LED.

This method allows the user to turn on the feedback LED of a feedback node.

Parameters:

node_index (int) – The index of the feedback node that should have the LED turn on.

Example:

# Turn on the feedback LED of the node at index 0
my_sage.set_feedback_led_on(0)
set_hub_led_off()

Turn off hub LED.

This method allows the user to turn off the hub LED. If the LED is already off, the method does nothing.

Example:

# Turn off the hub LED
my_sage.set_hub_led_off()
set_hub_led_on()

Turn on hub LED.

This method allows the user to turn on the hub LED. If the LED is already on, the method does nothing.

Example:

# Turn on the hub LED
my_sage.set_hub_led_on()
set_sensor_led_off(node_index)

Turn off sensor LED.

This method allows the user to turn off the sensor LED of a sensor node.

Parameters:

node_index (int) – The index of the sensor node that should have the LED turn off.

Example:

# Turn off the sensor LED of the node at index 0
my_sage.set_sensor_led_off(0)
set_sensor_led_on(node_index)

Turn on sensor LED.

This method allows the user to turn on the sensor LED of a sensor node.

Parameters:

node_index (int) – The index of the sensor node that should have the LED turn on.

Example:

# Turn on the sensor LED of the node at index 0
my_sage.set_sensor_led_on(0)
set_sensor_sample_period_in_milliseconds(period)

Set the sensor sample period in milliseconds.

This method allows the user to set the sensor sample period. The period must be one of the following values: [10, 20, 50, 100].

Parameters:

period (int) – The desired sensor sample period in milliseconds. Must be one of [10, 20, 50, 100].

Example:

# Set the sensor sample period to 50 milliseconds
my_sage.set_sensor_sample_period_in_milliseconds(50)
unregister_all_callback()

Unregister all button or dip switch callback function

This function reset all the callback functions, in which case there is an UI event comes, no action will be performed.

BaseApp Class

The BaseApp class is used to create apps that are able to run on the SageMotion system.

class sage.base_app.BaseApp(my_sage, app_file)

BaseApp Class

This is the base class for all apps for the SageMotion system. It provides the fundamental structure for the app to run in the system. To use it you will need to create a new class that inherits from this class and implement the run_in_loop method.

Example:

from sage.base_app import BaseApp

class Core(BaseApp):
    def __init__(self, my_sage):
        BaseApp.__init__(self, my_sage, __file__)

    def run_in_loop(self):
        # Your code here
        return True
abstract run_in_loop()

Run the application in a loop. This must be implemented in every app. The method should return True to keep the application running, or False to stop it.

Returns:

True to keep the app running, False to stop it.

Return type:

bool

Example Implementation:

from sage.base_app import BaseApp

class Core(BaseApp):
    def __init__(self, my_sage):
        BaseApp.__init__(self, my_sage, __file__)

    def run_in_loop(self):
        data = self.my_sage.get_next_data()
        self.my_sage.send_stream_data(data)
        self.my_sage.save_data(data,{})
        return True
check_status()

System status check and initialization work implemented here.

This method checks if the required number of sensors and feedback devices are connected. This can also be used to generally determine if the system is ready to start if it is overwritten. If the requirements are not satisfied, it returns False and an error message. Otherwise, it returns True and a success message. As it is currently implemented, it checks if the number of sensors and feedback devices are connected and returns an error message if they are not. You will not need to implement this method in your app unless you want to add additional checks.

Returns:

  • bool: True if the requirements are satisfied, False otherwise.

  • str: A success message if the requirements are satisfied, an error message otherwise.

Return type:

tuple

Example:

from sage.base_app import BaseApp

class Core(BaseApp):
    def __init__(self, my_sage):
        BaseApp.__init__(self, my_sage, __file__)

    def check_status(self):
        sensors_count = self.get_sensors_count()
        if sensors_count < len(self.info['sensors']):
            return False, "App requires {len(self.info['sensors'])} sensors but only {sensors_count} are connected"
        return True, "Now running the app"

    def run_in_loop(self):
        data = self.my_sage.get_next_data()
        self.my_sage.send_stream_data(data)
        self.my_sage.save_data(data,{})
        return True
on_start_event(start_time)

Handle the start event.

This method is called when the SageMotion system starts the app. It can be used to synchronize with other systems or to do work before the system starts. You will just need to implement this method in your app. As it is currently implemented, it does nothing.

Parameters:

start_time (datetime) – The time that the SageMotion system will start the app.

Example:

from sage.base_app import BaseApp

class Core(BaseApp):
    def __init__(self, my_sage):
        BaseApp.__init__(self, my_sage, __file__)

    def run_in_loop(self):
        data = self.my_sage.get_next_data()
        self.my_sage.send_stream_data(data)
        self.my_sage.save_data(data,{})
        return True

    def on_start_event(self, start_time):
        # Do some work here or
        # sync with other systems
on_stop_event(stop_time)

Handle the stop event.

This method is called when the SageMotion system stops the app. It can be used to synchronize with other systems or to do work after the system stops. You will just need to implement this method in your app. As it is currently implemented, it does nothing.

Parameters:

stop_time (datetime) – The time that the SageMotion system will stop the app.

Example:

from sage.base_app import BaseApp

class Core(BaseApp):
    def __init__(self, my_sage):
        BaseApp.__init__(self, my_sage, __file__)

    def run_in_loop(self):
        data = self.my_sage.get_next_data()
        self.my_sage.send_stream_data(data)
        self.my_sage.save_data(data,{})
        return True

    def on_stop_event(self, stop_time):
        # Stop other system synced or
        # Run report on data
get_trial_name(suffix='')

Standardized way to get the trail name.

Example:

from sage.base_app import BaseApp

class Core(BaseApp):
    def __init__(self, my_sage):
        BaseApp.__init__(self, my_sage, __file__)

    def run_in_loop(self):
        data = self.my_sage.get_next_data()
        self.my_sage.send_stream_data(data)
        self.my_sage.save_data(data,{})
        return True

    def on_stop_event(self, stop_time):
        trial_file_name = self.get_trial_name()
        print(f"Trial file name = {trial_file_name}")
get_trial_data(include_raw=True, trial_file_name='')

Standardized way to get the trail data into a dictionary format. This currently works for csv, and xlsx.

Parameters:
  • include_raw (bool) – Include raw data in the trial data. Normally, it will only include the user fields since including all of the data can take a while to process.

  • trial_file_name (str) – The name of the trial file. If not provided, it will use the last saved file name.

Example:

from sage.base_app import BaseApp

class Core(BaseApp):
    def __init__(self, my_sage):
        BaseApp.__init__(self, my_sage, __file__)

    def run_in_loop(self):
        data = self.my_sage.get_next_data()
        self.my_sage.send_stream_data(data)
        self.my_sage.save_data(data,{})
        return True

    def on_stop_event(self, stop_time):
        trial_file_name, trial_data_dict, trial_info_dict = self.get_trial_data()
        print(f"Trial file name = {trial_file_name}")
        print(f"Trial data dict = {trial_data_dict}")
        print(f"Trial info dict = {trial_info_dict}")
create_trial_name()

Choose or create a custom, programatically generated trial name. This method originally just retrieves the trial name from the config file, but can be overwritten to generate a custom name.

Returns:

The trial name.

Return type:

str

Example:

from sage.base_app import BaseApp

class Core(BaseApp):
    def __init__(self, my_sage):
        BaseApp.__init__(self, my_sage, __file__)

    def run_in_loop(self):
        data = self.my_sage.get_next_data()
        self.my_sage.send_stream_data(data)
        self.my_sage.save_data(data,{})
        return True

    def create_trial_name(self):
        return "Hello World"

    def on_stop_event(self, stop_time):
        trial_file_name = self.get_trial_name()
        print(f"Trial file name = {trial_file_name}")
        # Output: Trial file name = Hello World
get_absolute_path_in_app_folder(file_name)

Get the absolute path of a file in the current application folder.

Parameters:

file_name (str) – The name of the file.

Returns:

The absolute path of the file in the current application folder.

Return type:

str

Example:

config_file = self.get_absolute_path_in_app_folder('config.json')
get_feedback_count()

Get the count of paired feedback nodes.

Example:

from sage.base_app import BaseApp

class Core(BaseApp):
    def __init__(self, my_sage):
        BaseApp.__init__(self, my_sage, __file__)
        self.number_of_paired_feedback_nodes = self.get_feedback_count()

    def run_in_loop(self):
        data = self.my_sage.get_next_data()
        self.my_sage.send_stream_data(data, {})
        self.my_sage.save_data(data,{})
        return True
get_sensors_count()

Get the count of paired sensors.

Example:

from sage.base_app import BaseApp

class Core(BaseApp):
    def __init__(self, my_sage):
        BaseApp.__init__(self, my_sage, __file__)
        self.number_of_paired_sensors = self.get_sensors_count()

    def run_in_loop(self):
        data = self.my_sage.get_next_data()
        self.my_sage.send_stream_data(data, {})
        self.my_sage.save_data(data,{})
        return True