Jmeter 添加kafka支持
1,下载源码:https://github.com/BrightTag/kafkameter
主要代码:
/** Copyright 2014 Signal.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package co.signal.kafkameter;import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;import com.google.common.base.Strings;import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.jorphan.logging.LoggingManager;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log.Logger;/*** A {@link org.apache.jmeter.samplers.Sampler Sampler} which produces Kafka messages.** @author codyaray* @since 6/27/14** @see "http://ilkinbalkanay.blogspot.com/2010/03/load-test-whatever-you-want-with-apache.html"* @see "http://newspaint.wordpress.com/2012/11/28/creating-a-java-sampler-for-jmeter/"* @see "http://jmeter.512774.n5.nabble.com/Custom-Sampler-Tutorial-td4490189.html"*/
public class KafkaProducerSampler extends AbstractJavaSamplerClient {private static final Logger log = LoggingManager.getLoggerForClass();/*** Parameter for setting the Kafka brokers; for example, "kafka01:9092,kafka02:9092".*/private static final String PARAMETER_KAFKA_BROKERS = "kafka_brokers";/*** Parameter for setting the Kafka topic name.*/private static final String PARAMETER_KAFKA_TOPIC = "kafka_topic";/*** Parameter for setting the Kafka key.*/private static final String PARAMETER_KAFKA_KEY = "kafka_key";/*** Parameter for setting the Kafka message.*/private static final String PARAMETER_KAFKA_MESSAGE = "kafka_message";/*** Parameter for setting Kafka's {@code serializer.class} property.*/private static final String PARAMETER_KAFKA_MESSAGE_SERIALIZER = "kafka_message_serializer";/*** Parameter for setting Kafka's {@code key.serializer.class} property.*/private static final String PARAMETER_KAFKA_KEY_SERIALIZER = "kafka_key_serializer";/*** Parameter for setting the Kafka ssl keystore (include path information); for example, "server.keystore.jks".*/private static final String PARAMETER_KAFKA_SSL_KEYSTORE = "kafka_ssl_keystore";/*** Parameter for setting the Kafka ssl keystore password.*/private static final String PARAMETER_KAFKA_SSL_KEYSTORE_PASSWORD = "kafka_ssl_keystore_password";/*** Parameter for setting the Kafka ssl truststore (include path information); for example, "client.truststore.jks".*/private static final String PARAMETER_KAFKA_SSL_TRUSTSTORE = "kafka_ssl_truststore";/*** Parameter for setting the Kafka ssl truststore password.*/private static final String PARAMETER_KAFKA_SSL_TRUSTSTORE_PASSWORD = "kafka_ssl_truststore_password";/*** Parameter for setting the Kafka security protocol; "true" or "false".*/private static final String PARAMETER_KAFKA_USE_SSL = "kafka_use_ssl";/*** Parameter for setting encryption. It is optional.*/private static final String PARAMETER_KAFKA_COMPRESSION_TYPE = "kafka_compression_type";/*** Parameter for setting the partition. It is optional.*/private static final String PARAMETER_KAFKA_PARTITION = "kafka_partition";//private Producer<Long, byte[]> producer;private KafkaProducer<String, String> producer;@Overridepublic void setupTest(JavaSamplerContext context) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(PARAMETER_KAFKA_BROKERS));props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, context.getParameter(PARAMETER_KAFKA_KEY_SERIALIZER));props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, context.getParameter(PARAMETER_KAFKA_MESSAGE_SERIALIZER));props.put(ProducerConfig.ACKS_CONFIG, "1");// check if kafka security protocol is SSL or PLAINTEXT (default)if(context.getParameter(PARAMETER_KAFKA_USE_SSL).equals("true")){log.info("Setting up SSL properties...");props.put("security.protocol", "SSL");props.put("ssl.keystore.location", context.getParameter(PARAMETER_KAFKA_SSL_KEYSTORE));props.put("ssl.keystore.password", context.getParameter(PARAMETER_KAFKA_SSL_KEYSTORE_PASSWORD));props.put("ssl.truststore.location", context.getParameter(PARAMETER_KAFKA_SSL_TRUSTSTORE));props.put("ssl.truststore.password", context.getParameter(PARAMETER_KAFKA_SSL_TRUSTSTORE_PASSWORD));}String compressionType = context.getParameter(PARAMETER_KAFKA_COMPRESSION_TYPE);if (!Strings.isNullOrEmpty(compressionType)) {props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);}producer = new KafkaProducer<String, String>(props);}@Overridepublic void teardownTest(JavaSamplerContext context) {producer.close();}@Overridepublic Arguments getDefaultParameters() {Arguments defaultParameters = new Arguments();defaultParameters.addArgument(PARAMETER_KAFKA_BROKERS, "${PARAMETER_KAFKA_BROKERS}");defaultParameters.addArgument(PARAMETER_KAFKA_TOPIC, "${PARAMETER_KAFKA_TOPIC}");defaultParameters.addArgument(PARAMETER_KAFKA_KEY, "${PARAMETER_KAFKA_KEY}");defaultParameters.addArgument(PARAMETER_KAFKA_MESSAGE, "${PARAMETER_KAFKA_MESSAGE}");defaultParameters.addArgument(PARAMETER_KAFKA_MESSAGE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");defaultParameters.addArgument(PARAMETER_KAFKA_KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");defaultParameters.addArgument(PARAMETER_KAFKA_SSL_KEYSTORE, "${PARAMETER_KAFKA_SSL_KEYSTORE}");defaultParameters.addArgument(PARAMETER_KAFKA_SSL_KEYSTORE_PASSWORD, "${PARAMETER_KAFKA_SSL_KEYSTORE_PASSWORD}");defaultParameters.addArgument(PARAMETER_KAFKA_SSL_TRUSTSTORE, "${PARAMETER_KAFKA_SSL_TRUSTSTORE}");defaultParameters.addArgument(PARAMETER_KAFKA_SSL_TRUSTSTORE_PASSWORD, "${PARAMETER_KAFKA_SSL_TRUSTSTORE_PASSWORD}");defaultParameters.addArgument(PARAMETER_KAFKA_USE_SSL, "${PARAMETER_KAFKA_USE_SSL}");defaultParameters.addArgument(PARAMETER_KAFKA_COMPRESSION_TYPE, null);defaultParameters.addArgument(PARAMETER_KAFKA_PARTITION, null);return defaultParameters;}@Overridepublic SampleResult runTest(JavaSamplerContext context) {SampleResult result = newSampleResult();String topic = context.getParameter(PARAMETER_KAFKA_TOPIC);String key = context.getParameter(PARAMETER_KAFKA_KEY);String message = context.getParameter(PARAMETER_KAFKA_MESSAGE);sampleResultStart(result, message);final ProducerRecord<String, String> producerRecord;String partitionString = context.getParameter(PARAMETER_KAFKA_PARTITION);if (Strings.isNullOrEmpty(partitionString)) {producerRecord = new ProducerRecord<String, String>(topic, key, message);} else {final int partitionNumber = Integer.parseInt(partitionString);producerRecord = new ProducerRecord<String, String>(topic, partitionNumber, key, message);}try {producer.send(producerRecord);sampleResultSuccess(result, null);} catch (Exception e) {sampleResultFailed(result, "500", e);}return result;}/*** Use UTF-8 for encoding of strings*/private static final String ENCODING = "UTF-8";/*** Factory for creating new {@link SampleResult}s.*/private SampleResult newSampleResult() {SampleResult result = new SampleResult();result.setDataEncoding(ENCODING);result.setDataType(SampleResult.TEXT);return result;}/*** Start the sample request and set the {@code samplerData} to {@code data}.** @param result* the sample result to update* @param data* the request to set as {@code samplerData}*/private void sampleResultStart(SampleResult result, String data) {result.setSamplerData(data);result.sampleStart();}/*** Mark the sample result as {@code end}ed and {@code successful} with an "OK" {@code responseCode},* and if the response is not {@code null} then set the {@code responseData} to {@code response},* otherwise it is marked as not requiring a response.** @param result sample result to change* @param response the successful result message, may be null.*/private void sampleResultSuccess(SampleResult result, /* @Nullable */ String response) {result.sampleEnd();result.setSuccessful(true);result.setResponseCodeOK();if (response != null) {result.setResponseData(response, ENCODING);}else {result.setResponseData("No response required", ENCODING);}}/*** Mark the sample result as @{code end}ed and not {@code successful}, and set the* {@code responseCode} to {@code reason}.** @param result the sample result to change* @param reason the failure reason*/private void sampleResultFailed(SampleResult result, String reason) {result.sampleEnd();result.setSuccessful(false);result.setResponseCode(reason);}/*** Mark the sample result as @{code end}ed and not {@code successful}, set the* {@code responseCode} to {@code reason}, and set {@code responseData} to the stack trace.** @param result the sample result to change* @param exception the failure exception*/private void sampleResultFailed(SampleResult result, String reason, Exception exception) {sampleResultFailed(result, reason);result.setResponseMessage("Exception: " + exception);result.setResponseData(getStackTrace(exception), ENCODING);}/*** Return the stack trace as a string.** @param exception the exception containing the stack trace* @return the stack trace*/private String getStackTrace(Exception exception) {StringWriter stringWriter = new StringWriter();exception.printStackTrace(new PrintWriter(stringWriter));return stringWriter.toString();}
}
2,编译kafkameter-master
3,运行maven test,运行时会自动下载项目中一些需要的文件。
4,导出jar包。在src上右击,选取export导出,包名为:kafkameter-x.y.z.jar
5,将kafkameter-x.y.z.jar放到 $JMETER_HOME/lib/ext
6,运行jmeter,添加测试
Jmeter 添加kafka支持相关推荐
- 如何使用Jmeter对Kafka进行性能测试
目录 A Brief Overview of Apache Kafka 利用Pepper-Box插件配置生产者Producer Pepper-Box PlainText Config Pepper-B ...
- Jmeter向kafka发送数据
点击查看原文 Jmeter连接Kafka 在jmeter/lib/ext中导入一个kafka相关的jar包,如下图: 导入之后就可以打开JMeter进行操作了,首先在一个Thread group中添加 ...
- vim8支持的linux版本,Vim 8.0 版本安装方法及添加Python支持
利用Git安装 最简单也是最有效的方法 1. 获取Vim仓库: git clone https://github.com/vim/vim.git 2. 升级到最新的版本: cd vim git pul ...
- Java后台开发Tomcat添加https支持小程序开发过程
文章原文:blog.ouyangsihai.cn >> Java后台开发Tomcat添加https支持小程序开发过程 1 给自己的域名申请证书 注意:申请好了如果不是在腾讯注册的域名,不会 ...
- 如何使用vs将asp.net core项目添加容器支持并发布docker镜像到私有dockerhub和添加k8s/helm管理...
这篇文章介绍一下,如何使用VS2017给asp.net core添加容器支持,并发布镜像到私有docker hub,然后用chart管理容器镜像的操作流程. 话不多说,just do it. 新建项目 ...
- (转)springboot:添加JSP支持
转自: 14.springboot:添加JSP支持 - 简书(1)创建Maven web project 使用Eclipse新建一个Maven Web Project ,项目取名为:spring-bo ...
- Spring–添加AOP支持
我听到了一个有关一位高级(且酬劳颇丰)软件工程师的故事. 他的任务是记录他正在研究的项目中每个控制器中的每个方法. 工程师重写了所有控制器方法,因此使用如下代码: @RequestMapping(me ...
- wordpress主题ajax,为自制WordPress主题/插件的后台设置页面添加ajax支持
本文目录 [隐藏] 1PHP部分 1.1安全第一 1.2定义一些用得上的常量 1.3I18n=国际化支持 1.4添加菜单项 1.5美化下菜单项前面的icon 1.6设置页面/后台的HTML结构 1.7 ...
- Abp Vnext应用程序项目中添加docker支持的小结
文章目录 介绍 具体步骤 1.创建项目 2.添加docker支持 3.调整 4.运行实例 总结 介绍 abp vnext 里面的只有 module 项目里面是自动添加 docker 支持的,因为其是面 ...
最新文章
- LeetCode简单题之图片平滑器
- 是男人就过8题!楼教主出题,请接招!
- 探索可解释及稳定性,AI与博弈,自适应推理——“智源论坛:机器学习青年学者报告会”要点总结
- Android之自定义控件显示点赞用户并通过用户名称进入该用户主页的功能
- node js npm 和 cnpm的使用
- numpy基础——数组的组合与分割
- dropout理解(二)
- Java ObjectStreamClass getSerialVersionUID()方法(带示例)
- win7系统每次开机都需要疑难解答的原因与解决方法
- python程序设计基础第三版_Python程序设计(第三版)PPT及源码
- Casper 机制的历史起源:第一篇
- Java 面向对象 之 多态实例2
- mysql post 注入工具类_【Mysql sql inject】POST方法BASE64编码注入write-up
- oracle修改asm参数文件,修改asm中的spfile参数
- 黑马程序员—-C语言入门十重奏之十renascence
- 对996的一些看法与个人价值实现
- 到底买苹果XS还是XR_iPhone XS 和 XR 买哪个?10 个理由告诉你 XS 更好,贵是有原因的...
- linux如何使用磁盘阵列卡,Ubuntu 上创建常用磁盘阵列
- HDU 4622	Reincarnation (后缀数组|后缀自动机)
- 微信调支付宝支付常见问题