containerd启动过程
github: https://github.com/containerd/containerd
1. 前言
- dockerd 是 docker engine 守护进程,dockerd 启动时会启动 containerd 子进程,dockerd 与 containerd 通过 rpc 进行通信
- ctr 是 containerd 的 cli
- containerd 通过 shim 操作 runc,runc 真正控制容器生命周期,启动一个容器就会启动一个 shim 进程
- shim 直接调用 runc 的包函数,shim 与 containerd 之前通过 rpc 通信
- 真正用户想启动的进程由 runc 的 init 进程启动,即 runc init [args …]
docker ctr| |V V
dockerd -> containerd ---> shim -> runc -> runc init -> process|-- > shim -> runc -> runc init -> process+-- > shim -> runc -> runc init -> process
containerd 只是一个守护进程,容器的实际运行时由 runC 控制。containerd 主要职责是镜像管理(镜像、元信息等)、容器执行(调用最终运行时组件执行)
2. 源码编译
需要安装依赖包:btrfs-tools,直接 make 即可生成 ctr containerd containerd-shim binaries 可执行文件
2.1 containerd main 函数
入口目录为 cmd/containerd/main.go 中 main 函数,默认配置文件 /etc/containerd/config.toml,包含三个子命令,configCommand,publishCommand,ociHook
func main() {app := command.App()if err := app.Run(os.Args); err != nil {fmt.Fprintf(os.Stderr, "containerd: %s\n", err)os.Exit(1)}
}
2.1.1 command App 函数
// App 函数返回 *cli.App 实例
func App() *cli.App {app := cli.NewApp()app.Name = "containerd"app.Version = version.Versionapp.Usage = usageapp.Description = `
containerd is a high performance container runtime whose daemon can be started
by using this command. If none of the *config*, *publish*, or *help* commands
are specified, the default action of the **containerd** command is to start the
containerd daemon in the foreground.A default configuration is used if no TOML configuration is specified or located
at the default file location. The *containerd config* command can be used to
generate the default configuration for containerd. The output of that command
can be used and modified as necessary as a custom configuration.`
// 如果未指定 TOML 配置或位于默认文件位置,则使用默认配置。容器配置命令可用于生成容器的默认配置。该命令的输出可以根据需要作为自定义配置使用和修改app.Flags = []cli.Flag{cli.StringFlag{Name: "config,c",Usage: "path to the configuration file",Value: filepath.Join(defaults.DefaultConfigDir, "config.toml"),},cli.StringFlag{Name: "log-level,l",Usage: "set the logging level [trace, debug, info, warn, error, fatal, panic]",},cli.StringFlag{Name: "address,a",Usage: "address for containerd's GRPC server",},cli.StringFlag{Name: "root",Usage: "containerd root directory",},cli.StringFlag{Name: "state",Usage: "containerd state directory",},}app.Flags = append(app.Flags, serviceFlags()...)// configCommand 用于生成配置文件 containerd config default > /etc/containerd/config.toml,publishCommand,ociHook // publishCommand 二进制方式推送数据到containerd// ociHook 为 OCI 运行时钩子提供基础,以允许注入参数app.Commands = []cli.Command{configCommand,publishCommand,ociHook,}//未指定子命令时要执行的操作 // 需要“cli.ActionFunc”,但可以接受“func(cli.Context) {}” // 注意:对已弃用的“Action”的支持将在将来的版本中删除app.Action = func(context *cli.Context) error {var (start = time.Now()signals = make(chan os.Signal, 2048)serverC = make(chan *server.Server, 1)ctx, cancel = gocontext.WithCancel(gocontext.Background())config = defaultConfig())defer cancel()// 仅当配置存在或用户明确告诉我们加载此路径时,才尝试加载配置。configPath := context.GlobalString("config")_, err := os.Stat(configPath)if !os.IsNotExist(err) || context.GlobalIsSet("config") {if err := srvconfig.LoadConfig(configPath, config); err != nil {return err}}// 将传入参数应用于配置if err := applyFlags(context, config); err != nil {return err}// 确定根目录被创建if err := server.CreateTopLevelDirectories(config); err != nil {return err}// Stop if we are registering or unregistering against Windows SCM.stop, err := registerUnregisterService(config.Root)if err != nil {logrus.Fatal(err)}if stop {return nil}done := handleSignals(ctx, signals, serverC, cancel)// start the signal handler as soon as we can to make sure that// we don't miss any signals during bootsignal.Notify(signals, handledSignals...)// 清理挂载点if err := mount.SetTempMountLocation(filepath.Join(config.Root, "tmpmounts")); err != nil {return fmt.Errorf("creating temp mount location: %w", err)}// unmount all temp mounts on boot for the serverwarnings, err := mount.CleanupTempMounts(0)if err != nil {log.G(ctx).WithError(err).Error("unmounting temp mounts")}for _, w := range warnings {log.G(ctx).WithError(w).Warn("cleanup temp mount")}// 配置文件中grpc address 不能为空if config.GRPC.Address == "" {return fmt.Errorf("grpc address cannot be empty: %w", errdefs.ErrInvalidArgument)}if config.TTRPC.Address == "" {// If TTRPC was not explicitly configured, use defaults based on GRPC.config.TTRPC.Address = fmt.Sprintf("%s.ttrpc", config.GRPC.Address)config.TTRPC.UID = config.GRPC.UIDconfig.TTRPC.GID = config.GRPC.GID}log.G(ctx).WithFields(logrus.Fields{"version": version.Version,"revision": version.Revision,}).Info("starting containerd")type srvResp struct {s *server.Servererr error}// run server initialization in a goroutine so we don't end up blocking important things like SIGTERM handling// while the server is initializing.// As an example opening the bolt database will block forever if another containerd is already running and containerd// will have to be be `kill -9`'ed to recover.// 在 goroutine 中运行服务器初始化,这样我们就不会在服务器初始化时阻止重要的事情,例如 SIGTERM 处理。// 例如,如果另一个 containerd 已经在运行,则打开 bolt 数据库将永远阻塞,并且 containerd 必须被“kill -9”才能恢复。chsrv := make(chan srvResp)go func() {defer close(chsrv)server, err := server.New(ctx, config)if err != nil {select {case chsrv <- srvResp{err: err}:case <-ctx.Done():}return}// Launch as a Windows Service if necessaryif err := launchService(server, done); err != nil {logrus.Fatal(err)}select {case <-ctx.Done():server.Stop()case chsrv <- srvResp{s: server}:}}()var server *server.Serverselect {case <-ctx.Done():return ctx.Err()case r := <-chsrv:if r.err != nil {return r.err}server = r.s}// We don't send the server down serverC directly in the goroutine above because we need it lower down.select {case <-ctx.Done():return ctx.Err()case serverC <- server:}if config.Debug.Address != "" {var l net.Listenerif isLocalAddress(config.Debug.Address) {if l, err = sys.GetLocalListener(config.Debug.Address, config.Debug.UID, config.Debug.GID); err != nil {return fmt.Errorf("failed to get listener for debug endpoint: %w", err)}} else {if l, err = net.Listen("tcp", config.Debug.Address); err != nil {return fmt.Errorf("failed to get listener for debug endpoint: %w", err)}}serve(ctx, l, server.ServeDebug)}if config.Metrics.Address != "" {l, err := net.Listen("tcp", config.Metrics.Address)if err != nil {return fmt.Errorf("failed to get listener for metrics endpoint: %w", err)}serve(ctx, l, server.ServeMetrics)}// setup the ttrpc endpointtl, err := sys.GetLocalListener(config.TTRPC.Address, config.TTRPC.UID, config.TTRPC.GID)if err != nil {return fmt.Errorf("failed to get listener for main ttrpc endpoint: %w", err)}serve(ctx, tl, server.ServeTTRPC)if config.GRPC.TCPAddress != "" {l, err := net.Listen("tcp", config.GRPC.TCPAddress)if err != nil {return fmt.Errorf("failed to get listener for TCP grpc endpoint: %w", err)}serve(ctx, l, server.ServeTCP)}// setup the main grpc endpointl, err := sys.GetLocalListener(config.GRPC.Address, config.GRPC.UID, config.GRPC.GID)if err != nil {return fmt.Errorf("failed to get listener for main endpoint: %w", err)}serve(ctx, l, server.ServeGRPC)if err := notifyReady(ctx); err != nil {log.G(ctx).WithError(err).Warn("notify ready failed")}log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds())<-donereturn nil}return app
2.2 server.New 函数创建以及初始化 containerd server
2.2.1 初始化 contaienrd server,加载 timeout 配置
func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {if err := apply(ctx, config); err != nil {return nil, err}for key, sec := range config.Timeouts {d, err := time.ParseDuration(sec)if err != nil {return nil, fmt.Errorf("unable to parse %s into a time duration", sec)}timeout.Set(key, d)}
[timeouts]"io.containerd.timeout.shim.cleanup" = "5s""io.containerd.timeout.shim.load" = "5s""io.containerd.timeout.shim.shutdown" = "3s""io.containerd.timeout.task.state" = "2s"
2.2.2 LoadPlugins 加载插件
// LoadPlugins loads all plugins into containerd and generates an ordered graph
// of all plugins.
func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]*plugin.Registration, error) {// load all plugins into containerdpath := config.PluginDirif path == "" {path = filepath.Join(config.Root, "plugins")}if err := plugin.Load(path); err != nil {return nil, err}
2.2.2.1 注册插件 io.containerd.content.v1
// load additional plugins that don't automatically register themselvesplugin.Register(&plugin.Registration{Type: plugin.ContentPlugin,ID: "content",InitFn: func(ic *plugin.InitContext) (interface{}, error) {ic.Meta.Exports["root"] = ic.Rootreturn local.NewStore(ic.Root)},})
**,**store 结构体 实现了 Store 接口
路径 containerd/content/local/store.go// Store combines the methods of content-oriented interfaces into a set that
// are commonly provided by complete implementations.
type Store interface {ManagerProviderIngestManagerIngester
}
2.2.2.2 注册插件 io.containerd.metadata.v1,使用 bolt 数据库存储
plugin.Register(&plugin.Registration{Type: plugin.MetadataPlugin,ID: "bolt",Requires: []plugin.Type{plugin.ContentPlugin,plugin.SnapshotPlugin,},Config: &srvconfig.BoltConfig{ContentSharingPolicy: srvconfig.SharingPolicyShared,},
2.2.2.3 遍历所有插件
类型为 io.containerd.snapshotter.v1,或者 snapshot,实现在 containerd/snapshots/proxy/proxy.go,proxySnapshotter 结构体实现了 Snapshotter 接口
clients := &proxyClients{}for name, pp := range config.ProxyPlugins {var (t plugin.Typef func(*grpc.ClientConn) interface{}address = pp.Address)switch pp.Type {case string(plugin.SnapshotPlugin), "snapshot":t = plugin.SnapshotPluginssname := namef = func(conn *grpc.ClientConn) interface{} {return ssproxy.NewSnapshotter(ssapi.NewSnapshotsClient(conn), ssname)}case string(plugin.ContentPlugin), "content":t = plugin.ContentPluginf = func(conn *grpc.ClientConn) interface{} {return csproxy.NewContentStore(csapi.NewContentClient(conn))}default:log.G(ctx).WithField("type", pp.Type).Warn("unknown proxy plugin type")}plugin.Register(&plugin.Registration{Type: t,ID: name,InitFn: func(ic *plugin.InitContext) (interface{}, error) {ic.Meta.Exports["address"] = addressconn, err := clients.getClient(address)if err != nil {return nil, err}return f(conn), nil},})}
[plugins][plugins."io.containerd.snapshotter.v1.devmapper"]root_path = ""pool_name = ""base_image_size = ""
type Snapshotter interface {Stat(ctx context.Context, key string) (Info, error)Update(ctx context.Context, info Info, fieldpaths ...string) (Info, error)Usage(ctx context.Context, key string) (Usage, error)Mounts(ctx context.Context, key string) ([]mount.Mount, error)Prepare(ctx context.Context, key string, parent string, opts ...Opt) ([]mount.Mount, error)View(ctx context.Context, key string, parent string, opts ...Opt) ([]mount.Mount, error)Commit(ctx context.Context, name string, key string, opts ...Opt) errorRemove(ctx context.Context, key string) errorWalk(ctx context.Context, fn WalkFunc, filters ...string) errorClose() error
}
2.2.2.4 Graph 函数返回一个注册有序的插件列表
// Plugins in disableList specified by id will be disabled.
func Graph(filter DisableFilter) (ordered []*Registration) {register.RLock()defer register.RUnlock()for _, r := range register.r {if filter(r) {r.Disable = true}}added := map[*Registration]bool{}for _, r := range register.r {if r.Disable {continue}children(r, added, &ordered)if !added[r] {ordered = append(ordered, r)added[r] = true}}return ordered
}
2.2.2.5 启动GRPC Server
func serve(ctx gocontext.Context, l net.Listener, serveFunc func(net.Listener) error) {path := l.Addr().String()log.G(ctx).WithField("address", path).Info("serving...")go func() {defer l.Close()if err := serveFunc(l); err != nil {log.G(ctx).WithError(err).WithField("address", path).Fatal("serve failure")}}()
}
2.3 获取 GRPC server 配置参数
[grpc]address = "/run/containerd/containerd.sock"tcp_address = ""tcp_tls_cert = ""tcp_tls_key = ""uid = 0gid = 0max_recv_message_size = 16777216max_send_message_size = 16777216
serverOpts := []grpc.ServerOption{grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(otelgrpc.StreamServerInterceptor(),grpc.StreamServerInterceptor(grpc_prometheus.StreamServerInterceptor),streamNamespaceInterceptor,)),grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(otelgrpc.UnaryServerInterceptor(),grpc.UnaryServerInterceptor(grpc_prometheus.UnaryServerInterceptor),unaryNamespaceInterceptor,)),}if config.GRPC.MaxRecvMsgSize > 0 {serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(config.GRPC.MaxRecvMsgSize))}if config.GRPC.MaxSendMsgSize > 0 {serverOpts = append(serverOpts, grpc.MaxSendMsgSize(config.GRPC.MaxSendMsgSize))}
2.4 实例化Server
var (grpcServer = grpc.NewServer(serverOpts...)tcpServer = grpc.NewServer(tcpServerOpts...)grpcServices []grpcServicetcpServices []tcpServicettrpcServices []ttrpcServices = &Server{grpcServer: grpcServer,tcpServer: tcpServer,ttrpcServer: ttrpcServer,config: config,}// TODO: Remove this in 2.0 and let event plugin crease itevents = exchange.NewExchange()initialized = plugin.NewPluginSet()required = make(map[string]struct{}))
2.5 需要开启设置的插件
required_plugins = []
for _, r := range config.RequiredPlugins {required[r] = struct{}{}
}
2.6 Init调用各个注册的 InitFn 函数初始化,分别实现了不同的接口
// Init the registered plugin
func (r *Registration) Init(ic *InitContext) *Plugin {p, err := r.InitFn(ic)return &Plugin{Registration: r,Config: ic.Config,Meta: ic.Meta,instance: p,err: err,}
}
2.7 对所有 服务分别调用 Register 方法注册(根据是否实现了 Register 接口,各个插件里面的实现的接口 Register)
// register services after all plugins have been initializedfor _, service := range grpcServices {if err := service.Register(grpcServer); err != nil {return nil, err}}for _, service := range ttrpcServices {if err := service.RegisterTTRPC(ttrpcServer); err != nil {return nil, err}}for _, service := range tcpServices {if err := service.RegisterTCP(tcpServer); err != nil {return nil, err}}
3. plugin
// Registration contains information for registering a plugin
type Registration struct {Type TypeID stringConfig interface{}Requires []TypeInitFn func(*InitContext) (interface{}, error)Disable bool
}
# ctr plugins ls
TYPE ID PLATFORMS STATUS
io.containerd.content.v1 content - ok
io.containerd.snapshotter.v1 btrfs linux/amd64 error
io.containerd.snapshotter.v1 devmapper linux/amd64 error
io.containerd.snapshotter.v1 aufs linux/amd64 error
io.containerd.snapshotter.v1 native linux/amd64 ok
io.containerd.snapshotter.v1 overlayfs linux/amd64 ok
io.containerd.snapshotter.v1 zfs linux/amd64 error
io.containerd.metadata.v1 bolt - ok
io.containerd.differ.v1 walking linux/amd64 ok
io.containerd.gc.v1 scheduler - ok
io.containerd.service.v1 containers-service - ok
io.containerd.service.v1 content-service - ok
io.containerd.service.v1 diff-service - ok
io.containerd.service.v1 images-service - ok
io.containerd.service.v1 leases-service - ok
io.containerd.service.v1 namespaces-service - ok
io.containerd.service.v1 snapshots-service - ok
io.containerd.runtime.v1 linux linux/amd64 ok
io.containerd.runtime.v2 task linux/amd64 ok
io.containerd.monitor.v1 cgroups linux/amd64 ok
io.containerd.service.v1 tasks-service - ok
io.containerd.internal.v1 restart - ok
io.containerd.grpc.v1 containers - ok
io.containerd.grpc.v1 content - ok
io.containerd.grpc.v1 diff - ok
io.containerd.grpc.v1 events - ok
io.containerd.grpc.v1 healthcheck - ok
io.containerd.grpc.v1 images - ok
io.containerd.grpc.v1 leases - ok
io.containerd.grpc.v1 namespaces - ok
io.containerd.internal.v1 opt - ok
io.containerd.grpc.v1 snapshots - ok
io.containerd.grpc.v1 tasks - ok
io.containerd.grpc.v1 version - ok
io.containerd.grpc.v1 cri linux/amd64 ok
可以在源码中看到具体实现
4. 以plugin:io.containerd.grpc.v1.containers为例
代码路径: services/containers/service.go,ID 为 containers,提供基础元素的存储
注册
func init() {plugin.Register(&plugin.Registration{Type: plugin.GRPCPlugin,ID: "containers",Requires: []plugin.Type{plugin.ServicePlugin,},InitFn: func(ic *plugin.InitContext) (interface{}, error) {plugins, err := ic.GetByType(plugin.ServicePlugin)if err != nil {return nil, err}p, ok := plugins[services.ContainersService]if !ok {return nil, errors.New("containers service not found")}i, err := p.Instance()if err != nil {return nil, err}return &service{local: i.(api.ContainersClient)}, nil},})
}
containerd启动过程相关推荐
- 用Dockerfile构建MySQL镜像并实现容器启动过程中MySQL数据库系统的初始化
前一段时间就在研究用Dockerfile构建MySQL镜像并实现容器启动过程中MySQL数据库系统的初始化,但被一些无关紧要的事儿给耽误了,经过查阅<dockerfile最佳实践>及MyS ...
- Android系统的启动过程
Android系统的启动过程可以简单地总结为以下几个流程: 加载BootLoader -> 初始化内核 -> 启动init进程 -> init进程fork出Zygote(孵化器)进程 ...
- Android系统默认Home应用程序(Launcher)的启动过程源代码分析
在前面一篇文章中,我们分析了Android系统在启动时安装应用程序的过程,这些应用程序安装好之后,还需要有一个Home应用程序来负责把它们在桌面上展示出来,在Android系统中,这个默认的Home应 ...
- Linux0.11内核引导启动过程概述
Linux0.11仅支持x86架构.它的内核引导启动程序在文件夹boot内,共有三个汇编代码文件.按照启动流程依次是: (1)bootsect.s.boot是启动引导的意思,sect即sector,是 ...
- linux启动sql server数据库,SQL Server数据库启动过程详解及启动不起来的问题分析及解决方法...
第五步.启动系统数据库model model系统数据库同样也是SQL Server启动过程中用到的一个非常关键的数据库,如果这个库损坏,SQL Server启动也会失败,关于model数据不能启动的原 ...
- Linux必知必会的目录与启动过程
第1章 /etc/目录 1.1 /etc/sysconfig/network-scripts/ifcfg-eth0 linux第一块网卡的配置文件 [root@znix ~]# cat /etc/sy ...
- Spring Boot启动过程(二)
书接上篇 该说refreshContext(context)了,首先是判断context是否是AbstractApplicationContext派生类的实例,之后调用了强转为AbstractAppl ...
- Linux X Window System运行原理和启动过程
本文主要说明X Window System的基本运行原理,其启动过程,及常见的跨网络运行X Window System. 一) 基本运行原理 X Window System采用C/S结构,但和我们常见 ...
- Spring 容器的启动过程
点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 本文来源:http://r6f.cn/b47K 一. 前言 ...
最新文章
- 【iOS_Development】文件操作
- 真的,千万不要给女朋友解释 什么是 “羊群效应”
- Echarts的坐标调整,调整内部网格和外部的间隔
- 我是越来越喜欢做产品了
- 求方差时为什么要除以N—1,而不是除以N!【通俗理解-非数学专业】
- mysql 修改库的校对集_mysql数据库的基本操作(增删改查、字符集、校对集)
- 2.3基本算法之递归变递推 1188 菲波那契数列(2)
- php 转化js数组字符串,js数组怎么转为字符串
- android技巧:把自己的app变成手机系统自带的app[/system/app]
- 测试用例 集成测试增删改查_springBoot集成mongoDb并增删改查
- 使用Q-Vision软件Kvaser硬件产品,支持ADAS测试,实现对总线网络的分析
- 用python实现关机程序_python实现重启关机程序
- python中数字加引号和不加引号的区别_高考完小白自学Python,不太懂print语句中一个加引号,一个不加?...
- 程序员的自我进化:技术的广度与深度怎么权衡
- 计算机毕业设计Javahtml5健身房信息管理系统(源码+系统+mysql数据库+lw文档)
- java获取下周一_java 获取下周一日期
- 2010年9月2号安排~
- python枚举是什么意思_什么是枚举python
- 300PLC转以太网与MatrikonOPC以太网通讯
- mysql sdo geometry_SDO_GEOMETRY结构说明
热门文章
- 东方瑞丰:飞行模拟机才是虚拟现实桂冠上的明珠
- android 仿qq修改头像,Qt:小项目仿QQ修改头像界面,技术点记录
- MD5对文件进行加密,可以支持大文件
- HTML+CSS网页设计期末课程大作——体育篮球(5页面)
- 移动机器人的发展史 从世界首台移动机器人Shakey说起...
- 字符设备驱动开发的流程
- 西南大学计算机基础和数字电路,西南大学计算机基础和数字电路907复 习笔记.pdf...
- 无法加载文件 \env\Scripts\Activate.ps1,因为在此系统上禁止运行脚本。
- 关于zIndex的问题
- React 实现自定义日历