CAT源码解析
mufiye 内核新手

CAT是大众点评开源的监控项目,这是它的整体架构图,接下来我将从CAT Client、CAT Consumer两部分做源码解析,帮助你了解CAT的原理。

cat-architecture

一、CAT Client

CAT使用API的形式进行埋点,暴露给外部的方法都在CAT这个类中。其中核心的方法都列在下面,主要是:

  • Cat.newTransaction:标识一个事件,关心该事件的耗时信息,或者希望展示嵌套的调用消息树
  • Cat.logError:标识一个错误,通常与log.error一起使用,本质上也是一个event
  • Cat.logEvent:标识一个事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Cat {
private static Cat CAT = new Cat();

public static Transaction newTransaction(String type, String name) {
try {
return TraceContextHelper.threadLocal().newTransaction(type, name);
} catch (Exception e) {
errorHandler(e);
return NullMessage.TRANSACTION;
}
}

public static void logError(Throwable cause) {
logError(null, cause);
}

public static void logError(String message, Throwable cause) {
try {
Event event = TraceContextHelper.threadLocal().newEvent(message, cause);

event.complete();
} catch (Exception e) {
errorHandler(e);
}
}

public static void logEvent(String type, String name, String status, String nameValuePairs) {
try {
Event event = TraceContextHelper.threadLocal().newEvent(type, name);

event.addData(nameValuePairs);
event.setStatus(status);
event.complete();
} catch (Exception e) {
errorHandler(e);
}
}
}

这三个方法本质上都是创建了一种message,newTransaction对应transaction这种类型的message,logError和logEvent对应event这种类型的message。transaction与event相比,transaction会记录耗时信息,同时其可以嵌套transaction或者event构成一棵消息树。
message_class_tree

