Getting Started with Kinova
In these tutorials we will be using the Kinova 7DoF Gen3 Robotic arm. If you are using a Kinova arm, this part of the tutorial will be beneficial to set up the basics of connecting to the robot and downloading the Kinova Kortex API. If you would like to lean more about the Kinova Gen 3, you can head to their website.
Note
For this tutorial, the authors use the python version of the Kinova Kortex Api. For C++ users, please refer to the quick start guide.
Connecting to the Robot
Kinova provides a great quick start guide that covers everything you need to know to configure your computer and establish a connection with the robot. If you never have used a Kinova Robot, this guide will help you setup your network and robot.
Downloading Kinova Kortex API
For this tutorial we will be using Python 3.8.10 and Kortex Kortex Python API. If you don’t have Python 3.8.10 installed, please refer to the Python website.
To begin, download the API .whl file using this link.
Creating a Project Folder
For these tutorials, we will setup a single project folder to organize all your scripts and example code. I’ll be using Visual Studio Code, but you’re welcome to use any editor you prefer.
Here are the steps to create your project folder and get started:
Open your terminal
Create a new folder for your project
mkdir RobotSensorIntegration cd RobotSensorIntegration
Open the folder in Visual Studio Code
code .
This will open your project in vs Code. From here we will create a virtual environment to manage python packages. Since this project will involve several pip installations, using a virual environment will help prevent conflicts with other projects.
Start by being inside your project folder (RobotSensorIntegration)
In your terminal write:
python -m venv myenv
This creates a virtual environment named myenv using Python 3.8.10
Activate your environment using:
.\myenv\Scripts\activate
Now that we have the virtual environment setup, you will want to install the .whl file. Here is the code to install that:
python -m pip install <whl fullpath name>.whl
Now that we have the API installed, we can download the examples. The three examples that are important are API creation, gripper commands, and cartesian waypoint trajectories. You will eventually combine all of these into a single script when we bring in the sensor data, so its a good idea to understand how each one works on its own first. For now, copy these example scripts into your RobotSensorIntegration folder:
#API CREATION
#! /usr/bin/env python3
###
# KINOVA (R) KORTEX (TM)
#
# Copyright (c) 2018 Kinova inc. All rights reserved.
#
# This software may be modified and distributed
# under the under the terms of the BSD 3-Clause license.
#
# Refer to the LICENSE file for details.
#
###
import sys
import os
from kortex_api.TCPTransport import TCPTransport
from kortex_api.RouterClient import RouterClient
from kortex_api.SessionManager import SessionManager
from kortex_api.autogen.client_stubs.DeviceConfigClientRpc import DeviceConfigClient
from kortex_api.autogen.client_stubs.BaseClientRpc import BaseClient
from kortex_api.autogen.messages import DeviceConfig_pb2, Session_pb2, Base_pb2
def example_api_creation(args):
'''
This function creates all required objects and connections to use the arm's services.
It is easier to use the DeviceConnection utility class to create the router and then
create the services you need (as done in the other examples).
However, if you want to create all objects yourself, this function tells you how to do it.
'''
PORT = 10000
# Setup API
error_callback = lambda kException: print("_________ callback error _________ {}".format(kException))
transport = TCPTransport()
router = RouterClient(transport, error_callback)
transport.connect(args.ip, PORT)
# Create session
session_info = Session_pb2.CreateSessionInfo()
session_info.username = args.username
session_info.password = args.password
session_info.session_inactivity_timeout = 60000 # (milliseconds)
session_info.connection_inactivity_timeout = 2000 # (milliseconds)
print("Creating session for communication")
session_manager = SessionManager(router)
session_manager.CreateSession(session_info)
print("Session created")
# Create required services
device_config = DeviceConfigClient(router)
base = BaseClient(router)
print(device_config.GetDeviceType())
print(base.GetArmState())
# Close API session
session_manager.CloseSession()
# Disconnect from the transport object
transport.disconnect()
def main():
# Import the utilities helper module
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import utilities
# Parse arguments
args = utilities.parseConnectionArguments()
# Example core
example_api_creation(args)
if __name__ == "__main__":
main()
# GRIPPER COMMANDS
#! /usr/bin/env python3
###
# KINOVA (R) KORTEX (TM)
#
# Copyright (c) 2019 Kinova inc. All rights reserved.
#
# This software may be modified and distributed under the
# terms of the BSD 3-Clause license.
#
# Refer to the LICENSE file for details.
#
###
import sys
import os
import time
from kortex_api.autogen.client_stubs.BaseClientRpc import BaseClient
from kortex_api.autogen.messages import Base_pb2
class GripperCommandExample:
def __init__(self, router, proportional_gain = 2.0):
self.proportional_gain = proportional_gain
self.router = router
# Create base client using TCP router
self.base = BaseClient(self.router)
def ExampleSendGripperCommands(self):
# Create the GripperCommand we will send
gripper_command = Base_pb2.GripperCommand()
finger = gripper_command.gripper.finger.add()
# Close the gripper with position increments
print("Performing gripper test in position...")
gripper_command.mode = Base_pb2.GRIPPER_POSITION
position = 0.00
finger.finger_identifier = 1
while position < 1.0:
finger.value = position
print("Going to position {:0.2f}...".format(finger.value))
self.base.SendGripperCommand(gripper_command)
position += 0.1
time.sleep(1)
# Set speed to open gripper
print ("Opening gripper using speed command...")
gripper_command.mode = Base_pb2.GRIPPER_SPEED
finger.value = 0.1
self.base.SendGripperCommand(gripper_command)
gripper_request = Base_pb2.GripperRequest()
# Wait for reported position to be opened
gripper_request.mode = Base_pb2.GRIPPER_POSITION
while True:
gripper_measure = self.base.GetMeasuredGripperMovement(gripper_request)
if len (gripper_measure.finger):
print("Current position is : {0}".format(gripper_measure.finger[0].value))
if gripper_measure.finger[0].value < 0.01:
break
else: # Else, no finger present in answer, end loop
break
# Set speed to close gripper
print ("Closing gripper using speed command...")
gripper_command.mode = Base_pb2.GRIPPER_SPEED
finger.value = -0.1
self.base.SendGripperCommand(gripper_command)
# Wait for reported speed to be 0
gripper_request.mode = Base_pb2.GRIPPER_SPEED
while True:
gripper_measure = self.base.GetMeasuredGripperMovement(gripper_request)
if len (gripper_measure.finger):
print("Current speed is : {0}".format(gripper_measure.finger[0].value))
if gripper_measure.finger[0].value == 0.0:
break
else: # Else, no finger present in answer, end loop
break
def main():
# Import the utilities helper module
import argparse
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import utilities
# Parse arguments
parser = argparse.ArgumentParser()
args = utilities.parseConnectionArguments(parser)
# Create connection to the device and get the router
with utilities.DeviceConnection.createTcpConnection(args) as router:
example = GripperCommandExample(router)
example.ExampleSendGripperCommands()
if __name__ == "__main__":
main()
# CARTESIAN WAYPOINTS TRAJECTORY
#! /usr/bin/env python3
###
# KINOVA (R) KORTEX (TM)
#
# Copyright (c) 2021 Kinova inc. All rights reserved.
#
# This software may be modified and distributed
# under the terms of the BSD 3-Clause license.
#
# Refer to the LICENSE file for details.
#
###
import sys
import os
import time
import threading
from kortex_api.autogen.client_stubs.BaseClientRpc import BaseClient
from kortex_api.autogen.client_stubs.BaseCyclicClientRpc import BaseCyclicClient
from kortex_api.autogen.messages import Base_pb2, BaseCyclic_pb2, Common_pb2
# Maximum allowed waiting time during actions (in seconds)
TIMEOUT_DURATION = 30
# Create closure to set an event after an END or an ABORT
def check_for_end_or_abort(e):
"""Return a closure checking for END or ABORT notifications
Arguments:
e -- event to signal when the action is completed
(will be set when an END or ABORT occurs)
"""
def check(notification, e = e):
print("EVENT : " + \
Base_pb2.ActionEvent.Name(notification.action_event))
if notification.action_event == Base_pb2.ACTION_END \
or notification.action_event == Base_pb2.ACTION_ABORT:
e.set()
return check
def example_move_to_home_position(base):
# Make sure the arm is in Single Level Servoing mode
base_servo_mode = Base_pb2.ServoingModeInformation()
base_servo_mode.servoing_mode = Base_pb2.SINGLE_LEVEL_SERVOING
base.SetServoingMode(base_servo_mode)
# Move arm to ready position
print("Moving the arm to a safe position")
action_type = Base_pb2.RequestedActionType()
action_type.action_type = Base_pb2.REACH_JOINT_ANGLES
action_list = base.ReadAllActions(action_type)
action_handle = None
for action in action_list.action_list:
if action.name == "Home":
action_handle = action.handle
if action_handle == None:
print("Can't reach safe position. Exiting")
return False
e = threading.Event()
notification_handle = base.OnNotificationActionTopic(
check_for_end_or_abort(e),
Base_pb2.NotificationOptions()
)
base.ExecuteActionFromReference(action_handle)
finished = e.wait(TIMEOUT_DURATION)
base.Unsubscribe(notification_handle)
if finished:
print("Safe position reached")
else:
print("Timeout on action notification wait")
return finished
def populateCartesianCoordinate(waypointInformation):
waypoint = Base_pb2.CartesianWaypoint()
waypoint.pose.x = waypointInformation[0]
waypoint.pose.y = waypointInformation[1]
waypoint.pose.z = waypointInformation[2]
waypoint.blending_radius = waypointInformation[3]
waypoint.pose.theta_x = waypointInformation[4]
waypoint.pose.theta_y = waypointInformation[5]
waypoint.pose.theta_z = waypointInformation[6]
waypoint.reference_frame = Base_pb2.CARTESIAN_REFERENCE_FRAME_BASE
return waypoint
def example_trajectory(base, base_cyclic):
base_servo_mode = Base_pb2.ServoingModeInformation()
base_servo_mode.servoing_mode = Base_pb2.SINGLE_LEVEL_SERVOING
base.SetServoingMode(base_servo_mode)
product = base.GetProductConfiguration()
waypointsDefinition = tuple(tuple())
if( product.model == Base_pb2.ProductConfiguration__pb2.MODEL_ID_L53
or product.model == Base_pb2.ProductConfiguration__pb2.MODEL_ID_L31):
if(product.model == Base_pb2.ProductConfiguration__pb2.MODEL_ID_L31):
kTheta_x = 90.6
kTheta_y = -1.0
kTheta_z = 150.0
waypointsDefinition = ( (0.439, 0.194, 0.448, 0.0, kTheta_x, kTheta_y, kTheta_z),
(0.200, 0.150, 0.400, 0.0, kTheta_x, kTheta_y, kTheta_z),
(0.350, 0.050, 0.300, 0.0, kTheta_x, kTheta_y, kTheta_z))
else:
kTheta_x = 90.0
kTheta_y = 0.0
kTheta_z = 90.0
waypointsDefinition = ( (0.7, 0.0, 0.5, 0.0, kTheta_x, kTheta_y, kTheta_z),
(0.7, 0.0, 0.33, 0.1, kTheta_x, kTheta_y, kTheta_z),
(0.7, 0.48, 0.33, 0.1, kTheta_x, kTheta_y, kTheta_z),
(0.61, 0.22, 0.4, 0.1, kTheta_x, kTheta_y, kTheta_z),
(0.7, 0.48, 0.33, 0.1, kTheta_x, kTheta_y, kTheta_z),
(0.63, -0.22, 0.45, 0.1, kTheta_x, kTheta_y, kTheta_z),
(0.65, 0.05, 0.33, 0.0, kTheta_x, kTheta_y, kTheta_z))
else:
print("Product is not compatible to run this example please contact support with KIN number bellow")
print("Product KIN is : " + product.kin())
waypoints = Base_pb2.WaypointList()
waypoints.duration = 0.0
waypoints.use_optimal_blending = False
index = 0
for waypointDefinition in waypointsDefinition:
waypoint = waypoints.waypoints.add()
waypoint.name = "waypoint_" + str(index)
waypoint.cartesian_waypoint.CopyFrom(populateCartesianCoordinate(waypointDefinition))
index = index + 1
# Verify validity of waypoints
result = base.ValidateWaypointList(waypoints);
if(len(result.trajectory_error_report.trajectory_error_elements) == 0):
e = threading.Event()
notification_handle = base.OnNotificationActionTopic( check_for_end_or_abort(e),
Base_pb2.NotificationOptions())
print("Moving cartesian trajectory...")
base.ExecuteWaypointTrajectory(waypoints)
print("Waiting for trajectory to finish ...")
finished = e.wait(TIMEOUT_DURATION)
base.Unsubscribe(notification_handle)
if finished:
print("Cartesian trajectory with no optimization completed ")
e_opt = threading.Event()
notification_handle_opt = base.OnNotificationActionTopic( check_for_end_or_abort(e_opt),
Base_pb2.NotificationOptions())
waypoints.use_optimal_blending = True
base.ExecuteWaypointTrajectory(waypoints)
print("Waiting for trajectory to finish ...")
finished_opt = e_opt.wait(TIMEOUT_DURATION)
base.Unsubscribe(notification_handle_opt)
if(finished_opt):
print("Cartesian trajectory with optimization completed ")
else:
print("Timeout on action notification wait for optimized trajectory")
return finished_opt
else:
print("Timeout on action notification wait for non-optimized trajectory")
return finished
else:
print("Error found in trajectory")
result.trajectory_error_report.PrintDebugString();
def main():
# Import the utilities helper module
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import utilities
# Parse arguments
args = utilities.parseConnectionArguments()
# Create connection to the device and get the router
with utilities.DeviceConnection.createTcpConnection(args) as router:
# Create required services
base = BaseClient(router)
base_cyclic = BaseCyclicClient(router)
# Example core
success = True
success &= example_move_to_home_position(base)
success &= example_trajectory(base, base_cyclic)
return 0 if success else 1
if __name__ == "__main__":
exit(main())