# motion_service.py
#
# Copyright (c) 2018, Xamla and/or its affiliates. All rights reserved.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#!/usr/bin/env python3
import rospy
import actionlib
import numpy as np
from datetime import timedelta
import enum
from xamlamoveit_msgs.srv import *
from xamlamoveit_msgs.msg import *
from control_msgs.msg import GripperCommandAction, GripperCommandGoal
from moveit_msgs.msg import MoveItErrorCodes
from std_srvs.srv import SetBool
from .xamla_motion_exceptions import ServiceException, ArgumentError
from .data_types import *
from .utility import ROSNodeSteward, LeaseBaseLock
from collections import Iterable
from actionlib_msgs.msg import GoalID
from functools import partial
from threading import Lock
from copy import deepcopy
import asyncio
@enum.unique
class ActionLibGoalStatus(enum.Enum):
PENDING = 0
ACTIVE = 1
PREEMPTED = 2
SUCCEEDED = 3
ABORTED = 4
REJECTED = 5
PREEMPTING = 6
RECALLING = 7
RECALLED = 8
LOST = 9
def generate_action_executor(action):
async def run_action(goal):
loop = asyncio.get_event_loop()
action_done = loop.create_future()
def done_callback(goal_status, result):
status = ActionLibGoalStatus(goal_status)
if not loop.is_closed():
if status != ActionLibGoalStatus.SUCCEEDED:
try:
reason = ErrorCodes(result.result)
exc = ServiceException('action end unsuccessfully with'
' state: {}, reason: {}'.format(status,
reason),
error_code=reason)
except (AttributeError, ValueError):
exc = ServiceException('action end unsuccessfully with'
' state: {}'.format(status))
loop.call_soon_threadsafe(action_done.set_exception,
exc)
else:
loop.call_soon_threadsafe(action_done.set_result,
result)
try:
action.send_goal(goal, done_cb=done_callback)
await action_done
except asyncio.CancelledError as exc:
action.cancel_goal()
raise exc
finally:
action.wait_for_result()
return action_done
return run_action
class SteppedMotionClient(object):
"""
Client to perform supervised trajectory execution
Methods
-------
moveJ_supervised
Run supervised trajectory execution
"""
__step_topic = '/xamlaMoveActions/step'
__next_topic = '/xamlaMoveActions/next'
__previous_topic = '/xamlaMoveActions/prev'
__feedback_topic = '/xamlaMoveActions/feedback'
__movej_action_name = 'moveJ_step_action'
class __ShutdownManager(object):
def __init__(self):
self.instances = {}
self.mutex = Lock()
rospy.on_shutdown(self.on_shutdown)
def on_shutdown(self):
# print('stepped shutdown')
for key, cancel in self.instances.items():
# print('cancel goal_id: {}'.format(key))
cancel()
def register_instance(self, goal_id, cancel_func):
with self.mutex:
self.instances[goal_id] = cancel_func
# keys = [key for key in self.instances]
# print('instances after add: {}'.format(keys))
def unregister_instance(self, goal_id):
with self.mutex:
try:
self.instances.pop(goal_id)
except KeyError as exc:
pass
# keys = [key for key in self.instances]
# print('instances after pop: {}'.format(keys))
__shutdown_manager = None
def __init__(self, trajectory: JointTrajectory,
velocity_scaling: float, check_collision: bool=True):
"""
Run supervised trajectory execution
Parameters
----------
trajectory : JointTrajectory
Trajectory which should be executed supervised
velocity_scaling : float
Scaling factor to reduce or increase the trajectory
velocities range [0.0-1.0]
check_collision : bool (default True)
If True collision check is performed
Raises
------
TypeError
If trajectory is not of expected type JointTrajectory
ValueError
If velocity_scaling is not in range between 0.0 an 1.0
ServiceException
If action goal handle is not available
"""
self.__mutex = Lock()
if type(self).__shutdown_manager is None:
type(self).__shutdown_manager = type(self).__ShutdownManager()
self.__ros_node_steward = ROSNodeSteward()
self.__m_action = actionlib.SimpleActionClient(self.__movej_action_name,
StepwiseMoveJAction)
if not self.__m_action.wait_for_server(rospy.Duration(5)):
raise ServiceException('connection to stepped motion action'
' server could not be established')
self.__goal_id = None
self.__progress = None
self.__state = None
if not isinstance(trajectory, JointTrajectory):
raise TypeError('trajectory is not of expected type'
' JointTrajectory')
goal = StepwiseMoveJGoal()
velocity_scaling = float(velocity_scaling)
if velocity_scaling > 1.0 or velocity_scaling < 0.0:
raise ValueError('velocity scaling is not between 0.0 and 1.0')
goal.veloctiy_scaling = velocity_scaling
goal.check_collision = bool(check_collision)
goal.trajectory = trajectory.to_joint_trajectory_msg()
loop = asyncio.get_event_loop()
self.__action_done = loop.create_future()
self.__action_done.add_done_callback(self._done_callback)
def done_callback(goal_status, result):
status = ActionLibGoalStatus(goal_status)
type(self).__shutdown_manager.unregister_instance(self.__goal_id.id)
if not loop.is_closed():
if status != ActionLibGoalStatus.SUCCEEDED:
try:
reason = ErrorCodes(result.result)
exc = ServiceException('action end unsuccessfully with'
' state: {}, reason: {}'.format(status,
reason),
error_code=reason)
except ValueError:
exc = ServiceException('action end unsuccessfully with'
' state: {}'.format(status))
loop.call_soon_threadsafe(self.__action_done.set_exception,
exc)
else:
loop.call_soon_threadsafe(self.__action_done.set_result,
result)
self.__m_action.send_goal(goal, done_cb=done_callback)
self.__goal_id = self.__m_action.gh.comm_state_machine.action_goal.goal_id
if not self.__goal_id:
self.__m_action.cancel_goal()
raise ServiceException('action goal handle is not available')
type(self).__shutdown_manager.register_instance(self.__goal_id.id,
self.__m_action.cancel_goal)
self.__state = SteppedMotionState(self.__goal_id.id, '', 1, 0.0)
self.__step_pub = rospy.Publisher(self.__step_topic,
Step,
queue_size=1)
self.__next_pub = rospy.Publisher(self.__next_topic,
GoalID,
queue_size=1)
self.__previous_pub = rospy.Publisher(self.__previous_topic,
GoalID,
queue_size=1)
self.__feedback_sub = rospy.Subscriber(self.__feedback_topic,
TrajectoryProgress,
callback=self._feedback_callback,
queue_size=10)
def __del__(self):
type(self).__shutdown_manager.unregister_instance(self.__goal_id.id)
def cancel(self):
type(self).__shutdown_manager.unregister_instance(self.__goal_id.id)
try:
self.__m_action.cancel_goal()
except ServiceException:
pass
finally:
self.__m_action.wait_for_result()
@property
def state(self):
"""
state : SteppedMotionState or None
Current state of the supervised trajectory execution
"""
with self.__mutex:
return self.__state
@property
def goal_id(self):
"""
goal_id : GoalId or None
Current ros action goal id
"""
return self.__goal_id
@property
def action_done_future(self):
"""
action_done_future : asyncio.future
future which is done when supervised move is done of cancelled
"""
return self.__action_done
def step(self, velocity_scaling=0.5):
"""
Requests the supervised executor to perform a step
The stepping direction is determined by the sign of the velocity_scaling
Parameters
----------
velocity_scaling : float
Specifies the desired velocity and direction of the supervised motion.
Positive values mean forward and negative values backward stepping.
To stop fast as possible set velocity scaling to 0.0.
Valid range: [-1.0, 1.0]
Raises
------
ValueError
If velocity_scaling is not between -1.0 and 1.0
"""
if velocity_scaling < -1.00001 or velocity_scaling > 1.00001:
raise ValueError('velocity scaling is not in range [-1.0, 1.0]')
if self.__goal_id:
msg = Step()
msg.id = self.goal_id.id
msg.velocity_scaling = velocity_scaling
self.__step_pub.publish(msg)
def next(self):
"""
Request at supervised executor to perform next step
"""
if self.__goal_id:
self.__next_pub.publish(self.__goal_id)
def previous(self):
"""
Request at supervised executor to perform previous step
"""
if self.__goal_id:
self.__previous_pub.publish(self.__goal_id)
def _feedback_callback(self, trajectory_progress):
if self.__progress != trajectory_progress.progress:
with self.__mutex:
self.__progress = trajectory_progress.progress
self.__state = SteppedMotionState(self.__goal_id.id,
trajectory_progress.error_msg,
trajectory_progress.error_code,
trajectory_progress.progress)
def _done_callback(self, future):
# print('done callback')
# type(self).__shutdown_manager.unregister_instance(self.__goal_id.id)
self.__feedback_sub.unregister()
self.__step_pub.unregister()
self.__next_pub.unregister()
self.__previous_pub.unregister()
[docs]class MotionService(object):
__movej_action = 'moveJ_action'
__query_inverse_kinematics_service = "xamlaMoveGroupServices/query_ik2"
def __init__(self):
self.__ros_node_steward = ROSNodeSteward()
try:
self.__ik_service = rospy.ServiceProxy(
self.__query_inverse_kinematics_service,
GetIKSolution2)
except rospy.ServiceException as exc:
raise ServiceException('init service for query'
' inverse kinematics failed,'
' abort ') from exc
self.__m_action = actionlib.SimpleActionClient(self.__movej_action,
moveJAction)
if not self.__m_action.wait_for_server(rospy.Duration(5)):
raise ServiceException('connection to moveJ action'
' server could not be established')
[docs] @classmethod
def query_available_move_groups(cls):
"""
Query all currently available move groups
To query the move groups the ros service with the string
defined in query_move_group_service is called
Returns
-------
groups : List[MoveGroupDescription]
Returns a list with instances of MoveGroupDescription.
For further details please take a look into the documentation
of MoveGroupDescription
Raises
------
xamla_motion.ServiceNotAvailableException
If Service not exist or is not callable
TypeError
If the service response joint_names are not
of an iterable type
"""
query_move_group_service = ('xamlaMoveGroupServices/'
'query_move_group_interface')
try:
service = rospy.ServiceProxy(
query_move_group_service,
QueryMoveGroupInterfaces)
response = service()
except rospy.ServiceException as exc:
raise ServiceException('service call for query'
' available move groups failed,'
' abort ') from exc
groups = list()
for g in response.move_group_interfaces:
if len(g.joint_names) == 0:
joint_set = JointSet.empty()
else:
joint_set = JointSet(g.joint_names)
groups.append(MoveGroupDescription(g.name, g.sub_move_group_ids,
joint_set, g.end_effector_names,
g.end_effector_link_names))
return groups
[docs] @classmethod
def query_available_end_effectors(cls):
"""
Query all currently available end effectors
To query the available end effectors the ros service with the string
defined in _query_move_group_service is called an only the relevant
information about the endeffector is filtered out
Returns
-------
end_effectors : Dict[str, EndEffectorDescription]
Returns a dictionary key is the endeffector name,
value is an instance of EndEffectorDescription
Raises
------
xamla_motion.ServiceExeption
If Service not exist or is not callable
TypeError
If the service response joint_names are not
of an iterable type
"""
try:
move_groups = cls.query_available_move_groups()
except (ServiceException, TypeError) as exc:
raise exc
end_effectors = {}
for group in move_groups:
for i, end_effector in enumerate(group.end_effector_names):
if end_effector not in end_effectors:
d = EndEffectorDescription(end_effector,
group.sub_move_group_ids,
group.joint_set, group.name,
group.end_effector_link_names[i])
end_effectors[end_effector] = d
return end_effectors
[docs] @classmethod
def query_endeffector_limits(cls, name):
"""
Query end effector limits from ros param
To query the end effector limits the ros param definied in
end_effector_limits_param is read out
Parameters
----------
name : str convertable
Name of the end effector for which the
limits are queried
Returns
-------
EndEffectorLimits
Returns a instance of EndEffectorLimits
Raises
------
RuntimeError
If the the ros parameter or
the requested end effector name
no exists
KeyError
If not all necessary limits exists
"""
end_effector_limits_param = ('xamlaJointJogging/'
'end_effector_list')
try:
eel_param = rospy.get_param(end_effector_limits_param)
except KeyError as exc:
raise RuntimeError('end effector limit ros param: '
+ end_effector_limits_param +
' not exists') from exc
name = str(name)
for limits in eel_param:
try:
end_effector_name = limits['name']
except KeyError as exc:
continue
if name == end_effector_name:
max_xyz_vel = limits['taskspace_xyz_max_vel']
max_xyz_acc = limits['taskspace_xyz_max_acc']
max_angular_vel = limits['taskspace_angular_max_vel']
max_angular_acc = limits['taskspace_angular_max_acc']
return EndEffectorLimits(max_xyz_vel,
max_xyz_acc,
max_angular_vel,
max_angular_acc)
else:
continue
raise RuntimeError('Requested end effector name not exists')
[docs] @classmethod
def query_joint_limits(cls, joint_set):
"""
Query end joint limits from ros param
To query the joint limits the ros param definied in
joint_limits_param + joint name + limit name is read out
Parameters
----------
join_set : JointSet
Set of joints for which the
limits are queried
Returns
-------
JointLimits
Returns a instance of JointLimits
invalid limits are of type numpy.nan
Raises
------
TypeError : type mismatch
If joint_set is not of expected type JointSet
KeyError
If ros params not exists
"""
joint_limits_param = ('robot_description_planning/'
'joint_limits')
if not isinstance(joint_set, JointSet):
raise TypeError('joint_set is not of expected type JointSet')
maxVel = [None] * len(joint_set)
maxAcc = [None] * len(joint_set)
minPos = [None] * len(joint_set)
maxPos = [None] * len(joint_set)
for i, name in enumerate(joint_set):
prefix = joint_limits_param + '/' + name + '/'
if rospy.get_param(prefix+'has_velocity_limits'):
maxVel[i] = rospy.get_param(prefix+'max_velocity')
if rospy.get_param(prefix+'has_acceleration_limits'):
maxAcc[i] = rospy.get_param(prefix+'max_acceleration')
if rospy.get_param(prefix+'has_position_limits'):
minPos[i] = rospy.get_param(prefix+'min_position')
maxPos[i] = rospy.get_param(prefix+'max_position')
return JointLimits(joint_set, maxVel, maxAcc, minPos, maxPos)
[docs] @classmethod
def query_joint_states(cls, joint_set):
"""
Query joint states by calling the providing ros service
To query the joint states the ros service with the name
defined in quary_joint_states_service is called
Parameters
----------
joint_set : JointSet
Set of joints for which the joint states are queried
Returns
-------
JointStates
Returns a instance of JointStates which contains the joint
states of all joints defined in joint_set
Raises
------
TypeError : type mismatch
If joint_set is not of expected type JointSet
xamla_motion.ServiceException
If ros service not exists or is not callable
"""
query_joint_states_service = ('xamlaMoveGroupServices/'
'query_move_group'
'_current_position')
if not isinstance(joint_set, JointSet):
raise TypeError('joint_set is not of expected type JointSet')
try:
service = rospy.ServiceProxy(
query_joint_states_service,
GetCurrentJointState)
response = service(joint_set.names).current_joint_position
except rospy.ServiceException as exc:
print('service call for query current'
'joint states failed, abort ')
raise ServiceException('service call for query'
' current joint states '
'failed, abort') from exc
r_joint_set = JointSet(response.name)
positions = JointValues(r_joint_set,
response.position)
if not response.velocity:
velocities = None
else:
velocities = JointValues(r_joint_set,
response.velocity)
if not response.effort:
efforts = None
else:
efforts = JointValues(r_joint_set,
response.effort)
return JointStates(positions, velocities, efforts)
[docs] @classmethod
def query_pose(cls, move_group_name,
joint_positions, end_effector_link=''):
"""
Computes the pose by applying forward kinematics
Parameters
----------
move_group_name : str convertable
name of the move group from which the pose
is queried
joint_positions : JointValues
joint values from which the pose is calculated
end_effector_link : str convertable (optional)
end effector link is necessary if end effector
is not part of the move group but pose should
be computed for the end effector
Returns
-------
Pose
Pose of the kinematic chain defined by the input parameter
Raises
------
TypeError
If joint_positions is not of type JointValues
"""
if not isinstance(joint_positions, JointValues):
raise TypeError('joint_positions is not of'
' expected type JointValues')
joint_path = JointPath.from_one_point(joint_positions)
return cls.query_pose_many(move_group_name, joint_path,
end_effector_link)[0]
[docs] @classmethod
def query_pose_many(cls, move_group_name,
joint_path, end_effector_link=''):
"""
Query the poses from joint path points by applying forward kinematics
Parameters
----------
move_group_name : str convertable
name of the move group from which the poses are queried
joint_path : JointPath
joint path from which the poses are calculated
end_effector_link : str convertable (optional)
end effector link is necessary if end effector
is not part of the move group but pose should
be computed for the end effector
Returns
-------
List[Pose]
List of Poses
Raises
------
TypeError
If joint_path is not of type JointPath
"""
query_forward_kinematics_service = ('xamlaMoveGroupServices/'
'query_fk')
move_group_name = str(move_group_name)
end_effector_link = str(end_effector_link)
if not isinstance(joint_path, JointPath):
raise TypeError('joint_path is not of expected type JointPath')
try:
service = rospy.ServiceProxy(
query_forward_kinematics_service,
GetFKSolution)
response = service(move_group_name,
end_effector_link,
joint_path.joint_set,
[p.to_joint_path_point_msg()
for p in joint_path])
except rospy.ServiceException as exc:
print('service call for query forward kinematics'
' failed, abort ')
raise ServiceException('service call for query'
' forward kinematics'
' failed, abort') from exc
if (response.error_codes is None or response.error_msgs is None
or len(response.error_codes) != len(response.error_msgs)):
raise ServiceException('service call for query forward '
'kinematics returns with '
'invalid response')
for i, error in enumerate(response.error_codes):
if error.val != MoveItErrorCodes.SUCCESS:
raise ServiceException('service call for query forward'
' kinematics was not'
' successful for point: ' + str(i) +
' service name: ' +
query_forward_kinematics_service +
' error code: ' +
str(error.val) +
', error message: ' +
response.error_msgs[i])
return list(map(lambda x: Pose.from_posestamped_msg(x),
response.solutions))
@classmethod
def _query_moveit_joint_path(cls, move_group_name, joint_path):
query_joint_path_service = ('xamlaPlanningServices/'
'query_joint_path')
if not isinstance(joint_path, JointPath):
raise TypeError('joint_path is not of expected type JointPath')
try:
service = rospy.ServiceProxy(
query_joint_path_service,
GetMoveItJointPath)
response = service(move_group_name,
joint_path.joint_set,
[p.to_joint_path_point_msg()
for p in joint_path])
except rospy.ServiceException as exc:
print('service call for query joint path'
' failed, abort ')
raise ServiceException('service call for query'
' joint path'
' failed, abort') from exc
return response
[docs] @classmethod
def query_collision_free_joint_path(cls, move_group_name, joint_path):
"""
Query a collision free joint path from a user defined joint path
Parameters
----------
move_group_name : str convertable
name of the move group for which the collision free joint
is required
joint_path : JointPath
joint path which may contains collisions
Results
-------
JointPath
New planned JointPath without collisions
Raises
------
TypeError : type mismatch
If joint_path is not of expectef type JointPath
or it move_group_name is not convertable to str
ServiceException
If query joint path service is not callable or
is not successful
"""
query_joint_path_service = ('xamlaPlanningServices/'
'query_joint_path')
if not isinstance(joint_path, JointPath):
raise TypeError('joint_path is not of expected type JointPath')
move_group_name = str(move_group_name)
response = cls._query_moveit_joint_path(move_group_name,
joint_path)
if response.error_code.val != MoveItErrorCodes.SUCCESS:
raise ServiceException('service call for query collision free'
' joint path was not successful. '
'service name:' +
query_joint_path_service +
' error code: ' +
str(response.error_code.val))
return JointPath(joint_path.joint_set,
[JointValues(joint_path.joint_set, p.positions)
for p in response.path])
[docs] @classmethod
def query_joint_trajectory(cls, joint_path, max_velocity, max_acceleration,
max_deviation, delta_t):
"""
Query a joint trajectory from joint path / joint positions
Parameters
----------
joint_path : JointPath
Defines the key joint positions the trajectory must reach
max_velocity : Iterable[float convertable]
Defines the maximal velocity for every joint
max_acceleration : Iterable[float convertable]
Defines the maximal acceleration for every joint
max_deviation : float convertable
Defines the maximal deviation of the joints to the
defined key points while executing the trajectory
delta_t : float convertable
Sampling points frequency or time
if value is create as 1.0 the value is interpreted
as a value in seconds else the value is interpreted
as a value in Hz
Returns
-------
JointTrajectory
An instance of JointTrajectory with dense JointTrajectoryPoints
Raises
------
TypeError
If joint_path is not of expected type JointPath or
max_velocity or max_acceleration is not iterable or
values of max_velocity, max_acceleration or delta_t
are not convertable to float
ValueError
If max_velocity or max_acceleration have not the same
number of values has joints are defined in joint_path
"""
query_joint_trajectory_service = ('xamlaPlanningServices/'
'query_joint_trajectory')
if not isinstance(joint_path, JointPath):
raise TypeError('joint_path is not of expected type JointPath')
max_velocity = [float(v) for v in max_velocity]
if len(max_velocity) != len(joint_path.joint_set):
raise ValueError('max_velocity has not the same number of values'
' as joints are defined in joint_path')
max_acceleration = [float(v) for v in max_acceleration]
if len(max_acceleration) != len(joint_path.joint_set):
raise ValueError('max_acceleration has not the same number of'
' values as joints are defined in joint_path')
max_deviation = float(max_deviation)
delta_t = float(delta_t)
delta_t = 1 / delta_t if delta_t > 1.0 else delta_t
try:
service = rospy.ServiceProxy(
query_joint_trajectory_service,
GetOptimJointTrajectory)
response = service(joint_path.joint_set,
[p.to_joint_path_point_msg()
for p in joint_path],
max_velocity,
max_acceleration,
max_deviation,
delta_t)
except rospy.ServiceException as exc:
print('service call for query joint trajectory'
' failed, abort ')
raise ServiceException('service call for query'
' joint trajectory'
' failed, abort') from exc
if response.error_code.val != MoveItErrorCodes.SUCCESS:
raise ServiceException('service call for query joint'
' trajectory was not successful. '
'service name:' +
query_joint_trajectory_service +
' error code: ' +
str(response.error_code.val))
j = JointSet(response.solution.joint_names)
p = [JointTrajectoryPoint.from_joint_trajectory_point_msg(j, p)
for p in response.solution.points]
return JointTrajectory(j, p)
[docs] @classmethod
def query_task_space_trajectory(cls, end_effector_name, cartesian_path, seed,
max_xyz_velocity, max_xyz_acceleration,
max_angular_velocity, max_angular_acceleration,
ik_jump_threshold, max_deviation, collision_check,
delta_t):
"""
Query a joint trajectory from task space poses
Parameters
----------
end_effector_name : str convertable
Name of the end effector for a trajectory should be quried
cartesian_path : CartesianPath
Define the key poses the trajectory must reach
seed : JointValues
numerical seed to control configuration of the robot
max_xyz_velocity : float convertable
max velocity for translation in m/s
max_xyz_acceleration : float convertable
max acceleration for translation in m/s^2
max_angular_velocity : float convertable
max angular velocity in rad/s
max_angular_acceleration : float convertable
max angular acceleration in rad/s^2
ik_jump_threshold : float convertable
maximal inverse kinematic jump
max_deviation : float convertable
Defines the maximal deviation of the joints to the
defined key points while executing the trajectory
collision_check : bool convertable
If true check the trajectory is collision free
delta_t : float convertable
Sampling points frequency or time
if value is create as 1.0 the value is interpreted
as a value in seconds else the value is interpreted
as a value in Hz
Returns
-------
JointTrajectory
An instance of JointTrajectory with dense JointTrajectoryPoints
Raises
------
TypeError
If cartesian_path is not of expected type CartesianPath or
if seed is not of expected type JointValues
max_* is not iterable or the values are not convertable
as expected
ValueError
If max_velocity or max_acceleration have not the same
number of values has joints are defined in joint_path
"""
query_cartesian_trajectory_service = ('xamlaPlanningServices/'
'query_cartesian_trajectory')
if not isinstance(cartesian_path, CartesianPath):
raise TypeError(
'cartesian_path is not of expected type CartesianPath')
if not isinstance(seed, JointValues):
raise TypeError('seed is not of expected type JointValues')
max_xyz_velocity = float(max_xyz_velocity)
max_xyz_acceleration = float(max_xyz_acceleration)
max_angular_velocity = float(max_angular_velocity)
max_angular_acceleration = float(max_angular_acceleration)
end_effector_name = str(end_effector_name)
ik_jump_threshold = float(ik_jump_threshold)
max_deviation = float(max_deviation)
collision_check = bool(collision_check)
delta_t = float(delta_t)
delta_t = 1 / delta_t if delta_t > 1.0 else delta_t
try:
service = rospy.ServiceProxy(
query_cartesian_trajectory_service,
GetLinearCartesianTrajectory)
response = service(end_effector_name,
[p.to_posestamped_msg()
for p in cartesian_path],
max_xyz_velocity,
max_xyz_acceleration,
max_angular_velocity,
max_angular_acceleration,
delta_t,
ik_jump_threshold,
max_deviation,
seed.joint_set,
seed.to_joint_path_point_msg(),
collision_check)
except rospy.ServiceException as exc:
print('service call for query cartesian trajectory'
' failed, abort ')
raise ServiceException('service call for query'
' cartesian trajectory'
' failed, abort') from exc
if response.error_code.val != MoveItErrorCodes.SUCCESS:
raise ServiceException('service call for query cartesian'
' trajectory was not successful. '
'service name:' +
query_cartesian_trajectory_service +
' error code: ' +
str(response.error_code.val))
j = JointSet(response.solution.joint_names)
p = [JointTrajectoryPoint.from_joint_trajectory_point_msg(j, p)
for p in response.solution.points]
return JointTrajectory(j, p)
[docs] @classmethod
def query_joint_path_collisions(cls, move_group_name, joint_path):
"""
Query collisions in joint path
Parameters
----------
move_group_name : str convertable
Name of the move group for which a path should be check
for collisions
joint_path : JointPath
The path that should be check for collisions
Returns
-------
List[JointValuesCollision] or None
If collisions exists returns a list of
JointValuesCollisions else returns None
Raises
------
TypeError
If move_group_name ist not str convertable or
if joint_path is not of type JointPath
"""
query_joint_path_collisions = ('xamlaMoveGroupServices/'
'query_joint_position_collision_check')
move_group_name = str(move_group_name)
if not isinstance(joint_path, JointPath):
raise TypeError('joint_path is not of expected type JointPath')
try:
service = rospy.ServiceProxy(
query_joint_path_collisions,
QueryJointStateCollisions)
response = service(move_group_name,
joint_path.joint_set,
[p.to_joint_path_point_msg()
for p in joint_path])
except rospy.ServiceException as exc:
print('service call for query joint collisions'
' failed, abort ')
raise ServiceException('service call for query'
' joint collisions'
' failed, abort') from exc
response.in_collision = list(response.in_collision)
response.error_codes = list(response.error_codes)
if (len(response.in_collision) != len(joint_path) or
len(response.error_codes) != len(joint_path) or
len(response.messages) != len(joint_path)):
raise ServiceException('service call for query joint'
' collisions was not successful. '
'service name:' +
query_joint_path_collisions)
result = [JointValuesCollisions(i, response.error_codes[i],
response.messages[i])
for i in range(0, len(joint_path))
if response.in_collision[i]]
return result
[docs] @classmethod
def create_plan_parameters(cls, move_group_name=None, joint_set=None,
max_velocity=None, max_acceleration=None,
**kwargs):
"""
Create PlanParameters from user defined and/or quried inputs
Parameters
----------
move_group_name : str (optional default query first)
move group for which plan parameters should be created
joint_set : JointSet (optional default query similar)
joint set for which plan parameters should be created
max_velocity : Iterable[float convertable] (optional default max query)
Defines the maximal velocity for every joint
max_acceleration : Iterable[float convertable] (op default max query)
Defines the maximal acceleration for every joint
kwargs : dict
sample_resolution : float convertable (optional default 0.008/125Hz)
sample points frequency
if value is create as 1.0 the value is interpreted
as a value in seconds else the value is interpreted
as a value in Hz
collision_check : bool convertable (optional default True)
check for collision if True
max_deviation : float convertable (optional default 0.2)
max deviation from fly by points
velocity_scaling : float convertable (optional default 1.0)
scale query or user defined max velocity
values between 0.0 and 1.0
acceleration_scaling : float convertable (optional default 1.0)
scale query or user defined max acceleration
values between 0.0 and 1.0
Returns
-------
PlanParameters
Instance of plan parameters with automatically
queried and/or user defined values
Raises
------
ServiceException
If query services are not reachable or
not finish successful
TypeError
If move group name is not of type None or str convertable
If joint_set is not of type None or JointSet
If an other parameter is not convertable
ValueError
If scaling parameters are not between 0.0 and 1.0
ArgumentError
If a argument is not set and also could not be
quired automatically
"""
if not move_group_name:
groups = cls.query_available_move_groups()
if groups:
joint_set = joint_set if joint_set else groups[0].joint_set
for g in groups:
if g.joint_set.is_similar(joint_set):
move_group_name = g.name
break
else:
raise ArgumentError('no move group name was provided and'
' also no move group could'
' automatically be quired')
elif not joint_set:
groups = cls.query_available_move_groups()
for g in groups:
if g.name == move_group_name:
joint_set = g.joint_set
break
if not joint_set:
raise ArgumentError('no joint_set was provided and'
' also no joint_set could'
' automatically be quired')
if not max_velocity or not max_acceleration:
limits = cls.query_joint_limits(joint_set)
if not max_velocity:
max_velocity = limits.max_velocity
if not max_acceleration:
max_acceleration = limits.max_acceleration
joint_limits = JointLimits(joint_set, max_velocity,
max_acceleration, None, None)
return PlanParameters(move_group_name, joint_limits, **kwargs)
[docs] @classmethod
def create_task_space_plan_parameters(cls, end_effector_name=None,
max_xyz_velocity=None,
max_xyz_acceleration=None,
max_angular_velocity=None,
max_angular_acceleration=None,
**kwargs):
"""
Creates TakesSpacePlanParameters from user defined or queried inputs
Parameters
----------
move_group_name : str (optional default query first)
move group for which plan parameters should be created
max_xyz_velocity : float convertable or None
Defines the maximal xyz velocity [m/s]
max_xyz_acceleration : float convertable
Defines the maximal xyz acceleration [m/s^2]
max_angular_velocity : float convertable or None
Defines the maximal angular velocity [rad/s]
max_angular_acceleration : float convertable or None
Defines the maximal angular acceleration [rad/s^2]
kwargs : dict
sample_resolution : float convertable (optional default 0.008/125Hz)
sample points frequency
if value is create as 1.0 the value is interpreted
as a value in seconds else the value is interpreted
as a value in Hz
collision_check : bool convertable (optional default True)
check for collision if True
ik_jump_threshold : float convertable (optional default 1.2)
maximal inverse kinematic jump
max_deviation : float convertable (optional default 0.2)
max deviation from fly by points
velocity_scaling : float convertable (optional default 1.0)
scale query or user defined max velocity
values between 0.0 and 1.0
acceleration_scaling : float convertable (optional default 1.0)
scale query or user defined max acceleration
values between 0.0 and 1.0
Returns
-------
TaskSpacePlanParameters
Instance of TaskSpacePlanParameters with automatically
queried and/or user defined values
Raises
------
ServiceException
If query services are not reachable or
not finish successful
TypeError
If end_effector_name is not of type None or str convertable
If an other parameter is not convertable
ValueError
If scaling parameters are not between 0.0 and 1.0
ArgumentError
If a argument is not set and also could not be
quired automatically
"""
if not end_effector_name:
groups = cls.query_available_move_groups()
if groups:
for g in groups:
if g.end_effector_link_names:
end_effector_name = g.end_effector_names[0]
break
if not end_effector_name:
raise ArgumentError('no end effector name was provided and'
' also end effector name could'
' automatically be quired')
if (not max_xyz_velocity or not max_xyz_acceleration or
not max_angular_velocity or not max_angular_acceleration):
limits = cls.query_endeffector_limits(end_effector_name)
if not max_xyz_velocity:
max_xyz_velocity = limits.max_xyz_velocity
if not max_xyz_acceleration:
max_xyz_acceleration = limits.max_xyz_acceleration
if not max_angular_velocity:
max_angular_velocity = limits.max_angular_velocity
if not max_angular_acceleration:
max_angular_acceleration = limits.max_angular_acceleration
end_effector_limits = EndEffectorLimits(max_xyz_velocity,
max_xyz_acceleration,
max_angular_velocity,
max_angular_acceleration)
return TaskSpacePlanParameters(end_effector_name,
end_effector_limits,
**kwargs)
[docs] @classmethod
def plan_collision_free_joint_path(cls, path, parameters):
"""
Plans a collision free joint path by query it
Parameters
----------
path : JointPath
joint path which should be replanned to be
collision free
parameters : PlanParameters
plan parameters which defines the limits and
move group
Returns
-------
JointPath
the replanned collision free joint path
Raises
------
TypeError
If path is not of type JointPath or
if parameters is not of type PlanParameters
ValueError
If parameters joint set is not equal or sub
set of the path joint set and therefore
reordering was not possible
ServiceException
If query service is not available or finish
unsuccessfully
"""
if not isinstance(path, JointPath):
raise TypeError('path is not of expected'
' type JointPath')
if not isinstance(parameters, PlanParameters):
raise TypeError('parameters is not of expected'
' type PlanParameters')
# reorder path in respect to plan parameter joint_set
path = JointPath(parameters.joint_set, path.points)
return cls.query_collision_free_joint_path(parameters.move_group_name,
path)
[docs] @classmethod
def plan_move_pose_linear(cls, path, seed, parameters):
"""
Plans trajectory with linear movements from a cartesian path
Parameters
----------
path : CartesianPath
cartesian path with poses the trajectory must reach
seed : JointValues
numerical seed to control configuration
parameters : TaskSpacePlanParameters
plan parameters which defines the limits, settings
and end effector name
Returns
-------
JointTrajectory
Planned joint trajectory which reach the poses
defined in path under the constraints of
parameters
Raises
------
TypeError
If path is not of type CartesianPath or
if parameters is not of type TaskSpacePlanParameters or
if seed is not of type JointValues
ServiceException
If query service is not available or finish
unsuccessfully
"""
if not isinstance(path, CartesianPath):
raise TypeError('path is not of expected type CartesianPath')
if not isinstance(seed, JointValues):
raise TypeError('seed is not of expected type JointValues')
if not isinstance(parameters, TaskSpacePlanParameters):
raise TypeError('parameters is not of expected type'
' TaskSpacePlanParameters')
p = parameters
return cls.query_task_space_trajectory(p.end_effector_name,
path,
seed,
p.max_xyz_velocity,
p.max_xyz_acceleration,
p.max_angular_velocity,
p.max_angular_acceleration,
p.ik_jump_threshold,
p.max_deviation,
p.collision_check,
p.sample_resolution)
[docs] @classmethod
def plan_move_joints(cls, path, parameters):
"""
Plans trajectory from a joint path
Parameters
----------
path : JointPath
joint path with positions the trajectory must reach
parameters : PlanParameters
plan parameters which defines the limits, settings
and move group name
Returns
-------
JointTrajectory
Planned joint trajectory which reach the positions
defined in path under the constraints of
parameters
Raises
------
TypeError
If path is not of type JointPath or
if parameters is not of type PlanParameters or
ValueError
If parameters joint set is not equal or sub
set of the path joint set and therefore
reordering was not possible
ServiceException
If query service is not available or finish
unsuccessfully
"""
if not isinstance(path, JointPath):
raise TypeError('path is not of expected type JointPath')
if not isinstance(parameters, PlanParameters):
raise TypeError('parameters is not of expected type'
' PlanParameters')
path = JointPath(parameters.joint_set, path.points)
max_velocity = parameters.max_velocity
max_acc = parameters.max_acceleration
vel_isnan = np.isnan(parameters.max_velocity)
acc_isnan = np.isnan(parameters.max_acceleration)
if any(vel_isnan) or any(acc_isnan):
limits = cls.query_joint_limits(path.joint_set)
if any(vel_isnan):
max_velocity = [limits.max_velocity[i] *
parameters.velocity_scaling
if vel_isnan[i] else l
for i, l in enumerate(parameters.max_velocity)]
if any(acc_isnan):
max_acc = [limits.max_acceleration[i] *
parameters.acceleration_scaling
if acc_isnan[i] else l
for i, l in enumerate(parameters.max_acceleration)]
return cls.query_joint_trajectory(path,
max_velocity,
max_acc,
parameters.max_deviation,
parameters.sample_resolution)
[docs] async def execute_joint_trajectory(self, trajectory, collision_check):
"""
Executes a joint trajectory
Parameters
----------
trajectory : JointTrajectory
Joint trajectory which should be executed
collision_check : bool convertable
If True check for collision while executing
Raises
------
TypeError
If trajectory is not of type JointTrajectory
or if collision_check is not convertable to bool
ServiceException
If execution ends not successful
"""
if not isinstance(trajectory, JointTrajectory):
raise TypeError('trajectory is not of expected type'
' joint trajectory')
collision_check = bool(collision_check)
goal = moveJGoal(trajectory=trajectory.to_joint_trajectory_msg(),
check_collision=collision_check)
run_action = generate_action_executor(self.__m_action)
# the server itself lock resources
# with LeaseBaseLock(trajectory.joint_set.names) as lock_resources:
await run_action(goal)
[docs] def execute_joint_trajectory_supervised(self, trajectory: JointTrajectory,
velocity_scaling: float,
collision_check: bool) -> SteppedMotionClient:
"""
Creates a instance of SteppedMotionClient for supervised trajectory execution
Parameters
----------
trajectory : JointTrajectory
Joint trajectory which should be executed
velocity_scaling : float
scaling of velocity range 0.0 to 1.0
collision_check : bool convertable
If True check for collision while executing
Returns
-------
result : int
Result of the trajectory execution
Raises
------
TypeError
If trajectory is not of type JointTrajectory
or if collision_check is not convertable to bool
"""
return SteppedMotionClient(trajectory, velocity_scaling, collision_check)
[docs] def query_inverse_kinematics(self, pose, parameters,
seed=[],
end_effector_link='',
timeout=None,
attempts=1,
const_seed=False):
"""
Query inverse kinematic solutions one pose
Parameters
----------
pose : pose
Pose to transform to joint space
parameters : PlanParameters
Plan parameters which defines the limits, settings
and move group name
seed : JointValues (optional)
Numerical seed to control joint configuration
end_effector_link : str convertable (optinal)
necessary if poses are defined for end effector link
timeout : datatime.timedelta (optional)
timeout
attempts : int convertable (optional default 1)
Attempts to find a solution or each pose
const_seed : bool convertable (optional default False)
use constant seed instead of consecutive seeds
Returns
-------
JointValues
Instance of JointValues which is the joint space
equivalent of the task space pose
Raises
------
TypeError
If pose is not of type Pose or
parameters is not of type PlanParameters
or seed is not empty list or type JointValues
or the other parameters are not convertable
to defined types
ValueError
If parameters joint set is not equal or sub
set of the seed joint set if defined and therefore
reordering was not possible
ServiceException
If query service is not available
"""
if not isinstance(pose, Pose):
raise TypeError('pose is not of expected type Pose')
end_effector_pose = EndEffectorPose(pose, end_effector_link)
result = self.query_inverse_kinematics_many([end_effector_pose],
parameters,
seed,
timeout,
attempts,
const_seed)
if not result.succeeded:
raise ServiceException('ik service call failed with error'
' code: {}'.format(result.error_codes[0]))
return result.path[0]
[docs] def query_inverse_kinematics_many(self, poses, parameters,
seed=[],
timeout=None,
attempts=1,
const_seed=False):
"""
Query inverse kinematic solutions for a Iterable of Poses
Parameters
----------
poses : Iterable[EndEffectorPose]
Iterable with EndEffectorPoses to transform to joint space
parameters : PlanParameters
Plan parameters which defines the limits, settings
and move group name
seed : JointValues (optional)
Numerical seed to control joint configuration
timeout : datatime.timedelta
timeout
attempts : int convertable (optional default 1)
attempts to find a solution for each pose
const_seed : bool convertable (optional default False)
use constant seed instead of consecutive seeds
Returns
-------
IkResult
Instance of IkResult with all found solutions as
a JointPath and error codes
Raises
------
TypeError
If poses is not of type EndEffectorPose or
parameters is not of type PlanParameters
or seed is not empty list or type JointValues
or the other parameters are not convertable
to defined types
ValueError
If parameters joint set is not equal or sub
set of the seed joint set if defined and therefore
reordering was not possible
ServiceException
If query service is not available
"""
if (not isinstance(poses, Iterable) and
any(not isinstance(p, EndEffectorPose)
for p in poses)):
raise TypeError(
'poses is not of expected type Iterable[EndEffectorPose]')
if not isinstance(parameters, PlanParameters):
raise TypeError('parameters is not of expected'
' type PlanParameters')
if seed:
if not isinstance(seed, JointValues):
raise TypeError('seed is not of expected'
' type JointValues')
elif seed.joint_set == parameters.joint_set:
pass
elif parameters.joint_set.is_subset(seed.joint_set):
pass
else:
raise ValueError('joint set of parameters and seed do not'
' match and reording is not possible')
if timeout and not isinstance(timeout, timedelta):
raise TypeError('timeout is not of expected type timedelta')
else:
timeout = timedelta(milliseconds=200)
attempts = int(attempts)
const_seed = bool(const_seed)
duration = self._ros_duration_from_timedelta(timeout)
poses_msgs = []
for p in poses:
poses_msgs.append(p.to_end_effector_pose_msg())
req = GetIKSolution2Request()
req.group_name = parameters.move_group_name
req.joint_names = seed.joint_set.names
req.seed = seed.to_joint_values_point_msg()
req.const_seed = const_seed
req.points = poses_msgs
req.collision_check = parameters.collision_check
req.attempts = attempts
req.timeout = duration
try:
response = self.__ik_service(req)
except rospy.ServiceException as exc:
print('service call for query inverse kinematics'
' failed, abort ')
raise ServiceException('service call for query'
' inverse kinematics'
' failed, abort') from exc
f = JointValues.from_joint_path_point_msg
joint_path = JointPath(parameters.joint_set,
[f(seed.joint_set, p).select(parameters.joint_set)
for p in response.solutions])
return IkResults(joint_path, response.error_codes)
@staticmethod
def _ros_duration_from_timedelta(timedelta):
secs = timedelta.days*24*3600+timedelta.seconds
nsecs = timedelta.microseconds*1000
return rospy.Duration(secs, nsecs)
[docs] def plan_cartesian_path(self, path, parameters):
"""
Plan a joint trajectory from a cartesian path and plan parameters
Parameters
----------
path : CartesianPath
Poses the planned trajectory must reach
parameters : PlanParameters
Plan parameters which defines the limits, settings
and move group name
Returns
-------
JointTrajectory
A joint trajectory which reach defined poses of path
under the constrains in parameters
Raises
------
TypeError
If path is not of type CartesianPath or
parameters is not of type PlanParameters
ValueError
If parameters joint set is not equal or sub
set of the seed joint set and therefore
reordering was not possible
ServiceException
If query services are not available or
finish unsuccessfully or inverse kinematics
ends with error
"""
seed = self.query_joint_states(parameters.joint_set).positions
poses = [EndEffectorPose(p, '') for p in path]
result = self.query_inverse_kinematics_many(poses,
parameters,
seed)
if not result.succeeded:
raise ServiceException('service call for query inverse'
' kinematics was not successful')
return self.plan_collision_free_joint_path(result.path,
parameters)
[docs] @classmethod
def emergency_stop(cls, enable=True):
"""
Sets and resets emergency stop
Parameters
----------
enable : bool convertable (default True)
If True activate emergency stop
If False resets the emergency stop
Returns
-------
success : bool
True if the service call was successful
message : str
Response or error message
Raises
------
TypeError
If enable is not convertable to bool
ServiceException
If query service is not available
"""
query_emergency_stop = ('EmergencySTOP/'
'query_emergency_stop')
enable = bool(enable)
try:
service = rospy.ServiceProxy(
query_emergency_stop,
SetBool)
response = service(enable)
except rospy.ServiceException as exc:
print('service set emergency stop failed')
raise ServiceException('service call for set emergency'
' stop failed, abort') from exc
return response.success, response.message
[docs] @classmethod
def get_current_joint_values(cls, joint_set):
"""
Query the current joint positions of all joints in joint_set
Parameters
----------
joint_set : JointSet
Defines for which joint the positions are requested
Returns
------
positions : JointValues
Returns a instance of JointValues with the positions
of the requested joint set
Raises
------
TypeError
If joint_set is not of type JointSet
"""
print(rospy.get_name())
return cls.query_joint_states(joint_set).positions
# async def move_joints(self, target, parameters):
# """
# Moves joints to target joint positions
# Parameters
# ----------
# target : JointValues
# target joint positions
# parameters : PlanParameters
# Plan parameters which defines the limits, settings
# and move group name
# Returns
# -------
# None
# Raises
# ------
# TypeError
# If target is not of type JointValues or
# if parameters is not of type PlanParameters
# ServiceException
# If query services are not available or finish
# with a fail state
# """
# if not isinstance(target, JointValues):
# raise TypeError('target is not of expected type JointValues')
# if not isinstance(parameters, PlanParameters):
# raise TypeError('parameters is not of expected'
# ' type PlanParameters')
# source = self.get_current_joint_values(parameters.joint_set)
# path = JointPath.from_start_stop_point(source, target)
# trajectory = self.plan_move_joints(path, parameters)
# await self.execute_joint_trajectory(trajectory, parameters.collision_check)
# async def move_pose(self, target, end_effector_link, parameters, seed=None):
# """
# Moves to target pose
# Parameters
# ----------
# target : Pose
# target pose
# end_effector_link : str convertable
# specifies for which link the pose is defined
# parameters : PlanParameters
# Plan parameters which defines the limits, settings
# and move group name
# seed : JointValues or None
# numerical seed to control the robot configuration
# if none the current joint position is used
# Raises
# ------
# TypeError
# If target is not of type Pose
# If parameters is not of type PlanParameters
# If end_effector_link is not convertable to str
# If seed is not of type JointValues
# ServiceException
# If query services are not available or finish
# with a fail state
# """
# if not isinstance(target, Pose):
# raise TypeError('target is not of expected type Pose')
# if not seed:
# seed = self.get_current_joint_values(parameters.joint_set)
# joint_values = self.query_inverse_kinematics(target, parameters,
# seed,
# end_effector_link)
# await self.move_joints(joint_values, parameters)
# async def move_pose_linear(self, target, end_effector_link, parameters, seed=None):
# """
# Moves to target pose linear
# Parameters
# ----------
# target : Pose
# target pose
# end_effector_link : str convertable
# specifies for which link the pose is defined
# parameters : TaskPlanParameters
# Task space plan parameters which defines the limits, settings
# and end effector name
# seed : JointValues
# numerical seed to control the robot configuration
# Raises
# ------
# TypeError
# If target is not of type Pose
# If parameters is not of type TaskSpacePlanParameters
# If end_effector_link is not convertable to str
# If seed is not of type JointValues or None
# ServiceException
# If query services are not available or finish
# with a fail state
# """
# groups = self.query_available_move_groups()
# group = next(g for g in groups
# if any([parameters.end_effector_name in
# g.end_effector_names]))
# if not group:
# raise RuntimeError('no move group is available with'
# ' requested end effector name: ' +
# parameters.end_effector_name)
# if not seed:
# seed = self.get_current_joint_values(group.joint_set)
# source = self.query_pose(group.name, seed, end_effector_link)
# path = CartesianPath.from_start_stop_point(source, target)
# trajectory = self.plan_move_cartesian_linear(path, seed, parameters)
# await self.execute_joint_trajectory(trajectory,
# parameters.collision_check)
[docs] async def move_gripper(self, action_name, position, max_effort):
"""
Moves a gripper via the general ros interface GripperCommandAction
Parameters
----------
action_name : str convertable
Name of the action to control a specific gripper
position : float convertable
Requested position of the gripper in meter
max_effort : float convertable
Force which should be applied
Results
-------
action_response : MoveGripperResult
Result of the action execution
Raises
------
TypeError
If inputs are not convertable to specified types
ServiceException
If action server is not available
RuntimeError
If action returns unexpected result
"""
action_name = str(action_name)
action_client = actionlib.SimpleActionClient(action_name,
GripperCommandAction)
g = GripperCommandGoal()
g.command.position = float(position)
g.command.max_effort = float(max_effort)
if not action_client.wait_for_server(rospy.Duration(5)):
raise ServiceException('connection to grippercommand action'
' server with name: ' + action_name +
' could not be established')
run_action = generate_action_executor(action_client)
with LeaseBaseLock([action_name]):
response = await run_action(g)
if not response:
raise RuntimeError('Unexpected result received by'
' gripper command action: ' +
action_name)
return MoveGripperResult.from_gripper_command_action_result(response.result())
[docs] async def wsg_gripper_command(self, action_name, command, width, speed,
max_effort, stop_on_block=True):
"""
Controls a WeissWsg gripper via the the specific action WsgCommandAction
Parameters
----------
action_name : str convertable
Name of the action to control a specific gripper
command : WsgCommand
Specifies which kind of action should be performed
width : float convertable
Requested position of the gripper in meter
speed : float convetable
Requested speed in m/s
max_effort : float convertable
Force which should be applied
Results
-------
action_response : MoveGripperResult
Result of the action execution
Raises
------
TypeError
If inputs are not convertable to specified types
or if command is no of type WsgCommand
ServiceException
If action server is not available
RuntimeError
If action returns unexpected result
"""
from wsg_50_common.msg import CommandAction, CommandGoal
action_name = str(action_name)
if not isinstance(command, WsgCommand):
raise TypeError('command is not of expected type WsgCommand')
action_client = actionlib.SimpleActionClient(action_name,
CommandAction)
if not action_client.wait_for_server(rospy.Duration(5)):
raise ServiceException('connection to wsg gripper action'
' server with name: ' + action_name +
' could not be established')
g = CommandGoal()
g.command.command_id = command.value
g.command.width = float(width)
g.command.speed = float(speed)
g.command.force = float(max_effort)
g.command.stop_on_block = bool(stop_on_block)
run_action = generate_action_executor(action_client)
with LeaseBaseLock([action_name]):
response = await run_action(g)
if not response:
raise RuntimeError('Unexpected result received by'
' wsg gripper command action: ' +
action_name)
return WsgResult.from_wsg_command_action_result(response.result())