-
RabbitMQ 入门 (Go) - 6. 数据持久化(上)
从本节开始,我介绍一下如何将相关数据持久化到数据库,也就是上图中蓝色的部分。
目前的问题
我先运行 6 个传感器和2 个协调器,这里我使用了批处理文件:
运行后,看一下 RabbitMQ 的管理控制台:
注意上面前面几个 Queue,这些 Queue 就是我们让传感器和协调器监听那两个 Fanout Exchange 时创建的,因为这两个 Exchange 不使用路由 Key 来决定接收者,我使用了空字符串“”作为这些 Queue 的名称,而RabbitMQ 就会为它们赋予一个唯一的名字。
因为目前创建的 Queue 都是临时的,如果我重新启动系统,RabbitMQ会创建另一套不同的 Queue 来完成工作,这样的话系统资源就会被慢慢的耗尽,所以这个问题需要解决。
调整 autoDelete 参数
首先修改 tools 包下的 queuetools.go 里面的GetQueue 函数,添加一个 autoDelete 参数:
GetQueue 函数会确保创建一个Queue 从而能接收到消息。刚创建它的时候,我的意图是让它作用于 Direct Exchange 和命名的 Queue。后来我对它进行了扩展使用,也可以应用于匿名的 Queue。
再说一下 autoDelete 参数的作用是:若值为 true,那么如果一个 Queue 它没有被注册任何的使用者,这个 Queue 就会被删除。针对上述问题中的临时 Queue,这就是我想要的效果。但是针对传感器的数据 Queue,我还是希望在系统重启后,这些 Queue 能够保留。
所以,我为该函数添加了一个 autoDelete 参数,在创建 Queue 的时候,可以对 autoDelete 进行设置。
有三个调用该函数的地方需要调整代码,先打开 sensors.go:
-
针对传感器传送数据的 Queue,我要让它能够保留下来,所以 autoDelete 就是 false
-
而 discoveryQueue 是用来监听协调器的“发现”请求的,我想让每个传感器每次上线都会得到一个新的 Queue,这里 autoDelete 就设置为 true,这样的话 RabbitMQ 就会把旧的 Queue 自动清理掉。
调整 queuelistener.go 里面的调用:
这里得到的临时 Queue 是用来监听传感器上线时或响应协调器发现请求时来发布数据 Queue 名称的。
这里函数调用的 autoDelete 参数也设置为 true,从而让它们可以自动被清除掉。
测试运行
把之前的 Queue 都删掉:
然后再运行 5 个传感器和2 个协调器:
现在又是很多的 Queue。
然后我们再停掉所有的传感器和协调器:
可以看到传感器传送数据的 Queue 被保留了,而其它的临时 Queue 都自动删除掉了,这就是我们想要的效果。
泛化事件数据
到目前为止,系统中只发布了一种类型的事件(接收到传感器数据时的事件),而且目前还没有任何使用者监听这个事件。接下来我们就要完善事件这部分功能了,但首先必须做出一些优化修改,以便能真正满足需求。
目前 eventaggregator.go 里面包含了所有添加监听者以及向监听者发布事件的方法。
但现在的情况是事件的使用者也知道如何自行发布事件,这点不太好,因为它们不需要这样做。代码修改如下:
-
为了尽量少的暴露功能,我为事件的使用者创建了 EventRaiser 这个接口,它里面只有一个 AddListener 方法,与已经实现的 AddListener 方法相几乎完全匹配。
-
但是我把接口里 AddListener 的第二个参数,也就是回调函数里面的参数类型改为了 interface{},从而可以接收多种类型的数据。
-
相应的,后边所有涉及事件数据参数的地方都改为 interface{}
现在 EventAggregator 被泛化了,我也可以发布其它类型的事件了。
来到 queuelistener.go,我想在协调器发现数据源之后,发布一个事件:
这个事件的名称叫做 DataSourceDiscovered,事件数据就是 Queue 的名称,由于这个参数的类型是 interface{},所以它可以正常的传递进去。
创建数据的使用者
目前,我们整个系统的设计一共有三层,而数据源和数据的使用者是通过协调器分开的。这样做的好处是,关于如何处理消息的业务逻辑都集中在协调器这一层上面了,而数据源和数据使用者层仅关注它们自身的任务即可。
为了达到这个目的,需要在 coordinator 目录下创建一个 databaseconsumer.go 文件:
这个文件的作用是监听整个系统发出的事件,并决定哪些事件可以转发到数据管理包(我一会要建立的)。
dataconsumer.go
首先看一下 dataconsumer.go 文件的内容:
-
第 15 行建立 DatabaseConsumer struct,它有5个字段:
-
第一个字段类型是 EventRaiser 接口,该接口只能用于监听,而不能发布事件,这就是该接口的目的。
-
接下来三个字段都是与 RabbitMQ 相关的。
-
最后一个字段是注册的监听器的 Queue 名称的集合。
-
-
第 23 行,为 DatabaseConsumer 创建一个构造函数。它接收 EventRaiser 作为参数,并创建 RabbitMQ 相关的连接、Channel、Queue 为 DatabaseConsumer 各字段赋值。
-
第 29 行创建 Queue 时用到了一个 Queue 的名称,这个 Queue 是用来做持久化的,它是众所周知的,它的名称存放在 queuetools.go 文件里:
-
第 31 行就是监听数据源被发现的事件,回调函数的参数类型是空接口(其实就是事件的名称)。在回调函数内,调用我随后要建立的 SubscribeToDataEvent 方法,把 eventData 转化为字符串传递进去。
下面看一看 SubscribeToDataEvent 方法:
-
该方法的参数是事件的名称。
-
第 39 行,对已注册的监听器进行遍历,如果传进来的事件名称已注册,return 即可。
-
否则的话,需要注册这个数据源,这个事件的名称是 MessageReceived_+Queue 的名称。
-
第 45 行的回调函数,我将传入一个立即执行的匿名函数,它会返回我们真正需要使用的回调函数,也就是闭包。这种做法的好处就是返回的函数可以捕获其被定义的作用域内的变量,这样的话真正的回调函数就可以拥有一些可持续的“状态”(也就是 prevTime 和 buf)。这里我的需求是至少要间隔 5 秒钟以上,才记录一次(到数据库)。
-
回调函数内其它的逻辑都很简单,就不逐行介绍了。
-
第 67 行,发布消息使用的是 Default Exchange,并路由到持久化的那个 Queue。
修改 queuelistener.go 里面的构造函数
让其传入 EventAggregator 作为参数并赋值给 QueueListener 的 ea 字段。
修改协调器的 main 函数
-
创建包级共享的 DatabaseConsumer 变量,在 main 里用构造函数进行创建并赋值。
-
创建 EventAggregator,并传递给 DatabaseConsumer 和 QueueListener,让他们共享同一个 EventAggregator。
出处:https://www.cnblogs.com/yangxu-pro/p/RabbitMQ-Go-6.html