github: https://github.com/containerd/containerd

1. 前言

  • dockerd 是 docker engine 守护进程,dockerd 启动时会启动 containerd 子进程,dockerd 与 containerd 通过 rpc 进行通信
  • ctr 是 containerd 的 cli
  • containerd 通过 shim 操作 runc,runc 真正控制容器生命周期,启动一个容器就会启动一个 shim 进程
  • shim 直接调用 runc 的包函数,shim 与 containerd 之前通过 rpc 通信
  • 真正用户想启动的进程由 runc 的 init 进程启动,即 runc init [args …]
docker     ctr|         |V         V
dockerd -> containerd ---> shim -> runc -> runc init -> process|-- > shim -> runc -> runc init -> process+-- > shim -> runc -> runc init -> process

containerd 只是一个守护进程,容器的实际运行时由 runC 控制。containerd 主要职责是镜像管理(镜像、元信息等)、容器执行(调用最终运行时组件执行)

2. 源码编译

需要安装依赖包:btrfs-tools,直接 make 即可生成 ctr containerd containerd-shim binaries 可执行文件

2.1 containerd main 函数

入口目录为 cmd/containerd/main.go 中 main 函数,默认配置文件 /etc/containerd/config.toml,包含三个子命令,configCommand,publishCommand,ociHook

func main() {app := command.App()if err := app.Run(os.Args); err != nil {fmt.Fprintf(os.Stderr, "containerd: %s\n", err)os.Exit(1)}
}

2.1.1 command App 函数

