go语言消息中间件 go gin 中间件

为什么golang适合中间件

Golang是一种静态类型的编程语言,具有高效性、安全性和可扩展性。它特别适合用于构建中间件,因为它可以更快地生成和处理数据,而且它可以构建可靠的、可维护的系统。 Golang还具有跨平台的能力,可以在各种操作系统中使用,而且可以使用内置的库和框架来编写大量的代码,从而大大简化了中间件的开发过程。此外,Golang的性能也更好,能够更快地处理大量的数据。因此,Golang对于构建中间件来说是一个非常理想的选择。

网站建设哪家好,找创新互联公司!专注于网页设计、网站建设、微信开发、微信小程序开发、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了金安免费建站欢迎大家使用!

golang使用Nsq

1. 介绍

最近在研究一些消息中间件,常用的MQ如RabbitMQ,ActiveMQ,Kafka等。NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,由bitly公司开源出来的一款简单易用的消息中间件。

官方和第三方还为NSQ开发了众多客户端功能库,如官方提供的基于HTTP的nsqd、Go客户端go-nsq、Python客户端pynsq、基于Node.js的JavaScript客户端nsqjs、异步C客户端libnsq、Java客户端nsq-java以及基于各种语言的众多第三方客户端功能库。

1.1 Features

1). Distributed

NSQ提供了分布式的,去中心化,且没有单点故障的拓扑结构,稳定的消息传输发布保障,能够具有高容错和HA(高可用)特性。

2). Scalable易于扩展

NSQ支持水平扩展,没有中心化的brokers。内置的发现服务简化了在集群中增加节点。同时支持pub-sub和load-balanced 的消息分发。

3). Ops Friendly

NSQ非常容易配置和部署,生来就绑定了一个管理界面。二进制包没有运行时依赖。官方有Docker image。

4.Integrated高度集成

官方的 Go 和 Python库都有提供。而且为大多数语言提供了库。

1.2 组件

1.3 拓扑结构

NSQ推荐通过他们相应的nsqd实例使用协同定位发布者,这意味着即使面对网络分区,消息也会被保存在本地,直到它们被一个消费者读取。更重要的是,发布者不必去发现其他的nsqd节点,他们总是可以向本地实例发布消息。

NSQ

首先,一个发布者向它的本地nsqd发送消息,要做到这点,首先要先打开一个连接,然后发送一个包含topic和消息主体的发布命令,在这种情况下,我们将消息发布到事件topic上以分散到我们不同的worker中。

事件topic会复制这些消息并且在每一个连接topic的channel上进行排队,在我们的案例中,有三个channel,它们其中之一作为档案channel。消费者会获取这些消息并且上传到S3。

nsqd

每个channel的消息都会进行排队,直到一个worker把他们消费,如果此队列超出了内存限制,消息将会被写入到磁盘中。Nsqd节点首先会向nsqlookup广播他们的位置信息,一旦它们注册成功,worker将会从nsqlookup服务器节点上发现所有包含事件topic的nsqd节点。

nsqlookupd

2. Internals

2.1 消息传递担保

1)客户表示已经准备好接收消息

2)NSQ 发送一条消息,并暂时将数据存储在本地(在 re-queue 或 timeout)

3)客户端回复 FIN(结束)或 REQ(重新排队)分别指示成功或失败。如果客户端没有回复, NSQ 会在设定的时间超时,自动重新排队消息

这确保了消息丢失唯一可能的情况是不正常结束 nsqd 进程。在这种情况下,这是在内存中的任何信息(或任何缓冲未刷新到磁盘)都将丢失。

如何防止消息丢失是最重要的,即使是这个意外情况可以得到缓解。一种解决方案是构成冗余 nsqd对(在不同的主机上)接收消息的相同部分的副本。因为你实现的消费者是幂等的,以两倍时间处理这些消息不会对下游造成影响,并使得系统能够承受任何单一节点故障而不会丢失信息。

2.2 简化配置和管理

单个 nsqd 实例被设计成可以同时处理多个数据流。流被称为“话题”和话题有 1 个或多个“通道”。每个通道都接收到一个话题中所有消息的拷贝。在实践中,一个通道映射到下行服务消费一个话题。

