SkyWalking Agent源码浅析

SkyWalking Agent 入口

SkyWalking Agent 采用 JavaAgent 机制,其相关原理可自行 google。
SkyWalking Agent 入口在 org.apache.skywalking.apm.agent.SkyWalkingAgent 的 premain() 方法。主要看以下三行代码:

1
2
3
4
5
6
7
8
// 初始化配置
SnifferConfigInitializer.initialize(agentArgs);

// 初始化插件
pluginFinder = new PluginFinder(new PluginBootstrap().loadPlugins());

// 初始化服务管理
ServiceManager.INSTANCE.boot();

初始化配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public static void initialize(String agentOptions) throws ConfigNotFoundException, AgentPackageNotFoundException {
InputStreamReader configFileStream;

try {
// 读取配置文件,默认配置文件 /config/agent.config
configFileStream = loadConfig();
Properties properties = new Properties();
// 加载配置文件
properties.load(configFileStream);
// 用配置文件内容初始化 Config 类
ConfigInitializer.initialize(properties, Config.class);
} catch (Exception e) {
logger.error(e, "Failed to read the config file, skywalking is going to run in default config.");
}

try {
// 用系统环境变量去覆盖配置
overrideConfigBySystemEnv();
} catch (Exception e) {
logger.error(e, "Failed to read the system env.");
}

if (!StringUtil.isEmpty(agentOptions)) {
try {
agentOptions = agentOptions.trim();
logger.info("Agent options is {}.", agentOptions);

// 用 agentOptions 去覆盖配置
overrideConfigByAgentOptions(agentOptions);
} catch (Exception e) {
logger.error(e, "Failed to parse the agent options, val is {}.", agentOptions);
}
}

if (StringUtil.isEmpty(Config.Agent.APPLICATION_CODE)) {
throw new ExceptionInInitializerError("`agent.application_code` is missing.");
}
if (StringUtil.isEmpty(Config.Collector.BACKEND_SERVICE)) {
throw new ExceptionInInitializerError("`collector.direct_servers` and `collector.servers` cannot be empty at the same time.");
}

IS_INIT_COMPLETED = true;
}

该方法主要是读取配置文件(默认为 /config/agent.config)内容,初始化 Config 类,若含有相关系统关键变量或 agent 参数可覆盖配置文件的内容。

初始化插件

SkyWalking Agent 提供了多种插件,实现不同框架的透明接入 SkyWalking。

PluginBootstrap

PluginBootstrap 为插件引导程序类,创建需要加载的插件对象数组。其 loadPlugins() 方法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public List<AbstractClassEnhancePluginDefine> loadPlugins() throws AgentPackageNotFoundException {
// 初始化 AgentClassLoader,它负责负责寻找插件和拦截器
AgentClassLoader.initDefaultLoader();

// 获得插件定义路径集合
PluginResourcesResolver resolver = new PluginResourcesResolver();
List<URL> resources = resolver.getResources();

if (resources == null || resources.size() == 0) {
logger.info("no plugin files (skywalking-plugin.def) found, continue to start application.");
return new ArrayList<AbstractClassEnhancePluginDefine>();
}

// 创建插件定义
for (URL pluginUrl : resources) {
try {
PluginCfg.INSTANCE.load(pluginUrl.openStream());
} catch (Throwable t) {
logger.error(t, "plugin file [{}] init failure.", pluginUrl);
}
}

// 获得插件定义集合
List<PluginDefine> pluginClassList = PluginCfg.INSTANCE.getPluginClassList();

// 创建类增强插件定义集合
List<AbstractClassEnhancePluginDefine> plugins = new ArrayList<AbstractClassEnhancePluginDefine>();
for (PluginDefine pluginDefine : pluginClassList) {
try {
logger.debug("loading plugin class {}.", pluginDefine.getDefineClass());
AbstractClassEnhancePluginDefine plugin =
(AbstractClassEnhancePluginDefine)Class.forName(pluginDefine.getDefineClass(),
true,
AgentClassLoader.getDefault())
.newInstance();
plugins.add(plugin);
} catch (Throwable t) {
logger.error(t, "load plugin [{}] failure.", pluginDefine.getDefineClass());
}
}

return plugins;
}

