# world_view_client.py
#
# Copyright (c) 2018, Xamla and/or its affiliates. All rights reserved.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#!/usr/bin/env python3
import pathlib
from typing import Dict, List, Union
import moveit_msgs.msg as moveit_msgs
import rospy
from xamlamoveit_msgs.srv import (CreateFolderWorldView,
CreateFolderWorldViewRequest,
GetCartesianPathWorldView,
GetCartesianPathWorldViewRequest,
GetCollisionObjectWorldView,
GetCollisionObjectWorldViewRequest,
GetJointPostureWorldView,
GetJointPostureWorldViewRequest,
GetPoseWorldView, GetPoseWorldViewRequest,
QueryCartesianPathWorldView,
QueryCartesianPathWorldViewRequest,
QueryCollisionObjectWorldView,
QueryCollisionObjectWorldViewRequest,
QueryJointValuesWorldView,
QueryJointValuesWorldViewRequest,
QueryPosesWorldView,
QueryPosesWorldViewRequest,
RemoveElementWorldView,
RemoveElementWorldViewRequest,
SetCartesianPathWorldView,
SetCartesianPathWorldViewRequest,
SetCollisionObjectWorldView,
SetCollisionObjectWorldViewRequest,
SetJointPostureWorldView,
SetJointPostureWorldViewRequest,
SetPoseWorldView, SetPoseWorldViewRequest,
UpdateJointPostureWorldView,
UpdateJointPostureWorldViewRequest,
UpdatePoseWorldView,
UpdatePoseWorldViewRequest)
from ..data_types import CartesianPath, CollisionObject, JointValues, Pose
from ..xamla_motion_exceptions import ArgumentError, ServiceException
add_joint_values_srv_name = '/rosvita/world_view/add_joint_posture'
get_joint_values_srv_name = '/rosvita/world_view/get_joint_posture'
query_joint_values_srv_name = '/rosvita/world_view/query_joint_postures'
update_joint_values_srv_name = '/rosvita/world_view/update_joint_posture'
add_pose_srv_name = '/rosvita/world_view/add_pose'
get_pose_srv_name = '/rosvita/world_view/get_pose'
query_poses_srv_name = '/rosvita/world_view/query_poses'
update_pose_srv_name = '/rosvita/world_view/update_pose'
add_cartesian_path_srv_name = '/rosvita/world_view/set_cartesianpath'
get_cartesian_path_srv_name = '/rosvita/world_view/get_cartesianpath'
query_cartesian_paths_srv_name = '/rosvita/world_view/query_cartesianpath'
update_cartesian_path_srv_name = '/rosvita/world_view/update_cartesianpath'
add_collision_object_srv_name = '/rosvita/world_view/set_collisionobject'
get_collision_object_srv_name = '/rosvita/world_view/get_collisionobject'
query_collision_objects_srv_name = '/rosvita/world_view/query_collisionobject'
update_collision_object_srv_name = '/rosvita/world_view/update_collisionobject'
remove_element_srv_name = '/rosvita/world_view/remove_element'
add_folder_srv_name = '/rosvita/world_view/add_folder'
def _check_and_convert_element_path(element_path):
if not isinstance(element_path, pathlib.Path):
try:
element_path = pathlib.Path(element_path)
except TypeError as exc:
raise TypeError('element_path is not of expected type '
'pathlib.Path and can not beconverted'
' to it') from exc
return '/' / element_path
[docs]class WorldViewClient(object):
"""
Client to interact and manipulate the Rosvita WorldView
With help of this client class it is possible to add, get,
query and remove different kinds of elements to or from
the Rosvita world view.
Methods
-------
add_joint_values
Add / update joint values element in world view tree
get_joint_values
Get joint values element from world view tree
query_joint_values
Query all joint values under folder_path which start with prefix
add_pose
Add / update pose element in world view tree
get_pose
Get pose element from world view tree
query_poses
Query all existing poses under folder_path which start with prefix
add_cartesian_path
Add / update cartesian path element in world view tree
get_cartesian_path
Get cartesian path element from world view tree
query_cartesian_pahts
Query all existing castesian paths under folder_path which start with prefix
add_collision_object
Add / update collision object element in world view tree
get_collision_object
Get collision object element from world view tree
query_cartesian_pahts
Query all existing castesian paths under folder_path which start with prefix
"""
def __init__(self):
"""
Initialize WorldViewClient
Returns
-------
Instance of WorldViewClient
Raises
------
ServiceError
If connection to world view services
could not be established
"""
# initialize all service to handle joint values in world view
self.__add_joint_values_srv = rospy.ServiceProxy(
add_joint_values_srv_name,
SetJointPostureWorldView)
self.__get_joint_values_srv = rospy.ServiceProxy(
get_joint_values_srv_name,
GetJointPostureWorldView)
self.__query_joint_values_srv = rospy.ServiceProxy(
query_joint_values_srv_name,
QueryJointValuesWorldView)
self.__update_joint_values_srv = rospy.ServiceProxy(
update_joint_values_srv_name,
UpdateJointPostureWorldView)
# initialize all service to handle poses in world view
self.__add_pose_srv = rospy.ServiceProxy(
add_pose_srv_name,
SetPoseWorldView)
self.__get_pose_srv = rospy.ServiceProxy(
get_pose_srv_name,
GetPoseWorldView)
self.__query_poses_srv = rospy.ServiceProxy(
query_poses_srv_name,
QueryPosesWorldView)
self.__update_pose_srv = rospy.ServiceProxy(
update_pose_srv_name,
UpdatePoseWorldView)
# initialize all service to handle cartesian path in world view
self.__add_cartesian_path_srv = rospy.ServiceProxy(
add_cartesian_path_srv_name,
SetCartesianPathWorldView)
self.__get_cartesian_path_srv = rospy.ServiceProxy(
get_cartesian_path_srv_name,
GetCartesianPathWorldView)
self.__query_cartesian_paths_srv = rospy.ServiceProxy(
query_cartesian_paths_srv_name,
QueryCartesianPathWorldView)
self.__update_cartesian_path_srv = rospy.ServiceProxy(
update_cartesian_path_srv_name,
SetCartesianPathWorldView)
# initialize all service to handle collision objects in world view
self.__add_collision_object_srv = rospy.ServiceProxy(
add_collision_object_srv_name,
SetCollisionObjectWorldView)
self.__get_collision_object_srv = rospy.ServiceProxy(
get_collision_object_srv_name,
GetCollisionObjectWorldView)
self.__query_collision_objects_srv = rospy.ServiceProxy(
query_collision_objects_srv_name,
QueryCollisionObjectWorldView)
self.__update_collision_objects_srv = rospy.ServiceProxy(
update_collision_object_srv_name,
SetCollisionObjectWorldView)
# add folder service
self.__add_folder_srv = rospy.ServiceProxy(
add_folder_srv_name,
CreateFolderWorldView)
# remove element service
self.__remove_element_srv = rospy.ServiceProxy(
remove_element_srv_name,
RemoveElementWorldView)
def _call_update_joint_values(self,
element_path: pathlib.Path,
joint_values: JointValues,
transient: bool):
request = UpdateJointPostureWorldViewRequest()
request.display_name = element_path.name
request.element_path = str(element_path.parent)
request.point = joint_values.to_joint_values_point_msg()
request.transient = bool(transient)
try:
response = self.__update_joint_values_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(update_joint_values_srv_name)
) from exc
return response
def _call_update_pose(self,
element_path: pathlib.Path,
pose: Pose,
transient: bool):
request = UpdatePoseWorldViewRequest()
request.display_name = element_path.name
request.element_path = str(element_path.parent)
request.point = pose.to_posestamped_msg()
request.transient = bool(transient)
try:
response = self.__update_pose_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(update_pose_srv_name)
) from exc
return response
def _call_update_cartesian_path(self,
element_path: pathlib.Path,
cartesian_path: CartesianPath,
transient: bool):
request = SetCartesianPathWorldViewRequest()
request.display_name = element_path.name
request.element_path = str(element_path.parent)
request.path = cartesian_path.to_cartesian_path_msg()
request.transient = bool(transient)
try:
response = self.__update_cartesian_path_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(update_cartesian_path_srv_name)
) from exc
return response
def _call_update_collision_object(self,
element_path: pathlib.Path,
collision_object: CollisionObject,
transient: bool):
request = SetCollisionObjectWorldViewRequest()
request.display_name = element_path.name
request.element_path = str(element_path.parent)
request.collision_object = collision_object.to_collision_object_msg(
moveit_msgs.CollisionObject.ADD
)
request.transient = bool(transient)
try:
response = self.__update_collision_objects_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(update_collision_object_srv_name)
) from exc
return response
[docs] def add_joint_values(self, element_path: Union[str, pathlib.Path],
joint_values: JointValues,
transient: bool=False,
update_if_exists: bool=True):
"""
Add / update joint values element in world view tree
Parameters
----------
element_path : Union[str, pathlib.Path]
Path in world view tree where the new joint values element
should be created or update a existing one at this location
joint_values : JointValues
Contains new joint values
transient : bool (default False)
If True the added joint values are only valid for this session
and will not be saved in a rosvita project context
update_if_exists: bool (default True)
If True update the values if exists instead
of raising an exception
Raises
------
ServiceError
If necessary service is not available
ArgumentError
If it was not possible to add joint values
due to wrong path or element already exists
TypeError
If element_path is not of type pathlib.Path
and can not be converted to it
If joint_values is not of type JointValues
"""
element_path = _check_and_convert_element_path(element_path)
if not isinstance(joint_values, JointValues):
raise TypeError('joint_values is not of expected type JointValues')
response = None
if update_if_exists is True:
response = self._call_update_joint_values(
element_path,
joint_values,
transient
)
if response is not None and response.success is True:
return
request = SetJointPostureWorldViewRequest()
request.display_name = element_path.name
request.element_path = str(element_path.parent)
request.point = joint_values.to_joint_values_point_msg()
request.transient = bool(transient)
try:
response = self.__add_joint_values_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(add_joint_values_srv_name)
) from exc
if not response.success:
raise ArgumentError('service call of service: {}'
' was not successful,response with'
' error: {}'.format(add_joint_values_srv_name,
response.error))
[docs] def get_joint_values(self, element_path: Union[str, pathlib.Path]) -> JointValues:
"""
Get joint values element from world view tree
Parameters
----------
element_path : Union[str, pathlib.Path]
Full path of requested element in worldview tree
Returns
-------
joint_values : JointValues
Instance of JointValues with
the values which are defined in the
requested joint values element
Raises
------
ServiceError
If necessary service is not available
ArgumentError
If it was not possible to get joint values
due to wrong path or element not exists
TypeError
If element_path is not of type pathlib.Path
and can not be converted to it
"""
element_path = _check_and_convert_element_path(element_path)
request = GetJointPostureWorldViewRequest()
request.element_path = str(element_path)
try:
response = self.__get_joint_values_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(get_joint_values_srv_name)
) from exc
if not response.success:
raise ArgumentError('service call of service: {}'
' was not successful,response with'
' error: {}'.format(get_joint_values_srv_name,
response.error))
return JointValues.from_joint_values_point_msg(response.point)
[docs] def query_joint_values(self, folder_path: str, prefix: str='',
recursive: bool=False) -> Dict[pathlib.PurePath, JointValues]:
"""
Query all existing joint values elements under folder_path which start with prefix
Parameters
----------
folder_path : str
Path to folder in which the search should be performed
prefix : str (default empty string)
Query elements which start with this prefix
recursive : bool (default False)
If True query from folder path recursively
Returns
-------
result : List[JointValues]
List of JointValues in alphanumeric order see world view
Raises
------
ServiceError
If necessary service is not available
ArgumentError
If it was not possible to query joint values
due to not existing path
TypeError
If folder_path is not of type str
"""
if not isinstance(folder_path, str):
raise TypeError(
'folder_path is not of expected type str or is empty')
if not isinstance(prefix, str):
raise TypeError('path is not of expected type str')
if not isinstance(recursive, bool):
raise TypeError('recursive is not of expected type bool')
request = QueryJointValuesWorldViewRequest()
request.prefix = prefix
request.folder_path = folder_path
request.recursive = recursive
try:
response = self.__query_joint_values_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(query_joint_values_srv_name)
) from exc
if not response.success:
raise ArgumentError('service call of service: {}'
' was not successful,response with'
' error: {}'.format(query_joint_values_srv_name,
response.error))
if response.points:
result = {}
for n, e, p in zip(response.names,
response.element_paths,
response.points):
path = pathlib.PurePath(e)
result[path] = JointValues.from_joint_values_point_msg(p)
return result
else:
return {}
[docs] def add_pose(self, element_path: Union[str, pathlib.Path],
pose: Pose, transient: bool=False,
update_if_exists: bool=True):
"""
Add / store pose element in world view tree
Parameters
----------
element_path : Union[str, pathlib.Path]
Path in world view tree where the new pose element
should be created or update a existing one at this location
pose : Pose
Instance of pose which is added / stored
transient : bool
If True the added pose is only valid for this session
and will not be saved in a rosvita project context
update_if_exists: bool (default True)
If True update pose if it exists instead
of raising an exception
Raises
------
ServiceError
If necessary service is not available
ArgumentError
If it was not possible to add pose
due to wrong path or element already exists
TypeError
If element_path is not of type pathlib.Path
and can not be converted to it
If pose is not of type Pose
"""
element_path = _check_and_convert_element_path(element_path)
if not isinstance(pose, Pose):
raise TypeError('pose is not of expected type Pose')
response = None
if update_if_exists is True:
response = self._call_update_pose(
element_path,
pose,
transient
)
if response is not None and response.success is True:
return
print(element_path)
request = SetPoseWorldViewRequest()
request.display_name = element_path.name
request.element_path = str(element_path.parent)
request.point = pose.to_posestamped_msg()
request.transient = bool(transient)
try:
response = self.__add_pose_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(add_pose_srv_name)
) from exc
if not response.success:
raise ArgumentError('service call of service: {}'
' was not successful,response with'
' error: {}'.format(add_pose_srv_name,
response.error))
[docs] def get_pose(self, element_path: Union[str, pathlib.Path]) -> Pose:
"""
Get pose element from world view tree
Parameters
----------
element_path : Union[str, pathlib.Path]
Full path of requested element in worldview tree
Returns
-------
pose : Pose
Instance of Pose with
the values which are defined in the
requested pose element
Raises
------
ServiceError
If necessary service is not available
ArgumentError
If it was not possible to get pose
due to wrong path or element not exists
TypeError
If element_path is not of type pathlib.Path
and can not be converted to it
"""
element_path = _check_and_convert_element_path(element_path)
request = GetPoseWorldViewRequest()
request.element_path = str(element_path)
try:
response = self.__get_pose_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(get_pose_srv_name)
) from exc
if not response.success:
raise ArgumentError('service call of service: {}'
' was not successful,response with'
' error: {}'.format(get_pose_srv_name,
response.error))
return Pose.from_posestamped_msg(response.point)
[docs] def query_poses(self, folder_path: str, prefix: str='',
recursive: bool=False) -> List[Pose]:
"""
Query all existing pose elements under folder_path which start with prefix
Parameters
----------
folder_path : str
Path to folder in which the search should be performed
prefix : str (default empty string)
Query elements which start with this prefix
recursive : bool (default False)
If True query from folder path recursively
Returns
-------
result : List[Pose]
List of Pose in alphanumeric order see world view
Raises
------
ServiceError
If necessary service is not available
ArgumentError
If it was not possible to query pose
due to not existing path
TypeError
If folder_path is not of type str
"""
if not isinstance(folder_path, str):
raise TypeError(
'folder_path is not of expected type str or is empty')
if not isinstance(prefix, str):
raise TypeError('path is not of expected type str')
if not isinstance(recursive, bool):
raise TypeError('recursive is not of expected type bool')
request = QueryPosesWorldViewRequest()
request.prefix = prefix
request.folder_path = folder_path
request.recursive = recursive
try:
response = self.__query_poses_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(query_poses_srv_name)
) from exc
if not response.success:
raise ArgumentError('service call of service: {}'
' was not successful,response with'
' error: {}'.format(query_poses_srv_name,
response.error))
if response.points:
result = {}
for n, e, p in zip(response.names,
response.element_paths,
response.points):
path = pathlib.PurePath(e)
result[path] = Pose.from_posestamped_msg(p)
return result
else:
return {}
[docs] def add_cartesian_path(self, element_path: Union[str, pathlib.Path],
cartesian_path: CartesianPath, transient: bool=False,
update_if_exists: bool=True):
"""
Add / store cartesian path element in world view tree
Parameters
----------
element_path : Union[str, pathlib.Path]
Path in world view tree where the new cartesian path element
should be created or update a existing one at this location
cartesian_path : CartesianPath
Instance of cartesian path which is added / stored
transient : bool
If True the added cartesian path is only valid for this session
and will not be saved in a rosvita project context
update_if_exists: bool (default True)
If True update cartesian path if it exists instead
of raising an exception
Raises
------
ServiceError
If necessary service is not available
ArgumentError
If it was not possible to add cartesian path
due to wrong path or element already exists
TypeError
If element_path is not of type pathlib.Path
and can not be converted to it
If cartesian_path is not of type CartesianPath
"""
element_path = _check_and_convert_element_path(element_path)
if not isinstance(cartesian_path, CartesianPath):
raise TypeError(
'cartesian path is not of expected type CartesianPath')
if not isinstance(transient, bool):
raise TypeError('transient is not of expected type bool')
response = None
if update_if_exists is True:
response = self._call_update_cartesian_path(
element_path,
cartesian_path,
transient
)
if response is not None and response.success is True:
return
request = SetCartesianPathWorldViewRequest()
request.display_name = element_path.name
request.element_path = str(element_path.parent)
request.path = cartesian_path.to_cartesian_path_msg()
request.transient = bool(transient)
try:
response = self.__add_cartesian_path_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(add_cartesian_path_srv_name)
) from exc
if not response.success:
raise ArgumentError('service call of service: {}'
' was not successful,response with'
' error: {}'.format(add_cartesian_path_srv_name,
response.error))
[docs] def get_cartesian_path(self, element_path: Union[str, pathlib.Path]) -> CartesianPath:
"""
Get cartesian path element from world view tree
Parameters
----------
element_path : Union[str, pathlib.Path]
Full path of requested element in worldview tree
Returns
-------
cartesian_path : CartesianPath
Instance of CartesianPath with
the values which are defined in the
requested cartesian path element
Raises
------
ServiceError
If necessary service is not available
ArgumentError
If it was not possible to get cartesian path
due to wrong path or element not exists
TypeError
If element_path is not of type pathlib.Path
and can not be converted to it
"""
element_path = _check_and_convert_element_path(element_path)
request = GetCartesianPathWorldViewRequest()
request.element_path = str(element_path)
try:
response = self.__get_cartesian_path_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(get_cartesian_path_srv_name)
) from exc
if not response.success:
raise ArgumentError('service call of service: {}'
' was not successful,response with'
' error: {}'.format(get_cartesian_path_srv_name,
response.error))
return CartesianPath.from_cartesian_path_msg(response.path)
[docs] def query_cartesian_paths(self, folder_path: str, prefix: str='',
recursive: bool=False) -> List[CartesianPath]:
"""
Query all existing cartesian path elements under folder_path which start with prefix
Parameters
----------
folder_path : str
Path to folder in which the search should be performed
prefix : str (default empty string)
Query elements which start with this prefix
recursive : bool (default False)
If True query from folder path recursively
Returns
-------
result : List[CartesianPath]
List of CartesianPath in alphanumeric order see world view
Raises
------
ServiceError
If necessary service is not available
ArgumentError
If it was not possible to query cartesian path
due to not existing path
TypeError
If folder_path is not of type str
"""
if not isinstance(folder_path, str):
raise TypeError(
'folder_path is not of expected type str or is empty')
if not isinstance(prefix, str):
raise TypeError('path is not of expected type str')
if not isinstance(recursive, bool):
raise TypeError('recursive is not of expected type bool')
request = QueryCartesianPathWorldViewRequest()
request.prefix = prefix
request.folder_path = folder_path
request.recursive = recursive
try:
response = self.__query_cartesian_paths_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(query_cartesian_paths_srv_name)
) from exc
if not response.success:
raise ArgumentError('service call of service: {}'
' was not successful,response with'
' error: {}'.format(query_cartesian_paths_srv_name,
response.error))
if response.paths:
result = {}
for n, e, p in zip(response.names,
response.element_paths,
response.paths):
path = pathlib.PurePath(e)
result[path] = CartesianPath.from_cartesian_path_msg(p)
return result
else:
return {}
[docs] def add_collision_object(self, element_path: Union[str, pathlib.Path],
collision_object: CollisionObject, transient: bool=False,
update_if_exists: bool=True):
"""
Add / store collision object element in world view tree
Parameters
----------
element_path : Union[str, pathlib.Path]
Path in world view tree where the new collision object element
should be created or update a existing one at this location
collision_object : CollisionObject
Instance of collision object which is added or used for update
transient : bool
If True the added collision object is only valid for this session
and will not be saved in a rosvita project context
update_if_exists: bool (default True)
If True update cartesian path if it exists instead
of raising an exception
Raises
------
ServiceError
If necessary service is not available
ArgumentError
If it was not possible to add collision object
due to wrong path or element already exists
TypeError
If element_path is not of type pathlib.Path
and can not be converted to it
If collision_object is not of type CollisionObject
"""
element_path = _check_and_convert_element_path(element_path)
if not isinstance(collision_object, CollisionObject):
raise TypeError(
'collision object is not of expected type CollisionObject')
response = None
if update_if_exists is True:
response = self._call_update_collision_object(
element_path,
collision_object,
transient
)
if response is not None and response.success is True:
return
request = SetCollisionObjectWorldViewRequest()
request.collision_object = collision_object.to_collision_object_msg(
moveit_msgs.CollisionObject.ADD)
request.display_name = element_path.name
request.element_path = str(element_path.parent)
request.transient = bool(transient)
try:
response = self.__add_collision_object_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(add_collision_object_srv_name)
) from exc
if not response.success:
raise ArgumentError('service call of service: {}'
' was not successful,response with'
' error: {}'.format(add_collision_object_srv_name,
response.error))
[docs] def get_collision_object(self, element_path: Union[str, pathlib.Path]) -> CollisionObject:
"""
Get collision object element from world view tree
Parameters
----------
element_path : Union[str, pathlib.Path]
Full path of requested element in worldview tree
Returns
-------
collision_object : CollisionObject
Instance of CollisionObject with
the values which are defined in the
requested collision object element
Raises
------
ServiceError
If necessary service is not available
ArgumentError
If it was not possible to get collision object
due to wrong path or element not exists
TypeError
If element_path is not of type pathlib.Path
and can not be converted to it
"""
element_path = _check_and_convert_element_path(element_path)
request = GetCollisionObjectWorldViewRequest()
request.element_path = str(element_path)
try:
response = self.__get_collision_object_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(get_collision_object_srv_name)
) from exc
if not response.success:
raise ArgumentError('service call of service: {}'
' was not successful,response with'
' error: {}'.format(get_collision_object_srv_name,
response.error))
print(response.collision_object)
return CollisionObject.from_collision_object_msg(response.collision_object)
[docs] def query_collision_objects(self, folder_path: str, prefix: str='',
recursive: bool=False) -> List[CollisionObject]:
"""
Query all existing collision objet elements under folder_path which start with prefix
Parameters
----------
folder_path : str
Path to folder in which the search should be performed
prefix : str (default empty string)
Query elements which start with this prefix
recursive : bool (default False)
If True query from folder path recursively
Returns
-------
result : List[CollisionObject]
List of CollisionObject in alphanumeric order see world view
Raises
------
ServiceError
If necessary service is not available
ArgumentError
If it was not possible to query collision object
due to not existing path
TypeError
If folder_path is not of type str
"""
if not isinstance(folder_path, str):
raise TypeError(
'folder_path is not of expected type str or is empty')
if not isinstance(prefix, str):
raise TypeError('path is not of expected type str')
if not isinstance(recursive, bool):
raise TypeError('recursive is not of expected type bool')
request = QueryCollisionObjectWorldViewRequest()
request.prefix = prefix
request.folder_path = folder_path
request.recursive = recursive
try:
response = self.__query_collision_objects_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(query_collision_objects_srv_name)
) from exc
if not response.success:
raise ArgumentError('service call of service: {}'
' was not successful,response with'
' error: {}'.format(query_collision_objects_srv_name,
response.error))
if response.collision_objects:
result = {}
for n, e, p in zip(response.names,
response.element_paths,
response.collision_objects):
path = pathlib.PurePath(e)
result[path] = CollisionObject.from_collision_object_msg(p)
return result
else:
return {}
[docs] def add_folder(self, folder_path: Union[str, pathlib.Path],
raise_exception_if_exists: bool=False):
"""
Add a folder to world view tree
Parameters
----------
folder_path : Union[str, pathlib.Path]
Folder path in worldview tree
raise_exception_if_exists: bool (default False)
If True raise ArgumentError is folder already
exists als no exception is raised and folder
existing folder presists.
Raises
------
ServiceError
If necessary service is not available
ArgumentError
If it was not possible to add folder
input path not exists
TypeError
If element_path is not of type pathlib.Path
and can not be converted to it
"""
folder_path = _check_and_convert_element_path(folder_path)
request = CreateFolderWorldViewRequest()
request.folder_name = folder_path.name
request.folder_path = str(folder_path.parent)
try:
response = self.__add_folder_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(add_folder_srv_name)
) from exc
if not response.success and bool(raise_exception_if_exists) is True:
raise ArgumentError('service call of service: {}'
' was not successful,response with'
' error: {}'.format(add_folder_srv_name,
response.error))
[docs] def remove_element(self, element_path: Union[str, pathlib.Path],
raise_exception_if_not_exists: bool=False):
"""
Remove existing element from world view tree
Parameters
----------
element_path : Union[str, pathlib.Path]
Folder path in worldview tree
raise_exception_if_not_exists: bool default(False)
If True raise a ArgumentError if element which
should be removed not exists
Raises
------
ServiceError
If necessary service is not available
ArgumentError
If it was not possible to update collision object
due to element not exists
TypeError
If element_path is not of type pathlib.Path
and can not be converted to it
"""
element_path = _check_and_convert_element_path(element_path)
request = RemoveElementWorldViewRequest()
request.element_path = str(element_path)
try:
response = self.__remove_element_srv(request)
except rospy.ServiceException as exc:
raise ServiceException('service: {} is not available'
''.format(remove_element_srv_name)
) from exc
if not response.success and bool(raise_exception_if_not_exists) is True:
raise ArgumentError('service call of service: {}'
' was not successful,response with'
' error: {}'.format(remove_element_srv_name,
response.error))