Embedded Python API
Sage Class
The Sage class is used to interact with the SageMotion hardware by getting data and providing feedback.
- class sage.sage.Sage(sensor_pairings=None, feedback_pairings=None, user_fields=None, save_mode='csv', simulate_file=None, start_ui_thread=True)
A sage object which gathers data from wireless sensor nodes and delivers feedback.
At creation, user must specify the hardware addresses of nodes to connect with as well as custom data fields to be saved.
- Parameters:
sensor_pairings (dict) – A dictionary of sensor indexes and hardware addresses
feedback_pairings (dict) – A dictionary of feedback indexes and hardware addresses
user_fields (list) – A list of
sage.UserField
Objectssave_mode (str) – The file format how the data would be saved. Available options: ‘csv’, ‘h5’ or ‘xlsx’.
simulate_file (str) – The csv file location used to generate simulated sensor data. This is used for debugging purpose.
Note
-Each hardware address can only be included once
-Indexes must be contiguous and start at zero
-Hardware addresses must be six hexadecimal numbers connected by : like so: A4:37:BF:C2:11:9E. Addresses are not case sensitive.
- UserField Args:
name (str): The name of the custom field data_type (type): The data type of the custom field size (int): A int delineating the size of data
Example:
USER_FIELDS = [Sage.UserField(name='iteration', data_type=int, size=1)] SENSORS = {0: 'A4:37:BF:C2:11:00', 1:'A4:37:BF:C2:11:01'} FEEDBACK = {0: 'A4:37:BF:C2:11:02'} my_sage = Sage(SENSORS, FEEDBACK, USER_FIELDS, 'csv')
- clear_buffered_data()
Clear buffered data
This function clears the sensor data_queue. Possible usage including:
Examples:
input("Enter to start the trial") my_sage.clear_buffered_data() # starting trial
- control_data_flow(is_on, start_time=None)
Control the data flow of the sensor.
This method allows the user to start or stop the data flow from the sensor.
- Parameters:
is_on (bool) – A boolean value indicating whether to start (True) or stop (False) the data flow.
start_time (float, optional) – The time to start the data flow. If not specified, the data flow starts immediately.
Example:
my_sage = Sage(...) # user time-consuming initialization process my_sage.control_data_flow(True) data = my_sage.get_next_data()
- feedback_off(feedback_node)
Stop delivering vibration to specified feedback node (Outdated API)
- Parameters:
feedback_node (int) – The index of the feedback node. Indexes are set by the user using the function
sage.Sage.set_pairings()
- feedback_on(feedback_node, duration=5)
Deliver continuous vibration to specified feedback node (Outdated API)
- Parameters:
feedback_node (int) – The index of the feedback node. Indexes are set by the user using the function
sage.Sage.set_pairings()
duration (float) – The duration of the vibration in seconds with precision of 50 ms.
- feedback_vibration_off(feedback_node)
Stop delivering vibration to specified feedback node
- Parameters:
feedback_node (int) – The index of the feedback node. Indexes are set by the user using the function
sage.Sage.set_pairings()
- feedback_vibration_on(feedback_node, duration=5, strength=2)
Deliver continuous vibration to specified feedback node
- Parameters:
feedback_node (int) – The index of the feedback node. Indexes are set by the user using the function
sage.Sage.set_pairings()
duration (float) – The duration of the vibration in seconds with precision of 50 ms.
strength (int) – The intensity of the vibration. The value should be integers between 1 and 4 where 4 is full strength and 1 is low strength.
- get_dip_state(dip_num)
Get the current state of a dip switch
- Parameters:
dip_num (int) – The hardware index of the dip switch (1-4)
- Returns:
True if the switch is up, False if the switch is down
- Return type:
bool
- get_newest_data(is_serious=False)
Get the newest sensor packet
Returns a message containing the newest message containing data for all sensors. If there are multiple sensor packets waiting in the buffer, they will be discarded and only the newest one returned. If it is important for your application to read every sample, it is recommended that you use :meth”sage.get_next_data instead This function will block until new data is ready.
Data is extracted from a message by specifying the sensor index and then the data field.
Example:
while True: raw_data = my_sage.get_newest_data() sensor_index = 0 acc_x = raw_data[sensor_index]['AccelX'] print(acc_x)
Available sensor data data fields:
SensorIndex AccelX AccelY AccelZ GyroX GyroY GyroZ MagX MagY MagZ Quat1 Quat2 Quat3 Quat4 Sampletime Package
- get_next_data()
Get the next sensor packet
Returns a message containing the newest message containing data for all sensors. If this method is called at a slower frequency than the sampling rate, the age of each message will slowly grow with each read until internal buffer overflows and is cleared. If your algorithm executes in time slower than the sampling rate, it is recommended that you use
sage.get_newest_data()
instead. If this method is called before new data has arrived, it will block until new data is ready.Data is extracted from a message by specifying the sensor index and then the data field.
Example:
while True: raw_data = my_sage.get_next_data() sensor_index = 0 acc_x = raw_data[sensor_index]['AccelX'] print(acc_x)
Available sensor data data fields:
SensorIndex AccelX AccelY AccelZ GyroX GyroY GyroZ MagX MagY MagZ Quat1 Quat2 Quat3 Quat4 Sampletime Package
- get_sensor_sample_period_in_milliseconds()
Get the sensor sample period in milliseconds.
This method allows the user to get the current sensor sample period.
- Returns:
The current sensor sample period in milliseconds.
- Return type:
int
- register_dip_switch_callback(callback_func)
A decorator to set a callback function to execute when any dip switch changes state
- Parameters:
callback_func – The function to be executed when any dip switch changes state
The callback function must have three arguments, one for the Sage object, one for the switch hardware index that changed state, and one denoting the new state. Example:
@Sage.register_dip_switch_callback def dip_switch_callback(self, switch_num, is_on): if (switch_num is 1 and is_on): print("Switch one was just flipped on")
Note
- Dip switch state can be observed at any time by using the function
sage.Sage.get_dip_state()
.
- register_feedback_button_callback(callback_func)
A decorator to set a callback function to execute when the feedback button is pressed
- Parameters:
callback_func – The function to be executed when the sensor button is pressed
The callback function takes three arguments which are your Sage object, feedback index and event. Example:
@my_sage.register_feedback_button_callback def feedback_button_callback(self, sensor_index, event): if (event==Sage.BUTTON_DOWN): print("Feedback {} Button was pressed".format(sensor_index) if (event==Sage.BUTTON_UP): print("Feedback {} Button was released".format(sensor_index)
- register_function_button_callback(callback_func)
A decorator to set a callback function to execute when the function button is pressed
- Parameters:
callback_func – The function to be executed when the function button is pressed
The callback function only takes one argument which is your Sage object Example:
@my_sage.register_function_button_callback def button_callback(self, event): if (event==Sage.BUTTON_DOWN): print("Function Button was pressed") if (event==Sage.BUTTON_UP): print("Function Button was released")
- register_sensor_button_callback(callback_func)
A decorator to set a callback function to execute when the sensor button is pressed
- Parameters:
callback_func – The function to be executed when the sensor button is pressed
The callback function takes three arguments which are your Sage object, sensor index and event. Example:
@my_sage.register_sensor_button_callback def sensor_button_callback(self, sensor_index, event): if (event==Sage.BUTTON_DOWN): print("Sensor {} Button was pressed".format(sensor_index) if (event==Sage.BUTTON_UP): print("Sensor {} Button was released".format(sensor_index)
- save_data(raw_data, user_data)
Store data for offline analysis
This function converts data to persistent long term storage which can be retreived later for offline analysis. Data is stored in h5, csv or xlsx format and located in directory ~/sage_code/rpi_embedded/experiment_data. Raw_data is expected to be the same message object which was read into user algorithm, while user_data has an expected structure which was specified when
sage.Sage
object was created.- Parameters:
raw_data (list) – The data package received from
sage.get_next_data()
orsage.get_newest_data()
user_data (dict) – Custom user data dict with format as specified on
sage.Sage
creation
Examples:
iteration = 0 raw_data = my_sage.get_next_data() my_data = {'iteration': [iteration]} my_sage.save_data(raw_data, my_data)
- send_stream_data(raw_data=None, custom_data=None)
Export data for online analysis
This function exports data to web socket
- Parameters:
raw_data (list) – Raw data read from sensors, a list of dicts of length SENSOR_COUNT. Each dict should have identically named members.
custom_data (dict) – Computed values output by the algorithm. Custom data fields must be specified in the algorithm’s info.json.
Examples:
iteration = 0 my_data = {'iteration': [iteration]} my_sage.stream_data(raw_data, my_data)
- sensor_vibration_off(sensor_node)
Stop delivering vibrate to specified sensor node
- Parameters:
sensor_node (int) – The index of the sensor node. Indexes are set by the user using the function
sage.Sage.set_pairings()
- sensor_vibration_on(sensor_node, duration=5, strength=2)
Deliver continuous vibration to specified sensor node
- Parameters:
sensor_node (int) – The index of the sensor node. Indexes are set by the user using the function
sage.Sage.set_pairings()
duration (float) – The duration of the vibration in seconds with precision of 50 ms.
strength (int) – The intensity of the vibration. The value should be integers between 1 and 4 where 4 is full strength and 1 is low strength.
- set_feedback_led_off(node_index)
Turn off feedback LED.
This method allows the user to turn off the feedback LED of a feedback node.
- Parameters:
node_index (int) – The index of the feedback node that should have the LED turn off.
Example:
# Turn off the feedback LED of the node at index 0 my_sage.set_feedback_led_off(0)
- set_feedback_led_on(node_index)
Turn on feedback LED.
This method allows the user to turn on the feedback LED of a feedback node.
- Parameters:
node_index (int) – The index of the feedback node that should have the LED turn on.
Example:
# Turn on the feedback LED of the node at index 0 my_sage.set_feedback_led_on(0)
- set_hub_led_off()
Turn off hub LED.
This method allows the user to turn off the hub LED. If the LED is already off, the method does nothing.
Example:
# Turn off the hub LED my_sage.set_hub_led_off()
- set_hub_led_on()
Turn on hub LED.
This method allows the user to turn on the hub LED. If the LED is already on, the method does nothing.
Example:
# Turn on the hub LED my_sage.set_hub_led_on()
- set_sensor_led_off(node_index)
Turn off sensor LED.
This method allows the user to turn off the sensor LED of a sensor node.
- Parameters:
node_index (int) – The index of the sensor node that should have the LED turn off.
Example:
# Turn off the sensor LED of the node at index 0 my_sage.set_sensor_led_off(0)
- set_sensor_led_on(node_index)
Turn on sensor LED.
This method allows the user to turn on the sensor LED of a sensor node.
- Parameters:
node_index (int) – The index of the sensor node that should have the LED turn on.
Example:
# Turn on the sensor LED of the node at index 0 my_sage.set_sensor_led_on(0)
- set_sensor_sample_period_in_milliseconds(period)
Set the sensor sample period in milliseconds.
This method allows the user to set the sensor sample period. The period must be one of the following values: [10, 20, 50, 100].
- Parameters:
period (int) – The desired sensor sample period in milliseconds. Must be one of [10, 20, 50, 100].
Example:
# Set the sensor sample period to 50 milliseconds my_sage.set_sensor_sample_period_in_milliseconds(50)
- unregister_all_callback()
Unregister all button or dip switch callback function
This function reset all the callback functions, in which case there is an UI event comes, no action will be performed.
BaseApp Class
The BaseApp class is used to create apps that are able to run on the SageMotion system.
- class sage.base_app.BaseApp(my_sage, app_file)
BaseApp Class
This is the base class for all apps for the SageMotion system. It provides the fundamental structure for the app to run in the system. To use it you will need to create a new class that inherits from this class and implement the run_in_loop method.
Example:
from sage.base_app import BaseApp class Core(BaseApp): def __init__(self, my_sage): BaseApp.__init__(self, my_sage, __file__) def run_in_loop(self): # Your code here return True
- abstract run_in_loop()
Run the application in a loop. This must be implemented in every app. The method should return True to keep the application running, or False to stop it.
- Returns:
True to keep the app running, False to stop it.
- Return type:
bool
Example Implementation:
from sage.base_app import BaseApp class Core(BaseApp): def __init__(self, my_sage): BaseApp.__init__(self, my_sage, __file__) def run_in_loop(self): data = self.my_sage.get_next_data() self.my_sage.send_stream_data(data) self.my_sage.save_data(data,{}) return True
- check_status()
System status check and initialization work implemented here.
This method checks if the required number of sensors and feedback devices are connected. This can also be used to generally determine if the system is ready to start if it is overwritten. If the requirements are not satisfied, it returns False and an error message. Otherwise, it returns True and a success message. As it is currently implemented, it checks if the number of sensors and feedback devices are connected and returns an error message if they are not. You will not need to implement this method in your app unless you want to add additional checks.
- Returns:
bool: True if the requirements are satisfied, False otherwise.
str: A success message if the requirements are satisfied, an error message otherwise.
- Return type:
tuple
Example:
from sage.base_app import BaseApp class Core(BaseApp): def __init__(self, my_sage): BaseApp.__init__(self, my_sage, __file__) def check_status(self): sensors_count = self.get_sensors_count() if sensors_count < len(self.info['sensors']): return False, "App requires {len(self.info['sensors'])} sensors but only {sensors_count} are connected" return True, "Now running the app" def run_in_loop(self): data = self.my_sage.get_next_data() self.my_sage.send_stream_data(data) self.my_sage.save_data(data,{}) return True
- on_start_event(start_time)
Handle the start event.
This method is called when the SageMotion system starts the app. It can be used to synchronize with other systems or to do work before the system starts. You will just need to implement this method in your app. As it is currently implemented, it does nothing.
- Parameters:
start_time (datetime) – The time that the SageMotion system will start the app.
Example:
from sage.base_app import BaseApp class Core(BaseApp): def __init__(self, my_sage): BaseApp.__init__(self, my_sage, __file__) def run_in_loop(self): data = self.my_sage.get_next_data() self.my_sage.send_stream_data(data) self.my_sage.save_data(data,{}) return True def on_start_event(self, start_time): # Do some work here or # sync with other systems
- on_stop_event(stop_time)
Handle the stop event.
This method is called when the SageMotion system stops the app. It can be used to synchronize with other systems or to do work after the system stops. You will just need to implement this method in your app. As it is currently implemented, it does nothing.
- Parameters:
stop_time (datetime) – The time that the SageMotion system will stop the app.
Example:
from sage.base_app import BaseApp class Core(BaseApp): def __init__(self, my_sage): BaseApp.__init__(self, my_sage, __file__) def run_in_loop(self): data = self.my_sage.get_next_data() self.my_sage.send_stream_data(data) self.my_sage.save_data(data,{}) return True def on_stop_event(self, stop_time): # Stop other system synced or # Run report on data
- get_trial_name(suffix='')
Standardized way to get the trail name.
Example:
from sage.base_app import BaseApp class Core(BaseApp): def __init__(self, my_sage): BaseApp.__init__(self, my_sage, __file__) def run_in_loop(self): data = self.my_sage.get_next_data() self.my_sage.send_stream_data(data) self.my_sage.save_data(data,{}) return True def on_stop_event(self, stop_time): trial_file_name = self.get_trial_name() print(f"Trial file name = {trial_file_name}")
- get_trial_data(include_raw=True, trial_file_name='')
Standardized way to get the trail data into a dictionary format. This currently works for csv, and xlsx.
- Parameters:
include_raw (bool) – Include raw data in the trial data. Normally, it will only include the user fields since including all of the data can take a while to process.
trial_file_name (str) – The name of the trial file. If not provided, it will use the last saved file name.
Example:
from sage.base_app import BaseApp class Core(BaseApp): def __init__(self, my_sage): BaseApp.__init__(self, my_sage, __file__) def run_in_loop(self): data = self.my_sage.get_next_data() self.my_sage.send_stream_data(data) self.my_sage.save_data(data,{}) return True def on_stop_event(self, stop_time): trial_file_name, trial_data_dict, trial_info_dict = self.get_trial_data() print(f"Trial file name = {trial_file_name}") print(f"Trial data dict = {trial_data_dict}") print(f"Trial info dict = {trial_info_dict}")
- create_trial_name()
Choose or create a custom, programatically generated trial name. This method originally just retrieves the trial name from the config file, but can be overwritten to generate a custom name.
- Returns:
The trial name.
- Return type:
str
Example:
from sage.base_app import BaseApp class Core(BaseApp): def __init__(self, my_sage): BaseApp.__init__(self, my_sage, __file__) def run_in_loop(self): data = self.my_sage.get_next_data() self.my_sage.send_stream_data(data) self.my_sage.save_data(data,{}) return True def create_trial_name(self): return "Hello World" def on_stop_event(self, stop_time): trial_file_name = self.get_trial_name() print(f"Trial file name = {trial_file_name}") # Output: Trial file name = Hello World
- get_absolute_path_in_app_folder(file_name)
Get the absolute path of a file in the current application folder.
- Parameters:
file_name (str) – The name of the file.
- Returns:
The absolute path of the file in the current application folder.
- Return type:
str
Example:
config_file = self.get_absolute_path_in_app_folder('config.json')
- get_feedback_count()
Get the count of paired feedback nodes.
Example:
from sage.base_app import BaseApp class Core(BaseApp): def __init__(self, my_sage): BaseApp.__init__(self, my_sage, __file__) self.number_of_paired_feedback_nodes = self.get_feedback_count() def run_in_loop(self): data = self.my_sage.get_next_data() self.my_sage.send_stream_data(data, {}) self.my_sage.save_data(data,{}) return True
- get_sensors_count()
Get the count of paired sensors.
Example:
from sage.base_app import BaseApp class Core(BaseApp): def __init__(self, my_sage): BaseApp.__init__(self, my_sage, __file__) self.number_of_paired_sensors = self.get_sensors_count() def run_in_loop(self): data = self.my_sage.get_next_data() self.my_sage.send_stream_data(data, {}) self.my_sage.save_data(data,{}) return True