PluginFinder

PluginFinder 作为一个插件查找器,帮助从一个类增强插件定义集合中查找一个插件。其相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private final Map<String, LinkedList<AbstractClassEnhancePluginDefine>> nameMatchDefine = new HashMap<String, LinkedList<AbstractClassEnhancePluginDefine>>();
private final List<AbstractClassEnhancePluginDefine> signatureMatchDefine = new LinkedList<AbstractClassEnhancePluginDefine>();

public PluginFinder(List<AbstractClassEnhancePluginDefine> plugins) {
for (AbstractClassEnhancePluginDefine plugin : plugins) {
ClassMatch match = plugin.enhanceClass();

if (match == null) {
continue;
}

// 处理 NameMatch 为匹配的 AbstractClassEnhancePluginDefine 对象,添加到 `nameMatchDefine` 属性。
if (match instanceof NameMatch) {
NameMatch nameMatch = (NameMatch)match;
LinkedList<AbstractClassEnhancePluginDefine> pluginDefines = nameMatchDefine.get(nameMatch.getClassName());
if (pluginDefines == null) {
pluginDefines = new LinkedList<AbstractClassEnhancePluginDefine>();
nameMatchDefine.put(nameMatch.getClassName(), pluginDefines);
}
pluginDefines.add(plugin);
}
// 处理非 NameMatch 为匹配的 AbstractClassEnhancePluginDefine 对象,添加到 `signatureMatchDefine` 属性。
else {
signatureMatchDefine.add(plugin);
}
}
}

初始化服务管理

ServiceManager.INSTANCE.boot() 方法初始化所有的 BootService 实现。并进行其准备阶段逻辑、启动阶段逻辑、完成阶段逻辑。详见下文。

ServiceManager

ServiceManager 作为 BootService 管理器,负责初始化所有的 BootService 实现。并进行其准备阶段逻辑、启动阶段逻辑、完成阶段逻辑。
其相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
private Map<Class, BootService> bootedServices = Collections.emptyMap();

public void boot() {
// 加载所有 bootService
bootedServices = loadAllServices();
// 准备逻辑
prepare();
// 启动逻辑
startup();
// 完成逻辑
onComplete();
}

/**
* 加载所有的 BootService
* {@link org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient}
* {@link org.apache.skywalking.apm.agent.core.context.ContextManager}
* {@link org.apache.skywalking.apm.agent.core.sampling.SamplingService}
* {@link org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager}
* {@link org.apache.skywalking.apm.agent.core.jvm.JVMService}
* {@link org.apache.skywalking.apm.agent.core.remote.AppAndServiceRegisterClient}
* {@link org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService}
*
* @return bootedServices
*/
private Map<Class, BootService> loadAllServices() {
Map<Class, BootService> bootedServices = new LinkedHashMap<Class, BootService>();
List<BootService> allServices = new LinkedList<BootService>();
// 加载所有的 bootService
load(allServices);
Iterator<BootService> serviceIterator = allServices.iterator();
while (serviceIterator.hasNext()) {
BootService bootService = serviceIterator.next();

Class<? extends BootService> bootServiceClass = bootService.getClass();
// 该 bootService 上有 DefaultImplementor 注解则添加
boolean isDefaultImplementor = bootServiceClass.isAnnotationPresent(DefaultImplementor.class);
if (isDefaultImplementor) {
if (!bootedServices.containsKey(bootServiceClass)) {
bootedServices.put(bootServiceClass, bootService);
} else {
//ignore the default service
}
} else {
OverrideImplementor overrideImplementor = bootServiceClass.getAnnotation(OverrideImplementor.class);
// 该 bootService 上没有 DefaultImplementor 注解但是也没有 OverrideImplementor 注解,也添加
if (overrideImplementor == null) {
if (!bootedServices.containsKey(bootServiceClass)) {
bootedServices.put(bootServiceClass, bootService);
} else {
throw new ServiceConflictException("Duplicate service define for :" + bootServiceClass);
}
} else {
// 该 bootService 上没有 DefaultImplementor 注解,有 OverrideImplementor 注解
// 如果 OverrideImplementor 注解的 value 表明的 bootService 上有 DefaultImplementor 注解,也添加
Class<? extends BootService> targetService = overrideImplementor.value();
if (bootedServices.containsKey(targetService)) {
boolean presentDefault = bootedServices.get(targetService).getClass().isAnnotationPresent(DefaultImplementor.class);
if (presentDefault) {
bootedServices.put(targetService, bootService);
} else {
throw new ServiceConflictException("Service " + bootServiceClass + " overrides conflict, " +
"exist more than one service want to override :" + targetService);
}
} else {
bootedServices.put(targetService, bootService);
}
}
}

}
return bootedServices;
}

