seata TC 给每个客户端(TM及RM)都会编一个ID,这个ID是在TC端根据RPC context来设置的,无法自行设置,具体相关代码如下:
private static String buildClientId(String applicationId, Channel channel) {
return applicationId + Constants.CLIENT_ID_SPLIT_CHAR + getAddressFromChannel(channel);
}
private static RpcContext buildChannelHolder(NettyPoolKey.TransactionRole clientRole, String version, String applicationId,
String txServiceGroup, String dbkeys, Channel channel) {
RpcContext holder = new RpcContext();
holder.setClientRole(clientRole);
holder.setVersion(version);
holder.setClientId(buildClientId(applicationId, channel));
holder.setApplicationId(applicationId);
holder.setTransactionServiceGroup(txServiceGroup);
holder.addResources(dbKeytoSet(dbkeys));
holder.setChannel(channel);
return holder;
}
}
如果客户端在branch 报告成功后异常退出,可能出现如下错误:
java.lang.RuntimeException: rm client is not connected. dbkey:TccActionOne,clientId:tcc-sample:127.0.0.1:61605
at io.seata.core.rpc.netty.RpcServer.sendSyncRequest(RpcServer.java:217)
at io.seata.core.rpc.netty.RpcServer.sendSyncRequest(RpcServer.java:236)
at io.seata.server.coordinator.DefaultCoordinator.branchRollback(DefaultCoordinator.java:258)
at io.seata.server.coordinator.DefaultCore.doGlobalRollback(DefaultCore.java:309)
at io.seata.server.coordinator.DefaultCoordinator.handleRetryRollbacking(DefaultCoordinator.java:336)
at io.seata.server.coordinator.DefaultCoordinator.lambda$1(DefaultCoordinator.java:436)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Unknown Source)
这个错误其实不用担心,重新启动client(或者有相同resourceId-rm branch名称相同) 会自动rollback,有关处理逻辑可以看public class DefaultCore的doGlobalRollback函数,如何选择channel(及RM client)的逻辑为,依次选择:
- 原有IP及端口
- 原有IP上相同应用(application ID)及相同resourceId(RMCLIENT)
- 其它IP上相同应用(application ID)及相同resourceId(RMCLIENT)
- 其它IP上其它应用(application ID)及相同resourceId(RMCLIENT)
具体见如下代码
/**
* Gets get channel.
*
* @param resourceId Resource ID
* @param clientId Client ID - ApplicationId:IP:Port
* @return Corresponding channel, NULL if not found.
*/
public static Channel getChannel(String resourceId, String clientId) {
Channel resultChannel = null;
String[] clientIdInfo = readClientId(clientId);
if (clientIdInfo == null || clientIdInfo.length != 3) {
throw new FrameworkException("Invalid Client ID: " + clientId);
}
String targetApplicationId = clientIdInfo[0];
String targetIP = clientIdInfo[1];
int targetPort = Integer.parseInt(clientIdInfo[2]);
ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
RpcContext>>> applicationIdMap = RM_CHANNELS.get(resourceId);
if (targetApplicationId == null || applicationIdMap == null || applicationIdMap.isEmpty()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No channel is available for resource[" + resourceId + "]");
}
return null;
}
ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> ipMap = applicationIdMap.get(targetApplicationId);
if (null != ipMap && !ipMap.isEmpty()) {
// Firstly, try to find the original channel through which the branch was registered.
ConcurrentMap<Integer, RpcContext> portMapOnTargetIP = ipMap.get(targetIP);
if (portMapOnTargetIP != null && !portMapOnTargetIP.isEmpty()) {
RpcContext exactRpcContext = portMapOnTargetIP.get(targetPort);
if (exactRpcContext != null) {
Channel channel = exactRpcContext.getChannel();
if (channel.isActive()) {
resultChannel = channel;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Just got exactly the one " + channel + " for " + clientId);
}
} else {
if (portMapOnTargetIP.remove(targetPort, exactRpcContext)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Removed inactive " + channel);
}
}
}
}
// The original channel was broken, try another one.
if (resultChannel == null) {
for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnTargetIPEntry : portMapOnTargetIP
.entrySet()) {
Channel channel = portMapOnTargetIPEntry.getValue().getChannel();
if (channel.isActive()) {
resultChannel = channel;
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Choose " + channel + " on the same IP[" + targetIP + "] as alternative of "
+ clientId);
}
break;
} else {
if (portMapOnTargetIP.remove(portMapOnTargetIPEntry.getKey(),
portMapOnTargetIPEntry.getValue())) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Removed inactive " + channel);
}
}
}
}
}
}
// No channel on the this app node, try another one.
if (resultChannel == null) {
for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> ipMapEntry : ipMap
.entrySet()) {
if (ipMapEntry.getKey().equals(targetIP)) { continue; }
ConcurrentMap<Integer, RpcContext> portMapOnOtherIP = ipMapEntry.getValue();
if (portMapOnOtherIP == null || portMapOnOtherIP.isEmpty()) {
continue;
}
for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnOtherIPEntry : portMapOnOtherIP.entrySet()) {
Channel channel = portMapOnOtherIPEntry.getValue().getChannel();
if (channel.isActive()) {
resultChannel = channel;
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Choose " + channel + " on the same application[" + targetApplicationId
+ "] as alternative of "
+ clientId);
}
break;
} else {
if (portMapOnOtherIP.remove(portMapOnOtherIPEntry.getKey(),
portMapOnOtherIPEntry.getValue())) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Removed inactive " + channel);
}
}
}
}
if (resultChannel != null) { break; }
}
}
}
if (resultChannel == null) {
resultChannel = tryOtherApp(applicationIdMap, targetApplicationId);
if (resultChannel == null) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No channel is available for resource[" + resourceId
+ "] as alternative of "
+ clientId);
}
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Choose " + resultChannel + " on the same resource[" + resourceId
+ "] as alternative of "
+ clientId);
}
}
}
return resultChannel;
}
本文暂时没有评论,来添加一个吧(●'◡'●)