//  App 函数返回 *cli.App 实例
func App() *cli.App {app := cli.NewApp()app.Name = "containerd"app.Version = version.Versionapp.Usage = usageapp.Description = `
containerd is a high performance container runtime whose daemon can be started
by using this command. If none of the *config*, *publish*, or *help* commands
are specified, the default action of the **containerd** command is to start the
containerd daemon in the foreground.A default configuration is used if no TOML configuration is specified or located
at the default file location. The *containerd config* command can be used to
generate the default configuration for containerd. The output of that command
can be used and modified as necessary as a custom configuration.`
// 如果未指定 TOML 配置或位于默认文件位置,则使用默认配置。容器配置命令可用于生成容器的默认配置。该命令的输出可以根据需要作为自定义配置使用和修改app.Flags = []cli.Flag{cli.StringFlag{Name:  "config,c",Usage: "path to the configuration file",Value: filepath.Join(defaults.DefaultConfigDir, "config.toml"),},cli.StringFlag{Name:  "log-level,l",Usage: "set the logging level [trace, debug, info, warn, error, fatal, panic]",},cli.StringFlag{Name:  "address,a",Usage: "address for containerd's GRPC server",},cli.StringFlag{Name:  "root",Usage: "containerd root directory",},cli.StringFlag{Name:  "state",Usage: "containerd state directory",},}app.Flags = append(app.Flags, serviceFlags()...)// configCommand 用于生成配置文件 containerd config default > /etc/containerd/config.toml,publishCommand,ociHook  // publishCommand 二进制方式推送数据到containerd// ociHook 为 OCI 运行时钩子提供基础,以允许注入参数app.Commands = []cli.Command{configCommand,publishCommand,ociHook,}//未指定子命令时要执行的操作 // 需要“cli.ActionFunc”,但可以接受“func(cli.Context) {}” // 注意:对已弃用的“Action”的支持将在将来的版本中删除app.Action = func(context *cli.Context) error {var (start       = time.Now()signals     = make(chan os.Signal, 2048)serverC     = make(chan *server.Server, 1)ctx, cancel = gocontext.WithCancel(gocontext.Background())config      = defaultConfig())defer cancel()// 仅当配置存在或用户明确告诉我们加载此路径时,才尝试加载配置。configPath := context.GlobalString("config")_, err := os.Stat(configPath)if !os.IsNotExist(err) || context.GlobalIsSet("config") {if err := srvconfig.LoadConfig(configPath, config); err != nil {return err}}// 将传入参数应用于配置if err := applyFlags(context, config); err != nil {return err}//  确定根目录被创建if err := server.CreateTopLevelDirectories(config); err != nil {return err}// Stop if we are registering or unregistering against Windows SCM.stop, err := registerUnregisterService(config.Root)if err != nil {logrus.Fatal(err)}if stop {return nil}done := handleSignals(ctx, signals, serverC, cancel)// start the signal handler as soon as we can to make sure that// we don't miss any signals during bootsignal.Notify(signals, handledSignals...)// 清理挂载点if err := mount.SetTempMountLocation(filepath.Join(config.Root, "tmpmounts")); err != nil {return fmt.Errorf("creating temp mount location: %w", err)}// unmount all temp mounts on boot for the serverwarnings, err := mount.CleanupTempMounts(0)if err != nil {log.G(ctx).WithError(err).Error("unmounting temp mounts")}for _, w := range warnings {log.G(ctx).WithError(w).Warn("cleanup temp mount")}// 配置文件中grpc address 不能为空if config.GRPC.Address == "" {return fmt.Errorf("grpc address cannot be empty: %w", errdefs.ErrInvalidArgument)}if config.TTRPC.Address == "" {// If TTRPC was not explicitly configured, use defaults based on GRPC.config.TTRPC.Address = fmt.Sprintf("%s.ttrpc", config.GRPC.Address)config.TTRPC.UID = config.GRPC.UIDconfig.TTRPC.GID = config.GRPC.GID}log.G(ctx).WithFields(logrus.Fields{"version":  version.Version,"revision": version.Revision,}).Info("starting containerd")type srvResp struct {s   *server.Servererr error}// run server initialization in a goroutine so we don't end up blocking important things like SIGTERM handling// while the server is initializing.// As an example opening the bolt database will block forever if another containerd is already running and containerd// will have to be be `kill -9`'ed to recover.// 在 goroutine 中运行服务器初始化,这样我们就不会在服务器初始化时阻止重要的事情,例如 SIGTERM 处理。// 例如,如果另一个 containerd 已经在运行,则打开 bolt 数据库将永远阻塞,并且 containerd 必须被“kill -9”才能恢复。chsrv := make(chan srvResp)go func() {defer close(chsrv)server, err := server.New(ctx, config)if err != nil {select {case chsrv <- srvResp{err: err}:case <-ctx.Done():}return}// Launch as a Windows Service if necessaryif err := launchService(server, done); err != nil {logrus.Fatal(err)}select {case <-ctx.Done():server.Stop()case chsrv <- srvResp{s: server}:}}()var server *server.Serverselect {case <-ctx.Done():return ctx.Err()case r := <-chsrv:if r.err != nil {return r.err}server = r.s}// We don't send the server down serverC directly in the goroutine above because we need it lower down.select {case <-ctx.Done():return ctx.Err()case serverC <- server:}if config.Debug.Address != "" {var l net.Listenerif isLocalAddress(config.Debug.Address) {if l, err = sys.GetLocalListener(config.Debug.Address, config.Debug.UID, config.Debug.GID); err != nil {return fmt.Errorf("failed to get listener for debug endpoint: %w", err)}} else {if l, err = net.Listen("tcp", config.Debug.Address); err != nil {return fmt.Errorf("failed to get listener for debug endpoint: %w", err)}}serve(ctx, l, server.ServeDebug)}if config.Metrics.Address != "" {l, err := net.Listen("tcp", config.Metrics.Address)if err != nil {return fmt.Errorf("failed to get listener for metrics endpoint: %w", err)}serve(ctx, l, server.ServeMetrics)}// setup the ttrpc endpointtl, err := sys.GetLocalListener(config.TTRPC.Address, config.TTRPC.UID, config.TTRPC.GID)if err != nil {return fmt.Errorf("failed to get listener for main ttrpc endpoint: %w", err)}serve(ctx, tl, server.ServeTTRPC)if config.GRPC.TCPAddress != "" {l, err := net.Listen("tcp", config.GRPC.TCPAddress)if err != nil {return fmt.Errorf("failed to get listener for TCP grpc endpoint: %w", err)}serve(ctx, l, server.ServeTCP)}// setup the main grpc endpointl, err := sys.GetLocalListener(config.GRPC.Address, config.GRPC.UID, config.GRPC.GID)if err != nil {return fmt.Errorf("failed to get listener for main endpoint: %w", err)}serve(ctx, l, server.ServeGRPC)if err := notifyReady(ctx); err != nil {log.G(ctx).WithError(err).Warn("notify ready failed")}log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds())<-donereturn nil}return app

2.2 server.New 函数创建以及初始化 containerd server

2.2.1 初始化 contaienrd server,加载 timeout 配置

func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {if err := apply(ctx, config); err != nil {return nil, err}for key, sec := range config.Timeouts {d, err := time.ParseDuration(sec)if err != nil {return nil, fmt.Errorf("unable to parse %s into a time duration", sec)}timeout.Set(key, d)}
[timeouts]"io.containerd.timeout.shim.cleanup" = "5s""io.containerd.timeout.shim.load" = "5s""io.containerd.timeout.shim.shutdown" = "3s""io.containerd.timeout.task.state" = "2s"

2.2.2 LoadPlugins 加载插件


// LoadPlugins loads all plugins into containerd and generates an ordered graph
// of all plugins.
func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]*plugin.Registration, error) {// load all plugins into containerdpath := config.PluginDirif path == "" {path = filepath.Join(config.Root, "plugins")}if err := plugin.Load(path); err != nil {return nil, err}

2.2.2.1 注册插件 io.containerd.content.v1

// load additional plugins that don't automatically register themselvesplugin.Register(&plugin.Registration{Type: plugin.ContentPlugin,ID:   "content",InitFn: func(ic *plugin.InitContext) (interface{}, error) {ic.Meta.Exports["root"] = ic.Rootreturn local.NewStore(ic.Root)},})

**,**store 结构体 实现了 Store 接口

路径 containerd/content/local/store.go// Store combines the methods of content-oriented interfaces into a set that
// are commonly provided by complete implementations.
type Store interface {ManagerProviderIngestManagerIngester
}

2.2.2.2 注册插件 io.containerd.metadata.v1,使用 bolt 数据库存储

 plugin.Register(&plugin.Registration{Type: plugin.MetadataPlugin,ID:   "bolt",Requires: []plugin.Type{plugin.ContentPlugin,plugin.SnapshotPlugin,},Config: &srvconfig.BoltConfig{ContentSharingPolicy: srvconfig.SharingPolicyShared,},

2.2.2.3 遍历所有插件

类型为 io.containerd.snapshotter.v1,或者 snapshot,实现在 containerd/snapshots/proxy/proxy.go,proxySnapshotter 结构体实现了 Snapshotter 接口

 clients := &proxyClients{}for name, pp := range config.ProxyPlugins {var (t plugin.Typef func(*grpc.ClientConn) interface{}address = pp.Address)switch pp.Type {case string(plugin.SnapshotPlugin), "snapshot":t = plugin.SnapshotPluginssname := namef = func(conn *grpc.ClientConn) interface{} {return ssproxy.NewSnapshotter(ssapi.NewSnapshotsClient(conn), ssname)}case string(plugin.ContentPlugin), "content":t = plugin.ContentPluginf = func(conn *grpc.ClientConn) interface{} {return csproxy.NewContentStore(csapi.NewContentClient(conn))}default:log.G(ctx).WithField("type", pp.Type).Warn("unknown proxy plugin type")}plugin.Register(&plugin.Registration{Type: t,ID:   name,InitFn: func(ic *plugin.InitContext) (interface{}, error) {ic.Meta.Exports["address"] = addressconn, err := clients.getClient(address)if err != nil {return nil, err}return f(conn), nil},})}
[plugins][plugins."io.containerd.snapshotter.v1.devmapper"]root_path = ""pool_name = ""base_image_size = ""
type Snapshotter interface {Stat(ctx context.Context, key string) (Info, error)Update(ctx context.Context, info Info, fieldpaths ...string) (Info, error)Usage(ctx context.Context, key string) (Usage, error)Mounts(ctx context.Context, key string) ([]mount.Mount, error)Prepare(ctx context.Context, key string, parent string, opts ...Opt) ([]mount.Mount, error)View(ctx context.Context, key string, parent string, opts ...Opt) ([]mount.Mount, error)Commit(ctx context.Context, name string, key string, opts ...Opt) errorRemove(ctx context.Context, key string) errorWalk(ctx context.Context, fn WalkFunc, filters ...string) errorClose() error
}

2.2.2.4 Graph 函数返回一个注册有序的插件列表

// Plugins in disableList specified by id will be disabled.
func Graph(filter DisableFilter) (ordered []*Registration) {register.RLock()defer register.RUnlock()for _, r := range register.r {if filter(r) {r.Disable = true}}added := map[*Registration]bool{}for _, r := range register.r {if r.Disable {continue}children(r, added, &ordered)if !added[r] {ordered = append(ordered, r)added[r] = true}}return ordered
}

2.2.2.5 启动GRPC Server

func serve(ctx gocontext.Context, l net.Listener, serveFunc func(net.Listener) error) {path := l.Addr().String()log.G(ctx).WithField("address", path).Info("serving...")go func() {defer l.Close()if err := serveFunc(l); err != nil {log.G(ctx).WithError(err).WithField("address", path).Fatal("serve failure")}}()
}

2.3 获取 GRPC server 配置参数

 [grpc]address = "/run/containerd/containerd.sock"tcp_address = ""tcp_tls_cert = ""tcp_tls_key = ""uid = 0gid = 0max_recv_message_size = 16777216max_send_message_size = 16777216
 serverOpts := []grpc.ServerOption{grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(otelgrpc.StreamServerInterceptor(),grpc.StreamServerInterceptor(grpc_prometheus.StreamServerInterceptor),streamNamespaceInterceptor,)),grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(otelgrpc.UnaryServerInterceptor(),grpc.UnaryServerInterceptor(grpc_prometheus.UnaryServerInterceptor),unaryNamespaceInterceptor,)),}if config.GRPC.MaxRecvMsgSize > 0 {serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(config.GRPC.MaxRecvMsgSize))}if config.GRPC.MaxSendMsgSize > 0 {serverOpts = append(serverOpts, grpc.MaxSendMsgSize(config.GRPC.MaxSendMsgSize))}

