EtherCAT RX error counter diagnostic utility for real-time network quality monitoring.
This utility reads the mandatory EtherCAT RX Error Counters from all network nodes following the EtherCAT Technology Group diagnostic specifications. It monitors communication quality by tracking both Frame Errors (CRC failures at data link layer) and Physical Layer Errors (invalid symbols at hardware level) for each slave port.
import os
import platform
import sys
from typing import Dict
if platform.system() == "Windows":
os.add_dll_directory("c:\\Program Files (x86)\\INtime\\bin")
import RapidCodeHelpers as helpers
rapidcode_dir = helpers.find_rapid_code_directory()
sys.path.append(rapidcode_dir)
import RapidCodePython as RapidCode
PORT_0_FRAME_ERROR_COUNTER_REG = 0x300
PORT_0_RX_ERROR_COUNTER_REG = 0x301
PORT_1_FRAME_ERROR_COUNTER_REG = 0x302
PORT_1_RX_ERROR_COUNTER_REG = 0x303
PORT_2_FRAME_ERROR_COUNTER_REG = 0x304
PORT_2_RX_ERROR_COUNTER_REG = 0x305
PORT_3_FRAME_ERROR_COUNTER_REG = 0x306
PORT_3_RX_ERROR_COUNTER_REG = 0x307
ERROR_COUNT_SUBINDEX = 0
ERROR_COUNT_BYTES = 1
def read_node_rx_error_counters(node) -> Dict[str, int]:
"""Read all RX error counters from a network node."""
error_counts = {}
try:
error_counts['Port0_FrameErrors'] = node.ServiceChannelRead(PORT_0_FRAME_ERROR_COUNTER_REG, ERROR_COUNT_SUBINDEX, ERROR_COUNT_BYTES)
error_counts['Port0_RxErrors'] = node.ServiceChannelRead(PORT_0_RX_ERROR_COUNTER_REG, ERROR_COUNT_SUBINDEX, ERROR_COUNT_BYTES)
error_counts['Port1_FrameErrors'] = node.ServiceChannelRead(PORT_1_FRAME_ERROR_COUNTER_REG, ERROR_COUNT_SUBINDEX, ERROR_COUNT_BYTES)
error_counts['Port1_RxErrors'] = node.ServiceChannelRead(PORT_1_RX_ERROR_COUNTER_REG, ERROR_COUNT_SUBINDEX, ERROR_COUNT_BYTES)
error_counts['Port2_FrameErrors'] = node.ServiceChannelRead(PORT_2_FRAME_ERROR_COUNTER_REG, ERROR_COUNT_SUBINDEX, ERROR_COUNT_BYTES)
error_counts['Port2_RxErrors'] = node.ServiceChannelRead(PORT_2_RX_ERROR_COUNTER_REG, ERROR_COUNT_SUBINDEX, ERROR_COUNT_BYTES)
error_counts['Port3_FrameErrors'] = node.ServiceChannelRead(PORT_3_FRAME_ERROR_COUNTER_REG, ERROR_COUNT_SUBINDEX, ERROR_COUNT_BYTES)
error_counts['Port3_RxErrors'] = node.ServiceChannelRead(PORT_3_RX_ERROR_COUNTER_REG, ERROR_COUNT_SUBINDEX, ERROR_COUNT_BYTES)
except Exception as ex:
print(f"Error reading node {node.NumberGet()}: {ex}")
for port in range(4):
error_counts[f'Port{port}_FrameErrors'] = 0
error_counts[f'Port{port}_RxErrors'] = 0
return error_counts
def display_network_error_diagnostics(controller: RapidCode.MotionController):
"""Display RX error diagnostic counters for all nodes in the network."""
nodes = []
max_node_name_length = 10
for i in range(controller.NetworkNodeCountGet()):
io = controller.IOGet(i)
helpers.check_errors(io)
if io.NetworkNode.Exists():
nodes.append(io.NetworkNode)
max_node_name_length = max(max_node_name_length, len(io.NetworkNode.NameGet()))
header_line = f"{'Node':^4} | {'Name':^{max_node_name_length}} | {'Port 0':^12} | {'Port 1':^12} | {'Port 2':^12} | {'Port 3':^12}"
subheader_line = f"{'':^4} | {'':^{max_node_name_length}} | {'Frame RX':^12} | {'Frame RX':^12} | {'Frame RX':^12} | {'Frame RX':^12}"
total_width = len(header_line)
print("\n" + "="*total_width)
print("ETHERCAT RX ERROR COUNTERS")
print(f"Network: {controller.NetworkNodeCountGet()} Nodes")
print("="*total_width)
print(header_line)
print(subheader_line)
print("-"*total_width)
for node in nodes:
node_index = node.NumberGet()
node_name = node.NameGet()[:max_node_name_length]
error_counts = read_node_rx_error_counters(node)
ports = []
for port_num in range(4):
frame_errors = error_counts[f'Port{port_num}_FrameErrors']
rx_errors = error_counts[f'Port{port_num}_RxErrors']
ports.append(f"{frame_errors:5d} {rx_errors:5d}")
print(f"{node_index:^4} | {node_name:<{max_node_name_length}} | {ports[0]:^12} | {ports[1]:^12} | {ports[2]:^12} | {ports[3]:^12}")
print("="*total_width)
def main():
print("EtherCAT RX Error Counter Diagnostics")
print("-"*40)
creation_params: RapidCode.CreationParameters = helpers.get_creation_parameters()
motion_controller: RapidCode.MotionController = RapidCode.MotionController.Create(creation_params)
print(f"MotionController creation error count: {motion_controller.ErrorLogCountGet()}")
helpers.check_errors(motion_controller)
print(f"RapidCode Version: {motion_controller.VersionGet()}")
print(f"Serial Number: {motion_controller.SerialNumberGet()}")
if motion_controller.NetworkStateGet() == RapidCode.RSINetworkState_RSINetworkStateOPERATIONAL:
display_network_error_diagnostics(motion_controller)
else:
print("\nNetwork is not in OPERATIONAL state. Current state:", motion_controller.NetworkStateGet())
print("Cannot read node error counts.")
motion_controller.Delete()