在更底的层面,每个 nsqd 有一个与 nsqlookupd 的长期 TCP 连接,定期推动其状态。这个数据被 nsqlookupd 用于给消费者通知 nsqd 地址。对于消费者来说,一个暴露的 HTTP /lookup 接口用于轮询。为话题引入一个新的消费者,只需启动一个配置了 nsqlookup 实例地址的 NSQ 客户端。无需为添加任何新的消费者或生产者更改配置,大大降低了开销和复杂性。

2.3 消除单点故障

NSQ被设计以分布的方式被使用。nsqd 客户端(通过 TCP )连接到指定话题的所有生产者实例。没有中间人,没有消息代理,也没有单点故障。

这种拓扑结构消除单链,聚合,反馈。相反,你的消费者直接访问所有生产者。从技术上讲,哪个客户端连接到哪个 NSQ 不重要,只要有足够的消费者连接到所有生产者,以满足大量的消息,保证所有东西最终将被处理。对于 nsqlookupd,高可用性是通过运行多个实例来实现。他们不直接相互通信和数据被认为是最终一致。消费者轮询所有的配置的 nsqlookupd 实例和合并 response。失败的,无法访问的,或以其他方式故障的节点不会让系统陷于停顿。

2.4 效率

对于数据的协议,通过推送数据到客户端最大限度地提高性能和吞吐量的,而不是等待客户端拉数据。这个概念,称之为 RDY 状态,基本上是客户端流量控制的一种形式。

efficiency

2.5 心跳和超时

组合应用级别的心跳和 RDY 状态,避免头阻塞现象,也可能使心跳无用(即,如果消费者是在后面的处理消息流的接收缓冲区中,操作系统将被填满,堵心跳)为了保证进度,所有的网络 IO 时间上限势必与配置的心跳间隔相关联。这意味着,你可以从字面上拔掉之间的网络连接 nsqd 和消费者,它会检测并正确处理错误。当检测到一个致命错误,客户端连接被强制关闭。在传输中的消息会超时而重新排队等待传递到另一个消费者。最后,错误会被记录并累计到各种内部指标。

2.6 分布式

因为NSQ没有在守护程序之间共享信息,所以它从一开始就是为了分布式操作而生。个别的机器可以随便宕机随便启动而不会影响到系统的其余部分,消息发布者可以在本地发布,即使面对网络分区。

这种“分布式优先”的设计理念意味着NSQ基本上可以永远不断地扩展,需要更高的吞吐量?那就添加更多的nsqd吧。唯一的共享状态就是保存在lookup节点上,甚至它们不需要全局视图,配置某些nsqd注册到某些lookup节点上这是很简单的配置,唯一关键的地方就是消费者可以通过lookup节点获取所有完整的节点集。清晰的故障事件——NSQ在组件内建立了一套明确关于可能导致故障的的故障权衡机制,这对消息传递和恢复都有意义。虽然它们可能不像Kafka系统那样提供严格的保证级别,但NSQ简单的操作使故障情况非常明显。

2.7 no replication

不像其他的队列组件,NSQ并没有提供任何形式的复制和集群,也正是这点让它能够如此简单地运行,但它确实对于一些高保证性高可靠性的消息发布没有足够的保证。我们可以通过降低文件同步的时间来部分避免,只需通过一个标志配置,通过EBS支持我们的队列。但是这样仍然存在一个消息被发布后马上死亡,丢失了有效的写入的情况。

2.8 没有严格的顺序

虽然Kafka由一个有序的日志构成,但NSQ不是。消息可以在任何时间以任何顺序进入队列。在我们使用的案例中,这通常没有关系,因为所有的数据都被加上了时间戳,但它并不适合需要严格顺序的情况。

2.9 无数据重复删除功能

NSQ对于超时系统,它使用了心跳检测机制去测试消费者是否存活还是死亡。很多原因会导致我们的consumer无法完成心跳检测,所以在consumer中必须有一个单独的步骤确保幂等性。

