Kafka实战 - 06 Kafka消费者:从指定Topic拉取工单处置记录信息并存入MongoDB数据库
生活随笔
收集整理的這篇文章主要介紹了
Kafka实战 - 06 Kafka消费者:从指定Topic拉取工单处置记录信息并存入MongoDB数据库
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
文章目錄
- 1. 處置記錄表 t_disposal_record
- 2. kafka 主題和消費者配置
- 3. 定義一個線程任務 KafkaTask
- 1. kafka Topic中的數據:KafkaDisposalRecordDto對象
- 2. 對外暴露的處置記錄參數的公共接口 IDisposalRecordDto
- 3. 將KafkaDisposalRecordDto對象轉為DisposalRecord對象
- 4. 計算一個處置記錄的md5值
- 5. md5加密算法 Md5Encrypt
- 6. 存入數據庫的處置記錄對象 DisposalRecord
- 4. 利用線程池執行線程任務 IncidentRecordServiceImpl
1. 處置記錄表 t_disposal_record
{"_id": ObjectId("63934cecde4dda7027bd8f6f"),"uuId": "fx-in_rec-fec81183a3934ce0424e28","opCode": "executeSoar","opUserId": "0","opUsername": "系統執行","opTags": [ ],"comment": "","additional": "執行劇本:自動劇本1個","attachFileId": "","attachFilename": "","whiteIds": [ ],"soarRunId": NumberInt("2"),"createTime": NumberLong("1670597868"),"incidentId": "0594d971-8c65-430a-a8b7-959b11dd825d","md5Repeat": "ee1877b2d431e973445a52a62486a427" }2. kafka 主題和消費者配置
ngsoc:kafka:clusters:- name: ngsoc#{{$data := json (getv "/ngsoc/kafka/common/cluster/conn_info")}}bootstrap-servers: #{{range $data.route}}- '{{.host}}:{{.port}}' #{{end}}topics:- name: NGSOC_INCIDENT_RECORDpartition: 1replication: 2consumers:- name: incidentRecordgroup-id: restore-1topics:- NGSOC_INCIDENT_RECORDenable-auto-commit: false3. 定義一個線程任務 KafkaTask
public interface IIncidentRecordService {/*** 寫處置記錄** @param recordDto 處置記錄內容*/void writeRecord(IDisposalRecordDto recordDto);/*** 批量寫處置記錄** @param recordDtos 處置記錄列表*/void writeRecords(List<IDisposalRecordDto> recordDtos); } @Slf4j public class KafkaTask implements Runnable {@Setterprivate KafkaConsumer<String, String> kafkaConsumer;@Setterprivate IIncidentRecordService incidentRecordService;private final AtomicBoolean runningFlag;public KafkaTask() {this.runningFlag = new AtomicBoolean(true);}public void stop() {synchronized (this.runningFlag) {log.info("stop kafka task");this.runningFlag.set(false);}}@Overridepublic void run() {while (runningFlag.get()) {try {// 目前kafka采用自動提交,后續要設計execute方法拋出的異常,然后采用手動提交的方式// execute方法拋出兩種異常,一種異常是不提交,數據需要重復消費,一種異常是提交,數據需要丟棄ConsumerRecords<String, String> records = this.kafkaConsumer.poll(Duration.ofMillis(10000));if (records.count() > 0) {log.info("poll {} record", records.count());execute(records);}this.kafkaConsumer.commitSync();} catch (Exception ignore) {// 可以識別出來的異常在execute方法中進行處理,無法識別的異常拋出來在外層處理// 此處需要抉擇一下,出現異常的時候是丟棄數據,還是再次消費// 如果再次消費,且這條數據自身有問題導致一直消費不成功,會造成后面的數據一直無法消費}}}private void execute(ConsumerRecords<String, String> records) {List<IDisposalRecordDto> recordDtoList = new ArrayList<>();if (!records.isEmpty()) {try {records.iterator().forEachRemaining(record -> {KafkaDisposalRecordDto kafkaDisposalRecordDto = checkAndGet(record.value());if (kafkaDisposalRecordDto != null) {recordDtoList.add(kafkaDisposalRecordDto);}});// 將拉取的處置記錄信息存入mongodb數據庫this.incidentRecordService.writeRecords(recordDtoList);} catch (DuplicateKeyException | MongoException e) {// 操作mongo數據庫出現的異常log.warn(e.getMessage());} catch (Exception other) {// 其他未知異常,記錄一下異常數據方便追溯問題log.warn("abnormal data: {}", recordDtoList, other);throw other;}}}/*** 反序列化kafka中的數據* * @param value kafka中的數據* @return 對象*/private KafkaDisposalRecordDto checkAndGet(String value) {try {// 反序列化Kafka Topic中的數據KafkaDisposalRecordDto recordDto = JsonTranscodeUtil.transcode(value, KafkaDisposalRecordDto.class);assert recordDto != null;recordDto.validate();return recordDto;} catch (JsonProcessingException e) {log.warn("failed to deserialize kafka value: {}", value, e);} catch (DisposalRecordValidateException e) {log.warn("failed to validate kafka value: {}", value, e);}return null;} }1. kafka Topic中的數據:KafkaDisposalRecordDto對象
@Data public class KafkaDisposalRecordDto implements IDisposalRecordDto {/*** 操作碼,{@link OperateTypeEnum#getOpCode}*/private String opType;/*** 操作用戶id*/private String opUserId;/*** 操作用戶名*/private String opUsername;/*** 處置記錄標簽*/private List<String> opTags;/*** 備注信息*/private String comment;/*** 更多信息*/private String additional;/*** 附件id*/private String attachFileId;/*** 附件名稱*/private String attachFilename;/*** 關聯的白名單id*/private List<String> whiteIds;/*** soar執行記錄關聯的任務ID*/private Integer soarRunId;/*** 關聯的工單ID*/private Integer orderId;/*** 秒級時間戳,{@link DisposalRecord#getCreateTime}*/private Long createTime;/*** 綁定的安全告警或事件id*/private String incidentId;@Overridepublic DisposalRecord toDisposalRecord() throws DisposalRecordConverterException {try {// 將KafkaDisposalRecordDto對象轉為存入數據庫的DisposalRecord對象return KafkaDisposalRecordDtoToDisposalRecordConverter.convert(this);} catch (UnsupportedEncodingException | NoSuchAlgorithmException e) {throw new DisposalRecordConverterException(e);}}@Overridepublic void validate() throws DisposalRecordValidateException {// 必填字段校驗this.notNullOrThrow(opType, "opType must exist !");this.notNullOrThrow(createTime, "createTime must exist !");this.notNullOrThrow(incidentId, "incidentId must exist !");} }2. 對外暴露的處置記錄參數的公共接口 IDisposalRecordDto
/*** 所有對外暴露的處置記錄參數的公共接口*/ public interface IDisposalRecordDto {/*** 將自身轉換為處置記錄對象* * @return 處置記錄對象* @throws DisposalRecordConverterException 轉換失敗時拋出此異常*/DisposalRecord toDisposalRecord() throws DisposalRecordConverterException;/*** 驗證當前對象是否合法* * @throws DisposalRecordValidateException 驗證失敗拋出此異常*/default void validate() throws DisposalRecordValidateException {}/*** 驗證給定對象是否為空,如果為空則拋出異常** @param obj 待驗證對象* @param errMsg 異常信息* @throws DisposalRecordValidateException ex*/default void notNullOrThrow(Object obj, String errMsg) throws DisposalRecordValidateException {Optional.ofNullable(obj).orElseThrow(() -> new DisposalRecordValidateException(String.format("record: %s, errMsg: %s", this, errMsg)));} }3. 將KafkaDisposalRecordDto對象轉為DisposalRecord對象
public class KafkaDisposalRecordDtoToDisposalRecordConverter {private KafkaDisposalRecordDtoToDisposalRecordConverter() {}public static DisposalRecord convert(KafkaDisposalRecordDto source)throws UnsupportedEncodingException, NoSuchAlgorithmException {DisposalRecord disposalRecord = DisposalRecord.builder().uuId(CommonUtil.incidentRecordUuid()).opCode(source.getOpType()).opUserId(source.getOpUserId()).opUsername(source.getOpUsername()).opTags(source.getOpTags()).comment(source.getComment()).additional(source.getAdditional()).attachFileId(source.getAttachFileId()).attachFilename(source.getAttachFilename()).whiteIds(source.getWhiteIds()).soarRunId(source.getSoarRunId()).orderId(source.getOrderId()).createTime(source.getCreateTime()).incidentId(source.getIncidentId()).build();disposalRecord.setMd5Repeat(CommonUtil.md5RepeatForDisposalRecord(disposalRecord));return disposalRecord;} }4. 計算一個處置記錄的md5值
public class CommonUtil {/*** 生成incident record模塊的uuId* * @return uuId字符串*/public static String incidentRecordUuid() {return UuidUtils.generated("in_rec");}/*** 計算一個處置記錄的md5值* * @param disposalRecord 處置記錄* @return md5* @throws UnsupportedEncodingException ex* @throws NoSuchAlgorithmException ex*/public static String md5RepeatForDisposalRecord(DisposalRecord disposalRecord)throws UnsupportedEncodingException, NoSuchAlgorithmException {return Md5Encrypt.getMd5HexString(disposalRecord.toString());} }5. md5加密算法 Md5Encrypt
/*** md5加密算法*/ public class Md5Encrypt {/*** md加密算法** @param str 加密字符串* @return md5加密生成的16進制字符串*/public static String getMd5HexString(String str) throws NoSuchAlgorithmException, UnsupportedEncodingException {MessageDigest m = MessageDigest.getInstance("MD5");m.update(str.getBytes("UTF8"));byte[] s = m.digest();String result = "";for (int i = 0; i < s.length; i++) {result += Integer.toHexString((0x000000FF & s[i]) | 0xFFFFFF00).substring(6);}return result;} }6. 存入數據庫的處置記錄對象 DisposalRecord
@Data @Document("t_disposal_record") @Builder @NoArgsConstructor @ToString(exclude = { "uuId", "md5Repeat" }) public class DisposalRecord {/*** 唯一標識*/@Field("uuId")@Indexed(unique = true)private String uuId;/*** 操作碼,對應唯一一種操作類型*/@Field("opCode")private String opCode;/*** 操作用戶(用戶id)*/@Field("opUserId")private String opUserId;/*** 操作用戶(用戶名)*/@Field("opUsername")private String opUsername;/*** 標簽*/@Field("opTags")private List<String> opTags;/*** 備注*/@Field("comment")private String comment;/*** 更多信息*/@Field("additional")private String additional;/*** 附件(文件id)*/@Field("attachFileId")private String attachFileId;/*** 附件(文件名)*/@Field("attachFilename")private String attachFilename;/*** 白名單id列表*/@Field("whiteIds")private List<String> whiteIds;/*** soar執行記錄關聯的任務ID*/@Field("soarRunId")private Integer soarRunId;/*** 關聯的工單ID*/@Field("orderId")private Integer orderId;/*** 創建時間(秒級時間戳)*/@Field("createTime")private Long createTime;/*** 關聯事件id*/@Field("incidentId")private String incidentId;/*** uuId 以外的其他字段計算md5值*/@Field("md5Repeat")@Indexed(unique = true)private String md5Repeat;public void setOpUserId(String opUserId) {this.opUserId = getOrDefault(opUserId);}public void setOpUsername(String opUsername) {this.opUsername = getOrDefault(opUsername);}public void setOpTags(List<String> opTags) {this.opTags = getOrDefault(opTags);}public void setComment(String comment) {this.comment = getOrDefault(comment);}public void setAdditional(String additional) {this.additional = getOrDefault(additional);}public void setAttachFileId(String attachFileId) {this.attachFileId = getOrDefault(attachFileId);}public void setAttachFilename(String attachFilename) {this.attachFilename = getOrDefault(attachFilename);}public void setWhiteIds(List<String> whiteIds) {this.whiteIds = getOrDefault(whiteIds);}private String getOrDefault(String str) {return str == null ? "" : str;}private <T> List<T> getOrDefault(List<T> list) {return list == null ? Collections.emptyList() : list;} }4. 利用線程池執行線程任務 IncidentRecordServiceImpl
@Service @Slf4j public class IncidentRecordServiceImpl implements IIncidentRecordService, InitializingBean {@Setter(onMethod_ = @Autowired)private MongoTemplate incidentMongoTemplate;@Setter(onMethod_ = { @Autowired, @Qualifier("incidentRecordKafkaConsumer") })private KafkaConsumer<String, String> incidentRecordKafkaConsumer;// kafka 線程任務private KafkaTask kafkaTask;// 創建一個線程池private static final ExecutorService EXECUTOR = new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new SimpleThreadFactory("kafka-thread-"));@Overridepublic void afterPropertiesSet() {this.kafkaTask = new KafkaTask();kafkaTask.setIncidentRecordService(this);kafkaTask.setKafkaConsumer(this.incidentRecordKafkaConsumer);// 執行線程任務EXECUTOR.execute(kafkaTask);}@PreDestroypublic void destroy() {this.kafkaTask.stop();EXECUTOR.shutdown();}@Overridepublic void writeRecord(IDisposalRecordDto recordDto) {try {incidentMongoTemplate.insert(Objects.requireNonNull(recordDto.toDisposalRecord()));} catch (DisposalRecordConverterException e) {log.warn("處置記錄生成失敗,record: {}", recordDto);}}@Overridepublic void writeRecords(List<IDisposalRecordDto> recordDtos) {List<DisposalRecord> records = new ArrayList<>();List<String> existMd5Repeat = new ArrayList<>();recordDtos.forEach(recordDto -> {try {DisposalRecord record = recordDto.toDisposalRecord();if (!existMd5Repeat.contains(record.getMd5Repeat())) {// 根據md5Repeat去重records.add(record);existMd5Repeat.add(record.getMd5Repeat());}} catch (DisposalRecordConverterException e) {log.warn("處置記錄生成失敗, record: {}", recordDto);log.debug(e.getMessage(), e);}});incidentMongoTemplate.insert(records, DisposalRecord.class);} } public class SimpleThreadFactory implements ThreadFactory {private final AtomicInteger threadNumber = new AtomicInteger(1);private final String prefix;public SimpleThreadFactory(String prefix) {this.prefix = prefix;}@Overridepublic Thread newThread(@NotNull Runnable r) {return new Thread(r, prefix + threadNumber.getAndIncrement());} }總結
以上是生活随笔為你收集整理的Kafka实战 - 06 Kafka消费者:从指定Topic拉取工单处置记录信息并存入MongoDB数据库的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: jsp中应用Aplication统计访问
- 下一篇: 不注册微服务号如何使用Java实现每日给