SpringBoot與Cassandra整合,實現高寫入吞吐量用戶事件存儲系統
作者:Java知識日歷
隨著我們的app使用人數快速發展,用戶行為數據變得越來越重要,這些數據可以用來優化產品設計、提升用戶體驗、實施精準營銷策略,并支持業務決策。然而,處理海量的用戶行為數據需要高效的存儲和分析能力,傳統的數據庫方案往往難以滿足這些需求。
隨著我們的app使用人數快速發展,用戶行為數據變得越來越重要,這些數據可以用來優化產品設計、提升用戶體驗、實施精準營銷策略,并支持業務決策。然而,處理海量的用戶行為數據需要高效的存儲和分析能力,傳統的數據庫方案往往難以滿足這些需求。
為什么選擇使用Cassandra作為用戶事件存儲解決方案?
- 高性能寫入:Cassandra的設計使得它非常適合高寫入場景。它的寫操作是無鎖的,并且所有節點都可以接受寫請求,這大大提高了寫入性能。
- 一致性模型:Cassandra提供了多種一致性級別(如ONE, QUORUM, ALL等),可以根據具體需求調整,以平衡一致性和可用性。
- 自動分片:Cassandra會自動將數據分布在集群中的多個節點上,確保負載均衡。
- 動態添加節點:可以隨時向集群中添加新的節點,以提高容量和性能,而無需停機維護。
- 快速讀取:Cassandra支持高效的點查詢和范圍查詢,適合讀取特定用戶的所有事件。
- 二級索引:雖然Cassandra不支持復雜的查詢,但它提供了一定程度的二級索引功能,可以幫助進行更靈活的查詢。
- 多副本復制:Cassandra默認會在多個節點之間復制數據,確保數據的安全性和可靠性。
- 強容錯能力:即使某些節點宕機,其他節點仍然可以繼續提供服務。
- 定期快照和增量備份:支持定期快照和增量備份,方便數據恢復和災難恢復。
- 免費開源:作為Apache軟件基金會的一個頂級項目,Cassandra是免費且開源的。
- 硬件利用率高:Cassandra能夠充分利用現有的硬件資源,減少額外的成本投入。
哪些公司選擇Cassandra?
- Capital One:用于存儲金融交易數據和客戶行為分析。
- Spotify:用于存儲音樂播放記錄、用戶偏好等數據。
- Rovio (Angry Birds):用于存儲游戲內玩家的行為數據和統計信息。
- Uber:用于存儲司機和乘客的位置數據、行程信息等。
- Netflix:用于存儲和處理大量的用戶行為數據,如觀看歷史、搜索記錄等。
- Instagram:用于存儲照片和視頻的元數據,以及用戶活動日志。
- Apple:用于移動設備的推送通知系統和其他內部應用的數據存儲。
- Reddit:用于存儲用戶提交的內容、評論和其他社交數據。
- Zynga:用于存儲在線游戲中的用戶活動和統計數據。
代碼實操
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.0.5</version>
<relativePath/><!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>cassandra-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>cassandra-demo</name>
<description>Demo project for Spring Boot and Cassandra integration</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-cassandra</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
# 配置Cassandra連接信息
spring.data.cassandra.contact-points=localhost # Cassandra節點地址
spring.data.cassandra.port=9042 # Cassandra端口號
spring.data.cassandra.keyspace-name=user_events # keyspace名稱
spring.data.cassandra.schema-action=create_if_not_exists # 如果keyspace不存在則創建
實體類
package com.example.cassandrareactivedemo.model;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.cassandra.core.mapping.Table;
import java.util.Date;
/**
* 用戶事件實體類
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Table("user_events")
publicclass UserEvent {
@Id// 標記為主鍵
private String eventId; // 事件ID
private String userId; // 用戶ID
private String eventType; // 事件類型
private Date eventTime; // 事件發生時間
}
Repository
package com.example.cassandrareactivedemo.repository;
import com.example.cassandrareactivedemo.model.UserEvent;
import org.springframework.data.cassandra.repository.ReactiveCassandraRepository;
import reactor.core.publisher.Flux;
public interface UserEventRepository extends ReactiveCassandraRepository<UserEvent, String> {
/**
* 根據用戶ID查找所有相關的用戶事件
* @param userId 用戶ID
* @return 包含所有匹配用戶的事件流
*/
Flux<UserEvent> findByUserId(String userId);
}
Service
package com.example.cassandrareactivedemo.service;
import com.example.cassandrareactivedemo.model.UserEvent;
import com.example.cassandrareactivedemo.repository.UserEventRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* 用戶事件服務類,處理業務邏輯
*/
@Service
publicclass UserEventService {
@Autowired
private UserEventRepository userEventRepository;
/**
* 保存用戶事件到Cassandra數據庫
* @param userEvent 用戶事件對象
* @return 包含已保存用戶的Mono對象
*/
public Mono<UserEvent> save(UserEvent userEvent) {
return userEventRepository.save(userEvent);
}
/**
* 根據用戶ID查找所有相關的用戶事件
* @param userId 用戶ID
* @return 包含所有匹配用戶的事件流
*/
public Flux<UserEvent> findByUserId(String userId) {
return userEventRepository.findByUserId(userId);
}
}
Controller
package com.example.cassandrareactivedemo.controller;
import com.example.cassandrareactivedemo.model.UserEvent;
import com.example.cassandrareactivedemo.service.UserEventService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/api/user-events")
publicclass UserEventController {
@Autowired
private UserEventService userEventService;
/**
* 創建新的用戶事件
* @param userEvent 用戶事件對象
* @return 包含新創建用戶的Mono對象
*/
@PostMapping("/")
public Mono<UserEvent> createUserEvent(@RequestBody UserEvent userEvent) {
return userEventService.save(userEvent);
}
/**
* 根據用戶ID獲取所有相關用戶事件
* @param userId 用戶ID
* @return 包含所有匹配用戶的事件流
*/
@GetMapping("/{userId}")
public Flux<UserEvent> getUserEventsByUserId(@PathVariable String userId) {
return userEventService.findByUserId(userId);
}
}
Application
package com.example.cassandrareactivedemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class CassandraDemoApplication {
public static void main(String[] args) {
SpringApplication.run(CassandraDemoApplication.class, args);
}
}
測試
創建新的用戶事件
curl -X POST http://localhost:8080/api/user-events/ \
-H "Content-Type: application/json" \
-d '{
"eventId": "event1",
"userId": "user1",
"eventType": "login",
"eventTime": "2025-05-22T20:10:06Z"
}'
Respons
{
"eventId": "event1",
"userId": "user1",
"eventType": "login",
"eventTime": "2025-05-22T20:10:06Z"
}
獲取特定用戶的用戶事件
curl -X GET http://localhost:8080/api/user-events/user1
Respons
[
{
"eventId": "event1",
"userId": "user1",
"eventType": "login",
"eventTime": "2025-05-22T20:10:06Z"
}
]
責任編輯:武曉燕
來源:
Java知識日歷