1,下载源码:https://github.com/BrightTag/kafkameter

主要代码:

/** Copyright 2014 Signal.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package co.signal.kafkameter;import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;import com.google.common.base.Strings;import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.jorphan.logging.LoggingManager;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log.Logger;/*** A {@link org.apache.jmeter.samplers.Sampler Sampler} which produces Kafka messages.** @author codyaray* @since 6/27/14** @see "http://ilkinbalkanay.blogspot.com/2010/03/load-test-whatever-you-want-with-apache.html"* @see "http://newspaint.wordpress.com/2012/11/28/creating-a-java-sampler-for-jmeter/"* @see "http://jmeter.512774.n5.nabble.com/Custom-Sampler-Tutorial-td4490189.html"*/
public class KafkaProducerSampler extends AbstractJavaSamplerClient {private static final Logger log = LoggingManager.getLoggerForClass();/*** Parameter for setting the Kafka brokers; for example, "kafka01:9092,kafka02:9092".*/private static final String PARAMETER_KAFKA_BROKERS = "kafka_brokers";/*** Parameter for setting the Kafka topic name.*/private static final String PARAMETER_KAFKA_TOPIC = "kafka_topic";/*** Parameter for setting the Kafka key.*/private static final String PARAMETER_KAFKA_KEY = "kafka_key";/*** Parameter for setting the Kafka message.*/private static final String PARAMETER_KAFKA_MESSAGE = "kafka_message";/*** Parameter for setting Kafka's {@code serializer.class} property.*/private static final String PARAMETER_KAFKA_MESSAGE_SERIALIZER = "kafka_message_serializer";/*** Parameter for setting Kafka's {@code key.serializer.class} property.*/private static final String PARAMETER_KAFKA_KEY_SERIALIZER = "kafka_key_serializer";/*** Parameter for setting the Kafka ssl keystore (include path information); for example, "server.keystore.jks".*/private static final String PARAMETER_KAFKA_SSL_KEYSTORE = "kafka_ssl_keystore";/*** Parameter for setting the Kafka ssl keystore password.*/private static final String PARAMETER_KAFKA_SSL_KEYSTORE_PASSWORD = "kafka_ssl_keystore_password";/*** Parameter for setting the Kafka ssl truststore (include path information); for example, "client.truststore.jks".*/private static final String PARAMETER_KAFKA_SSL_TRUSTSTORE = "kafka_ssl_truststore";/*** Parameter for setting the Kafka ssl truststore password.*/private static final String PARAMETER_KAFKA_SSL_TRUSTSTORE_PASSWORD = "kafka_ssl_truststore_password";/*** Parameter for setting the Kafka security protocol; "true" or "false".*/private static final String PARAMETER_KAFKA_USE_SSL = "kafka_use_ssl";/*** Parameter for setting encryption. It is optional.*/private static final String PARAMETER_KAFKA_COMPRESSION_TYPE = "kafka_compression_type";/*** Parameter for setting the partition. It is optional.*/private static final String PARAMETER_KAFKA_PARTITION = "kafka_partition";//private Producer<Long, byte[]> producer;private KafkaProducer<String, String> producer;@Overridepublic void setupTest(JavaSamplerContext context) {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, context.getParameter(PARAMETER_KAFKA_BROKERS));props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, context.getParameter(PARAMETER_KAFKA_KEY_SERIALIZER));props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, context.getParameter(PARAMETER_KAFKA_MESSAGE_SERIALIZER));props.put(ProducerConfig.ACKS_CONFIG, "1");// check if kafka security protocol is SSL or PLAINTEXT (default)if(context.getParameter(PARAMETER_KAFKA_USE_SSL).equals("true")){log.info("Setting up SSL properties...");props.put("security.protocol", "SSL");props.put("ssl.keystore.location", context.getParameter(PARAMETER_KAFKA_SSL_KEYSTORE));props.put("ssl.keystore.password", context.getParameter(PARAMETER_KAFKA_SSL_KEYSTORE_PASSWORD));props.put("ssl.truststore.location", context.getParameter(PARAMETER_KAFKA_SSL_TRUSTSTORE));props.put("ssl.truststore.password", context.getParameter(PARAMETER_KAFKA_SSL_TRUSTSTORE_PASSWORD));}String compressionType = context.getParameter(PARAMETER_KAFKA_COMPRESSION_TYPE);if (!Strings.isNullOrEmpty(compressionType)) {props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);}producer = new KafkaProducer<String, String>(props);}@Overridepublic void teardownTest(JavaSamplerContext context) {producer.close();}@Overridepublic Arguments getDefaultParameters() {Arguments defaultParameters = new Arguments();defaultParameters.addArgument(PARAMETER_KAFKA_BROKERS, "${PARAMETER_KAFKA_BROKERS}");defaultParameters.addArgument(PARAMETER_KAFKA_TOPIC, "${PARAMETER_KAFKA_TOPIC}");defaultParameters.addArgument(PARAMETER_KAFKA_KEY, "${PARAMETER_KAFKA_KEY}");defaultParameters.addArgument(PARAMETER_KAFKA_MESSAGE, "${PARAMETER_KAFKA_MESSAGE}");defaultParameters.addArgument(PARAMETER_KAFKA_MESSAGE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");defaultParameters.addArgument(PARAMETER_KAFKA_KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer");defaultParameters.addArgument(PARAMETER_KAFKA_SSL_KEYSTORE, "${PARAMETER_KAFKA_SSL_KEYSTORE}");defaultParameters.addArgument(PARAMETER_KAFKA_SSL_KEYSTORE_PASSWORD, "${PARAMETER_KAFKA_SSL_KEYSTORE_PASSWORD}");defaultParameters.addArgument(PARAMETER_KAFKA_SSL_TRUSTSTORE, "${PARAMETER_KAFKA_SSL_TRUSTSTORE}");defaultParameters.addArgument(PARAMETER_KAFKA_SSL_TRUSTSTORE_PASSWORD, "${PARAMETER_KAFKA_SSL_TRUSTSTORE_PASSWORD}");defaultParameters.addArgument(PARAMETER_KAFKA_USE_SSL, "${PARAMETER_KAFKA_USE_SSL}");defaultParameters.addArgument(PARAMETER_KAFKA_COMPRESSION_TYPE, null);defaultParameters.addArgument(PARAMETER_KAFKA_PARTITION, null);return defaultParameters;}@Overridepublic SampleResult runTest(JavaSamplerContext context) {SampleResult result = newSampleResult();String topic = context.getParameter(PARAMETER_KAFKA_TOPIC);String key = context.getParameter(PARAMETER_KAFKA_KEY);String message = context.getParameter(PARAMETER_KAFKA_MESSAGE);sampleResultStart(result, message);final ProducerRecord<String, String> producerRecord;String partitionString = context.getParameter(PARAMETER_KAFKA_PARTITION);if (Strings.isNullOrEmpty(partitionString)) {producerRecord = new ProducerRecord<String, String>(topic, key, message);} else {final int partitionNumber = Integer.parseInt(partitionString);producerRecord = new ProducerRecord<String, String>(topic, partitionNumber, key, message);}try {producer.send(producerRecord);sampleResultSuccess(result, null);} catch (Exception e) {sampleResultFailed(result, "500", e);}return result;}/*** Use UTF-8 for encoding of strings*/private static final String ENCODING = "UTF-8";/*** Factory for creating new {@link SampleResult}s.*/private SampleResult newSampleResult() {SampleResult result = new SampleResult();result.setDataEncoding(ENCODING);result.setDataType(SampleResult.TEXT);return result;}/*** Start the sample request and set the {@code samplerData} to {@code data}.** @param result*          the sample result to update* @param data*          the request to set as {@code samplerData}*/private void sampleResultStart(SampleResult result, String data) {result.setSamplerData(data);result.sampleStart();}/*** Mark the sample result as {@code end}ed and {@code successful} with an "OK" {@code responseCode},* and if the response is not {@code null} then set the {@code responseData} to {@code response},* otherwise it is marked as not requiring a response.** @param result sample result to change* @param response the successful result message, may be null.*/private void sampleResultSuccess(SampleResult result, /* @Nullable */ String response) {result.sampleEnd();result.setSuccessful(true);result.setResponseCodeOK();if (response != null) {result.setResponseData(response, ENCODING);}else {result.setResponseData("No response required", ENCODING);}}/*** Mark the sample result as @{code end}ed and not {@code successful}, and set the* {@code responseCode} to {@code reason}.** @param result the sample result to change* @param reason the failure reason*/private void sampleResultFailed(SampleResult result, String reason) {result.sampleEnd();result.setSuccessful(false);result.setResponseCode(reason);}/*** Mark the sample result as @{code end}ed and not {@code successful}, set the* {@code responseCode} to {@code reason}, and set {@code responseData} to the stack trace.** @param result the sample result to change* @param exception the failure exception*/private void sampleResultFailed(SampleResult result, String reason, Exception exception) {sampleResultFailed(result, reason);result.setResponseMessage("Exception: " + exception);result.setResponseData(getStackTrace(exception), ENCODING);}/*** Return the stack trace as a string.** @param exception the exception containing the stack trace* @return the stack trace*/private String getStackTrace(Exception exception) {StringWriter stringWriter = new StringWriter();exception.printStackTrace(new PrintWriter(stringWriter));return stringWriter.toString();}
}

