完整工程可从以下地址签出:
https://gitcode.net/coloreaglestdio/pcaphub.git
在调试嵌入式物联设备时,尤其是在多个以太网物联设备交错通信的情况下,很难通过在捉襟见肘的嵌入式系统上进行数据记录与调试。如果设备连接的是一般的消费交换机,以及工业的架上交换机(一般位于车间的集控箱内),把调试笔记本插到空闲端口,是无法看到其他端口上设备的大部分数据的。这是因为交换机为了带宽和效率,会“记住”哪些MAC地址出现在哪些端口上,从而自动按照目的地址实现了点对点交换。也就是说,插在空闲口上的PC只能看到一些广播消息,大部分的UDP和TCP包是看不到的。
在古老的年代,有一种设备叫做集线器(Hub)。Hub是一种广播式的线路组合设备,可以达到这个要求。但由于所有端口共享一个带宽,导致通信效率很差,现在已经买不到了。当代通常在交换机上进行包测试,用的是镜像端口。如果恰好没有这种工业交换机,或者需要很多的镜像端口,又该如何调试呢?针对这种调试需求,可以使用PCAP,把一台插有多个以太网端口的PC变成Hub,观察所有接口上的数据流。
本软件主要功能如下:
通过上述功能,就能够组成一个交换网络。
本软件运行在调试工作站上,进程为“pcapHub”。调试的常见场景如下图所示:
当进行调试时,把待调试的工业设备从工业交换机断开,并临时接入在调试工作站的网卡上。同时,也可以接驳一些便携机(如果工作站没有显示器)。为了保持与原有工业网络的联通,还需要引一路线缆,把调试工作站和工业交换机连接起来。
经过上述连接,调试工作站和便携机上就能看到待调试工业设备的所有通信。调试工作站PC上的两个关键数据结构是实时队列和窗口知识。这两个结构保证了低延迟(1毫秒)、不重复(防止反复抓取冗余数据造成流量风暴)。
本软件的实时队列是一个环形的预分配队列,一行一个pcap包,超过最大长度后绕回。队列每行一个tag_packages数据结构:
struct tag_packages{int from_id;int len;QByteArray data;};QVector global_buffer;
QAtomicInteger pcap_recv_pos;
data 虽然为 QByteArray,但实际已经被预分配空间。因此,global_buffer 是一个静态的内存资源。主要的工作特点:
上述步骤,可以确保高效的进行数据流转。
窗口知识记录着每个MAC地址连着哪个网口,是一个动态字典。其最外层数据结构是一个QMap,变量名称pcap_ports。Key是MAC地址(64位整形),Value是tag_portAssign结构。
struct tag_portAssign{int curr_id;quint64 mac;QString portName;QDateTime dtmLastAck;};static QMap pcap_ports;static QMutex mtx_ports;
这个结构有个显著的特点,就是标记了来源MAC最后一次活跃的时刻。如果此时刻很旧(超过5秒),那么来源MAC对应的设备就可能已经移除了。下一次这个来源MAC出现在别的网口时,该网口的入队逻辑就能够更新来源MAC的绑定关系到本网口。
具体说,在抓取包时,一个网卡抓获一个包,并得到一个来源MAC,遵循下面的伪代码规则来对待这个包,以避免重复抓取到刚刚写入的内容。
网口N 捕获新包,得到来源MAC
若:字典pcap_ports内查不到来源MAC(说明来源MAC就连接在本网口上,它第一次出现)在字典中添加新的MAC知识;入队,更新pcap_recv_pos;
否则:若:pcap_ports[来源MAC].curr_id==本网口ID入队,更新pcap_recv_pos;pcap_ports[来源MAC].dtmLastAck=NOW();否则:若 pcap_ports[来源MAC].dtmLastAck 比现在时间早5秒以上(说明当前字典知识是老的,可能网线拔了,换了网口)pcap_ports[来源MAC].curr_id=本网口ID;pcap_ports[来源MAC].portName=本网口名字;pcap_ports[来源MAC].dtmLastAck=NOW();判断结束;判断结束;
判断结束
在处理队列准备向本网卡推送包时,仅推送 tag_packages.from_id != 本网口ID 的包。
在启动交换时,分别为各个端口创建线程:
for (int i=0;icap_thread * recv = new cap_thread(this);recv->setRunner(std::bind(recv_loop,strName,id));cap_thread * send = new cap_thread(this);send->setRunner(std::bind(send_loop,strName,id));++id;}
在线程内部,进行全局的队列读写:
//2. Run Cap Thread on interface.void recv_loop(QString itstr, int id){while (!pcap_stop){pcap_t *handle = NULL;char errbuf[PCAP_ERRBUF_SIZE];handle = pcap_open_live(itstr, 65535, 1, 10, errbuf);const u_char *packet;struct pcap_pkthdr header;while (!pcap_stop){packet = pcap_next(handle, &header);if(packet){//Src MACquint64 mac_src = 0;memcpy_s(&mac_src,8,packet+6,6);bool MyPack = false;QDateTime dtm = QDateTime::currentDateTime();mtx_ports.lock();const bool newClientMac = pcap_ports.contains(mac_src);if (!newClientMac){tag_portAssign & newmac = pcap_ports[mac_src];newmac.mac = mac_src;newmac.curr_id = id;newmac.dtmLastAck = dtm;MyPack = true;}else{tag_portAssign & curport = pcap_ports[mac_src];if (curport.curr_id==id){curport.dtmLastAck = dtm;MyPack = true;}else if (pcap_ports[mac_src].dtmLastAck.msecsTo(dtm) >5000 ){curport.curr_id = id;curport.dtmLastAck = dtm;MyPack = true;}}mtx_ports.unlock();//Only Enqueue packs for etheraddrs connected to this port id.if (MyPack){quint64 pos = pcap_recv_pos++;global_buffer[pos % PCAPIO_BUFCNT].from_id = id;global_buffer[pos % PCAPIO_BUFCNT].len = header.len;memcpy_s(global_buffer[pos % PCAPIO_BUFCNT].data.data(),PCAPIO_MAXPACK,packet,header.len);}}}pcap_close(handle);}}//3. Run Send Thread on interfacevoid send_loop(QString itstr,int id){quint64 send_pos = pcap_recv_pos;pcap_t *handle = NULL;char errbuf[PCAP_ERRBUF_SIZE];handle = pcap_open_live(strDev.c_str(), 65535, 1, 10, errbuf);while (!pcap_stop){if (send_pos > pcap_recv_pos)send_pos = pcap_recv_pos;if (send_pos == pcap_recv_pos){QThread::usleep(500);continue;}int pos = send_pos % PCAPIO_BUFCNT;++send_pos;if (global_buffer[pos].from_id!=id){/* Send down the packet */pcap_sendpacket(handle, // Adapter(const unsigned char *)global_buffer[pos].data.constData(), global_buffer[pos].len // size);}}pcap_close(handle);}
注意,上述代码经过大量简化,完整版本直接参考Git仓库。