javascript
Kafka在Spring项目中的实战演练
一、消費者
1.配置類
我們定義了一個單獨的kafka配置類,然后在配置文件中創建該bean對象,讓Spring去統一管理。這里我們設計成一個topic一個配置類,我們就可以去根據不同的需求去定制化不同的配置。
MQConsumerConfig.java
public class MQConsumerConfig {private Properties props = new Properties();private static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";private static final String GROUP_ID_CONFIG = "group.id";private static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";private static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";private static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";private static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";private static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";private static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";private static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";private static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";private static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes";private static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";private static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";private static final String CLIENT_ID_CONFIG = "client.id";private static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";private static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";private static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";private static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";private static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";private static final String CHECK_CRCS_CONFIG = "check.crcs";private static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";private static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";private static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";private static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";private static final String COMMIT_INTERVAL_CONFIG = "commit_interval";private static final String CONSUMER_TIMEOUT_CONFIG = "consumer_timeout";private static final String RESUB_TIME_CONFIG = "resub_time";private static final String RESUB_INTERVAL_CONFIG = "resub_interval";private String bootstrap_servers = "";private String group_id = "hds";private int session_timeout_ms = 30000;private int heartbeat_interval_ms = 10000;private boolean enable_auto_commit = false;private int auto_commit_interval_ms = 1000;private String auto_offset_reset = "latest";private int fetch_min_bytes = 1;private int fetch_max_wait_ms = 500;private long metadata_max_age_ms = 300000L;private int max_partition_fetch_bytes = 1048576;private int send_buffer_bytes = 131072;private int receive_buffer_bytes = 32768;private String client_id = "";private long reconnect_backoff_ms = 50L;private long retry_backoff_ms = 100L;private long metrics_sample_window_ms = 30000L;private int metrics_num_samples = 2;private String metric_reporters = "";private boolean check_crcs = true;private long connections_max_idle_ms = 540000L;private int request_timeout_ms = 40000;private String key_deserializer = "org.apache.kafka.common.serialization.StringDeserializer";private String value_deserializer = "com.uxin.commons.kafka.hessian2.MQDeserializer";private int commit_interval = 1;private long consumer_timeout = 100L;private int resub_time = 1;private long resub_interval = 1000L; ?public MQConsumerConfig() {this.props.put("bootstrap.servers", this.bootstrap_servers);this.props.put("group.id", this.group_id);this.props.put("session.timeout.ms", this.session_timeout_ms);this.props.put("heartbeat.interval.ms", this.heartbeat_interval_ms);this.props.put("enable.auto.commit", this.enable_auto_commit);this.props.put("auto.commit.interval.ms", this.auto_commit_interval_ms);this.props.put("auto.offset.reset", this.auto_offset_reset);this.props.put("fetch.min.bytes", this.fetch_min_bytes);this.props.put("fetch.max.wait.ms", this.fetch_max_wait_ms);this.props.put("metadata.max.age.ms", this.metadata_max_age_ms);this.props.put("max.partition.fetch.bytes", this.max_partition_fetch_bytes);this.props.put("send.buffer.bytes", this.send_buffer_bytes);this.props.put("receive.buffer.bytes", this.receive_buffer_bytes);this.props.put("client.id", this.client_id);this.props.put("reconnect.backoff.ms", this.reconnect_backoff_ms);this.props.put("retry.backoff.ms", this.retry_backoff_ms);this.props.put("metrics.sample.window.ms", this.metrics_sample_window_ms);this.props.put("metrics.num.samples", this.metrics_num_samples);this.props.put("metric.reporters", this.metric_reporters);this.props.put("check.crcs", this.check_crcs);this.props.put("connections.max.idle.ms", this.connections_max_idle_ms);this.props.put("request.timeout.ms", this.request_timeout_ms);this.props.put("key.deserializer", this.key_deserializer);this.props.put("value.deserializer", this.value_deserializer);this.props.put("commit_interval", this.commit_interval);this.props.put("consumer_timeout", this.consumer_timeout);this.props.put("resub_time", this.resub_time);this.props.put("resub_interval", this.resub_interval);} ?public Properties getProps() {return this.props;} ?public String toString() {return this.getProps().toString();} ?public String getBootstrap_servers() {return this.bootstrap_servers;} ?public void setBootstrap_servers(String bootstrap_servers) {this.bootstrap_servers = bootstrap_servers;this.props.put("bootstrap.servers", bootstrap_servers);} }spring-main.xml
這里單獨對kafka的地址進行配置,在不同的開發環境中,比如dev、test、prod等環境中配置不同的地址。
<!-- 送禮kafka隊列 --><bean id="giftConsumerConfig" class="com.uxin.zb.slive.service.config.MQConsumerConfig"><property name="bootstrap_servers" value="${common.mq.kafka.ipport}"/></bean> ?<!-- 送禮kafka隊列(新) --><bean id="sliveGiftConsumerConfig" class="com.uxin.zb.slive.service.config.MQConsumerConfig"><property name="bootstrap_servers" value="${gift.mq.kafka.ipport}"/></bean>2.反序列化類
查看上面的配置類,可以看到我們針對消息的value自定義了我們的反序列化類,這里使用了hessian。
hessian跟Serializable序列化的比較:
hessian序列化的效率更高,且序列化的數據更小,在基于RPC的調用方式中性能更好。
MQDeserializer.java
public class MQDeserializer implements Deserializer<MQEntry> {public MQDeserializer() {} ?public void configure(Map<String, ?> configs, boolean isKey) {} ?public MQEntry deserialize(String topic, byte[] data) {try {return (MQEntry)Hessian2Serialization.deserialize(data, MQEntry.class);} catch (Exception var4) {throw new SerializationException(topic + " JsonConverter deserializer error", var4);}} ?public void close() {} }3.消息體
消息是我們自定義的一個類
MQEntry.java
public class MQEntry<T> implements Serializable {private static final long serialVersionUID = 7485793906662475274L;public String topic;public T body;public Long produceTime; ?public MQEntry(String topic, T body) {this.topic = topic;this.body = body;this.produceTime = System.currentTimeMillis();} ?public T getBody() {return this.body;} ?public void setBody(T body) {this.body = body;} ?public String getTopic() {return this.topic;} ?public void setTopic(String topic) {this.topic = topic;} ?public Long getProduceTime() {return this.produceTime;} ?public String toString() {return ToStringBuilder.reflectionToString(this);} }4.消息接收類
我們這里運用了Runnable的線程方法,設計出了一個抽象類。然后每個topic會單獨實現該抽象類,然后自己實現對應的消費消息process()方法。下面是整個方法的實現。
ConfigurableAbstractMQConsumer.java
public abstract class ConfigurableAbstractMQConsumer<T> implements Runnable {private final Logger logger = LoggerFactory.getLogger(this.getClass());private final AtomicBoolean running = new AtomicBoolean(false);private String topic;private String groupId;private Long afterStop = 1000L;private final AtomicBoolean restartAfterException = new AtomicBoolean(false);private AtomicLong threadNum = new AtomicLong(1L);private MQConsumerConfig consumerConfig;private ThreadPoolExecutor executor;private MQExceptionHandler handler;private KafkaConsumer<String, T> consumer; ?public ConfigurableAbstractMQConsumer() {} ?protected void init(MQConsumerConfig consumerConfig) {this.init(consumerConfig, (ThreadPoolExecutor)null, (MQExceptionHandler)null);} ?protected void init(MQConsumerConfig consumerConfig, ThreadPoolExecutor executor) {this.init(consumerConfig, executor, (MQExceptionHandler)null);} ?protected void init(MQConsumerConfig consumerConfig, MQExceptionHandler handler) {this.init(consumerConfig, (ThreadPoolExecutor)null, handler);} ?protected void init(MQConsumerConfig consumerConfig, ThreadPoolExecutor executor, MQExceptionHandler handler) {this.logger.debug("{} init begin ", this.getClass().getSimpleName());this.analysisAnnotation();if (StringUtils.isBlank(consumerConfig.getGroup_id())) {consumerConfig.setGroup_id(this.groupId);} else {this.groupId = consumerConfig.getGroup_id();} ?Preconditions.checkNotNull(this.topic, "[ConfigurableAbstractMQConsumer]topic is null");Preconditions.checkNotNull(this.groupId, "[ConfigurableAbstractMQConsumer]groupId is null");Preconditions.checkNotNull(consumerConfig, "[ConfigurableAbstractMQConsumer]consumerConfig is null");Preconditions.checkArgument(StringUtils.isNotBlank(consumerConfig.getBootstrap_servers()), "[ConfigurableAbstractMQConsumer]bootstrap.servers is null");this.consumerConfig = consumerConfig;if (executor == null) {this.executor = this.defaultExecutor();this.logger.warn("{} init, no executor found, use default executor", this.getClass().getSimpleName());} else {this.executor = executor;} ?if (handler == null) {this.handler = new ConfigurableAbstractMQConsumer.DefaultMQExceptionHandler();this.logger.warn("{} init, no exception handler found, use default handler", this.getClass().getSimpleName());} else {this.handler = handler;} ?this.logger.debug("{} init end , consumerConfig:{}, executor:{}, handler:{}", new Object[]{this.getClass().getSimpleName(), consumerConfig, executor, handler});} ?protected abstract void process(T var1); ?private void analysisAnnotation() {try {Class<T> paramClazz = (Class)((ParameterizedType)this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];Method method = Class.forName(this.getClass().getName()).getDeclaredMethod("process", paramClazz);KafkaAnnotation kafkaAnnotation = (KafkaAnnotation)method.getAnnotation(KafkaAnnotation.class);this.topic = kafkaAnnotation.topic();this.afterStop = kafkaAnnotation.afterStop();this.groupId = StringUtils.isNotBlank(kafkaAnnotation.groupId()) ? kafkaAnnotation.groupId() : KafkaConfig.getGroupId();} catch (Exception var4) {this.logger.error("analysisAnnotation error", var4);} ?} ?private void initConsumer() {this.consumer = new KafkaConsumer(this.consumerConfig.getProps());} ?private void process0(ConsumerRecord<String, T> record) {this.executor.submit(() -> {try {this.process(record.value());} catch (Exception var3) {this.handlerException(new MQEvent("in-executor", this.groupId, record, var3));} ?});} ?private void handlerException(MQEvent event) {if (event != null && event.exception() != null && event.exception() instanceof IllegalStateException) {this.logger.error("MQ process error handle IllegalStateException Topic:({}) Group:({}) Consumer:({})", new Object[]{this.topic, this.groupId, this.getClass().getSimpleName(), event.exception()});if (KafkaConfig.getConsumerRecreateSwitcher()) {this.running.set(false);this.restartAfterException.set(true);}} ?this.logger.error("MQ process error event:{}", event);this.handler.handler(this.consumer, event);} ?public void run() {if (this.running.get()) {throw new IllegalStateException("Topic:(" + this.topic + ") Group:(" + this.groupId + ") Consumer:(" + this.getClass().getSimpleName() + ") has already running.");} else {this.logger.info("begin subscribe Topic:({}) Group:({}) Consumer:({})", new Object[]{this.topic, this.groupId, this.getClass().getSimpleName()}); ?try {this.consumer.subscribe(Arrays.asList(this.topic));AtomicLong count = new AtomicLong();this.running.set(true); ?while(this.running.get()) {long n1 = System.currentTimeMillis();long n2 = 0L;ConsumerRecords records = null; ?try {records = this.consumer.poll(this.consumerConfig.getConsumer_timeout());n2 = System.currentTimeMillis();} catch (WakeupException var29) {this.logger.error("Topic:({}) Group:({}) Consumer:({}) poll error 4 wakeup exception", new Object[]{this.topic, this.groupId, this.getClass().getSimpleName()});} catch (IllegalStateException var30) {this.logger.error("Topic:({}) Group:({}) Consumer:({}) poll error 4 IllegalStateException", new Object[]{this.topic, this.groupId, this.getClass().getSimpleName(), var30});if (KafkaConfig.getConsumerRecreateSwitcher()) {this.restartAfterException.set(true);break;}} catch (Exception var31) {this.logger.error("Topic:({}) Group:({}) Consumer:({}) poll error", new Object[]{this.topic, this.groupId, this.getClass().getSimpleName(), var31});continue;} ?if (records != null && !records.isEmpty()) {Iterator var7 = records.iterator(); ?while(var7.hasNext()) {ConsumerRecord record = (ConsumerRecord)var7.next(); ?try {long t0 = System.currentTimeMillis();this.process0(record);long t1 = System.currentTimeMillis();long t2 = System.currentTimeMillis();long tp = 0L; ?try {if (record.value() instanceof MQEntry) {MQEntry entry = (MQEntry)record.value();tp = entry.getProduceTime() != null ? entry.getProduceTime() : 0L;} else if (record.value() instanceof String) {JSONObject object = JSONObject.parseObject((String)record.value());tp = object.getLongValue("produceTime");}} catch (Throwable var27) {this.logger.error("{} get produceTime error", record, var27);} ?long t3 = System.currentTimeMillis();this.logger.info("MQ get {}, group:{}, commit:{}, receive:{}, cost:{},{},{},{}", new Object[]{record, this.groupId, commit, n1 - tp, t3 - t0, t3 - t2, t2 - t1, t1 - t0});} catch (Exception var28) {this.handlerException(new MQEvent("do-executor", this.groupId, record, var28));}} ?long n3 = System.currentTimeMillis();this.logger.info("MQ schedule fetch message, Topic:{}, Group:{}, cost:{},{},{},{}", new Object[]{this.topic, this.groupId, n3 - n1, n3 - n2, n2 - n1});} else {this.logger.trace("Topic:({}) Group:({}) Consumer:({}) poll records is null", new Object[]{this.topic, this.groupId, this.getClass().getSimpleName()});}boolean commit = count.incrementAndGet() >= (long)this.consumerConfig.getCommit_interval() && !this.consumerConfig.isEnable_auto_commit();if (commit) {this.consumer.commitSync();count.set(0L);}}} finally {this.running.set(false);this.consumer.close();this.logger.info("Topic:({}) Group:({}) Consumer:({}) closed.", new Object[]{this.topic, this.groupId, this.getClass().getSimpleName()});if (this.restartAfterException.compareAndSet(true, false) && KafkaConfig.getConsumerRecreateSwitcher()) {this.restartUp(this);} ?} ?}} ?public synchronized void restartUp(Runnable runnable) {this.logger.info("restart up, runnable:{}", runnable); ?try {Thread.sleep(3000L);} catch (InterruptedException var3) {} ?this.initConsumer();(new Thread(runnable, "ConfigurableAbstractMQConsumer-" + this.topic + "-" + this.groupId + "-" + this.threadNum.getAndIncrement())).start();} ?public synchronized void startup() {if (this.running.get()) {try {this.stop();Thread.sleep(this.afterStop);} catch (Exception var2) {}} ?this.initConsumer();StatLog.registerExecutor("MQ-Consumer-Executor-" + this.topic + "-" + this.groupId, this.executor);(new Thread(this, "ConfigurableAbstractMQConsumer-" + this.topic + "-" + this.groupId + "-" + this.threadNum.getAndIncrement())).start();} ?public boolean stop() {if (this.running.compareAndSet(true, false)) {this.consumer.wakeup();} ?return !this.running.get();} ?public boolean isRunning() {return this.running.get();} ?public String getTopic() {return this.topic;} ?public String getGroupId() {return this.groupId;} ?public MQConsumerConfig getConsumerConfig() {return this.consumerConfig;} ?private ThreadPoolExecutor defaultExecutor() {return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() + 1, 100, 30L, TimeUnit.MINUTES, new ArrayBlockingQueue(1000), new ThreadFactory() {private final AtomicLong threadNumber = new AtomicLong(0L); ?public Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("MQ-Consumer-" + ConfigurableAbstractMQConsumer.this.topic + "-" + ConfigurableAbstractMQConsumer.this.groupId + "-" + this.threadNumber.incrementAndGet());return thread;}}, new CallerRunsPolicy());} ?protected MQConsumerConfig deepClone(MQConsumerConfig consumerConfig) {MQConsumerConfig config = new MQConsumerConfig();config.setBootstrap_servers(consumerConfig.getBootstrap_servers());config.setGroup_id(consumerConfig.getGroup_id());config.setSession_timeout_ms(consumerConfig.getSession_timeout_ms());config.setHeartbeat_interval_ms(consumerConfig.getHeartbeat_interval_ms());config.setEnable_auto_commit(consumerConfig.isEnable_auto_commit());config.setAuto_commit_interval_ms(consumerConfig.getAuto_commit_interval_ms());config.setAuto_offset_reset(consumerConfig.getAuto_offset_reset());config.setFetch_min_bytes(consumerConfig.getFetch_min_bytes());config.setFetch_max_wait_ms(consumerConfig.getFetch_max_wait_ms());config.setMetadata_max_age_ms(consumerConfig.getMetadata_max_age_ms());config.setMax_partition_fetch_bytes(consumerConfig.getMax_partition_fetch_bytes());config.setSend_buffer_bytes(consumerConfig.getSend_buffer_bytes());config.setReceive_buffer_bytes(consumerConfig.getReceive_buffer_bytes());config.setClient_id(consumerConfig.getClient_id());config.setReconnect_backoff_ms(consumerConfig.getReconnect_backoff_ms());config.setRetry_backoff_ms(consumerConfig.getRetry_backoff_ms());config.setMetrics_sample_window_ms(consumerConfig.getMetrics_sample_window_ms());config.setMetrics_num_samples(consumerConfig.getMetrics_num_samples());config.setMetric_reporters(consumerConfig.getMetric_reporters());config.setCheck_crcs(consumerConfig.isCheck_crcs());config.setConnections_max_idle_ms(consumerConfig.getConnections_max_idle_ms());config.setRequest_timeout_ms(consumerConfig.getRequest_timeout_ms());config.setKey_deserializer(consumerConfig.getKey_deserializer());config.setValue_deserializer(consumerConfig.getValue_deserializer());config.setCommit_interval(consumerConfig.getCommit_interval());config.setConsumer_timeout(consumerConfig.getConsumer_timeout());config.setResub_time(consumerConfig.getResub_time());config.setResub_interval(consumerConfig.getResub_interval());return config;} ?protected class DefaultMQExceptionHandler implements MQExceptionHandler {protected DefaultMQExceptionHandler() {} ?public void handler(Consumer consumer, MQEvent event) {ConfigurableAbstractMQConsumer.this.logger.debug("[DefaultMQExceptionHandler] topic:{} groupId:{} record:{} event:{}", new Object[]{ConfigurableAbstractMQConsumer.this.topic, ConfigurableAbstractMQConsumer.this.groupId, event});}} }我們把上面這個方法單獨拆分出來分析一下:
我們首先自定義了一個注解:
KafkaAnnotation.java
該注解會在process方法中注解,用來定義該topic的一些屬性,比如topic、是否重試等,這個后面會貼一個消費者代碼作為示例。
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface KafkaAnnotation {String topic() default ""; ?String groupId() default "hds"; ?long afterStop() default 1000L; ?boolean retry() default false; }在初始化方法中,我們會獲取該注解的值,將定義的屬性設置到對應的變量中
? ?private void analysisAnnotation() {try {Class<T> paramClazz = (Class)((ParameterizedType)this.getClass().getGenericSuperclass()).getActualTypeArguments()[0];Method method = Class.forName(this.getClass().getName()).getDeclaredMethod("process", paramClazz);KafkaAnnotation kafkaAnnotation = (KafkaAnnotation)method.getAnnotation(KafkaAnnotation.class);this.topic = kafkaAnnotation.topic();this.afterStop = kafkaAnnotation.afterStop();this.groupId = StringUtils.isNotBlank(kafkaAnnotation.groupId()) ? kafkaAnnotation.groupId() : KafkaConfig.getGroupId();} catch (Exception var4) {this.logger.error("analysisAnnotation error", var4);}}本身是實現了Runnable接口,所以屬于線程類,我們需要有個接口去開啟線程。
? ?private void initConsumer() {this.consumer = new KafkaConsumer(this.consumerConfig.getProps());} ?public synchronized void startup() {if (this.running.get()) {try {this.stop();Thread.sleep(this.afterStop);} catch (Exception var2) {}} ?this.initConsumer();StatLog.registerExecutor("MQ-Consumer-Executor-" + this.topic + "-" + this.groupId, this.executor);(new Thread(this, "ConfigurableAbstractMQConsumer-" + this.topic + "-" + this.groupId + "-" + this.threadNum.getAndIncrement())).start();}在Spring啟動的時候,去獲取所有該實現類,調用startup()開啟線程。
String[] consumerNames = context.getBeanNamesForType(ConfigurableAbstractMQConsumer.class); ? if (consumerNames == null || consumerNames.length == 0) {out.println("there's no [ConfigurableAbstractMQConsumer] found.");return; } ? if (args == null || args.length == 0) { ?out.println("prepare to start all consumers."); ?for (String consumerName : consumerNames) {ConfigurableAbstractMQConsumer consumer = context.getBean(consumerName, ConfigurableAbstractMQConsumer.class);if (consumer != null) {consumer.startup();out.println("start consumer:" + consumer.getClass().getSimpleName() + " for topic:" + consumer.getTopic());}} ? } else { ?List<String> topics = Arrays.asList(StringUtils.split(args[0]));out.println("prepare to start consumers for topics:" + topics); ?if (topics != null && !topics.isEmpty()) {for (String consumerName : consumerNames) {ConfigurableAbstractMQConsumer consumer = context.getBean(consumerName, ConfigurableAbstractMQConsumer.class);if (consumer != null && topics.contains(consumer.getTopic())) {consumer.startup();out.println("start consumer:" + consumer.getClass().getSimpleName() + " for topic:" + consumer.getTopic());}}} } ? out.println("consumer startup done.");run方法里面實現了我們對消息的監聽
//監聽指定了topic的消息 this.consumer.subscribe(Arrays.asList(this.topic)); ? //... //... ? //拉取消息列表records = this.consumer.poll(this.consumerConfig.getConsumer_timeout()); ? ? Iterator var7 = records.iterator(); ? while(var7.hasNext()) {ConsumerRecord record = (ConsumerRecord)var7.next(); ?try {long t0 = System.currentTimeMillis();//調用process0方法,該方法會用線程池去調用process方法處理對應的消息this.process0(record);long t1 = System.currentTimeMillis();boolean commit = count.incrementAndGet() >= (long)this.consumerConfig.getCommit_interval() && !this.consumerConfig.isEnable_auto_commit();if (commit) {//同步提交this.consumer.commitSync();count.set(0L);}5.某個topic的消息接收類
我們將所有topic放入一個常量類來同意管理。
下面是某個topic的類,大概代碼結構如下:
@Component public class GiftBagGiftMQConsumer extends ConfigurableAbstractMQConsumer<MQEntry> {@Autowired@Overrideprotected void init(@Qualifier("giftConsumerConfig") MQConsumerConfig consumerConfig, @Qualifier("mqExecutor") ThreadPoolExecutor executor) {logger.info(">>>>>>>>>>>>>>> 查看kafka config對象:" + consumerConfig);super.init(consumerConfig, executor);} ?@TimerEntrance("big_gift_mq")@TimerMetric@Override@KafkaAnnotation(topic = KafkaTopic.GIFT_BAG_GIFT_QUEUE)public void process(MQEntry entry) {// 具體業務邏輯} }二、生產者
1.發送消息類
MQProducer.java
public class MQProducer {private static Logger Logger = LogProxy.getLogger(MQProducer.class);private static Properties props;private static KafkaProducer<String, MQEntry> producer;private static AtomicInteger recreateCount = new AtomicInteger(0);private static Integer RECREATE_COUNT_LIMIT = 5; ?public MQProducer() {} ?//進行一些必要參數的初始化設置public static synchronized void init() {props = new Properties();props.put("bootstrap.servers", KafkaConfig.getHost());props.put("acks", "1");props.put("retries", 3);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("value.serializer", MQSerializer.class);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");producer = new KafkaProducer(props);} ?public static void send(String group, final MQEntry entry) {if (producer == null) {init();} ?ProducerRecord<String, MQEntry> record = new ProducerRecord(entry.getTopic(), entry);Logger.debug("kafka config host:" + KafkaConfig.getHost()); ?try {//異步發送發送消息producer.send(record, new Callback() {public void onCompletion(RecordMetadata metadata, Exception e) {if (metadata == null) {MQProducer.Logger.error("MQ send topic, metadata is null", e);} else {MQProducer.Logger.info("MQ send topic:{}, offset:{}, entry:{}", new Object[]{entry.getTopic(), metadata.offset(), entry});} ?}});} catch (IllegalStateException var6) {Logger.error("kafka producer send failed catch by IllegalStateException, topic:{} entry:{}", new Object[]{entry.getTopic(), entry, var6});if (KafkaConfig.getProducerRecreateSwitcher()) {try {if (recreateCount.getAndIncrement() < RECREATE_COUNT_LIMIT) {producer.flush();producer.close(1000L, TimeUnit.MILLISECONDS);init();}} catch (Exception var5) {Logger.error("producer close failed, entry:{}", entry, var5);}}} ?} }由于消費者使用了我們自定義了反序列類,這里序列化也是使用我們自定義的。
MQSerializer.java
public class MQSerializer implements Serializer<Object> {public MQSerializer() {} ?public void configure(Map configs, boolean isKey) {} ?public byte[] serialize(String topic, Object data) {try {return Hessian2Serialization.serialize(data);} catch (Exception var4) {throw new SerializationException(topic + " JsonConverter serializer error", var4);}} ?public void close() {} }2.發送消息演示
MQEntry<String> entry = new MQEntry<String>(KafkaTopic.HUAJIAO_DIAMOND_GIFT_QUEUE, JSON.toJSONString(mqMap)); MQProducer.send("", entry);3.改進
這里我們生產者是將配置的參數和發送類放到一個類中的,我們也可以和消費者一樣,自定義一個配置類。
MQProducerConfig.java
public class MQProducerConfig {private Properties props = new Properties();private static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";private static final String METADATA_FETCH_TIMEOUT_CONFIG = "metadata.fetch.timeout.ms";private static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";private static final String BATCH_SIZE_CONFIG = "batch.size";private static final String BUFFER_MEMORY_CONFIG = "buffer.memory";private static final String ACKS_CONFIG = "acks";private static final String TIMEOUT_CONFIG = "timeout.ms";private static final String LINGER_MS_CONFIG = "linger.ms";private static final String CLIENT_ID_CONFIG = "client.id";private static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";private static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";private static final String MAX_REQUEST_SIZE_CONFIG = "max.request.size";private static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";private static final String BLOCK_ON_BUFFER_FULL_CONFIG = "block.on.buffer.full";private static final String RETRIES_CONFIG = "retries";private static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";private static final String COMPRESSION_TYPE_CONFIG = "compression.type";private static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";private static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";private static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";private static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";private static final String PARTITIONER_CLASS_CONFIG = "partitioner.class";private static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";private static final String REQUEST_TIMEOUT_MS_CONFIG = "request.timeout.ms";private static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";private static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";private String bootstrap_servers = "";private long metadata_fetch_timeout_ms = 60000L;private long metadata_max_age_ms = 300000L;private int batch_size = 16384;private long buffer_memory = 33554432L;private String acks = "1";private int timeout_ms = 30000;private long linger_ms = 1L;private String client_id = "";private int send_buffer_bytes = 131072;private int receive_buffer_bytes = 32768;private int max_request_size = 1048576;private long reconnect_backoff_ms = 50L;private boolean block_on_buffer_full = false;private int retries = 3;private long retry_backoff_ms = 100L;private String compression_type = "none";private long metrics_sample_window_ms = 30000L;private int metrics_num_samples = 2;private String metric_reporters = "";private int max_in_flight_requests_per_connection = 5;private long connections_max_idle_ms = 540000L;private String partitioner_class = "org.apache.kafka.clients.producer.internals.DefaultPartitioner";private long max_block_ms = 60000L;private int request_timeout_ms = 30000;private String key_serializer = "org.apache.kafka.common.serialization.StringSerializer";private String value_serializer = "com.uxin.commons.kafka.hessian2.MQSerializer"; ?public MQProducerConfig() {this.props.put("bootstrap.servers", this.bootstrap_servers);this.props.put("metadata.fetch.timeout.ms", this.metadata_fetch_timeout_ms);this.props.put("metadata.max.age.ms", this.metadata_max_age_ms);this.props.put("batch.size", this.batch_size);this.props.put("buffer.memory", this.buffer_memory);this.props.put("acks", this.acks);this.props.put("timeout.ms", this.timeout_ms);this.props.put("linger.ms", this.linger_ms);this.props.put("client.id", this.client_id);this.props.put("send.buffer.bytes", this.send_buffer_bytes);this.props.put("receive.buffer.bytes", this.receive_buffer_bytes);this.props.put("max.request.size", this.max_request_size);this.props.put("reconnect.backoff.ms", this.reconnect_backoff_ms);this.props.put("block.on.buffer.full", this.block_on_buffer_full);this.props.put("retries", this.retries);this.props.put("retry.backoff.ms", this.retry_backoff_ms);this.props.put("compression.type", this.compression_type);this.props.put("metrics.sample.window.ms", this.metrics_sample_window_ms);this.props.put("metrics.num.samples", this.metrics_num_samples);this.props.put("metric.reporters", this.metric_reporters);this.props.put("max.in.flight.requests.per.connection", this.max_in_flight_requests_per_connection);this.props.put("connections.max.idle.ms", this.connections_max_idle_ms);this.props.put("partitioner.class", this.partitioner_class);this.props.put("max.block.ms", this.max_block_ms);this.props.put("request.timeout.ms", this.request_timeout_ms);this.props.put("key.serializer", this.key_serializer);this.props.put("value.serializer", this.value_serializer);} ?public Properties getProps() {return this.props;} ?public String toString() {return this.getProps().toString();} ?public String getBootstrap_servers() {return this.bootstrap_servers;} ?public void setBootstrap_servers(String bootstrap_servers) {this.bootstrap_servers = bootstrap_servers;this.props.put("bootstrap.servers", bootstrap_servers);} }ConfigurableKafkaProducer.java
public class ConfigurableKafkaProducer {protected Logger logger = LogProxy.getLogger(this.getClass());private final ConcurrentMap<String, Producer<String, Object>> producer_instance = new ConcurrentHashMap();private MQProducerConfig producerConfig;private static AtomicInteger recreateCount = new AtomicInteger(0);private static Integer RECREATE_COUNT_LIMIT = 5; ?public ConfigurableKafkaProducer() {} ?public void init() {this.logger.debug("{} init begin ", this.getClass().getSimpleName());Preconditions.checkNotNull(this.producerConfig, "[ConfigurableKafkaProducer]producerConfig is null");Preconditions.checkArgument(StringUtils.isNotBlank(this.producerConfig.getBootstrap_servers()), "[ConfigurableKafkaProducer]bootstrap.servers is null");this.logger.debug("{} init end , producerConfig:{}", this.getClass().getSimpleName(), this.producerConfig);} ?private Producer<String, Object> getProducer(String topic) {Producer<String, Object> producer = (Producer)this.producer_instance.get(topic);if (producer != null) {return producer;} else {synchronized(this) {producer = (Producer)this.producer_instance.get(topic);if (producer != null) {return producer;} else {this.logger.info("KafkaProducer new producer, topic:{}, config:{}", topic, this.producerConfig);Producer<String, Object> newProducer = new KafkaProducer(this.producerConfig.getProps());Producer<String, Object> old = (Producer)this.producer_instance.putIfAbsent(topic, newProducer);if (old != null) {this.logger.warn("kafka new producer warn,too many producer ,but be killed");newProducer.close();return old;} else {return newProducer;}}}}} ?public void send(String topic, Object entry) {Producer producer = this.getProducer(topic); ?try {producer.send(new ProducerRecord(topic, entry), (metadata, e) -> {if (metadata == null) {this.logger.error("MQ send topic, metadata is null", e);} else {this.logger.info("MQ send topic:{}, offset:{}, entry:{}", new Object[]{topic, metadata.offset(), entry});} ?});} catch (IllegalStateException var7) {this.logger.error("MQ send failed, catch by IllegalStateException topic:{}, entry:{}", new Object[]{topic, entry, var7});if (KafkaConfig.getProducerRecreateSwitcher()) {try {if (recreateCount.getAndIncrement() < RECREATE_COUNT_LIMIT) {producer.flush();producer.close(1000L, TimeUnit.MILLISECONDS);this.producer_instance.remove(topic);}} catch (Exception var6) {this.logger.error("producer close failed, topic:{} entry:{}", new Object[]{topic, entry, var7});}}} ?} ?public void send(String topic, Object entry, boolean isLog) {Producer producer = this.getProducer(topic); ?try {producer.send(new ProducerRecord(topic, entry), (metadata, e) -> {if (metadata == null) {this.logger.error("MQ send topic, metadata is null", e);} else if (isLog) {this.logger.info("MQ send topic:{}, offset:{}, entry:{}", new Object[]{topic, metadata.offset(), entry});} ?});} catch (IllegalStateException var8) {this.logger.error("MQ send failed, catch by IllegalStateException topic:{}, entry:{}", new Object[]{topic, entry, var8});if (KafkaConfig.getProducerRecreateSwitcher()) {try {if (recreateCount.getAndIncrement() < RECREATE_COUNT_LIMIT) {producer.flush();producer.close(1000L, TimeUnit.MILLISECONDS);this.producer_instance.remove(topic);}} catch (Exception var7) {this.logger.error("producer close failed, topic:{} entry:{}", new Object[]{topic, entry, var8});}}} ?} ?public void setProducerConfig(MQProducerConfig producerConfig) {this.producerConfig = producerConfig;} ?public MQProducerConfig getProducerConfig() {return this.producerConfig;} }這樣,我們也可以自定義出不同配置的發送者,去根據不同情況使用不同的Producer。
<!--mq producer--><bean id="giftKafkaProducerConfig" class="com.uxin.commons.kafka.MQProducerConfig"><property name="bootstrap_servers" value="${gift.mq.kafka.ipport}" /></bean> <bean id="giftProducer" class="com.uxin.commons.kafka.ConfigurableKafkaProducer" init-method="init"><property name="producerConfig" ref="giftKafkaProducerConfig" /></bean>?
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的Kafka在Spring项目中的实战演练的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kafka消费组与重平衡机制详解
- 下一篇: ZooKeeper的典型应用