Redisson 管道批量发送命令流程分析
一、示例代碼
管道功能就是REDIS的批量發送,實際上是客戶端的功能,與服務端無關。相當于把多個請求的命令放在一個數據包通過TCP發送到服務端,然后客戶端再一次性讀取所有的命令回應,節省多次命令的網絡請求。
RBatch rBatch = redissonClient.createBatch();RBatch rBatch = redissonClient.createBatch();rBatch.getBucket("goodsName", StringCodec.INSTANCE).getAsync();rBatch.getSet("goodsSet",StringCodec.INSTANCE).readAllAsync();BatchResult<?> res = rBatch.execute();log.debug(" tt execute end. res:{}",JSONUtil.toJsonStr(res.getResponses()));二、創建批處理和添加命令流程
? ? ?1.創建批處理對象,包括批量命令執行器,并且創建的RBucket等各種REDIS容器對象都會傳入批量命令執行器。
public class RedissonBatch implements RBatch {private final EvictionScheduler evictionScheduler;private final CommandBatchService executorService;private final BatchOptions options;public RedissonBatch(EvictionScheduler evictionScheduler, ConnectionManager connectionManager, BatchOptions options) {this.executorService = new CommandBatchService(connectionManager, options);this.evictionScheduler = evictionScheduler;this.options = options;}@Overridepublic <V> RBucketAsync<V> getBucket(String name) {return new RedissonBucket<V>(executorService, name);}批量命令執行器繼承于通用命令執行器(CommandAsyncService) ,只是重寫了發送命令函數(SendCommand,async(異步發送命令)).
2.rBatch.getBucket("goodsName").getAsync()調用批量命令執行器的異步發送命令。
CommandBatchService @Overridepublic <V, R> void async(boolean readOnlyMode, NodeSource nodeSource,Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect) {if (isRedisBasedQueue()) {boolean isReadOnly = options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC;RedisExecutor<V, R> executor = new RedisQueuedBatchExecutor<>(isReadOnly, nodeSource, codec, command, params, mainPromise,false, connectionManager, objectBuilder, commands, connections, options, index, executed, latch);executor.execute();} else {RedisExecutor<V, R> executor = new RedisBatchExecutor<>(readOnlyMode, nodeSource, codec, command, params, mainPromise, false, connectionManager, objectBuilder, commands, options, index, executed);executor.execute();}}3.接著調用了RedisBatchExecutor.execute方法,
和BaseRedisBatchExecutor.addBatchCommandData public class RedisBatchExecutor<V, R> extends BaseRedisBatchExecutor<V, R> {@Overridepublic void execute() {addBatchCommandData(params);}}public class BaseRedisBatchExecutor<V, R> extends RedisExecutor<V, R> {final ConcurrentMap<MasterSlaveEntry, Entry> commands;final BatchOptions options;final AtomicInteger index;final AtomicBoolean executed;protected final void addBatchCommandData(Object[] batchParams) {MasterSlaveEntry msEntry = getEntry(source);Entry entry = commands.get(msEntry);if (entry == null) {entry = new Entry();Entry oldEntry = commands.putIfAbsent(msEntry, entry);if (oldEntry != null) {entry = oldEntry;}}if (!readOnlyMode) {entry.setReadOnlyMode(false);}Codec codecToUse = getCodec(codec);BatchCommandData<V, R> commandData = new BatchCommandData<V, R>(mainPromise, codecToUse, command, batchParams, index.incrementAndGet());entry.getCommands().add(commandData);}?這里的commands為以主節點為KEY,以待發送命令隊列列表為VALUE(Entry),保存一個MAP.然后會把命令都添加到entry的commands命令隊列中。
public static class Entry {Deque<BatchCommandData<?, ?>> commands = new LinkedBlockingDeque<>();volatile boolean readOnlyMode = true;?三、批量執行命令
1.調用rBatch.executeAsync(),接著會調用到CommandBatchService.executeAsync
CommandBatchServicepublic RFuture<BatchResult<?>> executeAsync() {AtomicInteger slots = new AtomicInteger(commands.size());for (Map.Entry<RFuture<?>, List<CommandBatchService>> entry : nestedServices.entrySet()) {slots.incrementAndGet();for (CommandBatchService service : entry.getValue()) {service.executeAsync();}entry.getKey().onComplete((res, e) -> {handle(voidPromise, slots, entry.getKey());});}for (Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) {RedisCommonBatchExecutor executor = new RedisCommonBatchExecutor(new NodeSource(e.getKey()), voidPromise,connectionManager, this.options, e.getValue(), slots);executor.execute();}return promise;}?2.接著跳到RedisCommonBatchExecutor.execute方法,這個調用了基類RedisExecutor的execute方法,
RedisExecutor public void execute() {codec = getCodec(codec);RFuture<RedisConnection> connectionFuture = getConnection();RPromise<R> attemptPromise = new RedissonPromise<R>();mainPromiseListener = (r, e) -> {if (mainPromise.isCancelled() && connectionFuture.cancel(false)) {log.debug("Connection obtaining canceled for {}", command);timeout.cancel();if (attemptPromise.cancel(false)) {free();}}};if (attempt == 0) {mainPromise.onComplete((r, e) -> {if (this.mainPromiseListener != null) {this.mainPromiseListener.accept(r, e);}});}scheduleRetryTimeout(connectionFuture, attemptPromise);connectionFuture.onComplete((connection, e) -> {if (connectionFuture.isCancelled()) {connectionManager.getShutdownLatch().release();return;}if (!connectionFuture.isSuccess()) {connectionManager.getShutdownLatch().release();exception = convertException(connectionFuture);return;}sendCommand(attemptPromise, connection);writeFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {checkWriteFuture(writeFuture, attemptPromise, connection);}});releaseConnection(attemptPromise, connectionFuture);});attemptPromise.onComplete((r, e) -> {checkAttemptPromise(attemptPromise, connectionFuture);});}?3.接著調用到RedisCommonBatchExecutor.sendCommand方法,進行命令發送。
@Overrideprotected void sendCommand(RPromise<Void> attemptPromise, RedisConnection connection) {boolean isAtomic = options.getExecutionMode() != ExecutionMode.IN_MEMORY;List<CommandData<?, ?>> list = new ArrayList<>(entry.getCommands().size());for (CommandData<?, ?> c : entry.getCommands()) {if ((c.getPromise().isCancelled() || c.getPromise().isSuccess()) && !isWaitCommand(c) && !isAtomic) {// skip commandcontinue;}list.add(c);}writeFuture = connection.send(new CommandsData(attemptPromise, list, options.isSkipResult(), isAtomic, isQueued, options.getSyncSlaves() > 0));}?注意這里的CommandsData里面的命令是一個列表,可以支持多個。
public class CommandsData implements QueueCommand {private final List<CommandData<?, ?>> commands;private final List<CommandData<?, ?>> attachedCommands;public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, boolean queued, boolean syncSlaves) {this(promise, commands, null, false, false, queued, syncSlaves);}public CommandsData(RPromise<Void> promise, List<CommandData<?, ?>> commands, boolean skipResult, boolean atomic, boolean queued, boolean syncSlaves) {this(promise, commands, null, skipResult, atomic, queued, syncSlaves);}4.接著會調用RedisConnection.send方法來發送數據,其實是調用netty中生成的NioSocketChannel來寫入命令數據。
RedisConnection ???????public ChannelFuture send(CommandsData data) {return channel.writeAndFlush(data); }5.netty的channel的writeAndFlush會調用管道中所有的outHandler進行處理。那在這里就是
CommandEncoder,CommandBatchEncoder,這里首先會調用到CommandBatchEncoder的encode方法 CommandBatchEncoder @Overrideprotected void encode(ChannelHandlerContext ctx, CommandsData msg, ByteBuf out) throws Exception {CommandEncoder encoder = ctx.pipeline().get(CommandEncoder.class);for (CommandData<?, ?> commandData : msg.getCommands()) {encoder.encode(ctx, commandData, out);}}這里面就是循環取出命令,逐個調用CommandEncoder單個命令編碼器進行編碼,最后再加到out列表中,一個網絡包發送出去。
?這里就是對兩個命令編碼的過程,接著就是調用socket.write將命令數據發送到服務端。
四、接收命令回應。
1.這個是接收的結果為一次性接收到的網絡數據,格式為REDIS的協議。
2021-11-19 10:14:08.418 tt TRACE 28472 --- [sson-netty-2-23] ? o.r.c.h.CommandDecoder.decode(CommandDecoder.java:114) : reply: $5
xdwww
*2
$4
word
$5
hello
2.接收回調處理,CommandDecoder.decode,在handleResult中會通知結果。
protected void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, boolean skipConvertor, List<CommandData<?, ?>> commandsData) throws IOException {int code = in.readByte();if (code == '+') {String result = readString(in);handleResult(data, parts, result, skipConvertor);} else if (code == '$') {ByteBuf buf = readBytes(in);Object result = null;if (buf != null) {Decoder<Object> decoder = selectDecoder(data, parts);result = decoder.decode(buf, state());}handleResult(data, parts, result, false);} else if (code == '*') {long size = readLong(in);List<Object> respParts = new ArrayList<Object>(Math.max((int) size, 0));state().incLevel();decodeList(in, data, parts, channel, size, respParts, skipConvertor, commandsData);state().decLevel();} else {String dataStr = in.toString(0, in.writerIndex(), CharsetUtil.UTF_8);throw new IllegalStateException("Can't decode replay: " + dataStr);}}?3.handleResult
private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean skipConvertor) {if (data != null && !skipConvertor) {result = data.getCommand().getConvertor().convert(result);}if (parts != null) {parts.add(result);} else {completeResponse(data, result);}}?4.解碼列表。
private void decodeList(ByteBuf in, CommandData<Object, Object> data, List<Object> parts,Channel channel, long size, List<Object> respParts, boolean skipConvertor, List<CommandData<?, ?>> commandsData)throws IOException {if (parts == null && commandsData != null) {for (int i = respParts.size(); i < size; i++) {int suffix = 0;if (RedisCommands.MULTI.getName().equals(commandsData.get(0).getCommand().getName())) {suffix = 1;}CommandData<Object, Object> commandData = (CommandData<Object, Object>) commandsData.get(i+suffix);decode(in, commandData, respParts, channel, skipConvertor, commandsData);if (commandData.getPromise().isDone() && !commandData.getPromise().isSuccess()) {data.tryFailure(commandData.cause());}}} else {for (int i = respParts.size(); i < size; i++) {decode(in, data, respParts, channel, skipConvertor, null);}}MultiDecoder<Object> decoder = messageDecoder(data, respParts);if (decoder == null) {return;}Object result = decoder.decode(respParts, state());decodeResult(data, parts, channel, result);}5.設置commandData的等待PROMISE的結果值。
protected void completeResponse(CommandData<Object, Object> data, Object result) {if (data != null) {data.getPromise().trySuccess(result);}}private void handleResult(CommandData<Object, Object> data, List<Object> parts, Object result, boolean skipConvertor) {if (data != null && !skipConvertor) {result = data.getCommand().getConvertor().convert(result);}if (parts != null) {parts.add(result);} else {completeResponse(data, result);}}?
?
總結
以上是生活随笔為你收集整理的Redisson 管道批量发送命令流程分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: redission收发命令流程分析
- 下一篇: springcloud ribbon @