Skip to content

Instantly share code, notes, and snippets.

@suraj2596
Created November 30, 2019 11:49
Show Gist options
  • Select an option

  • Save suraj2596/d14fb28f7e92fc1fc50e7e2c07b60626 to your computer and use it in GitHub Desktop.

Select an option

Save suraj2596/d14fb28f7e92fc1fc50e7e2c07b60626 to your computer and use it in GitHub Desktop.

Revisions

  1. suraj2596 created this gist Nov 30, 2019.
    158 changes: 158 additions & 0 deletions test.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,158 @@
    #!/usr/bin/env python2

    import time
    import numpy as np

    import rospy
    from sensor_msgs.msg import Imu
    import mavros_msgs
    from mavros_msgs.msg import ActuatorControl, State, Thrust
    from mavros_msgs.srv import CommandBool, SetMode
    from geometry_msgs.msg import PoseStamped
    from tf.transformations import euler_from_quaternion

    class Drone():
    def __init__(self):
    rospy.init_node('test_node', anonymous=True)

    # Variables
    self.state = State()

    # ROS services
    service_timeout = 30
    rospy.loginfo("waiting for ROS services")
    try:
    rospy.wait_for_service('mavros/cmd/arming', service_timeout)
    rospy.wait_for_service('mavros/set_mode', service_timeout)
    rospy.loginfo("ROS services are up")
    except rospy.ROSException:
    self.fail("failed to connect to services")

    self.set_arming_srv = rospy.ServiceProxy('mavros/cmd/arming',CommandBool)
    self.set_mode_srv = rospy.ServiceProxy('mavros/set_mode', SetMode)

    # ROS Subscribers
    rospy.Subscriber("/mavros/state", State, self.state_callback)
    # rospy.Subscriber("/mavros/imu/data", Imu, self.imu_callback)

    # ROS publishers
    # self.thrust_pub = rospy.Publisher("/mavros/setpoint_attitude/thrust", Thrust, queue_size=10)
    # self.actuator_pub = rospy.Publisher("/mavros/target_actuator_control", ActuatorControl, queue_size=10)

    self.rate = rospy.Rate(50)

    def set_arm(self, arm, timeout):
    """arm: True to arm or False to disarm, timeout(int): seconds"""
    rospy.loginfo("setting FCU arm: {}".format(arm))
    old_arm = self.state.armed
    loop_freq = 1 # Hz
    rate = rospy.Rate(loop_freq)
    arm_set = False
    for i in xrange(timeout * loop_freq):
    if self.state.armed == arm:
    arm_set = True
    rospy.loginfo("set arm success | seconds: {0} of {1}".format(
    i / loop_freq, timeout))
    break
    else:
    try:
    res = self.set_arming_srv(arm)
    if not res.success:
    rospy.logerr("failed to send arm command")
    except rospy.ServiceException as e:
    rospy.logerr(e)

    # try:
    rate.sleep()
    # except rospy.ROSException as e:
    # self.fail(e)

    # self.assertTrue(arm_set, (
    # "failed to set arm | new arm: {0}, old arm: {1} | timeout(seconds): {2}".
    # format(arm, old_arm, timeout)))

    def set_mode(self, mode, timeout):
    """mode: PX4 mode string, timeout(int): seconds"""
    rospy.loginfo("setting FCU mode: {0}".format(mode))
    old_mode = self.state.mode
    loop_freq = 1 # Hz
    rate = rospy.Rate(loop_freq)
    mode_set = False
    for i in xrange(timeout * loop_freq):
    if self.state.mode == mode:
    mode_set = True
    rospy.loginfo("set mode success | seconds: {0} of {1}".format(
    i / loop_freq, timeout))
    break
    else:
    try:
    res = self.set_mode_srv(0, mode) # 0 is custom mode
    if not res.mode_sent:
    rospy.logerr("failed to send mode command")
    except rospy.ServiceException as e:
    rospy.logerr(e)

    # try:
    rate.sleep()
    # except rospy.ROSException as e:
    # self.fail(e)

    # self.assertTrue(mode_set, (
    # "failed to set mode | new mode: {0}, old mode: {1} | timeout(seconds): {2}".
    # format(mode, old_mode, timeout)))

    # CALLBACKS
    def state_callback(self, data):
    self.state = data
    print(self.state.mode)

    def imu_callback(self, data):
    print(data.mode)
    # rospy.loginfo(rospy.get_caller_id() + "\nlinear acceleration:\nx: [{}]\ny: [{}]\nz: [{}]"
    # .format(data.linear_acceleration.x, data.linear_acceleration.y, data.linear_acceleration.z))

    # def command_actuator(self, a, inputs):
    # a.header.stamp = rospy.Time.now()
    # a.group_mix = 2 # ActuatorControl.PX4_MIX_PAYLOAD
    # a.controls = inputs

    if __name__ == '__main__':

    drone1 = Drone()
    # Wait for heartbeat

    # send junky setpoints

    junkmsg = Thrust()
    junkmsg.thrust = 0.5
    # print(junkmsg)
    thrust_pub = rospy.Publisher("mavros/setpoint_attitude/thrust", Thrust, queue_size=3)
    time.sleep(1)
    # thrust_pub.publish(junkmsg)
    for i in range(100):
    time.sleep(0.05)
    thrust_pub.publish(junkmsg)

    # set mode to OFFBOARD
    drone1.set_mode("OFFBOARD", 10)
    # time.sleep(2)

    # Arm the drone
    # drone1.set_arm(True,5)
    # time.sleep(2)
    # Initialize Actuator

    # drone1.actuator_control_message_1 = ActuatorControl()

    # # Set desired rpm
    # inputs = np.array((0.0, 0.3, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))

    # drone1.actuator_control_message_1.header.stamp = rospy.Time.now()
    # drone1.actuator_control_message_1.group_mix = 3 # ActuatorControl.PX4_MIX_PAYLOAD
    # drone1.actuator_control_message_1.controls = inputs

    # drone1.actuator_pub.publish(drone1.actuator_control_message_1)
    # # listener()


    rospy.spin()