中文

杂谈 os包的功能 Amaze UI中的组件 Skeleton CSS的用武之地 SVG Introduction mysql数据库迁移 How agent and message bus work in the CloudStack platform? 剖析cloudstack中虚机的创建过程 AngularJS与LoopBack LoopBack实战 Sails使用手册 在NodeJS中使用MySQL数据库 在NodeJS中使用Sqlite 实战XLRD类库 上下文中给模板中的变量赋值 使用Pecan创建restful API 服务 使用python语言操作excel文件 Magento性能调优 Celery的世界 Linux下如何快速查找文件和内容 MySQL常用资源 为PHP设置服务器(Apache/Nginx)环境变量 MySQL大量数据插入各种方法性能分析与比较 Laravel中使用Redis作为队列系统的工作流程 使用Supervisor来管理你的Laravel队列 在Laravel中使用自己的类库三种方式 用Xdebug和Sublime调试PHP代码 用Laravel+Grunt+Bower管理你的应用 PHP Socket的使用 Python脚本--下载合并SAE日志 PHP命名空间及自动加载 SVN 常用资源 Shell 常用资源 Python 常用资源 PHP 常用资源 jQuery 常用资源 JavaScript 常用资源 HTML 常用资源 Git 常用资源 Linux下多个命令连续执行方法 基于CSS3实现尖角面包屑 部署Ceilometer到已有环境中 更新前端框架到Bootstrap3 OpenStack Ceilometer数据存储与API源码解析 OpenStack Ceilometer中的Pipeline机制 OpenStack Ceilometer Compute Agent源码解读 学习Python动态扩展包stevedore 学习Python的ABC模块 Python包管理工具setuptools详解 OpenStack Horizon 中文本地化 安装MySQL和MongoDB的WEB管理界面 给Git或者APT设置goagent代理 WSGI学习 在虚拟机单机部署OpenStack Grizzly python包工具之间的关系 给OpenStack创建Ubuntu镜像 OpenStack Grizzly Multihost部署文档 HTML中meta标签viewpoint的作用 交互式编程-IPython 页面提速之——数据缓存 给OpenStack创建Win7镜像 Ceilometer的命令行使用 给OpenStack创建Windows XP镜像 概念模型、逻辑模型、物理模型的区别 Bootstrap常用资源 OpenStack监控项目Ceilometer的一些术语 服务器自动化部署及运维常见工具 Linux下开启Libvirtd的tcp监控 VNC和远程桌面的区别 调试和修改OpenStack中的Horizon部分 win7快速打开应用程序或文件 JavaScript变量作用域 git创建远程库 MySQL远程访问 sae下的python开发部署和一个简单例子 OpenStack Nova内部机制【译】 PHP可变变量 JS中防止浏览器屏蔽window.open PHP操作Session的原理及提升安全性时的一个问题

中文/英文

一日十句

标签


OpenStack Ceilometer中的Pipeline机制

2013年06月11日

Pipeline作用

Pipeline翻译过来是管道的意思,它在ceilometer中的作用类似一个过滤器一样,或者说是转换器。它是一般是一个方法链,这个方法链前面一部分是transformer,transformer实现数据转换等功能,它可以有多个。在链尾是publisher,它负责将数据发送到AMQP中去。

Pipeline定义

在Agent的构造函数中,第一个创建的属性就是pipeline_manager

self.pipeline_manager = pipeline.setup_pipeline(
    transformer.TransformerExtensionManager(
        'ceilometer.transformer',
    ),
    publisher.PublisherExtensionManager(
        'ceilometer.publisher',
    ),
)

其中,transformer和publisher来自setup.cfg中

ceilometer.transformer =
    accumulator = ceilometer.transformer.accumulator:TransformerAccumulator

ceilometer.publisher =
    meter_publisher = ceilometer.publisher.meter:MeterPublisher
    meter = ceilometer.publisher.meter:MeterPublisher
    udp = ceilometer.publisher.udp:UDPPublisher

Pipeline设置

它调用了ceilometer.pipeline中的setup_pipline(),setup_pipeline()通过导入pipeline.yaml,获得pipeline的配置,默认配置如下

name: meter_pipeline
interval: 600
counters:
    - "*"
transformers:
publishers:
    - meter

最后它创建了一个PipelineManager给self.pipeline_manager

PipelineManager(pipeline_cfg,transformer_manager,publisher_manager)

PipelineManager做的事情如下:

self.pipelines = [Pipeline(pipedef, publisher_manager,transformer_manager) for pipedef in cfg]

它遍历cfg中对pipeline的定义(基本都是一个),然后生成一个Pipeline对象数组

def __init__(self, cfg, publisher_manager, transformer_manager):
    self.cfg = cfg
    self.name = cfg['name']
    self.interval = int(cfg['interval'])
    self.counters = cfg['counters']
    self.publishers = cfg['publishers']
    self.transformer_cfg = cfg['transformers'] or []
    self.publisher_manager = publisher_manager
    self._check_counters()
    self._check_publishers(cfg, publisher_manager)
    self.transformers = self._setup_transformers(cfg, transformer_manager)

Pipeline的构造函数如上,它的作用是处理transformer和publisher

Pipeline使用

pipeline的使用位置在agent.py中

def setup_polling_tasks(self):
    polling_tasks = {}
    for pipeline, pollster in itertools.product(
            self.pipeline_manager.pipelines,
            self.pollster_manager.extensions):
        for counter in pollster.obj.get_counter_names():
            if pipeline.support_counter(counter):
                polling_task = polling_tasks.get(pipeline.interval, None)
                if not polling_task:
                    polling_task = self.create_polling_task()
                    polling_tasks[pipeline.interval] = polling_task
                polling_task.add(pollster, [pipeline])
                break

    return polling_tasks

首先通过product生成pipeline和pollster的笛卡尔积,即将每一个pollster都和pipeline配对(一般只有一个pipeline)。

pipeline.support_counter(counter)用来检查这个counter是否同意进入pipeline

另外,每一个polling_task都在构造函数中

self.publish_context = pipeline.PublishContext(
    agent_manager.context,
    cfg.CONF.counter_source)

声明了一个pipeline.PublishContext()

在执行task.poll_and_publish前,会先执行

def add(self, pollster, pipelines):
    self.publish_context.add_pipelines(pipelines)
    self.pollsters.update([pollster])

即增加一个pipeline管理

最后是publish_context的使用位置

def poll_and_publish_instances(self, instances):
    with self.publish_context as publisher:
        for instance in instances:
            if getattr(instance, 'OS-EXT-STS:vm_state', None) != 'error':
                for pollster in self.pollsters:
                    publisher(list(pollster.obj.get_counters(
                        self.manager,
                        instance)))

这里用了with as作为pipeline的管理

__enter__()中,定义了一个函数

def p(counters):
    for p in self.pipelines:
        p.publish_counters(self.context,
                           counters,
                           self.source)

这个函数执行pipeline中的publish_counters,然后最终的执行代码来自

ext.obj.publish_counters(ctxt, counters, source)

即publisher的publish_counters,在这里是ceilometer.publisher.meter:publish_counters,它负责将数据发送到AMQP中去

总结

Pipeline机制一定程度上保证了数据的安全性,并且可以统一数据格式,了解它对于了解Ceilometer的数据流有一定帮助