Getting Started with Kinova

In these tutorials we will be using the Kinova 7DoF Gen3 Robotic arm. If you are using a Kinova arm, this part of the tutorial will be beneficial to set up the basics of connecting to the robot and downloading the Kinova Kortex API. If you would like to lean more about the Kinova Gen 3, you can head to their website.

Note

For this tutorial, the authors use the python version of the Kinova Kortex Api. For C++ users, please refer to the quick start guide.

Connecting to the Robot

Kinova provides a great quick start guide that covers everything you need to know to configure your computer and establish a connection with the robot. If you never have used a Kinova Robot, this guide will help you setup your network and robot.

Downloading Kinova Kortex API

For this tutorial we will be using Python 3.8.10 and Kortex Kortex Python API. If you don’t have Python 3.8.10 installed, please refer to the Python website.

To begin, download the API .whl file using this link.

Creating a Project Folder

For these tutorials, we will setup a single project folder to organize all your scripts and example code. I’ll be using Visual Studio Code, but you’re welcome to use any editor you prefer.

Here are the steps to create your project folder and get started:

  1. Open your terminal

  2. Create a new folder for your project

    mkdir RobotSensorIntegration
    cd RobotSensorIntegration
    
  3. Open the folder in Visual Studio Code

    code .
    

This will open your project in vs Code. From here we will create a virtual environment to manage python packages. Since this project will involve several pip installations, using a virual environment will help prevent conflicts with other projects.

  1. Start by being inside your project folder (RobotSensorIntegration)

  2. In your terminal write:

    python -m venv myenv
    

    This creates a virtual environment named myenv using Python 3.8.10

  3. Activate your environment using:

    .\myenv\Scripts\activate
    

Now that we have the virtual environment setup, you will want to install the .whl file. Here is the code to install that:

python -m pip install <whl fullpath name>.whl

Now that we have the API installed, we can download the examples. The three examples that are important are API creation, gripper commands, and cartesian waypoint trajectories. You will eventually combine all of these into a single script when we bring in the sensor data, so its a good idea to understand how each one works on its own first. For now, copy these example scripts into your RobotSensorIntegration folder:

#API CREATION

#! /usr/bin/env python3

###
# KINOVA (R) KORTEX (TM)
#
# Copyright (c) 2018 Kinova inc. All rights reserved.
#
# This software may be modified and distributed
# under the under the terms of the BSD 3-Clause license.
#
# Refer to the LICENSE file for details.
#
###

import sys
import os

from kortex_api.TCPTransport import TCPTransport
from kortex_api.RouterClient import RouterClient
from kortex_api.SessionManager import SessionManager

from kortex_api.autogen.client_stubs.DeviceConfigClientRpc import DeviceConfigClient
from kortex_api.autogen.client_stubs.BaseClientRpc import BaseClient

from kortex_api.autogen.messages import DeviceConfig_pb2, Session_pb2, Base_pb2

def example_api_creation(args):
    '''
    This function creates all required objects and connections to use the arm's services.
    It is easier to use the DeviceConnection utility class to create the router and then
    create the services you need (as done in the other examples).
    However, if you want to create all objects yourself, this function tells you how to do it.
    '''

    PORT = 10000

    # Setup API
    error_callback = lambda kException: print("_________ callback error _________ {}".format(kException))
    transport = TCPTransport()
    router = RouterClient(transport, error_callback)
    transport.connect(args.ip, PORT)

    # Create session
    session_info = Session_pb2.CreateSessionInfo()
    session_info.username = args.username
    session_info.password = args.password
    session_info.session_inactivity_timeout = 60000   # (milliseconds)
    session_info.connection_inactivity_timeout = 2000 # (milliseconds)

    print("Creating session for communication")
    session_manager = SessionManager(router)
    session_manager.CreateSession(session_info)
    print("Session created")

    # Create required services
    device_config = DeviceConfigClient(router)
    base = BaseClient(router)

    print(device_config.GetDeviceType())
    print(base.GetArmState())

    # Close API session
    session_manager.CloseSession()

    # Disconnect from the transport object
    transport.disconnect()

def main():
    # Import the utilities helper module
    sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
    import utilities

    # Parse arguments
    args = utilities.parseConnectionArguments()

    # Example core
    example_api_creation(args)

if __name__ == "__main__":
    main()
# GRIPPER COMMANDS

#! /usr/bin/env python3

###
# KINOVA (R) KORTEX (TM)
#
# Copyright (c) 2019 Kinova inc. All rights reserved.
#
# This software may be modified and distributed under the
# terms of the BSD 3-Clause license.
#
# Refer to the LICENSE file for details.
#
###

import sys
import os
import time

from kortex_api.autogen.client_stubs.BaseClientRpc import BaseClient
from kortex_api.autogen.messages import Base_pb2

