Learn to use the RapidCodeRemote API to do streaming motion.
axis_request = rapidcode.AxisRequest()
axis_request.index = 0
axis_request.config.user_units = 1000
axis_request.config.error_limit.action = rsienums.RSIAction.RSIActionNONE
axis_request.action.clear_faults.SetInParent()
axis_request.action.position_set.position = 0
axis_request.action.move.motion_id = orig_motion_id
def add_next_second(request, amplitude, final=False):
num_points = 100
streaming = request.action.move.streaming
streaming.positions.extend([math.sin(pos / num_points * 2 * math.pi) * amplitude for pos in range(num_points)])
streaming.times.extend([ 1 / num_points for _ in range(num_points) ])
streaming.empty_count = 50
streaming.retain = False
streaming.final = final
print(f"Commanding streaming move. Final={final}")
return request
do_rpc(rmp_stub.Axis, add_next_second(axis_request, 1))
axis_request2 = rapidcode.AxisRequest()
add_next_second(axis_request2, 2)
def get_prev_id(response):
return response.status.motion_id - 1
prev_motion_id = get_prev_id(do_rpc(rmp_stub.Axis, axis_request2))
def print_executing():
status = do_rpc(rmp_stub.Axis, rapidcode.AxisRequest()).status
mid_exec = status.motion_id_executing
state = status.state
if state == rsienums.RSIStateERROR:
print(f"Error state: '{status.source_name}'")
return mid_exec
for amplitude in range(3, 11):
counter = 0
print(f"Waiting for motion id executing to equal: {prev_motion_id}")
while print_executing() < prev_motion_id:
if counter >= 1001:
if hasattr(self, 'fail'): self.fail(f"Motion id executing did not increment beyond {prev_motion_id} in time!")
else:
counter += 1
time.sleep(0.001)
prev_motion_id = get_prev_id(do_rpc(rmp_stub.Axis, add_next_second(rapidcode.AxisRequest(), amplitude, amplitude==10)))