APIs, concepts, guides, and more
Axis Streaming Motion

Learn to use the RapidCodeRemote API to do streaming motion.

This is a sample program to assist in the integration of the RMP motion controller with your application. It may not contain all of the logic and safety features that your application requires. We recommend that you wire an external hardware emergency stop (e-stop) button for safety when using our code sample apps. Doing so will help ensure the safety of you and those around you and will prevent potential injury or damage.

The sample apps assume that the system (network, axes, I/O) are configured prior to running the code featured in the sample app. See the Configuration page for more information.
The following helper function is used to check each response message for errors:
def do_rpc(rpc, request):
response = rpc(request)
num_errors = len(response.header.errors)
if (num_errors > 0):
raise Exception(f"Last RPC had {num_errors} errors! {response.header.errors}")
return response

📜 Stream a set of points to the axis
Pack a list of positions to command the axis to move to using MovePT().

axis_request = rapidcode.AxisRequest()
axis_request.index = 0
# Configure the axis
axis_request.config.user_units = 1000
# When working with a phantom axis, the axis' actual position will not change, so we tell the axis
# to not do anything when a position error occurs.
axis_request.config.error_limit.action = rsienums.RSIAction.RSIActionNONE
# Make sure the axis is ready for motion
axis_request.action.clear_faults.SetInParent() # Tell the server to clear the axis' faults.
axis_request.action.position_set.position = 0 # Tell the server to set the axis' current position to 0.
# Reset the motion ID to 0
axis_request.action.move.motion_id = orig_motion_id
# Helper function that adds the points to move to for the next second
def add_next_second(request, amplitude, final=False):
num_points = 100
streaming = request.action.move.streaming
# Only specifying positions and times tells the server to use `Axis::MovePT()`
streaming.positions.extend([math.sin(pos / num_points * 2 * math.pi) * amplitude for pos in range(num_points)])
streaming.times.extend([ 1 / num_points for _ in range(num_points) ])
streaming.empty_count = 50
streaming.retain = False
streaming.final = final
print(f"Commanding streaming move. Final={final}")
return request
# Send the first second's worth of points to the server
do_rpc(rmp_stub.Axis, add_next_second(axis_request, 1))
# Add the next second's points
axis_request2 = rapidcode.AxisRequest() # The default index value is 0
add_next_second(axis_request2, 2)
# We subtract 1 from the motion ID because the axis returns the ID of the next move.
def get_prev_id(response):
return response.status.motion_id - 1
prev_motion_id = get_prev_id(do_rpc(rmp_stub.Axis, axis_request2))
# TODO: remove, just for debug
def print_executing():
status = do_rpc(rmp_stub.Axis, rapidcode.AxisRequest()).status
mid_exec = status.motion_id_executing
# print(f"motion id exec: {mid_exec}, ele_exec: {status.element_id_executing}")
state = status.state
if state == rsienums.RSIStateERROR:
print(f"Error state: '{status.source_name}'")
return mid_exec
# Wait until the previous second has started (by checking the motion ID), then add more points.
# The added points will be appended to the points from the previous move.
for amplitude in range(3, 11):
counter = 0
# TODO: remove, just for debug
print(f"Waiting for motion id executing to equal: {prev_motion_id}")
# Send an empty AxisRequest (with default axis index 0).
# Retrieve the motion ID from the status message within the AxisResponse
while print_executing() < prev_motion_id:
# poll up to 11 times
if counter >= 1001:
if hasattr(self, 'fail'): self.fail(f"Motion id executing did not increment beyond {prev_motion_id} in time!")
counter += 1
# Send a new AxisRequest with the positions for the next second.
# Also use the returned AxisResponse to update the `previous_motion_id` variable.
# prev_motion_id = do_rpc(rmp_stub.Axis, add_next_second(rapidcode.AxisRequest(), amplitude, amplitude==10)).status.motion_id
prev_motion_id = get_prev_id(do_rpc(rmp_stub.Axis, add_next_second(rapidcode.AxisRequest(), amplitude, amplitude==10)))