探索Java中的分布式消息队列与事件总线:架构、实现与最佳实践

引言

在现代分布式系统中,消息队列和事件总线已经成为实现松耦合、高扩展性和高可用性架构的关键组件。无论是微服务架构、事件驱动架构,还是实时数据处理,消息队列和事件总线都扮演着至关重要的角色。本文将深入探讨Java中的分布式消息队列与事件总线的概念、实现方法、技术选型以及实际应用中的最佳实践,附带代码示例以便读者更好地理解。

什么是分布式消息队列和事件总线?

分布式消息队列

分布式消息队列是一种为分布式系统提供异步通信机制的中间件。它允许系统中的不同组件通过发送和接收消息进行交流,从而实现高效的数据传输和任务调度。

常见用途:
  • 任务调度和执行
  • 数据流处理
  • 系统解耦
  • 事件驱动架构
事件总线

事件总线是一种发布-订阅模型的实现,允许不同组件订阅和发布事件。事件总线可以在同一进程中运行,也可以跨多个分布式系统运行。

常见用途:
  • 事件通知
  • 事件驱动编程
  • 系统解耦
  • 实时数据处理

常见技术选型

技术类型优点缺点
RabbitMQ消息队列高性能、强大的路由功能、良好的社区支持配置复杂,学习曲线陡峭
Apache Kafka消息队列高吞吐量、持久化、分布式特点配置和管理复杂,低延迟不适合实时应用
ActiveMQ消息队列易于使用、功能齐全性能和扩展性不如Kafka和RabbitMQ
Apache Pulsar消息队列多租户、支持Geo-replication较新的技术,社区和文档相对较少
Spring Cloud Bus事件总线易于集成Spring生态系统主要适用于Spring项目,通用性较差
Vert.x Event Bus事件总线轻量级、高性能、灵活对于大型分布式系统,可能需要自定义扩展

实现分布式消息队列

使用RabbitMQ实现消息队列

配置RabbitMQ

首先,确保RabbitMQ服务在本地或远程服务器上运行。可以通过Docker快速启动RabbitMQ:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

添加依赖

在你的pom.xml文件中添加RabbitMQ客户端的依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.13.0</version>
</dependency>

生产者代码

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); 
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消费者代码

import com.rabbitmq.client.*;

public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); 
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
    }
}

实现事件总线

使用Spring Cloud Bus实现事件总线

添加依赖

在你的pom.xml文件中添加Spring Cloud Bus和RabbitMQ的依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

配置文件

application.yml中配置RabbitMQ连接信息:

spring:
  cloud:
    bus:
      enabled: true
  rabbitmq:
    host: localhost
    port: 5672

事件发布者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.cloud.bus.SpringCloudBusClient;
import org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class EventPublisherController {
    
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    @PostMapping("/publish-event")
    public String publishEvent() {
        applicationEventPublisher.publishEvent(new EnvironmentChangeRemoteApplicationEvent(this, "source", null));
        return "Event published";
    }
}

事件监听器

import org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
public class EventListenerComponent {

    @EventListener
    public void onEnvironmentChange(EnvironmentChangeRemoteApplicationEvent event) {
        System.out.println("Received event: " + event);
    }
}

总结

本文详细介绍了分布式消息队列和事件总线的概念、常见技术选型以及在Java中的实现方法。通过RabbitMQ和Spring Cloud Bus的代码示例,展示了如何在实际应用中使用这些技术来实现异步通信和事件驱动架构。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/872436.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Stream 流式编程

优质博文&#xff1a;IT-BLOG-CN 大家都知道可以将Collection类转化成流Stream进行操作&#xff08;Map并不能创建流&#xff09;&#xff0c;代码变得简约流畅。我们先看下流的几个特点&#xff1a; 1、流并不存储元素。这些元素可能存储在底层的集合中&#xff0c;或者是按需…

VMwareWorkstation安装Kali系统教程