/**
* 遍历 bootedServices,执行各个 bootService 的准备阶段逻辑,startup、onComplete、shutdown 均与此类似。
*/
private void prepare() {
for (BootService service : bootedServices.values()) {
try {
service.prepare();
} catch (Throwable e) {
logger.error(e, "ServiceManager try to pre-start [{}] fail.", service.getClass().getName());
}
}
}

public <T extends BootService> T findService(Class<T> serviceClass) {
return (T)bootedServices.get(serviceClass);
}

/**
* 根据 ServiceLoader 去加载 bootService
*/
void load(List<BootService> allServices) {
Iterator<BootService> iterator = ServiceLoader.load(BootService.class, AgentClassLoader.getDefault()).iterator();
while (iterator.hasNext()) {
allServices.add(iterator.next());
}
}

BootService

1
2
3
4
5
6
7
8
9
public interface BootService {
void prepare() throws Throwable;

void boot() throws Throwable;

void onComplete() throws Throwable;

void shutdown() throws Throwable;
}

BootService 是一个接口,下面针对四个抽象方法逐一分析其实现。

TraceSegmentServiceClient

TraceSegmentServiceClient 负责将 TraceSegment 异步发送到 Collector,其相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
private long lastLogTime;
private long segmentUplinkedCounter;
private long segmentAbandonedCounter;
// 内存队列,生产者消费者模型实现
private volatile DataCarrier<TraceSegment> carrier;
private volatile TraceSegmentServiceGrpc.TraceSegmentServiceStub serviceStub;
// GRPCChannel 连接状态
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;

/**
将自己添加到 GRPCChannelManager 的监听器中
*/
@Override
public void prepare() throws Throwable {
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
}

@Override
public void boot() throws Throwable {
lastLogTime = System.currentTimeMillis();
segmentUplinkedCounter = 0;
segmentAbandonedCounter = 0;
// 创建 DataCarrier,方法内部会创建 Channels、Buffer等
carrier = new DataCarrier<TraceSegment>(CHANNEL_SIZE, BUFFER_SIZE);
// 设置策略
carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
// 给 DataCarrier 设置消费者,方法内部会创建ConsumerPool、ConsumerThread等
carrier.consume(this, 1);
}

@Override
public void onComplete() throws Throwable {
// 添加到 TracingContext 的监听器集合中
TracingContext.ListenerManager.add(this);
}

@Override
public void shutdown() throws Throwable {
// 关闭 consumerPool
carrier.shutdownConsumers();
}

@Override
public void consume(List<TraceSegment> data) {
if (CONNECTED.equals(status)) {
// 创建 GRPCStreamServiceStatus 对象
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
// 创建 StreamObserver 对象
StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.collect(new StreamObserver<Downstream>() {
@Override
public void onNext(Downstream downstream) {

}

@Override
public void onError(Throwable throwable) {
status.finished();
if (logger.isErrorEnable()) {
logger.error(throwable, "Send UpstreamSegment to collector fail with a grpc internal exception.");
}
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
}

@Override
public void onCompleted() {
status.finished();
}
});

// 逐条非阻塞发送 TraceSegment 请求
for (TraceSegment segment : data) {
try {
// 将 TraceSegment 转换成 UpstreamSegment 对象,用于 gRPC 传输
UpstreamSegment upstreamSegment = segment.transform();
upstreamSegmentStreamObserver.onNext(upstreamSegment);
} catch (Throwable t) {
logger.error(t, "Transform and send UpstreamSegment to collector fail.");
}
}
// 标记全部请求发送完成
upstreamSegmentStreamObserver.onCompleted();

// 等待 Collector 处理完成
status.wait4Finish();
// 记录数量到 segmentAbandonedCounter
segmentUplinkedCounter += data.size();
} else {
// 记录数量到 segmentAbandonedCounter
segmentAbandonedCounter += data.size();
}

printUplinkStatus();
}

