SkyWalking Agent 入口
SkyWalking Agent 采用 JavaAgent 机制,其相关原理可自行 google。
SkyWalking Agent 入口在 org.apache.skywalking.apm.agent.SkyWalkingAgent 的 premain() 方法。主要看以下三行代码:
1 | // 初始化配置 |
初始化配置
1 | public static void initialize(String agentOptions) throws ConfigNotFoundException, AgentPackageNotFoundException { |
该方法主要是读取配置文件(默认为 /config/agent.config)内容,初始化 Config 类,若含有相关系统关键变量或 agent 参数可覆盖配置文件的内容。
初始化插件
SkyWalking Agent 提供了多种插件,实现不同框架的透明接入 SkyWalking。
PluginBootstrap
PluginBootstrap 为插件引导程序类,创建需要加载的插件对象数组。其 loadPlugins() 方法代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43public List<AbstractClassEnhancePluginDefine> loadPlugins() throws AgentPackageNotFoundException {
// 初始化 AgentClassLoader,它负责负责寻找插件和拦截器
AgentClassLoader.initDefaultLoader();
// 获得插件定义路径集合
PluginResourcesResolver resolver = new PluginResourcesResolver();
List<URL> resources = resolver.getResources();
if (resources == null || resources.size() == 0) {
logger.info("no plugin files (skywalking-plugin.def) found, continue to start application.");
return new ArrayList<AbstractClassEnhancePluginDefine>();
}
// 创建插件定义
for (URL pluginUrl : resources) {
try {
PluginCfg.INSTANCE.load(pluginUrl.openStream());
} catch (Throwable t) {
logger.error(t, "plugin file [{}] init failure.", pluginUrl);
}
}
// 获得插件定义集合
List<PluginDefine> pluginClassList = PluginCfg.INSTANCE.getPluginClassList();
// 创建类增强插件定义集合
List<AbstractClassEnhancePluginDefine> plugins = new ArrayList<AbstractClassEnhancePluginDefine>();
for (PluginDefine pluginDefine : pluginClassList) {
try {
logger.debug("loading plugin class {}.", pluginDefine.getDefineClass());
AbstractClassEnhancePluginDefine plugin =
(AbstractClassEnhancePluginDefine)Class.forName(pluginDefine.getDefineClass(),
true,
AgentClassLoader.getDefault())
.newInstance();
plugins.add(plugin);
} catch (Throwable t) {
logger.error(t, "load plugin [{}] failure.", pluginDefine.getDefineClass());
}
}
return plugins;
}
PluginFinder
PluginFinder 作为一个插件查找器,帮助从一个类增强插件定义集合中查找一个插件。其相关代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27private final Map<String, LinkedList<AbstractClassEnhancePluginDefine>> nameMatchDefine = new HashMap<String, LinkedList<AbstractClassEnhancePluginDefine>>();
private final List<AbstractClassEnhancePluginDefine> signatureMatchDefine = new LinkedList<AbstractClassEnhancePluginDefine>();
public PluginFinder(List<AbstractClassEnhancePluginDefine> plugins) {
for (AbstractClassEnhancePluginDefine plugin : plugins) {
ClassMatch match = plugin.enhanceClass();
if (match == null) {
continue;
}
// 处理 NameMatch 为匹配的 AbstractClassEnhancePluginDefine 对象,添加到 `nameMatchDefine` 属性。
if (match instanceof NameMatch) {
NameMatch nameMatch = (NameMatch)match;
LinkedList<AbstractClassEnhancePluginDefine> pluginDefines = nameMatchDefine.get(nameMatch.getClassName());
if (pluginDefines == null) {
pluginDefines = new LinkedList<AbstractClassEnhancePluginDefine>();
nameMatchDefine.put(nameMatch.getClassName(), pluginDefines);
}
pluginDefines.add(plugin);
}
// 处理非 NameMatch 为匹配的 AbstractClassEnhancePluginDefine 对象,添加到 `signatureMatchDefine` 属性。
else {
signatureMatchDefine.add(plugin);
}
}
}
初始化服务管理
ServiceManager.INSTANCE.boot() 方法初始化所有的 BootService 实现。并进行其准备阶段逻辑、启动阶段逻辑、完成阶段逻辑。详见下文。
ServiceManager
ServiceManager 作为 BootService 管理器,负责初始化所有的 BootService 实现。并进行其准备阶段逻辑、启动阶段逻辑、完成阶段逻辑。
其相关代码如下:
1 | private Map<Class, BootService> bootedServices = Collections.emptyMap(); |
BootService
1 | public interface BootService { |
BootService 是一个接口,下面针对四个抽象方法逐一分析其实现。
TraceSegmentServiceClient
TraceSegmentServiceClient 负责将 TraceSegment 异步发送到 Collector,其相关代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93private long lastLogTime;
private long segmentUplinkedCounter;
private long segmentAbandonedCounter;
// 内存队列,生产者消费者模型实现
private volatile DataCarrier<TraceSegment> carrier;
private volatile TraceSegmentServiceGrpc.TraceSegmentServiceStub serviceStub;
// GRPCChannel 连接状态
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
/**
将自己添加到 GRPCChannelManager 的监听器中
*/
public void prepare() throws Throwable {
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
}
public void boot() throws Throwable {
lastLogTime = System.currentTimeMillis();
segmentUplinkedCounter = 0;
segmentAbandonedCounter = 0;
// 创建 DataCarrier,方法内部会创建 Channels、Buffer等
carrier = new DataCarrier<TraceSegment>(CHANNEL_SIZE, BUFFER_SIZE);
// 设置策略
carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
// 给 DataCarrier 设置消费者,方法内部会创建ConsumerPool、ConsumerThread等
carrier.consume(this, 1);
}
public void onComplete() throws Throwable {
// 添加到 TracingContext 的监听器集合中
TracingContext.ListenerManager.add(this);
}
public void shutdown() throws Throwable {
// 关闭 consumerPool
carrier.shutdownConsumers();
}
public void consume(List<TraceSegment> data) {
if (CONNECTED.equals(status)) {
// 创建 GRPCStreamServiceStatus 对象
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
// 创建 StreamObserver 对象
StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.collect(new StreamObserver<Downstream>() {
public void onNext(Downstream downstream) {
}
public void onError(Throwable throwable) {
status.finished();
if (logger.isErrorEnable()) {
logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");
}
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
}
public void onCompleted() {
status.finished();
}
});
// 逐条非阻塞发送 TraceSegment 请求
for (TraceSegment segment : data) {
try {
// 将 TraceSegment 转换成 UpstreamSegment 对象,用于 gRPC 传输
UpstreamSegment upstreamSegment = segment.transform();
upstreamSegmentStreamObserver.onNext(upstreamSegment);
} catch (Throwable t) {
logger.error(t, "Transform and send UpstreamSegment to collector fail.");
}
}
// 标记全部请求发送完成
upstreamSegmentStreamObserver.onCompleted();
// 等待 Collector 处理完成
status.wait4Finish();
// 记录数量到 segmentAbandonedCounter
segmentUplinkedCounter += data.size();
} else {
// 记录数量到 segmentAbandonedCounter
segmentAbandonedCounter += data.size();
}
printUplinkStatus();
}
DataCarrier
DataCarrier 作为一个内存队列,采用生产者/消费者模型实现。相关代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49private final int bufferSize;
private final int channelSize;
// 持有一个 Buffer<T>[]
private Channels<T> channels;
// 持有一个 ConsumerThread[]
private ConsumerPool<T> consumerPool;
private String name;
public DataCarrier(int channelSize, int bufferSize) {
this("default", channelSize, bufferSize);
}
public DataCarrier(String name, int channelSize, int bufferSize) {
this.name = name;
this.bufferSize = bufferSize;
this.channelSize = channelSize;
// 创建一个 Channels
channels = new Channels<T>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), BufferStrategy.BLOCKING);
}
public boolean produce(T data) {
if (consumerPool != null) {
if (!consumerPool.isRunning()) {
return false;
}
}
// 保存数据到 channels 的 buffer 中
return this.channels.save(data);
}
/**
* set consumers to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
*
* @param consumer single instance of consumer, all consumer threads will all use this instance.
* @param num number of consumer threads
* @return
*/
public DataCarrier consume(IConsumer<T> consumer, int num, long consumeCycle) {
if (consumerPool != null) {
consumerPool.close();
}
// 创建一个 ConsumerPool,内部创建一个 ConsumerThread[num],多个 ConsumerThread 持有同一个 consumer
// 这里的 consumer 也就是 TraceSegmentServiceClient
consumerPool = new ConsumerPool<T>(this.name, this.channels, consumer, num, consumeCycle);
// 开启 consumerPool
consumerPool.begin();
return this;
}
Channels
在 DataCarrier 的构造方法中会创建一个 Channels 对象,它包含所有属于它的 Buffer 的数据。当 Buffer 已满的时候,支持不同策略,默认为 BLOCKING。1
channels = new Channels<T>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), BufferStrategy.BLOCKING);
1 | public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) { |
Buffer
Buffer 在保存数据时,把 buffer 作为一个 “环”,使用 index 记录最后存储的位置,不断向下,循环存储到 buffer 中。
通过这样的方式,带来良好的存储性能,避免扩容问题。但是 ,存储会存在冲突的问题:buffer 写入位置,暂未被消费,已经存在值。此时,根据不同的 BufferStrategy 进行处理。
相关代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45private final Object[] buffer;
private BufferStrategy strategy;
private AtomicRangeInteger index;
private List<QueueBlockingCallback<T>> callbacks;
Buffer(int bufferSize, BufferStrategy strategy) {
buffer = new Object[bufferSize];
this.strategy = strategy;
index = new AtomicRangeInteger(0, bufferSize);
callbacks = new LinkedList<QueueBlockingCallback<T>>();
}
/**
* Buffer 在保存数据时,把 buffer 作为一个 "环",使用 index 记录最后存储的位置,不断向下,循环存储到 buffer 中。
* 通过这样的方式,带来良好的存储性能,避免扩容问题。
* 但是 ,存储会存在冲突的问题:buffer 写入位置,暂未被消费,已经存在值。此时,根据不同的 BufferStrategy 进行处理
*/
boolean save(T data) {
int i = index.getAndIncrement();
if (buffer[i] != null) {
switch (strategy) {
case BLOCKING:
boolean isFirstTimeBlocking = true;
while (buffer[i] != null) {
if (isFirstTimeBlocking) {
isFirstTimeBlocking = false;
for (QueueBlockingCallback<T> callback : callbacks) {
callback.notify(data);
}
}
try {
Thread.sleep(1L);
} catch (InterruptedException e) {
}
}
break;
case IF_POSSIBLE:
return false;
case OVERRIDE:
default:
}
}
buffer[i] = data;
return true;
}
ConsumerPool
ConsumerPool 持有一个 ConsumerThread 数组和 channels 引用。
ConsumerPool 含有两个同名构造函数,一个是里面的每个 ConsumerThread 持有同一个 consumer 实例,一个是里面的每个 ConsumerThread 持有各自的 consumer 实例。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54private boolean running;
private ConsumerThread[] consumerThreads;
private Channels<T> channels;
private ReentrantLock lock;
/**
* 每个 ConsumerThread 持有各自的 consumer 实例
*/
public ConsumerPool(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,
long consumeCycle) {
this(channels, num);
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);
consumerThreads[i].setDaemon(true);
}
}
/**
* 每个 ConsumerThread 持有相同的 consumer 实例
*/
public ConsumerPool(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
this(channels, num);
prototype.init();
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", prototype, consumeCycle);
consumerThreads[i].setDaemon(true);
}
}
private ConsumerPool(Channels<T> channels, int num) {
running = false;
this.channels = channels;
consumerThreads = new ConsumerThread[num];
lock = new ReentrantLock();
}
public void begin() {
if (running) {
return;
}
try {
lock.lock();
// 将 Buffer 分配给 线程
this.allocateBuffer2Thread();
// 启动 ConsumerThread 开始执行任务
for (ConsumerThread consumerThread : consumerThreads) {
consumerThread.start();
}
running = true;
} finally {
lock.unlock();
}
}
ConsumerThread
1 | private volatile boolean running; |
ContextManager
ContextManager 作为 TraceSegment 的上下文管理器(TraceSegment 详细内容在此不做深究),该类的相关变量如下:1
2
3
4
5
6// 持有一个 TraceSegment 的 上下文 Context,由于 TraceSegment 涉及到多线程的问题,所以这里采用 ThreadLocal 持有 context。
private static ThreadLocal<AbstractTracerContext> CONTEXT = new ThreadLocal<AbstractTracerContext>();
// 持有一个运行时上下文,同样用 ThreadLocal 持有。
private static ThreadLocal<RuntimeContext> RUNTIME_CONTEXT = new ThreadLocal<RuntimeContext>();
// ContextManagerExtendService 作为一个扩展类,帮助 ContextManager 实现创建 context 的功能。同时它也是 BootService 的一个实现类。
private static ContextManagerExtendService EXTEND_SERVICE;
它的 prepare()、onComplete()、shutdown() 方法均没有具体实现内容,来看 boot() 方法:1
2
3
4
5
6
7
public void boot() {
// 从 ServiceManager 中获取 ContextManagerExtendService,并将 this 添加到它的监听器中。
// 当 TracingContext 完成时会调用 ContextManager 的 afterFinished() 方法
ContextManagerExtendService service = ServiceManager.INSTANCE.findService(ContextManagerExtendService.class);
service.registerListeners(this);
}
SamplingService
SamplingService 负责管理如何对 TraceSegment 进行采样。
考虑到如果每条 TraceSegment 都进行追踪,会带来一定的 CPU ( 用于序列化与反序列化 ) 和网络的开销。
通过配置 Config.Agent.SAMPLE_N_PER_3_SECS
属性,设置每三秒,收集 TraceSegment 的条数。
其相关代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42// 开关
private volatile boolean on = false;
private volatile AtomicInteger samplingFactorHolder;
private volatile ScheduledFuture<?> scheduledFuture;
public void boot() throws Throwable {
if (scheduledFuture != null) {
/**
* If {@link #boot()} invokes twice, mostly in test cases,
* cancel the old one.
*/
scheduledFuture.cancel(true);
}
// SAMPLE_N_PER_3_SECS 可在 /config/agent.config 文件中配置
if (Config.Agent.SAMPLE_N_PER_3_SECS > 0) {
on = true;
this.resetSamplingFactor();
ScheduledExecutorService service = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService"));
// 每3s调用一次 resetSamplingFactor() 重置计数器
scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() {
public void run() {
resetSamplingFactor();
}
}, new RunnableWithExceptionProtection.CallbackWhenException() {
public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, 3, TimeUnit.SECONDS);
logger.debug("Agent sampling mechanism started. Sample {} traces in 10 seconds.", Config.Agent.SAMPLE_N_PER_3_SECS);
}
}
public void shutdown() throws Throwable {
// 取消 scheduledFuture
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
}
GRPCChannelManager
GRPCChannelManager 作为 GRPCChannel 的管理器,负责创建 GRPCChannel 以及在连接状态发生改变时给各个监听器发送通知。相关代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87/**
* 创建一个 ScheduledFuture
* this 的 run 方法每 30s 执行一次
* @throws Throwable
*/
public void boot() throws Throwable {
if (Config.Collector.BACKEND_SERVICE.trim().length() == 0) {
logger.error("Collector server addresses are not set.");
logger.error("Agent will not uplink any data.");
return;
}
// 根据 BACKEND_SERVICE 获取 Collector 的地址
grpcServers = Arrays.asList(Config.Collector.BACKEND_SERVICE.split(","));
connectCheckFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("GRPCChannelManager"))
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);
}
/**
* 关闭 managedChannel
*/
public void shutdown() throws Throwable {
connectCheckFuture.cancel(true);
if (managedChannel != null) {
managedChannel.shutdownNow();
}
logger.debug("Selected collector grpc service shutdown.");
}
public void run() {
logger.debug("Selected collector grpc service running, reconnect:{}.", reconnect);
if (reconnect) {
if (grpcServers.size() > 0) {
String server = "";
try {
// 选取一个服务器
int index = Math.abs(random.nextInt()) % grpcServers.size();
server = grpcServers.get(index);
String[] ipAndPort = server.split(":");
// 根据 ip 和 port 创建一个 GRPCChannel
managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
.addManagedChannelBuilder(new StandardChannelBuilder())
.addManagedChannelBuilder(new TLSChannelBuilder())
.addChannelDecorator(new AuthenticationDecorator())
.build();
if (!managedChannel.isShutdown() && !managedChannel.isTerminated()) {
reconnect = false;
// 通知所有监听器listener: GRPCChannel 连接了
notify(GRPCChannelStatus.CONNECTED);
} else {
// 通知所有监听器listener: GRPCChannel 失去连接了
notify(GRPCChannelStatus.DISCONNECT);
}
return;
} catch (Throwable t) {
logger.error(t, "Create channel to {} fail.", server);
notify(GRPCChannelStatus.DISCONNECT);
}
}
logger.debug("Selected collector grpc service is not available. Wait {} seconds to retry", Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL);
}
}
/*
* 连接状态改变时给监听器发送连接状态
*/
private void notify(GRPCChannelStatus status) {
// 已知的 listener 有 AppAndServiceRegisterClient、TraceSegmentServiceClient、JVMService.Send
for (GRPCChannelListener listener : listeners) {
try {
listener.statusChanged(status);
} catch (Throwable t) {
logger.error(t, "Fail to notify {} about channel connected.", listener.getClass().getName());
}
}
}
JVMService
JVMService 收集 JVM cpu, memory, memorypool 和 gc 信息发送给 Collector。
AppAndServiceRegisterClient
AppAndServiceRegisterClient 负责服务和服务实例的注册及心跳检测。主要做了以下几件事:
- 准备阶段 prepare() 将自己注册为 GRPCChannelManager 的监听器。
- 接收 GRPCChannelManager 发送的 GRPCChannel 连接状态。
- 创建一个 ScheduledFuture,每 3s 执行一次任务。
- 执行任务时根据 GRPCChannel 连接状态以及 APPLICATION_ID 进行服务注册/服务实例注册/服务实例心跳检测。
其相关代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131/**
* 服务和服务实例注册客户端
* @author wusheng
*/
public class AppAndServiceRegisterClient implements BootService, Runnable, GRPCChannelListener {
private static final ILog logger = LogManager.getLogger(AppAndServiceRegisterClient.class);
private static final String PROCESS_UUID = UUID.randomUUID().toString().replaceAll("-", "");
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
private volatile ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub applicationRegisterServiceBlockingStub;
private volatile InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub instanceDiscoveryServiceBlockingStub;
private volatile ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub serviceNameDiscoveryServiceBlockingStub;
private volatile NetworkAddressRegisterServiceGrpc.NetworkAddressRegisterServiceBlockingStub networkAddressRegisterServiceBlockingStub;
private volatile ScheduledFuture<?> applicationRegisterFuture;
/**
* 从 GRPCChannelManager notify() 获取到连接状态
* @param status
*/
public void statusChanged(GRPCChannelStatus status) {
if (GRPCChannelStatus.CONNECTED.equals(status)) {
// 获取带 AuthenticationDecorator 装饰的 channel
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
applicationRegisterServiceBlockingStub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
instanceDiscoveryServiceBlockingStub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
serviceNameDiscoveryServiceBlockingStub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel);
networkAddressRegisterServiceBlockingStub = NetworkAddressRegisterServiceGrpc.newBlockingStub(channel);
} else {
applicationRegisterServiceBlockingStub = null;
instanceDiscoveryServiceBlockingStub = null;
serviceNameDiscoveryServiceBlockingStub = null;
}
// 设置状态
this.status = status;
}
/**
将自己添加到 GRPCChannelManager 的监听器中
*/
public void prepare() throws Throwable {
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
}
/**
* 创建一个 ScheduledFuture
* this 的 run 方法每 3s 执行一次
* @throws Throwable
*/
public void boot() throws Throwable {
applicationRegisterFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("AppAndServiceRegisterClient"))
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
}
public void onComplete() throws Throwable {
}
public void shutdown() throws Throwable {
applicationRegisterFuture.cancel(true);
}
public void run() {
logger.debug("AppAndServiceRegisterClient running, status:{}.", status);
boolean shouldTry = true;
// 在上面 statusChanged 方法改变状态为已连接后执行
while (GRPCChannelStatus.CONNECTED.equals(status) && shouldTry) {
shouldTry = false;
try {
// 服务注册
// APPLICATION_ID 默认值就是 DictionaryUtil.nullValue()
if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
// 正常来说,states 为 CONNECTED 的话,applicationRegisterServiceBlockingStub 不会为 null
if (applicationRegisterServiceBlockingStub != null) {
// APPLICATION_CODE 在 skywalking-agent 的 agent.conf 文件中配置
Application request = Application.newBuilder().setApplicationCode(Config.Agent.APPLICATION_CODE).build();
logger.debug("AppAndServiceRegisterClient request:{}.", request);
ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(
Application.newBuilder().setApplicationCode(Config.Agent.APPLICATION_CODE).build());
// 从 response 中获取 APPLICATION_ID 设置到 RemoteDownstreamConfig.Agent
if (applicationMapping != null) {
RemoteDownstreamConfig.Agent.APPLICATION_ID = applicationMapping.getApplication().getValue();
shouldTry = true;
}
}
} else {
// 服务实例注册
if (instanceDiscoveryServiceBlockingStub != null) {
if (RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID == DictionaryUtil.nullValue()) {
ApplicationInstanceMapping instanceMapping = instanceDiscoveryServiceBlockingStub.registerInstance(ApplicationInstance.newBuilder()
.setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID)
.setAgentUUID(PROCESS_UUID)
.setRegisterTime(System.currentTimeMillis())
.setOsinfo(OSUtil.buildOSInfo())
.build());
if (instanceMapping.getApplicationInstanceId() != DictionaryUtil.nullValue()) {
RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID
= instanceMapping.getApplicationInstanceId();
}
}
// 服务实例心跳检测
else {
instanceDiscoveryServiceBlockingStub.heartbeat(ApplicationInstanceHeartbeat.newBuilder()
.setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID)
.setHeartbeatTime(System.currentTimeMillis())
.build());
NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(networkAddressRegisterServiceBlockingStub);
OperationNameDictionary.INSTANCE.syncRemoteDictionary(serviceNameDiscoveryServiceBlockingStub);
}
}
}
} catch (Throwable t) {
logger.error(t, "AppAndServiceRegisterClient execute fail.");
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
}
}
}
}
ContextManagerExtendService
ContextManagerExtendService 作为 ContextManager 的扩展帮助类,代码比较简单,主要有两个功能:
- 将 ContextManager 添加到 TracingContext、IgnoredTracerContext 的监听器集合中,等待 context 完成时通知。
- 帮助 ContextManager 创建 AbstractTracerContext。
总结
- SkyWalking Agent 具有丰富的插件支持,支持列表可见 github 官方项目
- SkyWalking Agent 通过 ServiceManager 管理多个 BootService
- TraceSegmentServiceClient 负责发送 TraceSegment 到 Collector
- ContextManager 负责管理 TraceSegment 的上下文
- SamplingService 负责管理如何进行 TraceSegment 的采样
- GRPCChannelManager 负责管理 GRPCChannel 的创建及将状态发送至各个监听器
- JVMService 负责将 JVM 以及 GC 信息发送到 Collector
- AppAndServiceRegisterClient 作为一个客户端,负责进行服务注册、服务实例注册、心跳检测等调用
- ContextManagerExtendService 作为 ContextManager 的扩展帮助类
参考博文:芋道源码