Kali系统&#xff0c;全名为Kali Linux&#xff0c;为渗透测试和网络安全领域提供一个全面的工具集合。Kali系统预装了各种用于渗透测试和漏洞利用的工具&#xff0c;包括端口扫描、密码破解、网络嗅探、漏洞分析等。这些工具可以帮助安全专业人员评估和测试网络的安全性&#…

实例讲解Simulink应用层开发CAN报文解包及CAN信号设置方法

在VCU应用层开发中&#xff0c;在输入信号中主要包括开关信号、模拟信号、CAN信号、PWM信号等&#xff0c;其中CAN通讯由于通讯质量高&#xff0c;传输数据量大&#xff0c;采用总线通讯方式节省大量线束&#xff0c;在汽车上尤其是电动汽车上大量应用&#xff0c;当然&#xf…

数图亮相第三届中国区域零售创新峰会:共绘零售新蓝图,携手迈向新征程

8月31日&#xff0c;备受瞩目的第三届中国区域零售创新峰会在历史悠久的湖北襄阳圆满落下帷幕。在这场零售行业的盛会上&#xff0c;数图信息科技作为重要参会企业&#xff0c;积极参与其中&#xff0c;与众多行业精英共聚一堂&#xff0c;共同擘画零售业的宏伟蓝图。以下是本次…

C/C++ 中的算术运算及其陷阱(详解,举例分析)

在C/C编程中&#xff0c;算术运算是非常基础且常用的操作。然而&#xff0c;这些看似简单的运算背后却隐藏着一些潜在的陷阱&#xff0c;如果不加以注意&#xff0c;可能会导致程序出现难以预料的错误。本文将探讨C/C中常见的算术运算及其潜在的陷阱&#xff0c;并通过实例进行…

告别格式不兼容烦恼!ape转换mp3,分享3个简单方法

各位读者们&#xff0c;你们是否有过这种体验&#xff1a;满怀期待地在网上下载一首好听的歌曲&#xff0c;结果怎么点击手机都播放不了&#xff0c;定睛一看&#xff0c;弹窗显示“无法播放该音频文件”。这是为什么呢&#xff1f;原来那首歌的音频格式是ape&#xff0c;不被手…

iOS——关联对象学习补充

分类 在分类中添加属性会生成对应的成员变量&#xff0c;会生成对应的setter和getter方法的声明&#xff0c;但是不会生成setter和getter方法的实现。分类中的可以写property&#xff0c;会编译通过&#xff0c;但是引用变量会报错。分类中可以/只能访问原有类中.h中的属性。如…

如何选择合适的变压吸附制氧设备

在选择合适的变压吸附(Pressure Swing Adsorption, PSA)制氧设备时&#xff0c;需要综合考虑多个因素以确保设备能够高效、稳定地运行&#xff0c;满足特定应用场景的需求。以下是一些关键步骤和考虑因素&#xff0c;帮助您做出明智的决策。 1. 明确应用需求 明确您的制氧需求至…

visual studio 2022更新以后,之前的有些工程编译出错,升级到Visual studio Enterprise 2022 Preview解决

系列文章目录 文章目录 系列文章目录前言一、解决方法 前言 今天遇到一个问题&#xff1a;visual studio 2022升级成预览版以后&#xff0c;之前的有些工程编译出错。首先代码、项目设置都没有改变&#xff0c;只是更新了visual studio 2022。 在编译工程时&#xff0c;编译器…

Team Render 上的 Redshift 照明与我的编辑机器上的不同(如何缓存 Redshift GI)

有时&#xff0c;您的灯光在另一台机器&#xff08;例如属于 Team Render 农场的机器&#xff09;上看起来会与在主/编辑机器上看起来不同。这是因为&#xff0c;即使使用相似或相同的硬件&#xff0c;一台机器计算全局照明的方式与另一台机器也会有所不同。 这可能会导致光线…

Docker 部署 Kibana (图文并茂超详细)