DataCarrier

DataCarrier 作为一个内存队列,采用生产者/消费者模型实现。相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private final int bufferSize;
private final int channelSize;
// 持有一个 Buffer<T>[]
private Channels<T> channels;
// 持有一个 ConsumerThread[]
private ConsumerPool<T> consumerPool;
private String name;

public DataCarrier(int channelSize, int bufferSize) {
this("default", channelSize, bufferSize);
}

public DataCarrier(String name, int channelSize, int bufferSize) {
this.name = name;
this.bufferSize = bufferSize;
this.channelSize = channelSize;
// 创建一个 Channels
channels = new Channels<T>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), BufferStrategy.BLOCKING);
}

public boolean produce(T data) {
if (consumerPool != null) {
if (!consumerPool.isRunning()) {
return false;
}
}

// 保存数据到 channels 的 buffer 中
return this.channels.save(data);
}

/**
* set consumers to this Carrier. consumer begin to run when {@link DataCarrier#produce} begin to work.
*
* @param consumer single instance of consumer, all consumer threads will all use this instance.
* @param num number of consumer threads
* @return
*/
public DataCarrier consume(IConsumer<T> consumer, int num, long consumeCycle) {
if (consumerPool != null) {
consumerPool.close();
}
// 创建一个 ConsumerPool,内部创建一个 ConsumerThread[num],多个 ConsumerThread 持有同一个 consumer
// 这里的 consumer 也就是 TraceSegmentServiceClient
consumerPool = new ConsumerPool<T>(this.name, this.channels, consumer, num, consumeCycle);
// 开启 consumerPool
consumerPool.begin();
return this;
}

Channels

在 DataCarrier 的构造方法中会创建一个 Channels 对象,它包含所有属于它的 Buffer 的数据。当 Buffer 已满的时候,支持不同策略,默认为 BLOCKING。

1
channels = new Channels<T>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), BufferStrategy.BLOCKING);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) {
this.dataPartitioner = partitioner;
this.strategy = strategy;
// 根据 channelSize 创建一个 Buffer 数组
bufferChannels = new Buffer[channelSize];
for (int i = 0; i < channelSize; i++) {
bufferChannels[i] = new Buffer<T>(bufferSize, strategy);
}
}

// 保存数据
public boolean save(T data) {
int index = dataPartitioner.partition(bufferChannels.length, data);
int retryCountDown = 1;
if (BufferStrategy.IF_POSSIBLE.equals(strategy)) {
int maxRetryCount = dataPartitioner.maxRetryCount();
if (maxRetryCount > 1) {
retryCountDown = maxRetryCount;
}
}
for (; retryCountDown > 0; retryCountDown--) {
// 调用 Buffer 的 save() 方法
if (bufferChannels[index].save(data)) {
return true;
}
}
return false;
}

Buffer

Buffer 在保存数据时,把 buffer 作为一个 “环”,使用 index 记录最后存储的位置,不断向下,循环存储到 buffer 中。
通过这样的方式,带来良好的存储性能,避免扩容问题。但是 ,存储会存在冲突的问题:buffer 写入位置,暂未被消费,已经存在值。此时,根据不同的 BufferStrategy 进行处理。
相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private final Object[] buffer;
private BufferStrategy strategy;
private AtomicRangeInteger index;
private List<QueueBlockingCallback<T>> callbacks;

Buffer(int bufferSize, BufferStrategy strategy) {
buffer = new Object[bufferSize];
this.strategy = strategy;
index = new AtomicRangeInteger(0, bufferSize);
callbacks = new LinkedList<QueueBlockingCallback<T>>();
}