2.4 实例化Server

 var (grpcServer = grpc.NewServer(serverOpts...)tcpServer  = grpc.NewServer(tcpServerOpts...)grpcServices  []grpcServicetcpServices   []tcpServicettrpcServices []ttrpcServices = &Server{grpcServer:  grpcServer,tcpServer:   tcpServer,ttrpcServer: ttrpcServer,config:      config,}// TODO: Remove this in 2.0 and let event plugin crease itevents      = exchange.NewExchange()initialized = plugin.NewPluginSet()required    = make(map[string]struct{}))

2.5 需要开启设置的插件

required_plugins = []
for _, r := range config.RequiredPlugins {required[r] = struct{}{}
}

2.6 Init调用各个注册的 InitFn 函数初始化,分别实现了不同的接口

// Init the registered plugin
func (r *Registration) Init(ic *InitContext) *Plugin {p, err := r.InitFn(ic)return &Plugin{Registration: r,Config:       ic.Config,Meta:         ic.Meta,instance:     p,err:          err,}
}

2.7 对所有 服务分别调用 Register 方法注册(根据是否实现了 Register 接口,各个插件里面的实现的接口 Register)

 // register services after all plugins have been initializedfor _, service := range grpcServices {if err := service.Register(grpcServer); err != nil {return nil, err}}for _, service := range ttrpcServices {if err := service.RegisterTTRPC(ttrpcServer); err != nil {return nil, err}}for _, service := range tcpServices {if err := service.RegisterTCP(tcpServer); err != nil {return nil, err}}

3. plugin

// Registration contains information for registering a plugin
type Registration struct {Type     TypeID       stringConfig   interface{}Requires []TypeInitFn   func(*InitContext) (interface{}, error)Disable  bool
}
# ctr plugins ls
TYPE                            ID                    PLATFORMS      STATUS
io.containerd.content.v1        content               -              ok
io.containerd.snapshotter.v1    btrfs                 linux/amd64    error
io.containerd.snapshotter.v1    devmapper             linux/amd64    error
io.containerd.snapshotter.v1    aufs                  linux/amd64    error
io.containerd.snapshotter.v1    native                linux/amd64    ok
io.containerd.snapshotter.v1    overlayfs             linux/amd64    ok
io.containerd.snapshotter.v1    zfs                   linux/amd64    error
io.containerd.metadata.v1       bolt                  -              ok
io.containerd.differ.v1         walking               linux/amd64    ok
io.containerd.gc.v1             scheduler             -              ok
io.containerd.service.v1        containers-service    -              ok
io.containerd.service.v1        content-service       -              ok
io.containerd.service.v1        diff-service          -              ok
io.containerd.service.v1        images-service        -              ok
io.containerd.service.v1        leases-service        -              ok
io.containerd.service.v1        namespaces-service    -              ok
io.containerd.service.v1        snapshots-service     -              ok
io.containerd.runtime.v1        linux                 linux/amd64    ok
io.containerd.runtime.v2        task                  linux/amd64    ok
io.containerd.monitor.v1        cgroups               linux/amd64    ok
io.containerd.service.v1        tasks-service         -              ok
io.containerd.internal.v1       restart               -              ok
io.containerd.grpc.v1           containers            -              ok
io.containerd.grpc.v1           content               -              ok
io.containerd.grpc.v1           diff                  -              ok
io.containerd.grpc.v1           events                -              ok
io.containerd.grpc.v1           healthcheck           -              ok
io.containerd.grpc.v1           images                -              ok
io.containerd.grpc.v1           leases                -              ok
io.containerd.grpc.v1           namespaces            -              ok
io.containerd.internal.v1       opt                   -              ok
io.containerd.grpc.v1           snapshots             -              ok
io.containerd.grpc.v1           tasks                 -              ok
io.containerd.grpc.v1           version               -              ok
io.containerd.grpc.v1           cri                   linux/amd64    ok

可以在源码中看到具体实现

4. 以plugin:io.containerd.grpc.v1.containers为例

代码路径: services/containers/service.go,ID 为 containers,提供基础元素的存储

注册

func init() {plugin.Register(&plugin.Registration{Type: plugin.GRPCPlugin,ID:   "containers",Requires: []plugin.Type{plugin.ServicePlugin,},InitFn: func(ic *plugin.InitContext) (interface{}, error) {plugins, err := ic.GetByType(plugin.ServicePlugin)if err != nil {return nil, err}p, ok := plugins[services.ContainersService]if !ok {return nil, errors.New("containers service not found")}i, err := p.Instance()if err != nil {return nil, err}return &service{local: i.(api.ContainersClient)}, nil},})
}

containerd启动过程相关推荐

  1. 用Dockerfile构建MySQL镜像并实现容器启动过程中MySQL数据库系统的初始化

    前一段时间就在研究用Dockerfile构建MySQL镜像并实现容器启动过程中MySQL数据库系统的初始化,但被一些无关紧要的事儿给耽误了,经过查阅<dockerfile最佳实践>及MyS ...

  2. Android系统的启动过程

    Android系统的启动过程可以简单地总结为以下几个流程: 加载BootLoader -> 初始化内核 -> 启动init进程 -> init进程fork出Zygote(孵化器)进程 ...

  3. Android系统默认Home应用程序(Launcher)的启动过程源代码分析

    在前面一篇文章中,我们分析了Android系统在启动时安装应用程序的过程,这些应用程序安装好之后,还需要有一个Home应用程序来负责把它们在桌面上展示出来,在Android系统中,这个默认的Home应 ...

  4. Linux0.11内核引导启动过程概述

    Linux0.11仅支持x86架构.它的内核引导启动程序在文件夹boot内,共有三个汇编代码文件.按照启动流程依次是: (1)bootsect.s.boot是启动引导的意思,sect即sector,是 ...

  5. linux启动sql server数据库,SQL Server数据库启动过程详解及启动不起来的问题分析及解决方法...

    第五步.启动系统数据库model model系统数据库同样也是SQL Server启动过程中用到的一个非常关键的数据库,如果这个库损坏,SQL Server启动也会失败,关于model数据不能启动的原 ...

  6. Linux必知必会的目录与启动过程

    第1章 /etc/目录 1.1 /etc/sysconfig/network-scripts/ifcfg-eth0 linux第一块网卡的配置文件 [root@znix ~]# cat /etc/sy ...

  7. Spring Boot启动过程(二)

    书接上篇 该说refreshContext(context)了,首先是判断context是否是AbstractApplicationContext派生类的实例,之后调用了强转为AbstractAppl ...

  8. Linux X Window System运行原理和启动过程

    本文主要说明X Window System的基本运行原理,其启动过程,及常见的跨网络运行X Window System. 一) 基本运行原理 X Window System采用C/S结构,但和我们常见 ...

  9. Spring 容器的启动过程

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 本文来源:http://r6f.cn/b47K 一. 前言 ...