3. 实践安装过程

本文将nsq集群具体的安装过程略去,大家可以自行参考官网,比较简单。这部分介绍下笔者实验的拓扑,以及nsqadmin的相关信息。

3.1 拓扑结构

topology

实验采用3台NSQD服务,2台LOOKUPD服务。

采用官方推荐的拓扑,消息发布的服务和NSQD在一台主机。一共5台机器。

NSQ基本没有配置文件,配置通过命令行指定参数。

主要命令如下:

LOOKUPD命令

NSQD命令

工具类,消费后存储到本地文件。

发布一条消息

3.2 nsqadmin

对Streams的详细信息进行查看,包括NSQD节点,具体的channel,队列中的消息数,连接数等信息。

nsqadmin

channel

列出所有的NSQD节点:

nodes

消息的统计:

msgs

lookup主机的列表:

hosts

4. 总结

NSQ基本核心就是简单性,是一个简单的队列,这意味着它很容易进行故障推理和很容易发现bug。消费者可以自行处理故障事件而不会影响系统剩下的其余部分。

事实上,简单性是我们决定使用NSQ的首要因素,这方便与我们的许多其他软件一起维护,通过引入队列使我们得到了堪称完美的表现,通过队列甚至让我们增加了几个数量级的吞吐量。越来越多的consumer需要一套严格可靠性和顺序性保障,这已经超过了NSQ提供的简单功能。

结合我们的业务系统来看,对于我们所需要传输的发票消息,相对比较敏感,无法容忍某个nsqd宕机,或者磁盘无法使用的情况,该节点堆积的消息无法找回。这是我们没有选择该消息中间件的主要原因。简单性和可靠性似乎并不能完全满足。相比Kafka,ops肩负起更多负责的运营。另一方面,它拥有一个可复制的、有序的日志可以提供给我们更好的服务。但对于其他适合NSQ的consumer,它为我们服务的相当好,我们期待着继续巩固它的坚实的基础。

golangchannel和mq的区别

golangchannel和mq的区别

我是一个着迷于产品和运营的技术人,乐于跨界的终身学习者。欢迎关注我哟~

每周五12点 按时送达~

我的第「218」篇原创敬上

大家好,我是Z哥。

最近在项目中遇到了一个使用 RabbitMQ 时的问题,这个问题我觉得还是有一定普适性的,和大家分享一下,避免大家后续在同一个问题上犯错。

消息队列(MQ)是在软件开发中很常用的中间件,如果一个程序需要协调另一个程序进行数据的“write”操作,并且不关心“write”的结果,则便会选择它。它是一个保存消息(数据)的容器,由它来确保消息一定被送达到目标程序。

打个比喻来说,消息队列就是一个邮差,它负责将信件(消息)从源头送往目的地,并且根据信件重要性的不同,提供当面签收确认或者直接投放两种服务。

RabbitMQ 就是一个典型的消息队列,以 AMQP 为标准。历史也比较悠久,大概是从 2007年研发出来的,用的编程语言Erlang也同样具有年代感。

需要简单介绍一下 Erlang 的特点,它对我们理解 RabbitMQ 有很大的帮助。

Erlang 是一种运行于“虚拟机”(类似 JVM)的解释性语言。是一个结构化,动态类型编程语言,内建并行计算支持。使用 Erlang 编写出的应用运行时通常由成千上万个轻量级“进程”(并非传统意义上的进程)组成,并通过消息传递相互通讯。进程间上下文切换对于 Erlang 来说仅仅 只是一两个环节,比起 C 程序的线程切换要高效得多得多了。

——整理于百度百科的资料

不管是什么 MQ 中间件,作为消息的生产方和消费方都需要和 MQ 的服务端建立连接进行通讯。

一般这个连接都会使用 TCP 协议,在 RabbitMQ 里也不例外。大多数 RabbitMQ 的 SDK 都会将连接封装为一个「Connection」对象。

还没完,大多数的 MQ 中间件还会在「Connection」的基础上增加一个「Channel」的概念,以通过复用的方式提高 TCP 连接的利用率,因为建立和销毁 TCP 连接是非常昂贵的开销。在 RabbitMQ 中的复用 TCP 连接方式是「Non-blocking I/O」的模式。