/**
* Buffer 在保存数据时,把 buffer 作为一个 "环",使用 index 记录最后存储的位置,不断向下,循环存储到 buffer 中。
* 通过这样的方式,带来良好的存储性能,避免扩容问题。
* 但是 ,存储会存在冲突的问题:buffer 写入位置,暂未被消费,已经存在值。此时,根据不同的 BufferStrategy 进行处理
*/
boolean save(T data) {
int i = index.getAndIncrement();
if (buffer[i] != null) {
switch (strategy) {
case BLOCKING:
boolean isFirstTimeBlocking = true;
while (buffer[i] != null) {
if (isFirstTimeBlocking) {
isFirstTimeBlocking = false;
for (QueueBlockingCallback<T> callback : callbacks) {
callback.notify(data);
}
}
try {
Thread.sleep(1L);
} catch (InterruptedException e) {
}
}
break;
case IF_POSSIBLE:
return false;
case OVERRIDE:
default:
}
}
buffer[i] = data;
return true;
}

ConsumerPool

ConsumerPool 持有一个 ConsumerThread 数组和 channels 引用。
ConsumerPool 含有两个同名构造函数,一个是里面的每个 ConsumerThread 持有同一个 consumer 实例,一个是里面的每个 ConsumerThread 持有各自的 consumer 实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private boolean running;
private ConsumerThread[] consumerThreads;
private Channels<T> channels;
private ReentrantLock lock;

/**
* 每个 ConsumerThread 持有各自的 consumer 实例
*/
public ConsumerPool(String name, Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num,
long consumeCycle) {
this(channels, num);
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);
consumerThreads[i].setDaemon(true);
}
}

/**
* 每个 ConsumerThread 持有相同的 consumer 实例
*/
public ConsumerPool(String name, Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
this(channels, num);
prototype.init();
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier." + name + ".Consumser." + i + ".Thread", prototype, consumeCycle);
consumerThreads[i].setDaemon(true);
}

}

private ConsumerPool(Channels<T> channels, int num) {
running = false;
this.channels = channels;
consumerThreads = new ConsumerThread[num];
lock = new ReentrantLock();
}

public void begin() {
if (running) {
return;
}
try {
lock.lock();
// 将 Buffer 分配给 线程
this.allocateBuffer2Thread();
// 启动 ConsumerThread 开始执行任务
for (ConsumerThread consumerThread : consumerThreads) {
consumerThread.start();
}
running = true;
} finally {
lock.unlock();
}
}

ConsumerThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
private volatile boolean running;
private IConsumer<T> consumer;
// 分配给该线程的 Buffer,持有 start 和 end 位置
private List<DataSource> dataSources;
private long consumeCycle;

ConsumerThread(String threadName, IConsumer<T> consumer, long consumeCycle) {
super(threadName);
this.consumer = consumer;
running = false;
dataSources = new LinkedList<DataSource>();
this.consumeCycle = consumeCycle;
}

@Override
public void run() {
running = true;

while (running) {
boolean hasData = consume();

if (!hasData) {
try {
Thread.sleep(consumeCycle);
} catch (InterruptedException e) {
}
}
}

// consumer thread is going to stop
// consume the last time
consume();

consumer.onExit();
}

private boolean consume() {
boolean hasData = false;
LinkedList<T> consumeList = new LinkedList<T>();
for (DataSource dataSource : dataSources) {
LinkedList<T> data = dataSource.obtain();
if (data.size() == 0) {
continue;
}
for (T element : data) {
consumeList.add(element);
}
hasData = true;
}

if (consumeList.size() > 0) {
try {
// TraceSegmentServiceClient 为 IConsumer 实现之一,回头看看 TraceSegmentServiceClient 的 consume 方法
consumer.consume(consumeList);
} catch (Throwable t) {
consumer.onError(consumeList, t);
}
}
return hasData;
}

ContextManager

ContextManager 作为 TraceSegment 的上下文管理器(TraceSegment 详细内容在此不做深究),该类的相关变量如下:

