SkyWalking Collector 入口
SkyWalking Collector 入口是 org.apache.skywalking.oap.server.starter.OAPServerStartUp 的 main() 方法。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public static void main(String[] args) {
// 创建配置加载器
ApplicationConfigLoader configLoader = new ApplicationConfigLoader();
// 创建模块管理器
ModuleManager manager = new ModuleManager();
try {
// 加载配置
ApplicationConfiguration applicationConfiguration = configLoader.load();
// 初始化组件
manager.init(applicationConfiguration);
String mode = System.getProperty("mode");
if ("init".equals(mode)) {
logger.info("OAP starts up in init mode successfully, exit now...");
System.exit(0);
}
} catch (ConfigFileNotFoundException | ModuleNotFoundException | ProviderNotFoundException | ServiceNotProvidedException | ModuleConfigException | ModuleStartException e) {
logger.error(e.getMessage(), e);
System.exit(1);
}
}
简介
- Collector 使用组件管理器( ModuleManager ),管理多个组件( Module )。
一个组件有多种组件服务提供者( ModuleProvider ),同时一个组件只允许使用一个组件服务提供者。 - Collector 使用一个应用配置类( ApplicationConfiguration )。
一个应用配置类包含多个组件配置类( ModuleConfiguration )。每个组件对应一个组件配置类。
一个组件配置类包含多个组件服务提供者配置( ProviderConfiguration )。每个组件服务提供者对应一个组件配置类。
注意:因为一个组件只允许同时使用一个组件服务提供者,所以一个组件配置类只设置一个组件服务提供者配置。配置加载器
- ApplicationConfigLoader 作为配置加载器,负责加载配置创建 ApplicationConfiguration 。
- ApplicationConfiguration 作为一个配置类,持有一个组件名->组件配置类的 map 集合。
- 每一个组件配置类又持有一个组件服务提供配置名->组件服务提供配置类的 map 集合。
ApplicationConfiguration 和 ModuleConfiguration 是1:N
,ModuleConfiguration 和 ProviderConfiguration 也是1:N
。相关代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public class ApplicationConfiguration {
/**
* moduleName -> ModuleConfiguration 对应关系集合
*/
private HashMap<String, ModuleConfiguration> modules = new HashMap<>();
/**
* 某个组件模块的配置,例如 storage 为一个组件模块
*/
public class ModuleConfiguration {
/**
* moduleProviderName -> ProviderConfiguration 对应关系集合
*/
private HashMap<String, ProviderConfiguration> providers = new HashMap<>();
}
/**
* 一个组件模块的提供者配置,例如 elasticsearch 和 h2 为 storage 组件的两个组件提供者
*/
public class ProviderConfiguration {
private Properties properties;
}
}
组件管理器
ModuleManager 作为组件模块管理器,负责创建各个组件模块,以及一个组件启动流程类 BootstrapFlow
其 init() 方法如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public void init(
ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException, ModuleConfigException, ModuleStartException {
String[] moduleNames = applicationConfiguration.moduleList();
ServiceLoader<ModuleDefine> moduleServiceLoader = ServiceLoader.load(ModuleDefine.class);
LinkedList<String> moduleList = new LinkedList<>(Arrays.asList(moduleNames));
for (ModuleDefine module : moduleServiceLoader) {
for (String moduleName : moduleNames) {
if (moduleName.equals(module.name())) {
ModuleDefine newInstance;
try {
// 创建组件定义
newInstance = module.getClass().newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new ModuleNotFoundException(e);
}
// 各个组件做准备工作
newInstance.prepare(this, applicationConfiguration.getModuleConfiguration(moduleName));
// 保存各个组件到变量 loadedModules
loadedModules.put(moduleName, newInstance);
moduleList.remove(moduleName);
}
}
}
// Finish prepare stage
isInPrepareStage = false;
if (moduleList.size() > 0) {
throw new ModuleNotFoundException(moduleList.toString() + " missing.");
}
// 创建组件启动流程类,会计算 ModuleProvider 启动顺序
BootstrapFlow bootstrapFlow = new BootstrapFlow(loadedModules);
bootstrapFlow.start(this);
bootstrapFlow.notifyAfterCompleted();
}
组件管理
ModuleManager 作为组件管理器,持有已加载的组件集合。1
private final Map<String, ModuleDefine> loadedModules = new HashMap<>();
ModuleDefine
根据一个组件配置类创建出一个组件定义对象,该对象持有一个对应的组件提供者集合。相关代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41private final LinkedList<ModuleProvider> loadedProviders = new LinkedList<>();
void prepare(ModuleManager moduleManager,
ApplicationConfiguration.ModuleConfiguration configuration) throws ProviderNotFoundException, ServiceNotProvidedException, ModuleConfigException, ModuleStartException {
ServiceLoader<ModuleProvider> moduleProviderLoader = ServiceLoader.load(ModuleProvider.class);
boolean providerExist = false;
// 遍历所有 ModuleProvider 实现类的实例数组,创建在配置中的 ModuleProvider 实现类的实例
for (ModuleProvider provider : moduleProviderLoader) {
if (!configuration.has(provider.name())) {
continue;
}
providerExist = true;
if (provider.module().equals(getClass())) {
ModuleProvider newProvider;
try {
newProvider = provider.getClass().newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new ProviderNotFoundException(e);
}
newProvider.setManager(moduleManager);
newProvider.setModuleDefine(this);
loadedProviders.add(newProvider);
}
}
if (!providerExist) {
throw new ProviderNotFoundException(this.name() + " module no provider exists.");
}
for (ModuleProvider moduleProvider : loadedProviders) {
logger.info("Prepare the {} provider in {} module.", moduleProvider.name(), this.name());
try {
copyProperties(moduleProvider.createConfigBeanIfAbsent(), configuration.getProviderConfiguration(moduleProvider.name()), this.name(), moduleProvider.name());
} catch (IllegalAccessException e) {
throw new ModuleConfigException(this.name() + " module config transport to config bean failure.", e);
}
// 执行 ModuleProvider 准备阶段的逻辑。会创建对应的 Service
moduleProvider.prepare();
}
}
ModuleProvider
ModuleProvider 组件提供者相关变量如下:1
2
3
4
5
6// 所属的组件管理器
private ModuleManager manager;
// 对应的组件定义
private ModuleDefine moduleDefine;
// 相关的 service 服务对象集合
private final Map<Class<? extends Service>, Service> services = new HashMap<>();
BootstrapFlow
BootstrapFlow 作为组件启动流程类,负责计算所有 ModuleProvider 的启动顺序,并在 start() 方法中执行所有 ModuleProvider 的启动阶段逻辑。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30BootstrapFlow(Map<String, ModuleDefine> loadedModules) throws CycleDependencyException {
this.loadedModules = loadedModules;
startupSequence = new LinkedList<>();
// 获得 ModuleProvider 启动顺序
makeSequence();
}
"unchecked") (
void start(
ModuleManager moduleManager) throws ModuleNotFoundException, ServiceNotProvidedException, ModuleStartException {
for (ModuleProvider provider : startupSequence) {
// 校验依赖的 Module 是否都已经存在
String[] requiredModules = provider.requiredModules();
if (requiredModules != null) {
for (String module : requiredModules) {
if (!moduleManager.has(module)) {
throw new ModuleNotFoundException(module + " is required by " + provider.getModuleName()
+ "." + provider.name() + ", but not found.");
}
}
}
logger.info("start the provider {} in {} module.", provider.name(), provider.getModuleName());
// 校验 ModuleProvider 包含的 Service 们都创建成功。(Service 会在 moduleProvider.prepare 方法中创建)
provider.requiredCheck(provider.getModule().services());
// 执行 ModuleProvider 启动阶段逻辑
provider.start();
}
}
应用注册调用分析
RegisterModuleProvider
RegisterModuleProvider 作为注册组件提供者,其 start() 方法代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public void start() {
// 1. 从 ModuleManager 根据 CoreModule.NAME 获取 ModuleProviderHolder(ModuleDefine)
// 2. 从 ModuleDefine 对应的已加载的 ModuleProvider 集合中获取第一个 ModuleProvider
// 3. 从 ModuleProvider 的 services 中根据 GRPCHandlerRegister.class 获取 GRPCHandlerRegisterImpl
// GRPCHandlerRegisterImpl 以及 GRPCServer 的初始化在 CoreModuleProvider.prepare 方法中
GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).provider().getService(GRPCHandlerRegister.class);
// 添加服务注册处理器
grpcHandlerRegister.addHandler(new ApplicationRegisterHandler(getManager()));
// 添加服务实例/心跳检测处理器
grpcHandlerRegister.addHandler(new InstanceDiscoveryServiceHandler(getManager()));
// 添加服务名发现处理器
grpcHandlerRegister.addHandler(new ServiceNameDiscoveryHandler(getManager()));
// 添加网络地址注册处理器
grpcHandlerRegister.addHandler(new NetworkAddressRegisterServiceHandler(getManager()));
JettyHandlerRegister jettyHandlerRegister = getManager().find(CoreModule.NAME).provider().getService(JettyHandlerRegister.class);
jettyHandlerRegister.addHandler(new ApplicationRegisterServletHandler(getManager()));
jettyHandlerRegister.addHandler(new InstanceDiscoveryServletHandler(getManager()));
jettyHandlerRegister.addHandler(new InstanceHeartBeatServletHandler(getManager()));
jettyHandlerRegister.addHandler(new NetworkAddressRegisterServletHandler(getManager()));
jettyHandlerRegister.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
}
以 GRPC 方式为例(REST 方式类似),GRPCHandlerRegisterImpl 在 CoreModuleProvider 的 prepare() 中创建。然后添加服务注册处理器、实例注册处理器等处理器。
相关处理器
ApplicationRegisterHandler 作为应用注册处理器,主要实现处理 agent 端监控应用启动后向 collector 端注册该应用服务的请求。相关代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24private final IServiceInventoryRegister serviceInventoryRegister;
public ApplicationRegisterHandler(ModuleManager moduleManager) {
serviceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInventoryRegister.class);
}
public void applicationCodeRegister(Application request, StreamObserver<ApplicationMapping> responseObserver) {
if (logger.isDebugEnabled()) {
logger.debug("Register application, application code: {}", request.getApplicationCode());
}
ApplicationMapping.Builder builder = ApplicationMapping.newBuilder();
String serviceName = request.getApplicationCode();
// 根据应用服务名获取id,有则获取无则创建
int serviceId = serviceInventoryRegister.getOrCreate(serviceName);
if (serviceId != Const.NONE) {
KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(serviceName).setValue(serviceId).build();
builder.setApplication(value);
}
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
这里的 serviceInventoryRegister 具体实现是 CoreModuleProvider 中创建的 ServiceInventoryRegister。
通过 getOrCreate() 方法获取服务id,有则获取,无则创建。其他三个处理器功能如下:
- InstanceDiscoveryServiceHandler 对应服务实例注册/心跳检测。
- ServiceNameDiscoveryHandler 对应服务名查找。
- NetworkAddressRegisterServiceHandler 对应网络地址注册。
agent 调用 api
在分析 agent 的时候分析过一个 bootService: AppAndServiceRegisterClient,在 run() 方法里会调用对应的上述 api 方法。
grpc 学习推荐博文GRPC学习1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57public void run() {
logger.debug("AppAndServiceRegisterClient running, status:{}.", status);
boolean shouldTry = true;
// 在上面 statusChanged 方法改变状态为已连接后执行
while (GRPCChannelStatus.CONNECTED.equals(status) && shouldTry) {
shouldTry = false;
try {
// 服务注册
// APPLICATION_ID 默认值就是 DictionaryUtil.nullValue()
if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
// 正常来说,states 为 CONNECTED 的话,applicationRegisterServiceBlockingStub 不会为 null
if (applicationRegisterServiceBlockingStub != null) {
// APPLICATION_CODE 在 skywalking-agent 的 agent.conf 文件中配置
Application request = Application.newBuilder().setApplicationCode(Config.Agent.APPLICATION_CODE).build();
logger.debug("AppAndServiceRegisterClient request:{}.", request);
ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(
Application.newBuilder().setApplicationCode(Config.Agent.APPLICATION_CODE).build());
// 从 response 中获取 APPLICATION_ID 设置到 RemoteDownstreamConfig.Agent
if (applicationMapping != null) {
RemoteDownstreamConfig.Agent.APPLICATION_ID = applicationMapping.getApplication().getValue();
shouldTry = true;
}
}
} else {
// 服务实例注册
if (instanceDiscoveryServiceBlockingStub != null) {
if (RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID == DictionaryUtil.nullValue()) {
ApplicationInstanceMapping instanceMapping = instanceDiscoveryServiceBlockingStub.registerInstance(ApplicationInstance.newBuilder()
.setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID)
.setAgentUUID(PROCESS_UUID)
.setRegisterTime(System.currentTimeMillis())
.setOsinfo(OSUtil.buildOSInfo())
.build());
if (instanceMapping.getApplicationInstanceId() != DictionaryUtil.nullValue()) {
RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID
= instanceMapping.getApplicationInstanceId();
}
}
// 服务实例心跳检测
else {
instanceDiscoveryServiceBlockingStub.heartbeat(ApplicationInstanceHeartbeat.newBuilder()
.setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID)
.setHeartbeatTime(System.currentTimeMillis())
.build());
NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(networkAddressRegisterServiceBlockingStub);
OperationNameDictionary.INSTANCE.syncRemoteDictionary(serviceNameDiscoveryServiceBlockingStub);
}
}
}
} catch (Throwable t) {
logger.error(t, "AppAndServiceRegisterClient execute fail.");
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
}
}
}
总结
SkyWalking Collector 通过不同组件实现不同的功能,而每个组件又可以选择不同的组件提供者。
注册服务通过 RegisterModuleProvider 实现、存储服务通过 H2StorageProvider/StorageModuleElasticsearchProvider 实现等。
和 Agent 的通讯可采用 GRPC 和 REST 两种方式。