关于NIO,「Non-blocking I/O」的概念,有感兴趣的话可以跳转去看之前写的这篇文章。(用最通俗的话讲明白阻塞/非阻塞/异步/同步,到底啥区别?)

多说一句,任何方案都不是“银弹”。当每个 Channel 的流量不是很大时,复用单一的 Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源。但是 Channel 本身的流量很大时,这时候多个 Channel 复用一个 Connection 就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个 Connection,将这些 Channel 均摊到这些 Connection 中,至于哪些 Channel 使用那个 Connection 以及Connection 与 Channel 之间的数量关系是多少,需要根据业务自身的实际情况进行调节。

Channel 在 AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面展开的。比如, channel.exchangeDeclare、channel.queueDeclare、channel.basicPublish、channel.basicConsume 等方法。RabbitMQ 相关的 API 与 AMQP 紧密相连,比如 channel.basicPublish 对应 AMQP 的 Basic.Publish 命令。

可能你要问了,Channel 是不是也能像 Connection 一样被复用?这是个好问题,也是我们这次遇到问题的关键点。

结论是:可以,但是需要自己保证客户端对 Channel 访问的线程安全问题,因为在 Channel 的另一端,在 RabbitMQ 的服务端,每个 Channel 由一个单独的“进程”所管理,如果由于多线程复用Channel 导致数据帧乱序了,RabbitMQ 的服务端会主动关闭整个 Connection 。

因此,我们这次犯的错误就是多线程复用了同一个 Channel 导致的问题。所以,如果你也用到 streadway/amqp 这个库的话,需要特别注意这点。

不过,不同语言的SDK内部实现不同,我们分别使用 Golang 的 AMQP 库 streadway/amqp,和 RabbitMQ 官方提供的 C# 版本的库分别模拟过同样的场景,前者出现问题,后者却没有问题。

受限于时间原因,没有具体去核实 C# 库的源码,主观猜测是 C# 库内部多做了一些对于单个 Channel 的线程安全处理。

最后,我整理了三点使用 streadway/amqp 库的最佳实践,你可以看看:

01

golang 中使用 streadway/amqp 时,需要保证每一个线程单独一个 Channel。

streadway/amqp 库中的获取一个 Channel 的方法「Connection.channel()」是线程安全的。但是内部有一个 defaultChannelMax 的参数对 Channel 的数量进行了限制,默认是 (2 10) - 1,2047。这个需要注意:

02

我们可以通过调用 amqp.DialConfig(url string, config Config) 来调整个限制。

但是,并不是你调整了多少就是多少,还需要和 RabbitMQ 服务端的配置进行 min() 函数的处理,最终为两者的最小值。

Tips:特别是用云厂商的 MQ 产品,因为阶梯收费的原因会对很多性能参数做限制,需要格外关注这点,比如某版本的阿里云 RabbitMQ 实例限制是单个 Connection 最多 64 个 Channel)

03

正如前面对 Erlang 的简单介绍,Erlang 是一个天然支持多“进程”设计的语言,所以在 RabbitMQ 的服务端设计中,每一个 Queue,每一个 Connection 都是单独的一个“进程”。因此如果你想尽可能地压榨 RabbitMQ 性能,可以通过建立更多的 Connection 或者创建更多的 Queue 来实现,当然需要注意到 Connection 的创建和销毁的性能开销问题。

推荐阅读:

减少联调、高效集成,试试这个工具

golang使用3周总结

也可以「关注」我,带你以技术思维看世界~

想更进一步和我一起玩耍,欢迎「搜索微信公号:跨界架构师」。

内容包括:架构设计丨分布式系统丨产品丨运营丨个人深度思考。

网站题目:go语言消息中间件 go gin 中间件
网站路径:/article38/ddsicpp.html

成都网站建设公司_创新互联,为您提供域名注册小程序开发网站设计营销型网站建设品牌网站建设

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联

星空体育app最新版本(2024已更新)建设