2,编译kafkameter-master

3,运行maven test,运行时会自动下载项目中一些需要的文件。

4,导出jar包。在src上右击,选取export导出,包名为:kafkameter-x.y.z.jar

5,将kafkameter-x.y.z.jar放到 $JMETER_HOME/lib/ext

6,运行jmeter,添加测试

Jmeter 添加kafka支持相关推荐

  1. 如何使用Jmeter对Kafka进行性能测试

    目录 A Brief Overview of Apache Kafka 利用Pepper-Box插件配置生产者Producer Pepper-Box PlainText Config Pepper-B ...

  2. Jmeter向kafka发送数据

    点击查看原文 Jmeter连接Kafka 在jmeter/lib/ext中导入一个kafka相关的jar包,如下图: 导入之后就可以打开JMeter进行操作了,首先在一个Thread group中添加 ...

  3. vim8支持的linux版本,Vim 8.0 版本安装方法及添加Python支持

    利用Git安装 最简单也是最有效的方法 1. 获取Vim仓库: git clone https://github.com/vim/vim.git 2. 升级到最新的版本: cd vim git pul ...

  4. Java后台开发Tomcat添加https支持小程序开发过程

    文章原文:blog.ouyangsihai.cn >> Java后台开发Tomcat添加https支持小程序开发过程 1 给自己的域名申请证书 注意:申请好了如果不是在腾讯注册的域名,不会 ...

  5. 如何使用vs将asp.net core项目添加容器支持并发布docker镜像到私有dockerhub和添加k8s/helm管理...

    这篇文章介绍一下,如何使用VS2017给asp.net core添加容器支持,并发布镜像到私有docker hub,然后用chart管理容器镜像的操作流程. 话不多说,just do it. 新建项目 ...

  6. (转)springboot:添加JSP支持

    转自: 14.springboot:添加JSP支持 - 简书(1)创建Maven web project 使用Eclipse新建一个Maven Web Project ,项目取名为:spring-bo ...

  7. Spring–添加AOP支持

    我听到了一个有关一位高级(且酬劳颇丰)软件工程师的故事. 他的任务是记录他正在研究的项目中每个控制器中的每个方法. 工程师重写了所有控制器方法,因此使用如下代码: @RequestMapping(me ...

  8. wordpress主题ajax,为自制WordPress主题/插件的后台设置页面添加ajax支持

    本文目录 [隐藏] 1PHP部分 1.1安全第一 1.2定义一些用得上的常量 1.3I18n=国际化支持 1.4添加菜单项 1.5美化下菜单项前面的icon 1.6设置页面/后台的HTML结构 1.7 ...

  9. Abp Vnext应用程序项目中添加docker支持的小结

    文章目录 介绍 具体步骤 1.创建项目 2.添加docker支持 3.调整 4.运行实例 总结 介绍 abp vnext 里面的只有 module 项目里面是自动添加 docker 支持的,因为其是面 ...

最新文章

  1. LeetCode简单题之图片平滑器
  2. 是男人就过8题!楼教主出题,请接招!
  3. 探索可解释及稳定性,AI与博弈,自适应推理——“智源论坛:机器学习青年学者报告会”要点总结
  4. Android之自定义控件显示点赞用户并通过用户名称进入该用户主页的功能
  5. node js npm 和 cnpm的使用
  6. numpy基础——数组的组合与分割
  7. dropout理解(二)
  8. Java ObjectStreamClass getSerialVersionUID()方法(带示例)
  9. win7系统每次开机都需要疑难解答的原因与解决方法
  10. python程序设计基础第三版_Python程序设计(第三版)PPT及源码
  11. Casper 机制的历史起源:第一篇
  12. Java 面向对象 之 多态实例2
  13. mysql post 注入工具类_【Mysql sql inject】POST方法BASE64编码注入write-up
  14. oracle修改asm参数文件,修改asm中的spfile参数
  15. 黑马程序员—-C语言入门十重奏之十renascence
  16. 对996的一些看法与个人价值实现
  17. 到底买苹果XS还是XR_iPhone XS 和 XR 买哪个?10 个理由告诉你 XS 更好,贵是有原因的...
  18. linux如何使用磁盘阵列卡,Ubuntu 上创建常用磁盘阵列
  19. HDU 4622 Reincarnation (后缀数组|后缀自动机)
  20. 微信调支付宝支付常见问题

热门文章

  1. 《一封神气的情书》李敖
  2. 爱奇艺笔试题之成长值计算
  3. CAN通信稳定性开发分析
  4. 案例6-1.3 哥尼斯堡的“七桥问题”
  5. 穹顶之下——大数据下生活
  6. 刚开始做斗音掌握这5点至少让你少走半年弯路
  7. android平板游戏隐藏功能,平板电脑怎么隐藏游戏
  8. java 宝箱概率问题
  9. OWASP十大漏洞之一
  10. 电脑快捷键快速关机方法,电脑如何快速关机