1
2
3
4
5
6
// 持有一个 TraceSegment 的 上下文 Context,由于 TraceSegment 涉及到多线程的问题,所以这里采用 ThreadLocal 持有 context。
private static ThreadLocal<AbstractTracerContext> CONTEXT = new ThreadLocal<AbstractTracerContext>();
// 持有一个运行时上下文,同样用 ThreadLocal 持有。
private static ThreadLocal<RuntimeContext> RUNTIME_CONTEXT = new ThreadLocal<RuntimeContext>();
// ContextManagerExtendService 作为一个扩展类,帮助 ContextManager 实现创建 context 的功能。同时它也是 BootService 的一个实现类。
private static ContextManagerExtendService EXTEND_SERVICE;

它的 prepare()、onComplete()、shutdown() 方法均没有具体实现内容,来看 boot() 方法:

1
2
3
4
5
6
7
@Override
public void boot() {
// 从 ServiceManager 中获取 ContextManagerExtendService,并将 this 添加到它的监听器中。
// 当 TracingContext 完成时会调用 ContextManager 的 afterFinished() 方法
ContextManagerExtendService service = ServiceManager.INSTANCE.findService(ContextManagerExtendService.class);
service.registerListeners(this);
}

SamplingService

SamplingService 负责管理如何对 TraceSegment 进行采样。
考虑到如果每条 TraceSegment 都进行追踪,会带来一定的 CPU ( 用于序列化与反序列化 ) 和网络的开销。
通过配置 Config.Agent.SAMPLE_N_PER_3_SECS 属性,设置每三秒,收集 TraceSegment 的条数。
其相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 开关
private volatile boolean on = false;
private volatile AtomicInteger samplingFactorHolder;
private volatile ScheduledFuture<?> scheduledFuture;

@Override
public void boot() throws Throwable {
if (scheduledFuture != null) {
/**
* If {@link #boot()} invokes twice, mostly in test cases,
* cancel the old one.
*/
scheduledFuture.cancel(true);
}
// SAMPLE_N_PER_3_SECS 可在 /config/agent.config 文件中配置
if (Config.Agent.SAMPLE_N_PER_3_SECS > 0) {
on = true;
this.resetSamplingFactor();
ScheduledExecutorService service = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService"));
// 每3s调用一次 resetSamplingFactor() 重置计数器
scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() {
@Override
public void run() {
resetSamplingFactor();
}
}, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, 3, TimeUnit.SECONDS);
logger.debug("Agent sampling mechanism started. Sample {} traces in 10 seconds.", Config.Agent.SAMPLE_N_PER_3_SECS);
}
}

@Override
public void shutdown() throws Throwable {
// 取消 scheduledFuture
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
}

GRPCChannelManager

GRPCChannelManager 作为 GRPCChannel 的管理器,负责创建 GRPCChannel 以及在连接状态发生改变时给各个监听器发送通知。相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/**
* 创建一个 ScheduledFuture
* this 的 run 方法每 30s 执行一次
* @throws Throwable
*/
@Override
public void boot() throws Throwable {
if (Config.Collector.BACKEND_SERVICE.trim().length() == 0) {
logger.error("Collector server addresses are not set.");
logger.error("Agent will not uplink any data.");
return;
}
// 根据 BACKEND_SERVICE 获取 Collector 的地址
grpcServers = Arrays.asList(Config.Collector.BACKEND_SERVICE.split(","));
connectCheckFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("GRPCChannelManager"))
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override
public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);
}

/**
* 关闭 managedChannel
*/
@Override
public void shutdown() throws Throwable {
connectCheckFuture.cancel(true);
if (managedChannel != null) {
managedChannel.shutdownNow();
}
logger.debug("Selected collector grpc service shutdown.");
}