class GripperCommandExample:
    def __init__(self, router, proportional_gain = 2.0):

        self.proportional_gain = proportional_gain
        self.router = router

        # Create base client using TCP router
        self.base = BaseClient(self.router)

    def ExampleSendGripperCommands(self):

        # Create the GripperCommand we will send
        gripper_command = Base_pb2.GripperCommand()
        finger = gripper_command.gripper.finger.add()

        # Close the gripper with position increments
        print("Performing gripper test in position...")
        gripper_command.mode = Base_pb2.GRIPPER_POSITION
        position = 0.00
        finger.finger_identifier = 1
        while position < 1.0:
            finger.value = position
            print("Going to position {:0.2f}...".format(finger.value))
            self.base.SendGripperCommand(gripper_command)
            position += 0.1
            time.sleep(1)

        # Set speed to open gripper
        print ("Opening gripper using speed command...")
        gripper_command.mode = Base_pb2.GRIPPER_SPEED
        finger.value = 0.1
        self.base.SendGripperCommand(gripper_command)
        gripper_request = Base_pb2.GripperRequest()

        # Wait for reported position to be opened
        gripper_request.mode = Base_pb2.GRIPPER_POSITION
        while True:
            gripper_measure = self.base.GetMeasuredGripperMovement(gripper_request)
            if len (gripper_measure.finger):
                print("Current position is : {0}".format(gripper_measure.finger[0].value))
                if gripper_measure.finger[0].value < 0.01:
                    break
            else: # Else, no finger present in answer, end loop
                break

        # Set speed to close gripper
        print ("Closing gripper using speed command...")
        gripper_command.mode = Base_pb2.GRIPPER_SPEED
        finger.value = -0.1
        self.base.SendGripperCommand(gripper_command)

        # Wait for reported speed to be 0
        gripper_request.mode = Base_pb2.GRIPPER_SPEED
        while True:
            gripper_measure = self.base.GetMeasuredGripperMovement(gripper_request)
            if len (gripper_measure.finger):
                print("Current speed is : {0}".format(gripper_measure.finger[0].value))
                if gripper_measure.finger[0].value == 0.0:
                    break
            else: # Else, no finger present in answer, end loop
                break

def main():
    # Import the utilities helper module
    import argparse
    sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
    import utilities

    # Parse arguments
    parser = argparse.ArgumentParser()
    args = utilities.parseConnectionArguments(parser)

    # Create connection to the device and get the router
    with utilities.DeviceConnection.createTcpConnection(args) as router:

        example = GripperCommandExample(router)
        example.ExampleSendGripperCommands()

if __name__ == "__main__":
    main()
# CARTESIAN WAYPOINTS TRAJECTORY

#! /usr/bin/env python3

###
# KINOVA (R) KORTEX (TM)
#
# Copyright (c) 2021 Kinova inc. All rights reserved.
#
# This software may be modified and distributed
# under the terms of the BSD 3-Clause license.
#
# Refer to the LICENSE file for details.
#
###

import sys
import os
import time
import threading

from kortex_api.autogen.client_stubs.BaseClientRpc import BaseClient
from kortex_api.autogen.client_stubs.BaseCyclicClientRpc import BaseCyclicClient


from kortex_api.autogen.messages import Base_pb2, BaseCyclic_pb2, Common_pb2

# Maximum allowed waiting time during actions (in seconds)
TIMEOUT_DURATION = 30

# Create closure to set an event after an END or an ABORT
def check_for_end_or_abort(e):
    """Return a closure checking for END or ABORT notifications

    Arguments:
    e -- event to signal when the action is completed
        (will be set when an END or ABORT occurs)
    """
    def check(notification, e = e):
        print("EVENT : " + \
              Base_pb2.ActionEvent.Name(notification.action_event))
        if notification.action_event == Base_pb2.ACTION_END \
        or notification.action_event == Base_pb2.ACTION_ABORT:
            e.set()
    return check

def example_move_to_home_position(base):
    # Make sure the arm is in Single Level Servoing mode
    base_servo_mode = Base_pb2.ServoingModeInformation()
    base_servo_mode.servoing_mode = Base_pb2.SINGLE_LEVEL_SERVOING
    base.SetServoingMode(base_servo_mode)

    # Move arm to ready position
    print("Moving the arm to a safe position")
    action_type = Base_pb2.RequestedActionType()
    action_type.action_type = Base_pb2.REACH_JOINT_ANGLES
    action_list = base.ReadAllActions(action_type)
    action_handle = None
    for action in action_list.action_list:
        if action.name == "Home":
            action_handle = action.handle

    if action_handle == None:
        print("Can't reach safe position. Exiting")
        return False

    e = threading.Event()
    notification_handle = base.OnNotificationActionTopic(
        check_for_end_or_abort(e),
        Base_pb2.NotificationOptions()
    )

    base.ExecuteActionFromReference(action_handle)
    finished = e.wait(TIMEOUT_DURATION)
    base.Unsubscribe(notification_handle)

    if finished:
        print("Safe position reached")
    else:
        print("Timeout on action notification wait")
    return finished
