原始状态的 activemq-client sdk 集成非常方便,也更适合定制。就是有些同学,可能对原始接口会比较陌生,会希望有个具体的示例。
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>${activemq.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>${activemq.version}</version> </dependency>
希望更加简化使用的同学,可以使用:
activemq-solon-cloud-plugin (使用更简单,定制性弱些)
1、添加集成配置
先使用 Solon 初始器 先生成一个 Solon Web 模板项目,然后添加上面的 activemq-client 依赖。再做个配置约定(也可按需定义):
- "solon.activemq",作为配置前缀
- "properties",作为公共配置
- "producer",作为生态者专属配置(估计用不到)
- "consumer",作为消费者专属配置(估计用不到)
具体的配置属性,参考自:ActiveMQConnectionFactory
solon.app: name: "demo-app" group: "demo" # 配置可以自由定义,与 @Bean 代码对应起来即可(以下为参考) solon.activemq: properties: #公共配置(配置项,参考:ActiveMQConnectionFactory) brokerURL: "failover:tcp://localhost:61616" redeliveryPolicy: initialRedeliveryDelay: 5000 backOffMultiplier: 2 useExponentialBackOff: true maximumRedeliveries: -1 maximumRedeliveryDelay: 3600_000
添加 java 配置器
@Configuration public class ActivemqConfig { @Bean(destroyMethod = "stop") public Connection client(@Inject("${solon.activemq.properties}") Props common) throws Exception { String brokerURL = (String) common.remove("brokerURL"); String userName = (String) common.remove("userName"); String password = (String) common.remove("password"); ActiveMQConnectionFactory factory; if (Utils.isEmpty(userName)) { factory = new ActiveMQConnectionFactory(brokerURL); } else { factory = new ActiveMQConnectionFactory(brokerURL, userName, password); } //绑定额外的配置并创建连接 Connection connection = common.bindTo(factory).createConnection(); connection.start(); return connection; } @Bean public IProducer producer(Connection connection) throws Exception { return new IProducer(connection); } @Bean public void consumer(Connection connection, MessageListener messageListener) throws Exception { Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination destination = session.createTopic("topic.test"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(messageListener); } }
activemq 的消息发送的代码比较复杂,所以我们可以做个包装处理(用于上面的配置构建),临时命名为 IProducer:
public class IProducer { private Connection connection; public IProducer(Connection connection) { this.connection = connection; } public void send(String topic, MessageBuilder messageBuilder) throws JMSException { Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination destination = session.createTopic(topic); MessageProducer producer = session.createProducer(destination); producer.send(destination, messageBuilder.build(session)); } @FunctionalInterface public static interface MessageBuilder { Message build(Session session) throws JMSException; } }
3、代码应用
发送(或生产),这里代控制器由用户请求再发送消息(仅供参考):
@Controller public class DemoController { @Inject private IProducer producer; @Mapping("/send") public void send(String msg) throws Exception { //发送 producer.send("topic.test", s -> s.createTextMessage("test")); } }
监听(或消费),这里采用订阅回调的方式:(仅供参考)
@Component public class DemoMessageListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println(message); RunUtil.runAndTry(message::acknowledge); } }