搭建基于Foxglove的时序数据播放环境
文章目录
简要介绍如何配置Foxglove可视化工具的前后端环境以便实现在项目中以多种方式播放采集的时序数据。
需求背景
在自动驾驶相关的项目中经常会对采集的时序数据进行可视化播放展示,常见展示方式包括纯文本
、图片
、表格
、图表
、地图
、3D
等展示形式,之前团队内部对于这种需求都是前后端配合自己开发相关的UI组件进行展示。但随着要展示数据的增多以及用户需求的多变,采用自行开发的方式已经力不从心,迫切的需要一款能同时支持多路数据以不同方式播放展示的工具来减轻研发压力,提升系统稳定性与可靠性。
在切换为新的可视化工具之前,内部确定了如下几个指标:
- 能同时支持多路数据播放,可方便的添加与配置
- 支持
纯文本
、表格
、图表
、地图
、3D
、ROS
、图片
、视频
等形式播放 - 相关实现方案不是很小众,在有使用问题时能有相关途径寻找解决方案
- 软件License许可能允许商用,且软件代码开源以便能进行二次开发
软件概览
经过多方对比以及参考业界其它公司相关的方案后,最终决定采用基于Foxglove
作为对应的可视化工具替代实现,其具有如下特性:
- 核心是基于
React
实现的浏览器播放,server
端只需根据规范返回对应格式的数据即可,不限制server
端的编程语言和实现方式,前后端通信主要基于WebSocket
实现 - 预先定义了几十种数据格式,基于这些数据格式和实际业务需求可组合成多种不同的数据播放源,每个数据播放源都对应为一个
panel
- 支持多种类型的播放形式,除了前述的几种类型,其还支持
滑块
、服务
,日志
、状态图
、主题图
等十几种样式,且可根据实际需求自行开发第三方的插件并很容易的整合到系统中,在Foxglove
实际播放时每种播放类型都对应为一个panel
- 支持多通道播放,可对同一类型的
panel
可基于多个不同的数据源创建多个不同的panel
来同时播放,也可同时配置多个不同类型的panel
进行同时播放 - 支持界面自定义布局与调整,可根据实际需求灵活调整不同的
panel
的展示位置以及宽高等信息,当关闭某个panel
时整个播放界面会自动调整已达到最佳显示效果 - 在创建
panel
并关联server
端相关的topic
或关闭配置好的panel
后都会给server
端发送对应的通知,方便进行server
端开启或关闭数据推送服务,减少不必要的压力。同时,重复配置的panel
只会建立一个连接,减少页面端的负载 - 软件许可基于Mozilla Public License 2.0,可用于商业化的产品中1
- 支持在线版本和本地版本,在线版本需要付费,本地版本可通过
Docker
安装
Foxglove
的使用流程如下图所示:
- 首先要创建对应的
topic
,每个topic
都代表某种类型的展示数据,除了纯文本和表格之外,对于地图、3D、图片的展示方式需要根据官方文档设置对应的schema
topic
创建完毕后要进行注册,只有注册完毕的topic
才能被对应的panel
使用- 创建
panel
,panel
是数据展示的基本单位,其中纯文本和表格类型可兼容所有的topic
,而对于地图、3D、图片类型则需要与特定schema
的topic
关联 - 创建完
panel
后,需要将其关联到对应的topic
,并根据实际情况进行针对性的设置,如3D场景下设置显示的物体,地图场景下设置使用的地图源 - 启动对应的server端,持续不停的给相关
topic
写入数据 - 若一切正常,在
Foxglvoe
前端界面上对应的panel
中会实时2的展示server端发送来的数据 - 若不需要某个
panel
,直接关闭即可,此时Foxglove
会自动关闭相关的WebSocket
连接
在个人项目使用中Server
端的数据来源是基于Kafka
实现的,相关的流程如下
前后端配置
本章节以UI端本地私有化安装为例,说明如何配置Foxglove
前后端通信的环境,server
端采用Java
实现。
服务端安装
- 根据
Foxglove
官方网站的说明文档以及相关的Java Demo编写对应的server
端项目,暴露WebSocket
端口,此处假设其端口为8765 - 启动
server
端程序,则对应的WebSocket
访问地址为http://127.0.0.1:8765
页面端安装
-
在
Foxglove
对应的GitHub地址上有其UI端的安装说明,主要采用Docker
安装,相关的指令如下3# 官方原始的说明 docker run --rm -p "8080:8080" ghcr.io/foxglove/studio:latest # 个人新保存的镜像 docker run --rm -p "8080:8080" lucumt/foxglove_studio:1.74.1
-
上述指令安装完成之后,在浏览器中输入
http://127.0.0.1:8080
会打开类似如下界面,在打开数据源
对话框中选择最下面的打开连接
-
在弹出的对话框中添加前面使用的
Websocket
地址,之后点击open
按钮关闭该对话框 -
若连接配置正确,在左侧会出现相关的topic列表,类似如下图所示,至此
Foxglove
前后端通信的环境初步搭建完毕
服务器端编写
由于Foxglove
官方已经提供了相应的Docker
镜像可直接运行,除非需要自定义开发插件,通常不涉及到对UI端的操作,我们使用Foxglove
时更多的工作还是集中在server
端。
本文相关的完整代码参见foxglove-websocket-java。
topic创建
-
Foxglove
前后端通信主要基于WebSocket
实现,本文采用Netty
提供的WebSocket
工具类来简化使用,对应的Maven
版本为<dependency> <groupId>org.yeauty</groupId> <artifactId>netty-websocket-spring-boot-starter</artifactId> <version>0.12.0</version> </dependency>
-
添加一个名为
FoxgloveServer
的类用于暴露WebSocket
端口,此时在程序启动后前后端已具备初步的通信能力@Slf4j @ServerEndpoint(port = "8765") public class FoxgloveServer { public static final String EMPTY_CHASSIS_CODE = "NaN"; @BeforeHandshake public void handshake(Session session) { log.info("----------session信息" + session.toString()); session.setSubprotocols("foxglove.websocket.v1"); } @OnOpen public void onOpen(Session session) { log.info("这是一次新的链接 ---》 new connection" + session.hashCode()); } @OnClose public void onClose(Session session) throws IOException { // 从对象集合中删除该连接对象 log.info("-------one connection closed"); session.close(); } @OnError public void onError(Session session, Throwable throwable) { throwable.printStackTrace(); } @OnMessage public void onMessage(Session session, String message) { JSONObject msg = JSON.parseObject(message); String op = msg.getString("op"); log.info("-------------on open msg:\t" + message); } @OnBinary public void onBinary(Session session, byte[] bytes) { // 这里接收到用户指令 String data = new String(Arrays.copyOfRange(bytes, 5, bytes.length)); JSONObject message = JSON.parseObject(data); log.info("--------binary message:\t" + message); } }
-
建立一个名为
ChannelInfo
的topic
类,代码如下所示,需要注意的是类名必须为ChannelInfo
,否则最后生成的JSON
格式错误会导致topic
无法注册4@Data public class ChannelInfo { /** * topic id,在注册时指定其值 */ private Integer id; /** * topic名称 */ private String topic; /** * topic编码,如json,protobuf等 */ private String encoding; /** * topic描述 */ private String schemaName; /** * topic对应的数据结构,可以是自定义的,也可以是foxglove官方定义好的 */ private String schema; /** * 对用结构的表示形式,如json时为jsonschema */ private String schemaEncoding; }
-
添加一个名为
ChannelUtil
的类,用于创建对应的topic
public class ChannelUtil { public static List<ChannelInfo> createChannels() { String schema; ChannelInfo channelChassis = new ChannelInfo(); channelChassis.setId(0); channelChassis.setTopic("/drive/chassis_code"); channelChassis.setEncoding("json"); channelChassis.setSchemaName("底盘编码(用于切换播放源)"); channelChassis.setSchema("{\"type\": \"object\", \"properties\": {\"chassis_code\": {\"type\": \"string\"}}}"); channelChassis.setSchemaEncoding("jsonschema"); ChannelInfo channelControl = createControlData(1); ChannelInfo channel3D = new ChannelInfo(); channel3D.setId(2); channel3D.setTopic("/drive/3D"); channel3D.setEncoding("json"); channel3D.setSchemaName("foxglove.SceneUpdate"); schema = DataUtil.loadJsonSchema("SceneUpdate.json"); channel3D.setSchema(schema); channel3D.setSchemaEncoding("jsonschema"); ChannelInfo channelGPS = new ChannelInfo(); channelGPS.setId(3); channelGPS.setTopic("/drive/map"); channelGPS.setEncoding("json"); channelGPS.setSchemaName("foxglove.LocationFix"); schema = DataUtil.loadJsonSchema("LocationFix.json"); channelGPS.setSchema(schema); channelGPS.setSchemaEncoding("jsonschema"); List<ChannelInfo> channelList = new ArrayList<>(); channelList.add(channelChassis); channelList.add(channelControl); channelList.add(channel3D); channelList.add(channelGPS); return channelList; } private static ChannelInfo createControlData(int index) { ChannelInfo channelControl = new ChannelInfo(); channelControl.setId(index); channelControl.setTopic("/drive/control_data"); channelControl.setEncoding("json"); channelControl.setSchemaName("控制数据信号"); StringBuffer sb = new StringBuffer(); sb.append("{\"type\": \"object\", \"properties\":{"); sb.append("\"底盘号\": {\"type\": \"string\"},"); sb.append("\"终端id\": {\"type\": \"string\"},"); sb.append("\"时间\": {\"type\": \"string\"},"); sb.append("\"纬度\": {\"type\": \"string\"},"); sb.append("\"经度\": {\"type\": \"string\"},"); sb.append("\"海拔\": {\"type\": \"string\"},"); sb.append("\"变速箱输出轴转速\": {\"type\": \"string\"},"); sb.append("\"制动系统准备可以释放\": {\"type\": \"string\"},"); sb.append("\"角速度(rad/s)\": {\"type\": \"string\"},"); sb.append("\"AX(m/s^2)\": {\"type\": \"string\"},"); sb.append("\"AY(m/s^2)\": {\"type\": \"string\"},"); sb.append("\"总驱动力\": {\"type\": \"string\"}"); sb.append("}}"); channelControl.setSchema(sb.toString()); channelControl.setSchemaEncoding("jsonschema"); return channelControl; } }
-
对前述的
FoxgloveServer
类中的onOpen
方法添加类似如下代码,用于真正的注册相关的topic
,可注册的topic
数目可根据实际需求添加任意多个@OnOpen public void onOpen(Session session) { log.info("这是一次新的链接 ---》 new connection" + session.hashCode()); this.session = session; ServerInfo serverInfo = new ServerInfo(); serverInfo.setOp("serverInfo"); serverInfo.setName("foxglove data render"); serverInfo.setCapabilities(Arrays.asList("clientPublish", "services")); serverInfo.setSupportedEncodings(Arrays.asList("json")); String severInfoString = JSON.toJSONString(serverInfo); session.sendText(severInfoString); Advertise advertise = new Advertise(); advertise.setOp("advertise"); advertise.setChannels(ChannelUtil.createChannels()); session.sendText(JSON.toJSONString(advertise)); }
-
若连接配置正确,在UI界面的左侧会出现相关的
topic
列表,如前述步骤所示,至此topic
的创建与注册完成
数据发送
本章节以发送纯文本
类型的消息为例说明如何在server
端编写代码实现数据发送
-
topic
注册完毕后,不能直接使用,需要创建panel
并选中对应的topic
,此时UI端会给server
端通过WebSocket
协议发送对应的事件消息,需修改FoxgloveServer
中的onMessage
方法@OnMessage public void onMessage(Session session, String message) { JSONObject msg = JSON.parseObject(message); String op = msg.getString("op"); log.info("-------------on open msg:\t" + message); switch (op) { case "subscribe": // 创建连接时会执行此处逻辑 this.createThread(msg); break; case "unsubscribe": break; } } private void createThread(JSONObject msg) { List<Subscription> subscribeList = msg.getObject("subscriptions", new TypeReference<List<Subscription>>() { }); log.info("============开始创建基于channel的数据发送线程==============" + subscribeList); for (Subscription sub : subscribeList) { Integer channelId = sub.getChannelId(); SendDataThread thread = getKafkaSendThread(sub.getId(), channelId, session); String threadName = "thread-" + getTopicName(channelId) + "-" + RandomStringUtils.randomAlphabetic(6).toLowerCase(); thread.setName(threadName); thread.setChassisCode(chassisCode == null ? EMPTY_CHASSIS_CODE : chassisCode); thread.start(); threadMap.put(channelId, thread); } }
其中
SendDataThread
类是一个继承自Thread
类,用于封装Kafka
的数据发送流程,具体的发送逻辑需要再次继承SendDataThread
类来实现。 -
在前述
topic
列表中纯文本
对应的名称为/drive/chassis_code
,对应的数据结构如下{ "type": "object", "properties": { "chassis_code": { "type": "string" } } }
则可继承
SendDataThread
编写相应的实现类@Slf4j public class SendChassisThread extends SendDataThread { private DataConfig dataConfig; public SendChassisThread(int index, Session session) { super(index, session); this.dataConfig = AppCtxUtil.getBean(DataConfig.class); this.frequency = dataConfig.getChassis().getFrequency(); } @Override public void run() { while (running) { try { ChassisInfo chassis = new ChassisInfo(); chassis.setTimestamp(DateUtil.createTimestamp()); chassis.setChassisCode(RandomStringUtils.randomAlphanumeric(6).toUpperCase()); JSONObject jsonObject = (JSONObject) JSONObject.toJSON(chassis); byte[] bytes = jsonObject.toJSONString().getBytes(); this.session.sendBinary(bytes); log.info("---------------chassis info:\t" + chassis); sleep(frequency); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }
对应的
ChassisInfo
类代码如下@Data public class ChassisInfo { @JsonProperty("底盘号") private String chassisCode; @JsonProperty("时间戳") private Timestamp timestamp; }
-
启动
server
端程序,在Foxglove
UI端按照下图所示点击创建按钮,在出现的panel
列表中选择原始消息(英文环境下为Raw Message),用于展示纯文本消息 -
在出现的纯文本
panel
的顶部选择对应的topic
,此处为/drive/chassis_code
,执行到这一步后理论上就能正常播放数据,但在浏览器中查看结果时,会呈现出类似下图所示的效果,文本消息并没有正常展示。 -
从上面报错的初步信息可知是UI端在解析二进制文件时出错,与官方Demo的
WebSocket
数据包进行对比,折腾一段时间后其原因是Foxglove
有自己特定的编码算法,不能直接返回生成的原始消息,需要对其基于byte
进行编码,之后UI端自行解码。相关的编码代码如下package com.visualization.foxglove.util; import com.alibaba.fastjson.JSON; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.IOUtils; import java.io.IOException; import java.io.InputStream; import java.util.Map; public class DataUtil { public static byte[] getFormattedBytes(byte[] data, int channel) { return getFormattedBytes(data, 0, channel); } public static byte[] getFormattedBytes(byte[] data, long ns, int channel) { byte constantInfo = 1; byte[] constantInfoByte = new byte[]{constantInfo}; byte[] dataType = getIntBytes(channel); byte[] nsTime = getLongBytes(ns); byte[] pack1 = byteConcat(constantInfoByte, dataType, nsTime); byte[] pack2 = byteConcat(pack1, data); return pack2; } public static byte[] loadGlbData(String glbFile) { try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); InputStream stream = classLoader.getResourceAsStream("glb/" + glbFile); byte[] bytes = IOUtils.toByteArray(stream); return bytes; } catch (IOException e) { e.printStackTrace(); } return null; } public static String loadJsonSchema(String schemaFile) { ObjectMapper objectMapper = new ObjectMapper(); Map map = null; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); InputStream stream = classLoader.getResourceAsStream("schema/" + schemaFile); map = objectMapper.readValue(stream, Map.class); } catch (IOException e) { e.printStackTrace(); } return JSON.toJSONString(map); } public static final long fx = 0xffL; /** * int 转 byte[] * 小端 * * @param data * @return */ public static byte[] getIntBytes(int data) { int length = 4; byte[] bytes = new byte[length]; for (int i = 0; i < length; i++) { bytes[i] = (byte) ((data >> (i * 8)) & fx); } return bytes; } /** * long 转 byte[] * 小端 * * @param data * @return */ public static byte[] getLongBytes(long data) { int length = 8; byte[] bytes = new byte[length]; for (int i = 0; i < length; i++) { bytes[i] = (byte) ((data >> (i * 8)) & fx); } return bytes; } public static byte[] byteConcat(byte[] bt1, byte[] bt2) { byte[] bt4 = new byte[bt1.length + bt2.length]; int len = 0; System.arraycopy(bt1, 0, bt4, 0, bt1.length); len += bt1.length; System.arraycopy(bt2, 0, bt4, len, bt2.length); return bt4; } public static byte[] byteConcat(byte[] bt1, byte[] bt2, byte[] bt3) { byte[] bt4 = new byte[bt1.length + bt2.length + bt3.length]; int len = 0; System.arraycopy(bt1, 0, bt4, 0, bt1.length); len += bt1.length; System.arraycopy(bt2, 0, bt4, len, bt2.length); len += bt2.length; System.arraycopy(bt3, 0, bt4, len, bt3.length); return bt4; } }
-
将数据发送的代码修改如下,并重新启动
server
端服务@Override public void run() { while (running) { try { ChassisInfo chassis = new ChassisInfo(); chassis.setTimestamp(DateUtil.createTimestamp()); chassis.setChassisCode(RandomStringUtils.randomAlphanumeric(6).toUpperCase()); JSONObject jsonObject = (JSONObject) JSONObject.toJSON(chassis); // 需要调用对应的编码函数 byte[] bytes = DataUtil.getFormattedBytes(jsonObject.toJSONString().getBytes(), index); this.session.sendBinary(bytes); log.info("---------------chassis info:\t" + chassis); sleep(frequency); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
-
在UI端查看,可发现此时能正常播放消息,至此
Foxglove
前后端交互的配置基本搭建完毕,后续就是根据不同的面板类型进行针对性的编码。