部署 Kibana ( Docker ) [Step 1] : 拉取 Kibana 镜像 docker pull kibana:7.14.0[Step 2] : 创建目录 ➡️ 启动容器 ➡️ 拷贝文件 ➡️ 授权文件 ➡️ 删除容器 # 创建目录 mkdir -p /data/kibana/{conf,plugins}# 启动容器 docker run --name kibana --restartalways \…

科普神文,一次性讲透AI大模型的核心概念

令牌&#xff0c;向量&#xff0c;嵌入&#xff0c;注意力&#xff0c;这些AI大模型名词是否一直让你感觉熟悉又陌生&#xff0c;如果答案肯定的话&#xff0c;那么朋友&#xff0c;今天这篇科普神文不容错过。我将结合大量示例及可视化的图形手段&#xff0c;为你由浅入深一次…

Centos Stream9系统安装及网络配置详解

1.镜像下载 如未拥有系统镜像文件的伙伴可通过前往下面的连接进行下载&#xff0c;下载完成后需将其刻录至U盘中。 PS&#xff1a;该U盘应为空盘&#xff0c;刻录文件会导该盘格式化&#xff0c;下载文件选择dvd1.iso完整包&#xff0c;适用于本地安装。 下载地址&#xff1…

恋爱相亲交友系统源码原生源码可二次开发APP 小程序 H5,web全适配

直播互动&#xff1a;平台设有专门的直播间&#xff0c;允许房间主人与其他异性用户通过视频连线的方式进行一对一互动。语音视频交流&#xff1a;异性用户可以发起语音或视频通话&#xff0c;以增进了解和交流。群组聊天&#xff1a;用户能够创建群聊&#xff0c;邀请自己关注…

【云计算】什么是云计算服务|为什么出现了云计算|云计算的服务模式

文章目录 什么是云计算服务本地部署VS云计算SaaS PaaS IaaS公有云、私有云、混合云为什么优先发展云计算服务的厂商是亚马逊、阿里巴巴等公司 什么是云计算服务 根据不同的目标用户&#xff0c;云计算服务&#xff08;Cloud Computing Services&#xff09;分为两种&#xff1…

探索动销方案创新路径,开启企业增长新引擎

在当今竞争激烈的市场中&#xff0c;动销方案的重要性不言而喻。然而&#xff0c;传统动销手段已难以应对多变的市场环境&#xff0c;企业急需探索创新路径。 当前动销方案面临哪些挑战呢&#xff1f; 首先&#xff0c;消费者需求越发多样化&#xff0c;他们追求个性化和多元化…

如何修复软件中的BUG

笔者上一篇博文《如何开发出一款优秀的软件》主要讲了如何开发一款优秀的软件及相应的必要条件。但对一个已上线&#xff0c;已经成型的产品&#xff0c;该如何解决存在的bug呢&#xff1f;这是本文要阐述的内容。 在这里&#xff0c;首先说一下bug的种类及bug严重程度分类&…

QT: Unable to create a debugging engine.

1.问题场景&#xff1a; 第一次安装QT&#xff0c;没有配置debug功能 打开控制面板》程序》找到Kit 重启电脑即可 2.问题场景&#xff1a; qt原本一直好好的&#xff0c;突然有天打开运行调试版本&#xff0c;提示Unable to create a debugging engine.错误。这个是指无法创…

【计算机网络】TCP连接如何确保传输的可靠性

一、确保可靠传输的机制 TCP&#xff08;传输控制协议&#xff09;是一种面向连接的、提供可靠交付的、面向字节流的、支持全双工的传输层通信协议 1、序列号 seq TCP头部中的序号&#xff0c;占32位&#xff08;4字节&#xff09;&#xff1b; 发送方给报文段分配一个序列号&a…

如何锻炼自己深度思考的能力?4个方法让你快速看清事物的本质!

我们每天都会接触到海量的信息&#xff0c;但真正的智慧并不在于掌握多少信息&#xff0c;而在于如何从中提炼出有价值的知识&#xff0c;并对其进行深刻的理解与运用。 本周想和大家探讨一下深度思考的重要性&#xff0c;同时分享一些实用的方法和技巧&#xff0c;希望能帮你…