Message接口和AbstractMessage抽象类,通用属性包括m_type(类型)、m_name(名称)、m_completed(是否结束)、m_timestampInMillis(耗时)、m_data(数据);通用方法包括对于数据的读写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public interface Message {
/**
* add one key-value pair to the message.
*
* @param key
* @param value
*/
public void addData(String key, Object value);

/**
* Complete the message construction.
*/
public void complete();

/**
* Set the message status.
*
* @param status
* message status. "0" means success, otherwise error code.
*/
public void setStatus(String status);

// 只展示了部分方法,还有get方法以及兼容参数的set方法
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
public abstract class AbstractMessage implements Message {

private String m_type;

private String m_name;

protected String m_status = "unset";

private long m_timestampInMillis;

private CharSequence m_data;

private boolean m_completed;

public AbstractMessage(String type, String name) {
m_type = String.valueOf(type);
m_name = String.valueOf(name);
m_timestampInMillis = MilliSecondTimer.currentTimeMillis();
}

@Override
public void addData(String keyValuePairs) {
if (m_data == null) {
m_data = keyValuePairs;
} else if (m_data instanceof StringBuilder) {
((StringBuilder) m_data).append('&').append(keyValuePairs);
} else {
StringBuilder sb = new StringBuilder(m_data.length() + keyValuePairs.length() + 16);

sb.append(m_data).append('&');
sb.append(keyValuePairs);
m_data = sb;
}
}

@Override
public void addData(String key, Object value) {
if (m_data instanceof StringBuilder) {
((StringBuilder) m_data).append('&').append(key).append('=').append(value);
} else {
String str = String.valueOf(value);
int old = m_data == null ? 0 : m_data.length();
StringBuilder sb = new StringBuilder(old + key.length() + str.length() + 16);

if (m_data != null) {
sb.append(m_data).append('&');
}

sb.append(key).append('=').append(str);
m_data = sb;
}
}

@Override
public CharSequence getData() {
if (m_data == null) {
return "";
} else {
return m_data;
}
}

@Override
public String getName() {
return m_name;
}

@Override
public String getStatus() {
return m_status;
}

@Override
public long getTimestamp() {
return m_timestampInMillis;
}

@Override
public String getType() {
return m_type;
}

@Override
public boolean isCompleted() {
return m_completed;
}

@Override
public boolean isSuccess() {
return Message.SUCCESS.equals(m_status);
}

public void setCompleted() {
m_completed = true;
}

protected void setData(CharSequence data) {
m_data = data;
}

public void setName(String name) {
m_name = name;
}

@Override
public void setStatus(String status) {
m_status = status;
}

@Override
public void setStatus(Throwable e) {
if (this instanceof Transaction) {
Cat.logError(e);
}

m_status = e.getClass().getName();
}

public void setTimestamp(long timestamp) {
m_timestampInMillis = timestamp;
}

public void setType(String type) {
m_type = type;
}

@Override
public Message success() {
m_status = SUCCESS;
return this;
}

@Override
public String toString() {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K

new PlainTextMessageTreeEncoder().encodeMessage(this, buf);

return buf.toString(Charset.forName("utf-8"));
}
}

newTransaction方法

newTransaction的处理逻辑包括了主要的客户端message处理链路,因此分析其来熟悉CAT Client的处理逻辑。

1
2
3
4
5
6
7
8
9
10
public class Cat {
public static Transaction newTransaction(String type, String name) {
try {
return TraceContextHelper.threadLocal().newTransaction(type, name);
} catch (Exception e) {
errorHandler(e);
return NullMessage.TRANSACTION;
}
}
}

使用Threadlocal记录一个线程中的cat message上下文,cat的message线程之间隔离(除了可以通过forkTransaction进行message的复制传递)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TraceContextHelper {
private static ThreadLocal<TraceContext> s_threadLocalContext = new ThreadLocal<TraceContext>();

public static TraceContext threadLocal() {
initialize();

TraceContext ctx = s_threadLocalContext.get();

if (ctx == null) {
ctx = new DefaultTraceContext(s_pipeline, s_factory);

s_threadLocalContext.set(ctx);
}

return ctx;
}
}

TraceContext接口中包含了对于message的基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface TraceContext {
void add(Message message);

void end(Transaction transaction);

MessageTree getMessageTree();

boolean hasPeekTransaction();

Transaction newTransaction(String type, String name);

Transaction peekTransaction();

void start(Transaction transaction);
}

DefaultTransaction是Transaction的默认实现类:

  • 初始化:记录开始时间,同时调用context的start方法
  • addChild():如果是子Transaction或者是Event,作为子节点添加到父Transaction中
  • complete():表示当前Transaction已经结束,记录结束时间并将子节点也设置为结束,同时调用context的end方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class DefaultTransaction extends AbstractMessage implements Transaction {
public DefaultTransaction(TraceContext ctx, String type, String name) {
super(type, name);

m_ctx = ctx;
m_durationInMicros = System.nanoTime() / 1000L;
m_ctx.start(this);
}

@Override
public DefaultTransaction addChild(Message message) {
if (m_children == null) {
m_children = new ArrayList<Message>();
}

if (message != null) {
m_children.add(message);
} else {
Cat.logError(new Exception("Null child message."));
}

return this;
}

@Override
public void complete() {
if (m_durationInMicros > 1e9) { // duration is not set
long end = System.nanoTime();

m_durationInMicros = end / 1000L - m_durationInMicros;
}

super.setCompleted();

if (m_children != null) {
List<Message> children = new ArrayList<Message>(m_children);

for (Message child : children) {
if (!child.isCompleted() && child instanceof ForkableTransaction) {
child.complete();
}
}
}

m_ctx.end(this);
}
}

DefaultTraceContext是核心的代码逻辑,其使用m_tree记录当前的根节点消息,使用m_stack记录消息之间的层级关系:

  • start():如果当前没有消息,则将m_tree设置为当前要开始的这条消息,并将该消息入栈;如果有消息,则将当前要开始的这条消息加入到根节点消息的子节点,也将该消息入栈
  • end():不断将消息弹出栈,直到匹配到当前要结束的消息,保证要结束消息的子节点也全部结束了,之后对于这条要结束的消息,其与其嵌套的子消息构成了一棵消息树,对这棵消息树进行处理(上报)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class DefaultTraceContext implements TraceContext {
private MessagePipeline m_pipeline;

private DefaultMessageTree m_tree;

private Stack<Transaction> m_stack = new Stack<Transaction>();

private Set<Integer> m_exceptions = new HashSet<Integer>();

@Override
public Transaction newTransaction(String type, String name) {
return new DefaultTransaction(this, type, name);
}

private void deliver(DefaultMessageTree tree) {
m_pipeline.headContext(tree).fireMessage(tree);
tree.reset();
}

@Override
public void end(Transaction transaction) {
Transaction child = m_stack.pop();

// in case of child transactions are not completed explicitly
while (transaction != child && !m_stack.isEmpty()) {
Transaction parent = m_stack.pop();

child = parent;
}

if (m_stack.isEmpty()) {
deliver(m_tree);
}
}

@Override
public Transaction peekTransaction() {
if (m_stack.isEmpty()) {
throw new RuntimeException("Stack is empty!");
} else {
return m_stack.peek();
}
}

@Override
public void start(Transaction transaction) {
if (m_stack.isEmpty()) {
m_tree.setMessage(transaction);
} else {
m_stack.peek().addChild(transaction);
}

m_stack.push(transaction);
}
}

message的上报

上报逻辑是对于采样到的数据会被放到一个本地内存的队列中,定时读取该队列使用netty进行上报。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
@Sharable
public class MessageTransporter extends ChannelInboundHandlerAdapter implements Initializable, LogEnabled, Task {
private ByteBufQueue m_queue;

private List<Channel> m_channels = new CopyOnWriteArrayList<>();

private ByteBuf m_buf;

private AtomicBoolean m_enabled = new AtomicBoolean(true);

private CountDownLatch m_latch = new CountDownLatch(1);

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();

m_channels.add(channel);
m_logger.info("Connected to CAT server %s, %s", channel.remoteAddress(), channel);

super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();

m_channels.remove(channel);
m_logger.info("Disconnected from CAT server %s, %s", channel.remoteAddress(), channel);

super.channelInactive(ctx);
}

@Override
public void initialize(ComponentContext ctx) {
m_queue = ctx.lookup(ByteBufQueue.class);
}

private ByteBuf next() throws InterruptedException {
if (m_buf == null) {
m_buf = m_queue.poll();
}

return m_buf;
}

@Override
public void run() {
try {
while (m_enabled.get()) {
ByteBuf buf = next();

if (buf != null) {
boolean success = write(buf);

if (success) {
m_buf = null;
continue;
}
}

TimeUnit.MILLISECONDS.sleep(5);
}

// if shutdown in progress
if (!m_enabled.get()) {
ByteBuf buf = next();

while (buf != null) {
boolean success = write(buf);

if (success) {
buf = next();
} else {
break;
}
}
}
} catch (InterruptedException e) {
// ignore it
} finally {
m_latch.countDown();
}
}

@Override
public void shutdown() {
m_enabled.set(false);

try {
m_latch.await();
} catch (InterruptedException e) {
// ignore it
}
}

private boolean write(ByteBuf tree) {
if (!m_channels.isEmpty()) {
Channel channel = m_channels.get(0);

if (channel.isActive() && channel.isWritable()) {
channel.writeAndFlush(tree);
return true;
}
}

return false;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class DefaultByteBufQueue implements ByteBufQueue, Initializable {
private BlockingQueue<ByteBuf> m_queue;

@Override
public boolean offer(ByteBuf buf) {
return m_queue.offer(buf);
}

@Override
public ByteBuf poll() {
try {
return m_queue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return null;
}
}

@Override
public void initialize(ComponentContext ctx) {
ConfigureManager configureManager = ctx.lookup(ConfigureManager.class);
int size = configureManager.getIntProperty(ConfigureProperty.SENDER_MESSAGE_QUEUE_SIZE, 5000);

m_queue = new ArrayBlockingQueue<ByteBuf>(size);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class MessageConveyer extends MessageHandlerAdaptor implements Initializable {
public static String ID = "message-conveyer";

// Inject
private ByteBufQueue m_queue;

// Inject
private MessageStatistics m_statistics;

@Override
public int getOrder() {
return 400;
}

@Override
public void handleMessage(MessageHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;

m_statistics.onBytes(buf.readableBytes());

if (!m_queue.offer(buf)) {
m_statistics.onOverflowed();
}
} else {
ctx.fireMessage(msg);
}
}

@Override
public void initialize(ComponentContext ctx) {
m_queue = ctx.lookup(ByteBufQueue.class);
m_statistics = ctx.lookup(MessageStatistics.class);
}
}

message id的生成

对于一颗message tree(消息树)其有一个唯一的消息id,该message id由 服务名+ip地址+小时时间戳+序列号构成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public class MessageIdFactory implements Initializable {
public static final long HOUR = 3600 * 1000L;

// ...

public String getNextId(String domain) {
if (m_initialized.get()) {
Builder builder = findOrCreateBuilder(domain);

if (builder != null) {
return builder.buildNextId();
} else {
return "";
}
} else {
throw new IllegalStateException("Please call MessageIdFactory.initialize(String) first!");
}
}

class Builder {
private String m_domain;

private AtomicLong m_lastHour = new AtomicLong();

private AtomicInteger m_batchStart;

private AtomicInteger m_batchOffset;

private RandomAccessFile m_markFile;

private MappedByteBuffer m_byteBuffer;

public Builder(String domain) {
File file = new File(m_baseDir, domain + ".mark");

m_domain = domain;
m_batchStart = new AtomicInteger();
m_batchOffset = new AtomicInteger();

try {
m_markFile = new RandomAccessFile(file, "rw");
m_byteBuffer = m_markFile.getChannel().map(MapMode.READ_WRITE, 0, 20);
} catch (Throwable e) {
throw new IllegalStateException(String.format("Unable to access mark file(%s)!", file), e);
}
}

public String buildNextId() {
StringBuilder sb = new StringBuilder(m_domain.length() + 32);
long hour = getHour();

sb.append(m_domain);
sb.append('-');
sb.append(getIpAddress());
sb.append('-');
sb.append(hour);
sb.append('-');
sb.append(getIndex(hour));

return sb.toString();
}

private synchronized int getIndex(long hour) {
int offset = m_batchOffset.incrementAndGet();

if (m_lastHour.get() != hour || offset >= getBatchSize()) {
FileLock lock = null;

try {
// lock could be null in case of CAT is stopping in progress
lock = lock();

int limit = m_byteBuffer.limit();
long lastHour = limit >= 12 ? m_byteBuffer.getLong(4) : 0;

if (lastHour == hour) { // same hour
int start = limit >= 4 ? m_byteBuffer.getInt(0) : 0;

m_batchStart.set(start);
} else {
m_batchStart.set(0);
}

offset = 0;
m_lastHour.set(hour);
m_batchOffset.set(0);
m_byteBuffer.putInt(0, m_batchStart.get() + getBatchSize());
m_byteBuffer.putLong(4, hour);

if (lock != null) {
m_markFile.getChannel().force(false);
}
} catch (InterruptedException e) {
// ignore it
} catch (Throwable e) {
e.printStackTrace();
} finally {
if (lock != null) {
try {
lock.release();
} catch (Exception e) {
// ignore it
}
}
}
}

return m_batchStart.get() + offset;
}
}
}

二、CAT Consumer

CAT Consumer采集、处理、存储CAT Client上报的数据。

对于收到的消息,CAT Consumer对于这些消息做分时间段处理,对于同一个时间段的消息统一处理,比如对于同一个小时、同一天的数据,其会启动一个小时/天Period,在startPeriod和endPeriod时执行特定的逻辑,比如对于小时period,在结束时生成一个小时报表。每一个Period下会生成无数个Period task,period task会对应一种message analyzer,对特定消息进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
public class PeriodManager implements Task {
public static long EXTRATIME = 3 * 60 * 1000L;

private PeriodStrategy m_strategy;

private List<Period> m_periods = new ArrayList<Period>();

private boolean m_active;

@Inject
private MessageAnalyzerManager m_analyzerManager;

@Inject
private ServerStatisticManager m_serverStateManager;

@Inject
private Logger m_logger;

public PeriodManager(long duration, MessageAnalyzerManager analyzerManager, ServerStatisticManager serverStateManager,
Logger logger) {
m_strategy = new PeriodStrategy(duration, EXTRATIME, EXTRATIME);
m_active = true;
m_analyzerManager = analyzerManager;
m_serverStateManager = serverStateManager;
m_logger = logger;
}

private void endPeriod(long startTime) {
int len = m_periods.size();

for (int i = 0; i < len; i++) {
Period period = m_periods.get(i);

if (period.isIn(startTime)) {
period.finish();
m_periods.remove(i);
break;
}
}
}

public Period findPeriod(long timestamp) {
for (Period period : m_periods) {
if (period.isIn(timestamp)) {
return period;
}
}

return null;
}

@Override
public String getName() {
return "RealtimeConsumer-PeriodManager";
}

public void init() {
long startTime = m_strategy.next(System.currentTimeMillis());

startPeriod(startTime);
}

@Override
public void run() {
while (m_active) {
try {
long now = System.currentTimeMillis();
long value = m_strategy.next(now);

if (value > 0) {
startPeriod(value);
} else if (value < 0) {
// last period is over,make it asynchronous
Threads.forGroup("cat").start(new EndTaskThread(-value));
}
} catch (Throwable e) {
Cat.logError(e);
}

try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
break;
}
}
}

@Override
public void shutdown() {
m_active = false;
}

private void startPeriod(long startTime) {
long endTime = startTime + m_strategy.getDuration();
Period period = new Period(startTime, endTime, m_analyzerManager, m_serverStateManager, m_logger);

m_periods.add(period);
period.start();
}

private class EndTaskThread implements Task {

private long m_startTime;

public EndTaskThread(long startTime) {
m_startTime = startTime;
}

@Override
public String getName() {
return "End-Consumer-Task";
}

@Override
public void run() {
endPeriod(m_startTime);
}

@Override
public void shutdown() {
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
public class Period {
private static final int QUEUE_SIZE = 30000;

private long m_startTime;

private long m_endTime;

private Map<String, List<PeriodTask>> m_tasks;

@Inject
private MessageAnalyzerManager m_analyzerManager;

@Inject
private ServerStatisticManager m_serverStateManager;

@Inject
private Logger m_logger;

public Period(long startTime, long endTime, MessageAnalyzerManager analyzerManager,
ServerStatisticManager serverStateManager, Logger logger) {
m_startTime = startTime;
m_endTime = endTime;
m_analyzerManager = analyzerManager;
m_serverStateManager = serverStateManager;
m_logger = logger;

List<String> names = m_analyzerManager.getAnalyzerNames();

m_tasks = new HashMap<String, List<PeriodTask>>();
for (String name : names) {
List<MessageAnalyzer> messageAnalyzers = m_analyzerManager.getAnalyzer(name, startTime);

for (MessageAnalyzer analyzer : messageAnalyzers) {
MessageQueue queue = new DefaultMessageQueue(QUEUE_SIZE);
PeriodTask task = new PeriodTask(analyzer, queue, startTime);

task.enableLogging(m_logger);

List<PeriodTask> analyzerTasks = m_tasks.get(name);

if (analyzerTasks == null) {
analyzerTasks = new ArrayList<PeriodTask>();
m_tasks.put(name, analyzerTasks);
}
analyzerTasks.add(task);
}
}
}

public void distribute(MessageTree tree) {
m_serverStateManager.addMessageTotal(tree.getDomain(), 1);
boolean success = true;
String domain = tree.getDomain();

for (Entry<String, List<PeriodTask>> entry : m_tasks.entrySet()) {
List<PeriodTask> tasks = entry.getValue();
int length = tasks.size();
int index = 0;
boolean manyTasks = length > 1;

if (manyTasks) {
index = Math.abs(domain.hashCode()) % length;
}
PeriodTask task = tasks.get(index);
boolean enqueue = task.enqueue(tree);

if (!enqueue) {
if (manyTasks) {
task = tasks.get((index + 1) % length);
enqueue = task.enqueue(tree);

if (!enqueue) {
success = false;
}
} else {
success = false;
}
}
}

if ((!success) && (!tree.isProcessLoss())) {
m_serverStateManager.addMessageTotalLoss(tree.getDomain(), 1);

tree.setProcessLoss(true);
}
}

public void finish() {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date startDate = new Date(m_startTime);
Date endDate = new Date(m_endTime - 1);

m_logger.info(String.format("Finishing %s tasks in period [%s, %s]", m_tasks.size(), df.format(startDate), df.format(endDate)));

try {
for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
for (PeriodTask task : tasks.getValue()) {
task.finish();
}
}
} catch (Throwable e) {
Cat.logError(e);
} finally {
m_logger.info(String.format("Finished %s tasks in period [%s, %s]", m_tasks.size(), df.format(startDate), df.format(endDate)));
}
}

public List<MessageAnalyzer> getAnalyzer(String name) {
List<MessageAnalyzer> analyzers = new ArrayList<MessageAnalyzer>();
List<PeriodTask> tasks = m_tasks.get(name);

if (tasks != null) {
for (PeriodTask task : tasks) {
analyzers.add(task.getAnalyzer());
}
}
return analyzers;
}

public List<MessageAnalyzer> getAnalyzers() {
List<MessageAnalyzer> analyzers = new ArrayList<MessageAnalyzer>(m_tasks.size());

for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
for (PeriodTask task : tasks.getValue()) {
analyzers.add(task.getAnalyzer());
}
}

return analyzers;
}

public long getStartTime() {
return m_startTime;
}

public boolean isIn(long timestamp) {
return timestamp >= m_startTime && timestamp < m_endTime;
}

public void start() {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

m_logger.info(String.format("Starting %s tasks in period [%s, %s]", m_tasks.size(), df.format(new Date(m_startTime)),
df.format(new Date(m_endTime - 1))));

for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
List<PeriodTask> taskList = tasks.getValue();

for (int i = 0; i < taskList.size(); i++) {
PeriodTask task = taskList.get(i);

task.setIndex(i);

Threads.forGroup("Cat-RealtimeConsumer").start(task);
}
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class PeriodTask implements Task, LogEnabled {

private MessageAnalyzer m_analyzer;

private MessageQueue m_queue;

private long m_startTime;

private int m_queueOverflow;

private Logger m_logger;

private int m_index;

public PeriodTask(MessageAnalyzer analyzer, MessageQueue queue, long startTime) {
m_analyzer = analyzer;
m_queue = queue;
m_startTime = startTime;
}

public void setIndex(int index) {
m_index = index;
}

@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}

public boolean enqueue(MessageTree tree) {
if (m_analyzer.isEligable(tree)) {
boolean result = m_queue.offer(tree);

if (!result) { // trace queue overflow
m_queueOverflow++;

if (m_queueOverflow % (10 * CatConstants.ERROR_COUNT) == 0) {
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(m_analyzer.getStartTime()));

m_logger
.warn(m_analyzer.getClass()
.getSimpleName() + " queue overflow number " + m_queueOverflow + " analyzer time:"
+ date);
}
}
return result;
} else {
return true;
}
}

public void finish() {
try {
m_analyzer.doCheckpoint(true);
m_analyzer.destroy();
} catch (Exception e) {
Cat.logError(e);
}
}

public MessageAnalyzer getAnalyzer() {
return m_analyzer;
}

@Override
public String getName() {
Calendar cal = Calendar.getInstance();

cal.setTimeInMillis(m_startTime);
return m_analyzer.getClass().getSimpleName() + "-" + cal.get(Calendar.HOUR_OF_DAY) + "-" + m_index;
}

@Override
public void run() {
try {
m_analyzer.analyze(m_queue);
} catch (Exception e) {
Cat.logError(e);
}
}

@Override
public void shutdown() {
if (m_analyzer instanceof AbstractMessageAnalyzer) {
((AbstractMessageAnalyzer<?>) m_analyzer).shutdown();
}
}
}

MessageAnalyzer接口有针对多种消息分析器的实现,比如Transaction、Event、Problem(错误的Event、Transaction,长耗时的特定类型Transaction)、Logview(原始的消息树数据)、Dependency(服务拓扑关系)等等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface MessageAnalyzer {
public boolean isEligable(MessageTree tree);

public void analyze(MessageQueue queue);

public void destroy();

public void doCheckpoint(boolean atEnd);

public long getStartTime();

public void initialize(long startTime, long duration, long extraTime);

public int getAnanlyzerCount(String name);

public void setIndex(int index);

public ReportManager<?> getReportManager();
}

Transaction Analyzer

Transaction报表分为四个维度,分别是domain(服务名)、machine(ip地址)、type、name,在这四个维度上计算聚合后的指标数据。

1
2
3
4
5
6
7
8
9
10
11
public class TransactionReport extends BaseEntity<TransactionReport> {
private String m_domain;

private java.util.Date m_startTime;

private java.util.Date m_endTime;

private Set<String> m_domainNames = new LinkedHashSet<String>();

private Map<String, Machine> m_machines = new ConcurrentHashMap<String, Machine>();
}
1
2
3
4
5
public class Machine extends BaseEntity<Machine> {
private String m_ip;

private Map<String, TransactionType> m_types = new ConcurrentHashMap<String, TransactionType>();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class TransactionType extends BaseEntity<TransactionType> {
private String m_id;

private long m_totalCount;

private long m_failCount;

private double m_failPercent;

private double m_min = 86400000d;

private double m_max = -1d;

private double m_avg;

private double m_sum;

private double m_sum2;

private double m_std;

private String m_successMessageUrl;

private String m_failMessageUrl;

private Map<String, TransactionName> m_names = new ConcurrentHashMap<String, TransactionName>();

private double m_tps;

private double m_line95Value;

private double m_line99Value;

private double m_line999Value;

private double m_line90Value;

private Map<Integer, Graph2> m_graph2s = new ConcurrentHashMap<Integer, Graph2>();

private transient Map<Integer, AllDuration> m_allDurations = new ConcurrentHashMap<Integer, AllDuration>();

private GraphTrend m_graphTrend;

private Map<Integer, Range2> m_range2s = new ConcurrentHashMap<Integer, Range2>();

private double m_line50Value;

private double m_line9999Value;

private String m_longestMessageUrl;

private Map<String, String> m_dynamicAttributes = new LinkedHashMap<String, String>();
}
1
2
3
public class TransactionName extends BaseEntity<TransactionName> {
// 属性与TransactionType类似
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionReport> implements LogEnabled {
@Inject(ID)
private ReportManager<TransactionReport> m_reportManager;

@Override
public void process(MessageTree tree) {
String domain = tree.getDomain();
TransactionReport report = m_reportManager.getHourlyReport(getStartTime(), domain, true);
List<Transaction> transactions = tree.findOrCreateTransactions();

for (Transaction t : transactions) {
String data = String.valueOf(t.getData());

if (data.length() > 0 && data.charAt(0) == CatConstants.BATCH_FLAG) {
processBatchTransaction(tree, report, t, data);
} else {
processTransaction(report, tree, t);
}
}

if (System.currentTimeMillis() > m_nextClearTime) {
m_nextClearTime = m_nextClearTime + TimeHelper.ONE_MINUTE;

Threads.forGroup("cat").start(new Runnable() {

@Override
public void run() {
cleanUpReports();
}
});
}
}

private void processBatchTransaction(MessageTree tree, TransactionReport report, Transaction t, String data) {
String[] tabs = data.substring(1).split(CatConstants.SPLIT);
int total = Integer.parseInt(tabs[0]);
int fail = Integer.parseInt(tabs[1]);
long sum = Long.parseLong(tabs[2]);
String type = t.getType();
String name = t.getName();

String ip = tree.getIpAddress();
TransactionType transactionType = findOrCreateType(report.findOrCreateMachine(ip), type);
TransactionName transactionName = findOrCreateName(transactionType, name, report.getDomain());
DurationMeta durations = computeBatchDuration(t, tabs, transactionType, transactionName, report.getDomain());

processTypeAndName(tree, t, transactionType, transactionName, total, fail, sum, durations);
}

private void processTransaction(TransactionReport report, MessageTree tree, Transaction t) {
String type = t.getType();
String name = t.getName();

if (!m_filterConfigManager.discardTransaction(type, name)) {
boolean valid = checkForTruncatedMessage(tree, t);

if (valid) {
String ip = tree.getIpAddress();
TransactionType transactionType = findOrCreateType(report.findOrCreateMachine(ip), type);
TransactionName transactionName = findOrCreateName(transactionType, name, report.getDomain());

processTypeAndName(t, transactionType, transactionName, tree, t.getDurationInMillis());
}
}
}

private void processTypeAndName(Transaction t, TransactionType type, TransactionName name, MessageTree tree,
double duration) {
String messageId = tree.getMessageId();

type.incTotalCount();
name.incTotalCount();

type.setSuccessMessageUrl(messageId);
name.setSuccessMessageUrl(messageId);

if (!t.isSuccess()) {
type.incFailCount();
name.incFailCount();

String statusCode = formatStatus(t.getStatus());

findOrCreateStatusCode(name, statusCode).incCount();
}

int allDuration = computeDuration((int) duration);
double sum = duration * duration;

if (type.getMax() <= duration) {
type.setLongestMessageUrl(messageId);
}
if (name.getMax() <= duration) {
name.setLongestMessageUrl(messageId);
}
name.setMax(Math.max(name.getMax(), duration));
name.setMin(Math.min(name.getMin(), duration));
name.setSum(name.getSum() + duration);
name.setSum2(name.getSum2() + sum);
name.findOrCreateAllDuration(allDuration).incCount();

type.setMax(Math.max(type.getMax(), duration));
type.setMin(Math.min(type.getMin(), duration));
type.setSum(type.getSum() + duration);
type.setSum2(type.getSum2() + sum);
type.findOrCreateAllDuration(allDuration).incCount();

long current = t.getTimestamp() / 1000 / 60;
int min = (int) (current % (60));
boolean statistic = m_statisticManager.shouldStatistic(type.getId(), tree.getDomain());

processNameGraph(t, name, min, duration, statistic, allDuration);
processTypeRange(t, type, min, duration, statistic, allDuration);
}
}

Dump Analyzer

dump analyzer对于上报的message tree会判断是否需要丢弃,如果不需要丢弃将其放到队列中,对于该队列会有定期线程对于message tree执行批量写入逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class DumpAnalyzer extends AbstractMessageAnalyzer<Object> implements LogEnabled {
@Override
public void process(MessageTree tree) {
try {
MessageId messageId = MessageId.parse(tree.getMessageId());

if (!shouldDiscard(messageId)) {
processWithStorage(tree, messageId, messageId.getHour());
}
} catch (Exception ignored) {
}
}

private void processWithStorage(MessageTree tree, MessageId messageId, int hour) {
MessageDumper dumper = m_dumperManager.find(hour);

tree.setFormatMessageId(messageId);

if (dumper != null) {
dumper.process(tree);
} else {
m_serverStateManager.addPigeonTimeError(1);
}
}
}
1
2
3
4
5
6
7
public interface MessageDumper {
public void awaitTermination(int hour) throws InterruptedException;

public void initialize(int hour);

public void process(MessageTree tree);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class DefaultMessageDumper extends ContainerHolder implements MessageDumper, LogEnabled {
@Override
public void process(MessageTree tree) {
MessageId id = tree.getFormatMessageId();
String domain = id.getDomain();
// hash by ip address and block hash by domain
// int index = getIndex(id.getDomain());
int index = getIndex(id.getIpAddressInHex());

BlockingQueue<MessageTree> queue = m_queues.get(index);
boolean success = queue.offer(tree);

if (!success) {
m_statisticManager.addMessageDumpLoss(1);

if ((m_failCount.incrementAndGet() % 100) == 0) {
Cat.logError(new MessageQueueFullException("Error when adding message to queue, fails: " + m_failCount));

m_logger.info("message tree queue is full " + m_failCount + " index " + index);
// tree.getBuffer().release();
}
} else {
m_statisticManager.addMessageSize(domain, tree.getBuffer().readableBytes());

if ((++m_total) % CatConstants.SUCCESS_COUNT == 0) {
m_statisticManager.addMessageDump(CatConstants.SUCCESS_COUNT);
}
}
}
}

Reference

  1. CAT源码仓库
  2. 深度剖析开源分布式监控CAT
  • 本文标题:CAT源码解析
  • 本文作者:mufiye
  • 创建时间:2024-09-22 21:11:56
  • 本文链接:http://mufiye.github.io/2024/09/22/CAT源码解析/
  • 版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
 评论