Created
November 30, 2019 11:49
-
-
Save suraj2596/d14fb28f7e92fc1fc50e7e2c07b60626 to your computer and use it in GitHub Desktop.
Revisions
-
suraj2596 created this gist
Nov 30, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,158 @@ #!/usr/bin/env python2 import time import numpy as np import rospy from sensor_msgs.msg import Imu import mavros_msgs from mavros_msgs.msg import ActuatorControl, State, Thrust from mavros_msgs.srv import CommandBool, SetMode from geometry_msgs.msg import PoseStamped from tf.transformations import euler_from_quaternion class Drone(): def __init__(self): rospy.init_node('test_node', anonymous=True) # Variables self.state = State() # ROS services service_timeout = 30 rospy.loginfo("waiting for ROS services") try: rospy.wait_for_service('mavros/cmd/arming', service_timeout) rospy.wait_for_service('mavros/set_mode', service_timeout) rospy.loginfo("ROS services are up") except rospy.ROSException: self.fail("failed to connect to services") self.set_arming_srv = rospy.ServiceProxy('mavros/cmd/arming',CommandBool) self.set_mode_srv = rospy.ServiceProxy('mavros/set_mode', SetMode) # ROS Subscribers rospy.Subscriber("/mavros/state", State, self.state_callback) # rospy.Subscriber("/mavros/imu/data", Imu, self.imu_callback) # ROS publishers # self.thrust_pub = rospy.Publisher("/mavros/setpoint_attitude/thrust", Thrust, queue_size=10) # self.actuator_pub = rospy.Publisher("/mavros/target_actuator_control", ActuatorControl, queue_size=10) self.rate = rospy.Rate(50) def set_arm(self, arm, timeout): """arm: True to arm or False to disarm, timeout(int): seconds""" rospy.loginfo("setting FCU arm: {}".format(arm)) old_arm = self.state.armed loop_freq = 1 # Hz rate = rospy.Rate(loop_freq) arm_set = False for i in xrange(timeout * loop_freq): if self.state.armed == arm: arm_set = True rospy.loginfo("set arm success | seconds: {0} of {1}".format( i / loop_freq, timeout)) break else: try: res = self.set_arming_srv(arm) if not res.success: rospy.logerr("failed to send arm command") except rospy.ServiceException as e: rospy.logerr(e) # try: rate.sleep() # except rospy.ROSException as e: # self.fail(e) # self.assertTrue(arm_set, ( # "failed to set arm | new arm: {0}, old arm: {1} | timeout(seconds): {2}". # format(arm, old_arm, timeout))) def set_mode(self, mode, timeout): """mode: PX4 mode string, timeout(int): seconds""" rospy.loginfo("setting FCU mode: {0}".format(mode)) old_mode = self.state.mode loop_freq = 1 # Hz rate = rospy.Rate(loop_freq) mode_set = False for i in xrange(timeout * loop_freq): if self.state.mode == mode: mode_set = True rospy.loginfo("set mode success | seconds: {0} of {1}".format( i / loop_freq, timeout)) break else: try: res = self.set_mode_srv(0, mode) # 0 is custom mode if not res.mode_sent: rospy.logerr("failed to send mode command") except rospy.ServiceException as e: rospy.logerr(e) # try: rate.sleep() # except rospy.ROSException as e: # self.fail(e) # self.assertTrue(mode_set, ( # "failed to set mode | new mode: {0}, old mode: {1} | timeout(seconds): {2}". # format(mode, old_mode, timeout))) # CALLBACKS def state_callback(self, data): self.state = data print(self.state.mode) def imu_callback(self, data): print(data.mode) # rospy.loginfo(rospy.get_caller_id() + "\nlinear acceleration:\nx: [{}]\ny: [{}]\nz: [{}]" # .format(data.linear_acceleration.x, data.linear_acceleration.y, data.linear_acceleration.z)) # def command_actuator(self, a, inputs): # a.header.stamp = rospy.Time.now() # a.group_mix = 2 # ActuatorControl.PX4_MIX_PAYLOAD # a.controls = inputs if __name__ == '__main__': drone1 = Drone() # Wait for heartbeat # send junky setpoints junkmsg = Thrust() junkmsg.thrust = 0.5 # print(junkmsg) thrust_pub = rospy.Publisher("mavros/setpoint_attitude/thrust", Thrust, queue_size=3) time.sleep(1) # thrust_pub.publish(junkmsg) for i in range(100): time.sleep(0.05) thrust_pub.publish(junkmsg) # set mode to OFFBOARD drone1.set_mode("OFFBOARD", 10) # time.sleep(2) # Arm the drone # drone1.set_arm(True,5) # time.sleep(2) # Initialize Actuator # drone1.actuator_control_message_1 = ActuatorControl() # # Set desired rpm # inputs = np.array((0.0, 0.3, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)) # drone1.actuator_control_message_1.header.stamp = rospy.Time.now() # drone1.actuator_control_message_1.group_mix = 3 # ActuatorControl.PX4_MIX_PAYLOAD # drone1.actuator_control_message_1.controls = inputs # drone1.actuator_pub.publish(drone1.actuator_control_message_1) # # listener() rospy.spin()