Apache Pulsar 设计用于大规模实时数据处理,支持多种消息传递模型(发布/订阅、队列等),并提供了强大的功能来确保消息的可靠性和性能。
发布/订阅 (Pub/Sub): 支持多个消费者同时从同一个主题接收消息,适合实时数据分析和通知系统。
独占订阅 (Exclusive Subscription): 确保只有一个消费者能够消费某个分区的消息,从而保证消息的严格顺序。
共享订阅 (Shared Subscription): 多个消费者可以负载均衡地消费消息,提高吞吐量。
故障域感知路由: 根据地理位置和网络拓扑优化消息路由,确保高效的数据传输。
持久化消息: 所有消息都被持久化到磁盘,确保消息不会丢失。
分层存储: 使用分层存储策略,结合内存和磁盘存储,提高读写效率。
自动清理: 定期清理过期或不再需要的消息,节省存储空间。
事务消息: 支持事务性的消息发送和确认机制,确保数据一致性。
两阶段提交: 实现ACID特性,保证消息的一致性和可靠性。
死信队列 (Dead Letter Queue, DLQ): 对于无法成功处理的消息,将其放入死信队列以便后续排查和处理。
重试机制: 在消息处理失败时,进行一定次数的重试(默认最多3次),如果仍然失败,则将消息放入死信队列。
用户发起一笔交易请求,系统需要将该请求发送到交易处理系统,并确保请求按顺序被处理。而使用Pulsar的独占订阅模式确保交易请求按顺序被单一消费者处理,避免乱序导致的账务错误。
bin/pulsar standalone
<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Apache Pulsar Client --><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>2.10.1</version></dependency><!-- Lombok for cleaner Java code --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- JUnit for testing --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
在application.properties文件中配置Pulsar的相关属性:
# Pulsar broker URLpulsar.service.url=pulsar://localhost:6650# Topic namepulsar.topic.name=finance-transaction-topic# Dead letter topic namepulsar.dead-letter.topic.name=dead-letter-topic# Max redelivery count before sending to dead letter queuepulsar.max.redeliver.count=3
创建一个服务类来处理生产和消费消息,包括事务消息和死信队列的处理逻辑。
importlombok.extern.slf4j.Slf4j;importorg.apache.pulsar.client.api.*;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Service;importjavax.annotation.PostConstruct;importjavax.annotation.PreDestroy;importjava.util.concurrent.CompletableFuture;@Service@Slf4jpublicclass FinanceTransactionService {@Value("${pulsar.service.url}")private String serviceUrl;@Value("${pulsar.topic.name}")private String topicName;@Value("${pulsar.dead-letter.topic.name}")private String deadLetterTopicName;@Value("${pulsar.max.redeliver.count}")privateintmaxRedeliverCount;private PulsarClient client;private Producer<String>producer;private Consumer<String>consumer;/**
* 初始化Pulsar客户端、生产者和消费者
*/@PostConstructpublicvoid init()throws Exception {// 初始化Pulsar客户端client=PulsarClient.builder().serviceUrl(serviceUrl).build();// 创建生产者producer=client.newProducer(Schema.STRING).topic(topicName).sendTimeout(0,java.util.concurrent.TimeUnit.SECONDS).enableBatching(false).create();// 创建消费者consumer=client.newConsumer(Schema.STRING).topic(topicName).subscriptionName("finance-subscription").subscriptionType(SubscriptionType.Exclusive).negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder().maxDelayMs(60_000).minDelayMs(1_000).multiplier(2).build()).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliverCount).deadLetterTopic(deadLetterTopicName).build()).subscribe();// 开始消费消息consumeMessages();}/**
* 关闭Pulsar客户端、生产者和消费者
*/@PreDestroypublicvoidclose()throws Exception {if(producer!=null){
producer.close();}if(consumer!=null){
consumer.close();}if(client!=null){
client.close();}
}/**
* 发送事务消息
*
* @param message 消息内容
* @return 消息ID的CompletableFuture对象
*/publicCompletableFuture<MessageId>sendTransactionalMessage(String message){returnproducer.sendAsync(message);}/**
* 消费消息并处理
*/private void consumeMessages(){
new Thread(()->{while(!Thread.currentThread().isInterrupted()){
try {
Message<String>msg=consumer.receive();log.info("Received message: {}",msg.getValue());// 处理消息booleanprocessSuccess=processMessage(msg.getValue());if(processSuccess){// 确认消息consumer.acknowledgeAsync(msg.getMessageId());}else{// 负确认消息,触发重试机制consumer.negativeAcknowledge(msg.getMessageId(),new CustomException("Processing failed"));}
} catch(Exception e){
log.error("Error processing message",e);}
}
}).start();}/**
* 模拟消息处理逻辑
*
* @param message 消息内容
* @return 处理是否成功
*/privatebooleanprocessMessage(String message){// 模拟消息处理逻辑// 对于每三条消息中的一条模拟处理失败long messageId=Long.parseLong(message.split(":")[1]);returnmessageId%3!=0;}
static class CustomException extends Exception {publicCustomException(String message){
super(message);}
}// Getter methods for configuration properties (for testing purposes)publicString getServiceUrl(){returnserviceUrl;}publicString getTopicName(){returntopicName;}publicString getDeadLetterTopicName(){returndeadLetterTopicName;}publicintgetMaxRedeliverCount(){returnmaxRedeliverCount;}
}
创建一个控制器类来暴露API端点用于发送消息。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ExecutionException;
@RestController
public class FinanceTransactionController {
@Autowired
private FinanceTransactionService financeTransactionService;
/**
* 发送消息到Pulsar主题
*
* @param message 消息内容
* @return 发送结果
*/
@PostMapping("/send-message")
public String sendMessage(@RequestParam String message) {
try {
financeTransactionService.sendTransactionalMessage(message).get();
return"Message sent successfully";
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to send message", e);
return"Failed to send message";
}
}
}
为了验证上述功能是否正常工作,我们写了一些测试用例。
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.http.ResponseEntity;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class FinanceTransactionControllerTest {
@Autowired
private TestRestTemplate restTemplate;
@Autowired
private FinanceTransactionService financeTransactionService;
/**
* 清空主题中的消息,确保每次测试前环境干净
*/
@BeforeEach
public void setUp() throws Exception {
clearTopic(financeTransactionService.getTopicName());
clearTopic(financeTransactionService.getDeadLetterTopicName());
}
/**
* 关闭资源
*/
@AfterEach
public void tearDown() throws Exception {
financeTransactionService.close();
}
/**
* 测试成功发送的消息是否正确地出现在主主题中,并且没有出现在死信队列中
*/
@Test
public void testSendMessage_Success() {
ResponseEntity<String> response = restTemplate.postForEntity("/send-message?message=transaction:1", null, String.class);
assertEquals("Message sent successfully", response.getBody());
response = restTemplate.postForEntity("/send-message?message=transaction:2", null, String.class);
assertEquals("Message sent successfully", response.getBody());
response = restTemplate.postForEntity("/send-message?message=transaction:4", null, String.class);
assertEquals("Message sent successfully", response.getBody());
// 验证消息在主主题中
assertMessageInTopic("transaction:1");
assertMessageInTopic("transaction:2");
assertMessageInTopic("transaction:4");
// 验证死信队列中没有消息
assertNoMessagesInTopic(financeTransactionService.getDeadLetterTopicName());
}
/**
* 测试失败发送的消息是否在达到最大重试次数后进入死信队列
*/
@Test
public void testSendMessage_Failure() {
ResponseEntity<String> response = restTemplate.postForEntity("/send-message?message=transaction:3", null, String.class);
assertEquals("Message sent successfully", response.getBody());
// 验证消息在死信队列中(经过多次重试)
assertMessageInTopicWithRetries("transaction:3", financeTransactionService.getMaxRedeliverCount());
}
/**
* 清空指定主题中的所有消息
*
* @param topicName 主题名称
*/
private void clearTopic(String topicName) throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl(financeTransactionService.getServiceUrl())
.build();
Reader<String> reader = client.newReader(Schema.STRING)
.topic(topicName)
.startMessageId(MessageId.earliest)
.create();
while (reader.hasMessageAvailable()) {
reader.readNext();
}
reader.close();
client.close();
}
/**
* 验证指定主题中是否存在特定消息
*
* @param expectedMessage 预期消息内容
*/
private void assertMessageInTopic(String expectedMessage) {
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(financeTransactionService.getServiceUrl())
.build();
Reader<String> reader = client.newReader(Schema.STRING)
.topic(financeTransactionService.getTopicName())
.startMessageId(MessageId.earliest)
.create()) {
while (reader.hasMessageAvailable()) {
Message<String> msg = reader.readNext();
if (msg.getValue().equals(expectedMessage)) {
return;
}
}
fail("Expected message not found in topic: " + expectedMessage);
} catch (Exception e) {
fail("Failed to read from topic: " + e.getMessage());
}
}
/**
* 验证指定主题中没有消息
*
* @param topicName 主题名称
*/
private void assertNoMessagesInTopic(String topicName) {
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(financeTransactionService.getServiceUrl())
.build();
Reader<String> reader = client.newReader(Schema.STRING)
.topic(topicName)
.startMessageId(MessageId.earliest)
.create()) {
assertFalse(reader.hasMessageAvailable(), "Unexpected messages found in topic: " + topicName);
} catch (Exception e) {
fail("Failed to read from topic: " + e.getMessage());
}
}
/**
* 验证指定主题中是否存在特定消息(带有重试机制)
*
* @param expectedMessage 预期消息内容
* @param maxRetries 最大重试次数
*/
private void assertMessageInTopicWithRetries(String expectedMessage, int maxRetries) {
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(financeTransactionService.getServiceUrl())
.build();
Reader<String> reader = client.newReader(Schema.STRING)
.topic(financeTransactionService.getDeadLetterTopicName())
.startMessageId(MessageId.earliest)
.create()) {
int retryCount = 0;
while (retryCount < maxRetries) {
if (reader.hasMessageAvailable()) {
Message<String> msg = reader.readNext();
if (msg.getValue().equals(expectedMessage)) {
return;
}
}
retryCount++;
Thread.sleep(1000); // 等待1秒后重试
}
fail("Expected message not found in dead letter topic after retries: " + expectedMessage);
} catch (Exception e) {
fail("Failed to read from dead letter topic: " + e.getMessage());
}
}
}
发送消息:
curl -X POST http://localhost:8080/send-message\?message\=transaction\:1 curl -X POST http://localhost:8080/send-message\?message\=transaction\:2 curl -X POST http://localhost:8080/send-message\?message\=transaction\:3 curl -X POST http://localhost:8080/send-message\?message\=transaction\:4
日志:
Received message: transaction:1 Received message: transaction:2 Received message: transaction:3 Received message: transaction:4