def populateCartesianCoordinate(waypointInformation):

    waypoint = Base_pb2.CartesianWaypoint()
    waypoint.pose.x = waypointInformation[0]
    waypoint.pose.y = waypointInformation[1]
    waypoint.pose.z = waypointInformation[2]
    waypoint.blending_radius = waypointInformation[3]
    waypoint.pose.theta_x = waypointInformation[4]
    waypoint.pose.theta_y = waypointInformation[5]
    waypoint.pose.theta_z = waypointInformation[6]
    waypoint.reference_frame = Base_pb2.CARTESIAN_REFERENCE_FRAME_BASE

    return waypoint

def example_trajectory(base, base_cyclic):

    base_servo_mode = Base_pb2.ServoingModeInformation()
    base_servo_mode.servoing_mode = Base_pb2.SINGLE_LEVEL_SERVOING
    base.SetServoingMode(base_servo_mode)
    product = base.GetProductConfiguration()
    waypointsDefinition = tuple(tuple())
    if(   product.model == Base_pb2.ProductConfiguration__pb2.MODEL_ID_L53
       or product.model == Base_pb2.ProductConfiguration__pb2.MODEL_ID_L31):
        if(product.model == Base_pb2.ProductConfiguration__pb2.MODEL_ID_L31):
            kTheta_x = 90.6
            kTheta_y = -1.0
            kTheta_z = 150.0
            waypointsDefinition = ( (0.439,  0.194, 0.448, 0.0, kTheta_x, kTheta_y, kTheta_z),
                                    (0.200,  0.150, 0.400, 0.0, kTheta_x, kTheta_y, kTheta_z),
                                    (0.350,  0.050, 0.300, 0.0, kTheta_x, kTheta_y, kTheta_z))
        else:
            kTheta_x = 90.0
            kTheta_y = 0.0
            kTheta_z = 90.0
            waypointsDefinition = ( (0.7,   0.0,  0.5,  0.0, kTheta_x, kTheta_y, kTheta_z),
                                    (0.7,   0.0,  0.33, 0.1, kTheta_x, kTheta_y, kTheta_z),
                                    (0.7,   0.48, 0.33, 0.1, kTheta_x, kTheta_y, kTheta_z),
                                    (0.61,  0.22, 0.4,  0.1, kTheta_x, kTheta_y, kTheta_z),
                                    (0.7,   0.48, 0.33, 0.1, kTheta_x, kTheta_y, kTheta_z),
                                    (0.63, -0.22, 0.45, 0.1, kTheta_x, kTheta_y, kTheta_z),
                                    (0.65,  0.05, 0.33, 0.0, kTheta_x, kTheta_y, kTheta_z))
    else:
        print("Product is not compatible to run this example please contact support with KIN number bellow")
        print("Product KIN is : " + product.kin())


    waypoints = Base_pb2.WaypointList()

    waypoints.duration = 0.0
    waypoints.use_optimal_blending = False

    index = 0
    for waypointDefinition in waypointsDefinition:
        waypoint = waypoints.waypoints.add()
        waypoint.name = "waypoint_" + str(index)
        waypoint.cartesian_waypoint.CopyFrom(populateCartesianCoordinate(waypointDefinition))
        index = index + 1

    # Verify validity of waypoints
    result = base.ValidateWaypointList(waypoints);
    if(len(result.trajectory_error_report.trajectory_error_elements) == 0):
        e = threading.Event()
        notification_handle = base.OnNotificationActionTopic(   check_for_end_or_abort(e),
                                                                Base_pb2.NotificationOptions())

        print("Moving cartesian trajectory...")

        base.ExecuteWaypointTrajectory(waypoints)

        print("Waiting for trajectory to finish ...")
        finished = e.wait(TIMEOUT_DURATION)
        base.Unsubscribe(notification_handle)

        if finished:
            print("Cartesian trajectory with no optimization completed ")
            e_opt = threading.Event()
            notification_handle_opt = base.OnNotificationActionTopic(   check_for_end_or_abort(e_opt),
                                                                Base_pb2.NotificationOptions())

            waypoints.use_optimal_blending = True
            base.ExecuteWaypointTrajectory(waypoints)

            print("Waiting for trajectory to finish ...")
            finished_opt = e_opt.wait(TIMEOUT_DURATION)
            base.Unsubscribe(notification_handle_opt)

            if(finished_opt):
                print("Cartesian trajectory with optimization completed ")
            else:
                print("Timeout on action notification wait for optimized trajectory")

            return finished_opt
        else:
            print("Timeout on action notification wait for non-optimized trajectory")

        return finished

    else:
        print("Error found in trajectory")
        result.trajectory_error_report.PrintDebugString();


def main():

    # Import the utilities helper module
    sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
    import utilities

    # Parse arguments
    args = utilities.parseConnectionArguments()

    # Create connection to the device and get the router
    with utilities.DeviceConnection.createTcpConnection(args) as router:

        # Create required services
        base = BaseClient(router)
        base_cyclic = BaseCyclicClient(router)


        # Example core
        success = True

        success &= example_move_to_home_position(base)
        success &= example_trajectory(base, base_cyclic)

        return 0 if success else 1

if __name__ == "__main__":
    exit(main())