@Override
public void run() {
logger.debug("Selected collector grpc service running, reconnect:{}.", reconnect);
if (reconnect) {
if (grpcServers.size() > 0) {
String server = "";
try {
// 选取一个服务器
int index = Math.abs(random.nextInt()) % grpcServers.size();
server = grpcServers.get(index);
String[] ipAndPort = server.split(":");

// 根据 ip 和 port 创建一个 GRPCChannel
managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
.addManagedChannelBuilder(new StandardChannelBuilder())
.addManagedChannelBuilder(new TLSChannelBuilder())
.addChannelDecorator(new AuthenticationDecorator())
.build();

if (!managedChannel.isShutdown() && !managedChannel.isTerminated()) {
reconnect = false;
// 通知所有监听器listener: GRPCChannel 连接了
notify(GRPCChannelStatus.CONNECTED);
} else {
// 通知所有监听器listener: GRPCChannel 失去连接了
notify(GRPCChannelStatus.DISCONNECT);
}
return;
} catch (Throwable t) {
logger.error(t, "Create channel to {} fail.", server);
notify(GRPCChannelStatus.DISCONNECT);
}
}

logger.debug("Selected collector grpc service is not available. Wait {} seconds to retry", Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL);
}
}

/*
* 连接状态改变时给监听器发送连接状态
*/
private void notify(GRPCChannelStatus status) {
// 已知的 listener 有 AppAndServiceRegisterClient、TraceSegmentServiceClient、JVMService.Send
for (GRPCChannelListener listener : listeners) {
try {
listener.statusChanged(status);
} catch (Throwable t) {
logger.error(t, "Fail to notify {} about channel connected.", listener.getClass().getName());
}
}
}

JVMService

JVMService 收集 JVM cpu, memory, memorypool 和 gc 信息发送给 Collector。

AppAndServiceRegisterClient

