solon 集成 activemq-client (sdk)

原始状态的 activemq-client sdk 集成非常方便,也更适合定制。就是有些同学,可能对原始接口会比较陌生,会希望有个具体的示例。

<dependency>     <groupId>org.apache.activemq</groupId>     <artifactId>activemq-client</artifactId>     <version>${activemq.version}</version> </dependency>  <dependency>     <groupId>org.apache.activemq</groupId>     <artifactId>activemq-pool</artifactId>     <version>${activemq.version}</version> </dependency> 

希望更加简化使用的同学,可以使用:

activemq-solon-cloud-plugin (使用更简单,定制性弱些)

1、添加集成配置

先使用 Solon 初始器 先生成一个 Solon Web 模板项目,然后添加上面的 activemq-client 依赖。再做个配置约定(也可按需定义):

  • "solon.activemq",作为配置前缀
    • "properties",作为公共配置
    • "producer",作为生态者专属配置(估计用不到)
    • "consumer",作为消费者专属配置(估计用不到)

具体的配置属性,参考自:ActiveMQConnectionFactory

solon.app:   name: "demo-app"   group: "demo"  # 配置可以自由定义,与 @Bean 代码对应起来即可(以下为参考) solon.activemq:   properties:  #公共配置(配置项,参考:ActiveMQConnectionFactory)     brokerURL: "failover:tcp://localhost:61616"     redeliveryPolicy:       initialRedeliveryDelay: 5000       backOffMultiplier: 2       useExponentialBackOff: true       maximumRedeliveries: -1       maximumRedeliveryDelay: 3600_000 

添加 java 配置器

@Configuration public class ActivemqConfig {     @Bean(destroyMethod = "stop")     public Connection client(@Inject("${solon.activemq.properties}") Props common) throws Exception {         String brokerURL = (String) common.remove("brokerURL");         String userName = (String) common.remove("userName");         String password = (String) common.remove("password");          ActiveMQConnectionFactory factory;         if (Utils.isEmpty(userName)) {             factory = new ActiveMQConnectionFactory(brokerURL);         } else {             factory = new ActiveMQConnectionFactory(brokerURL, userName, password);         }          //绑定额外的配置并创建连接         Connection connection = common.bindTo(factory).createConnection();         connection.start();         return connection;     }      @Bean     public IProducer producer(Connection connection) throws Exception {         return new IProducer(connection);     }      @Bean     public void consumer(Connection connection,                          MessageListener messageListener) throws Exception {         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);          Destination destination = session.createTopic("topic.test");         MessageConsumer consumer = session.createConsumer(destination);          consumer.setMessageListener(messageListener);     } } 

activemq 的消息发送的代码比较复杂,所以我们可以做个包装处理(用于上面的配置构建),临时命名为 IProducer:

public class IProducer {     private Connection connection;      public IProducer(Connection connection) {         this.connection = connection;     }      public void send(String topic, MessageBuilder messageBuilder) throws JMSException {         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);          Destination destination = session.createTopic(topic);         MessageProducer producer = session.createProducer(destination);          producer.send(destination, messageBuilder.build(session));     }      @FunctionalInterface     public static interface MessageBuilder {         Message build(Session session) throws JMSException;     } } 

3、代码应用

发送(或生产),这里代控制器由用户请求再发送消息(仅供参考):

@Controller public class DemoController {     @Inject     private IProducer producer;      @Mapping("/send")     public void send(String msg) throws Exception {         //发送         producer.send("topic.test", s -> s.createTextMessage("test"));     } } 

监听(或消费),这里采用订阅回调的方式:(仅供参考)

@Component public class DemoMessageListener implements MessageListener {     @Override     public void onMessage(Message message) {         System.out.println(message);         RunUtil.runAndTry(message::acknowledge);     } } 

发表评论

相关文章