最新文章

  1. 【iOS_Development】文件操作
  2. 真的,千万不要给女朋友解释 什么是 “羊群效应”
  3. Echarts的坐标调整,调整内部网格和外部的间隔
  4. 我是越来越喜欢做产品了
  5. 求方差时为什么要除以N—1,而不是除以N!【通俗理解-非数学专业】
  6. mysql 修改库的校对集_mysql数据库的基本操作(增删改查、字符集、校对集)
  7. 2.3基本算法之递归变递推 1188 菲波那契数列(2)
  8. php 转化js数组字符串,js数组怎么转为字符串
  9. android技巧:把自己的app变成手机系统自带的app[/system/app]
  10. 测试用例 集成测试增删改查_springBoot集成mongoDb并增删改查
  11. 使用Q-Vision软件Kvaser硬件产品,支持ADAS测试,实现对总线网络的分析
  12. 用python实现关机程序_python实现重启关机程序
  13. python中数字加引号和不加引号的区别_高考完小白自学Python,不太懂print语句中一个加引号,一个不加?...
  14. 程序员的自我进化:技术的广度与深度怎么权衡
  15. 计算机毕业设计Javahtml5健身房信息管理系统(源码+系统+mysql数据库+lw文档)
  16. java获取下周一_java 获取下周一日期
  17. 2010年9月2号安排~
  18. python枚举是什么意思_什么是枚举python
  19. 300PLC转以太网与MatrikonOPC以太网通讯
  20. mysql sdo geometry_SDO_GEOMETRY结构说明

热门文章

  1. 东方瑞丰:飞行模拟机才是虚拟现实桂冠上的明珠
  2. android 仿qq修改头像,Qt:小项目仿QQ修改头像界面,技术点记录
  3. MD5对文件进行加密,可以支持大文件
  4. HTML+CSS网页设计期末课程大作——体育篮球(5页面)
  5. 移动机器人的发展史 从世界首台移动机器人Shakey说起...
  6. 字符设备驱动开发的流程
  7. 西南大学计算机基础和数字电路,西南大学计算机基础和数字电路907复 习笔记.pdf...
  8. 无法加载文件 \env\Scripts\Activate.ps1,因为在此系统上禁止运行脚本。
  9. 关于zIndex的问题
  10. React 实现自定义日历