AppAndServiceRegisterClient 负责服务和服务实例的注册及心跳检测。主要做了以下几件事:

  • 准备阶段 prepare() 将自己注册为 GRPCChannelManager 的监听器。
  • 接收 GRPCChannelManager 发送的 GRPCChannel 连接状态。
  • 创建一个 ScheduledFuture,每 3s 执行一次任务。
  • 执行任务时根据 GRPCChannel 连接状态以及 APPLICATION_ID 进行服务注册/服务实例注册/服务实例心跳检测。
    其相关代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    /**
    * 服务和服务实例注册客户端
    * @author wusheng
    */
    @DefaultImplementor
    public class AppAndServiceRegisterClient implements BootService, Runnable, GRPCChannelListener {
    private static final ILog logger = LogManager.getLogger(AppAndServiceRegisterClient.class);
    private static final String PROCESS_UUID = UUID.randomUUID().toString().replaceAll("-", "");

    private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
    private volatile ApplicationRegisterServiceGrpc.ApplicationRegisterServiceBlockingStub applicationRegisterServiceBlockingStub;
    private volatile InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceBlockingStub instanceDiscoveryServiceBlockingStub;
    private volatile ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub serviceNameDiscoveryServiceBlockingStub;
    private volatile NetworkAddressRegisterServiceGrpc.NetworkAddressRegisterServiceBlockingStub networkAddressRegisterServiceBlockingStub;
    private volatile ScheduledFuture<?> applicationRegisterFuture;

    /**
    * 从 GRPCChannelManager notify() 获取到连接状态
    * @param status
    */
    @Override
    public void statusChanged(GRPCChannelStatus status) {
    if (GRPCChannelStatus.CONNECTED.equals(status)) {
    // 获取带 AuthenticationDecorator 装饰的 channel
    Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
    applicationRegisterServiceBlockingStub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
    instanceDiscoveryServiceBlockingStub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
    serviceNameDiscoveryServiceBlockingStub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel);
    networkAddressRegisterServiceBlockingStub = NetworkAddressRegisterServiceGrpc.newBlockingStub(channel);
    } else {
    applicationRegisterServiceBlockingStub = null;
    instanceDiscoveryServiceBlockingStub = null;
    serviceNameDiscoveryServiceBlockingStub = null;
    }
    // 设置状态
    this.status = status;
    }

    /**
    将自己添加到 GRPCChannelManager 的监听器中
    */
    @Override
    public void prepare() throws Throwable {
    ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
    }

    /**
    * 创建一个 ScheduledFuture
    * this 的 run 方法每 3s 执行一次
    * @throws Throwable
    */
    @Override
    public void boot() throws Throwable {
    applicationRegisterFuture = Executors
    .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("AppAndServiceRegisterClient"))
    .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
    @Override
    public void handle(Throwable t) {
    logger.error("unexpected exception.", t);
    }
    }), 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
    }

    @Override
    public void onComplete() throws Throwable {
    }

    @Override
    public void shutdown() throws Throwable {
    applicationRegisterFuture.cancel(true);
    }

    @Override
    public void run() {
    logger.debug("AppAndServiceRegisterClient running, status:{}.", status);
    boolean shouldTry = true;
    // 在上面 statusChanged 方法改变状态为已连接后执行
    while (GRPCChannelStatus.CONNECTED.equals(status) && shouldTry) {
    shouldTry = false;
    try {
    // 服务注册
    // APPLICATION_ID 默认值就是 DictionaryUtil.nullValue()
    if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
    // 正常来说,states 为 CONNECTED 的话,applicationRegisterServiceBlockingStub 不会为 null
    if (applicationRegisterServiceBlockingStub != null) {
    // APPLICATION_CODE 在 skywalking-agent 的 agent.conf 文件中配置
    Application request = Application.newBuilder().setApplicationCode(Config.Agent.APPLICATION_CODE).build();
    logger.debug("AppAndServiceRegisterClient request:{}.", request);
    ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(
    Application.newBuilder().setApplicationCode(Config.Agent.APPLICATION_CODE).build());
    // 从 response 中获取 APPLICATION_ID 设置到 RemoteDownstreamConfig.Agent
    if (applicationMapping != null) {
    RemoteDownstreamConfig.Agent.APPLICATION_ID = applicationMapping.getApplication().getValue();
    shouldTry = true;
    }
    }
    } else {
    // 服务实例注册
    if (instanceDiscoveryServiceBlockingStub != null) {
    if (RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID == DictionaryUtil.nullValue()) {

    ApplicationInstanceMapping instanceMapping = instanceDiscoveryServiceBlockingStub.registerInstance(ApplicationInstance.newBuilder()
    .setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID)
    .setAgentUUID(PROCESS_UUID)
    .setRegisterTime(System.currentTimeMillis())
    .setOsinfo(OSUtil.buildOSInfo())
    .build());
    if (instanceMapping.getApplicationInstanceId() != DictionaryUtil.nullValue()) {
    RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID
    = instanceMapping.getApplicationInstanceId();
    }
    }
    // 服务实例心跳检测
    else {
    instanceDiscoveryServiceBlockingStub.heartbeat(ApplicationInstanceHeartbeat.newBuilder()
    .setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID)
    .setHeartbeatTime(System.currentTimeMillis())
    .build());

    NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(networkAddressRegisterServiceBlockingStub);
    OperationNameDictionary.INSTANCE.syncRemoteDictionary(serviceNameDiscoveryServiceBlockingStub);
    }
    }
    }
    } catch (Throwable t) {
    logger.error(t, "AppAndServiceRegisterClient execute fail.");
    ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
    }
    }
    }
    }

ContextManagerExtendService

ContextManagerExtendService 作为 ContextManager 的扩展帮助类,代码比较简单,主要有两个功能:

  1. 将 ContextManager 添加到 TracingContext、IgnoredTracerContext 的监听器集合中,等待 context 完成时通知。
  2. 帮助 ContextManager 创建 AbstractTracerContext。

总结

  • SkyWalking Agent 具有丰富的插件支持,支持列表可见 github 官方项目
  • SkyWalking Agent 通过 ServiceManager 管理多个 BootService
  • TraceSegmentServiceClient 负责发送 TraceSegment 到 Collector
  • ContextManager 负责管理 TraceSegment 的上下文
  • SamplingService 负责管理如何进行 TraceSegment 的采样
  • GRPCChannelManager 负责管理 GRPCChannel 的创建及将状态发送至各个监听器
  • JVMService 负责将 JVM 以及 GC 信息发送到 Collector
  • AppAndServiceRegisterClient 作为一个客户端,负责进行服务注册、服务实例注册、心跳检测等调用
  • ContextManagerExtendService 作为 ContextManager 的扩展帮助类

参考博文:芋道源码