SpringBoot 整合 Kafka

SpringBoot 整合 Kafka环境 ZooKeeper Kafka 如果使用 docker compose 部署 Kafka 集群 可以参考我的这篇文章 docker compose 安装 Kafka 集群依赖 pom xml 依赖文件如下 springboot 版本是 2 3 0 RELEASE dependencies dependency groupId org springframew boot groupId lt dependency dependencies

环境

ZooKeeper、Kafka。

如果使用 docker-compose 部署 Kafka 集群,可以参考我的这篇文章 docker-compose安装Kafka集群

依赖

pom.xml 依赖文件如下:springboot 版本是 2.3.0.RELEASE 。

<dependencies> <dependency> <groupId>org.springframework.boot 
     groupId> <artifactId>spring-boot-starter 
      artifactId>  
       dependency>  
       <dependency> <groupId>org.springframework.kafka 
        groupId> <artifactId>spring-kafka 
         artifactId> <version>2.4.0.RELEASE 
          version>  
           dependency>  
           <dependency> <groupId>org.projectlombok 
            groupId> <artifactId>lombok 
             artifactId>  
              dependency>  
              <dependency> <groupId>org.springframework.boot 
               groupId> <artifactId>spring-boot-starter-test 
                artifactId> <scope>test 
                 scope>  
                  dependency>  
                   dependencies> 

配置

可以通过Java类配置配置文件配置两种方式来配置 Kafka。

Java 类配置

配置主题(KafkaAdmin)

创建配置类 KafkaTopicConfiguration 代码如下:该配置可选,通常会事先通过 Kafka 提供的脚本创建主题。

/ * kafka 主题配置类 * * @author Leo * @create 2020/12/31 15:57 / @Configuration public class KafkaTopicConfiguration { 
    / * 创建 KafkaAmin,可以自动检测集群中是否存在topic,不存在则创建 * @return */ @Bean public KafkaAdmin kafkaAdmin() { 
    Map<String, Object> props = new HashMap<>(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"); return new KafkaAdmin(props); } @Bean public NewTopic newTopic() { 
    // 创建 topic,指定 名称、分区数、副本数 return new NewTopic("hello-kafka-test-topic", 3, (short) 2); } } 

配置生产者

创建配置类KafkaProducerConfiguration 代码如下:里面涉及基本配置和自定义分区器、拦截器、事务等配置。

/ * kafka 生产者配置类 * * @author Leo * @create 2020/12/31 15:09 / @Configuration public class KafkaProducerConfiguration { 
    / * 不包含事务 producerFactory * @return */ public ProducerFactory<String, String> producerFactory() { 
    Map<String, Object> props = new HashMap<>(); //kafka 集群地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"); //重试次数 props.put(ProducerConfig.RETRIES_CONFIG, 3); //应答级别 //acks=0 把消息发送到kafka就认为发送成功 //acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功 //acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功 props.put(ProducerConfig.ACKS_CONFIG, "all"); //KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000); //批量处理的最大大小 单位 byte props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); //发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka props.put(ProducerConfig.LINGER_MS_CONFIG, 1000); //生产者可用缓冲区的最大值 单位 byte props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, ); //每条消息最大的大小 props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, ); //客户端ID props.put(ProducerConfig.CLIENT_ID_CONFIG, "hello-kafka"); //Key 序列化方式 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //Value 序列化方式 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //消息压缩:none、lz4、gzip、snappy,默认为 none。 props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); //自定义分区器 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName()); return new DefaultKafkaProducerFactory<>(props); } / * 包含事务 producerFactory * @return */ public ProducerFactory<String, String> producerFactoryWithTransaction() { 
    DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = (DefaultKafkaProducerFactory<String, String>) producerFactory(); //设置事务Id前缀 defaultKafkaProducerFactory.setTransactionIdPrefix("tx"); return defaultKafkaProducerFactory; } / * 不包含事务 kafkaTemplate * @return */ @Bean("kafkaTemplate") public KafkaTemplate<String, String> kafkaTemplate() { 
    return new KafkaTemplate<>(producerFactory()); } / * 包含事务 kafkaTemplate * @return */ @Bean("kafkaTemplateWithTransaction") public KafkaTemplate<String, String> kafkaTemplateWithTransaction() { 
    return new KafkaTemplate<>(producerFactoryWithTransaction()); } / * 以该方式配置事务管理器:就不能以普通方式发送消息,只能通过 kafkaTemplate.executeInTransaction 或 * 在方法上加 @Transactional 注解来发送消息,否则报错 * @param producerFactory * @return */ // @Bean // public KafkaTransactionManager 
   
     kafkaTransactionManager(ProducerFactory 
    
      producerFactory) { 
      
     
    // return new KafkaTransactionManager<>(producerFactory); // } } 

配置消费者

创建配置类KafkaConsumerConfiguration代码如下:

/ * kafka 消费者配置类 * * @author Leo * @create 2020/12/31 15:09 / @Slf4j @Configuration public class KafkaConsumerConfiguration { 
    @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); //设置 consumerFactory factory.setConsumerFactory(consumerFactory()); //设置是否开启批量监听 factory.setBatchListener(false); //设置消费者组中的线程数量 factory.setConcurrency(1); return factory; } / * consumerFactory * @return */ public ConsumerFactory<String, Object> consumerFactory() { 
    Map<String, Object> props = new HashMap<>(); //kafka集群地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"); //自动提交 offset 默认 true props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //自动提交的频率 单位 ms props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); //批量消费最大数量 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); //消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup"); //session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ); //请求超时 props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, ); //Key 反序列化类 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //Value 反序列化类 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //当kafka中没有初始offset或offset超出范围时将自动重置offset //earliest:重置为分区中最小的offset //latest:重置为分区中最新的offset(消费分区中新产生的数据) //none:只要有一个分区不存在已提交的offset,就抛出异常 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); //设置Consumer拦截器 props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName()); return new DefaultKafkaConsumerFactory<>(props); } / * 消费异常处理器 * @return */ @Bean public ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler() { 
    return new ConsumerAwareListenerErrorHandler() { 
    @Override public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) { 
    //打印消费异常的消息和异常信息 log.error("consumer failed! message: {}, exceptionMsg: {}, groupId: {}", message, exception.getMessage(), exception.getGroupId()); return null; } }; } } 

配置文件配置

以配置文件的方式来进行配置的话,所有配置都在application.yml中,配置内容如下:

spring: application: name: hello-kafka kafka: listener: #设置是否批量消费,默认 single(单条),batch(批量) type: single # 集群地址 bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 # 生产者配置 producer: # 重试次数 retries: 3 # 应答级别 # acks=0 把消息发送到kafka就认为发送成功 # acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功 # acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功 acks: all # 批量处理的最大大小 单位 byte batch-size: 4096 # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka buffer-memory:  # 客户端ID client-id: hello-kafka # Key 序列化类 key-serializer: org.apache.kafka.common.serialization.StringSerializer # Value 序列化类 value-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息压缩:none、lz4、gzip、snappy,默认为 none。 compression-type: gzip properties: partitioner: #指定自定义分区器 class: top.zysite.hello.kafka.partitioner.MyPartitioner linger: # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka ms: 1000 max: block: # KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms ms: 6000 # 消费者配置 consumer: # 默认消费者组 group-id: testGroup # 自动提交 offset 默认 true enable-auto-commit: false # 自动提交的频率 单位 ms auto-commit-interval: 1000 # 批量消费最大数量 max-poll-records: 100 # Key 反序列化类 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # Value 反序列化类 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset # latest:重置为分区中最新的offset(消费分区中新产生的数据) # none:只要有一个分区不存在已提交的offset,就抛出异常 auto-offset-reset: latest properties: interceptor: classes: top.zysite.hello.kafka.interceptor.MyConsumerInterceptor session: timeout: # session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作 ms:  request: timeout: # 请求超时 ms:  

这里采用Java类配置。在 SpringBoot 启动类上添加注解@EnableKafka启用KafkaTemplate

@SpringBootApplication @EnableKafka public class HelloKafkaApplication { 
    public static void main(String[] args) { 
    SpringApplication.run(HelloKafkaApplication.class, args); } } 

自定义分区器

可以看到上述配置包含自定义分区器的配置,自定义分区器需实现Partitioner接口并在生产者端进行配置,代码如下:

/ * 自定义分区器 * * @author Leo * @create 2021/5/26 13:40 / public class MyPartitioner implements Partitioner { 
    / * 分区策略核心方法 * @param topic * @param key * @param keyBytes * @param value * @param valueBytes * @param cluster * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { 
    //具体分区逻辑,这里全部发送到0号分区 return 0; } @Override public void close() { 
    } @Override public void configure(Map<String, ?> configs) { 
    } } 

配置了之后所有的消息都会发送到 0 号分区。(这里只是为了演示)

消费者拦截器

可以看到上述配置也包含了消费者拦截器的配置,消费者拦截器需实现ConsumerInterceptor接口,代码如下:

/ * 消费者拦截器 * * @author Leo * @create 2021/5/27 16:30 / @Slf4j public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> { 
    / * KafkaConsumer 会在 poll 方法返回之前调用该方法,可以在该方法中对消息进行过滤 * @param records * @return */ @Override public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) { 
    System.out.println(" before interceptor: " + records.count() + ""); Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>(); //遍历每个topic、partition for (TopicPartition topicPartition : records.partitions()) { 
    //获取特定topic、partition下的消息列表 List<ConsumerRecord<String, String>> recordList = records.records(topicPartition); //过滤 List<ConsumerRecord<String, String>> filteredList = recordList.stream() .filter(record -> !record.value().contains("filter")).collect(Collectors.toList()); //放入新的消息记录里 newRecords.put(topicPartition, filteredList); } ConsumerRecords<String, String> filteredRecords = new ConsumerRecords<>(newRecords); System.out.println(" after interceptor: " + filteredRecords.count() + ""); //返回过滤后的消息记录 return filteredRecords; } / * 提交完offset之后调用该方法 * @param offsets */ @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { 
    if (!offsets.isEmpty()) { 
    offsets.forEach(((topicPartition, offsetAndMetadata) -> { 
    log.info("partition : " + topicPartition + ", offset : " + offsetAndMetadata); })); } } @Override public void close() { 
    } @Override public void configure(Map<String, ?> configs) { 
    } } 

有消费者拦截器,自然也有生产者拦截器,只需要实现ProducerInterceptor接口并在生产者端进行配置即可,这里只演示消费者拦截器。

测试

为了方便看到效果,这里统一测试生产者和消费者。

创建生产者服务类

创建生产消息服务类KafkaProducerService代码如下:

/ * kafka 生产服务 * * @author Leo * @create 2020/12/31 16:06 / @Slf4j @Service public class KafkaProducerService { 
    @Qualifier("kafkaTemplate") @Resource private KafkaTemplate<String, String> kafkaTemplate; @Qualifier("kafkaTemplateWithTransaction") @Resource private KafkaTemplate<String, String> kafkaTemplateWithTransaction; / * 发送消息(同步) * @param topic 主题 * @param key 键 * @param message 值 */ public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException { 
    //可以指定最长等待时间,也可以不指定 kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS); log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message); //指定key,kafka根据key进行hash,决定存入哪个partition // kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS); //存入指定partition // kafkaTemplate.send(topic, 0, key, message).get(10, TimeUnit.SECONDS); } / * 发送消息并获取结果 * @param topic * @param message * @throws ExecutionException * @throws InterruptedException */ public void sendMessageGetResult(String topic, String key, String message) throws ExecutionException, InterruptedException { 
    SendResult<String, String> result = kafkaTemplate.send(topic, message).get(); log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message); log.info("The partition the message was sent to: " + result.getRecordMetadata().partition()); } / * 发送消息(异步) * @param topic 主题 * @param message 消息内容 */ public void sendMessageAsync(String topic, String message) { 
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message); //添加回调 future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { 
    @Override public void onFailure(Throwable throwable) { 
    log.error("sendMessageAsync failure! topic : {}, message: {}", topic, message); } @Override public void onSuccess(SendResult<String, String> stringStringSendResult) { 
    log.info("sendMessageAsync success! topic: {}, message: {}", topic, message); } }); } / * 可以将消息组装成 Message 对象和 ProducerRecord 对象发送 * @param topic * @param key * @param message * @throws InterruptedException * @throws ExecutionException * @throws TimeoutException */ public void testMessageBuilder(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException { 
    // 组装消息 Message msg = MessageBuilder.withPayload(message) .setHeader(KafkaHeaders.MESSAGE_KEY, key) .setHeader(KafkaHeaders.TOPIC, topic) .setHeader(KafkaHeaders.PREFIX,"kafka_") .build(); //同步发送 kafkaTemplate.send(msg).get(); // 组装消息 // ProducerRecord 
   
     producerRecord = new ProducerRecord<>(topic, message); 
    // kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS); } / * 以事务方式发送消息 * @param topic * @param key * @param message */ public void sendMessageInTransaction(String topic, String key, String message) { 
    kafkaTemplateWithTransaction.executeInTransaction(new KafkaOperations.OperationsCallback<String, String, Object>() { 
    @Override public Object doInOperations(KafkaOperations<String, String> kafkaOperations) { 
    kafkaOperations.send(topic, key, message); //出现异常将会中断事务,消息不会发送出去 throw new RuntimeException("exception"); } }); } } 

创建消费者服务类

创建消费消息服务类KafkaConsumerService代码如下:

/ * kafka 消费服务 * * @author Leo * @create 2020/12/31 16:06 / @Slf4j @Service public class KafkaConsumerService { 
    / * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"} * @param message 消息 */ @KafkaListener(id = "consumerSingle", topics = "hello-kafka-test-topic") public void consumerSingle(String message) { 
    log.info("consumerSingle ====> message: {}", message); } /* @KafkaListener(id = "consumerBatch", topicPartitions = { @TopicPartition(topic = "hello-batch1", partitions = "0"), @TopicPartition(topic = "hello-batch2", partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "4")) })*/ / * 批量消费消息 * @param messages */ @KafkaListener(id = "consumerBatch", topics = "hello-batch") public void consumerBatch(List<ConsumerRecord<String, String>> messages) { 
    log.info("consumerBatch =====> messageSize: {}", messages.size()); log.info(messages.toString()); } / * 指定消费异常处理器 * @param message */ @KafkaListener(id = "consumerException", topics = "hello-kafka-test-topic", errorHandler = "consumerAwareListenerErrorHandler") public void consumerException(String message) { 
    throw new RuntimeException("consumer exception"); } / * 验证ConsumerInterceptor * @param message */ @KafkaListener(id = "interceptor", topics = "consumer-interceptor") public void consumerInterceptor(String message) { 
    log.info("consumerInterceptor ====> message: {}", message); } } 

创建 Junit 测试类

使用 Junit 整合 SpringBoot 来测试:

@RunWith(SpringRunner.class) @SpringBootTest class HelloKafkaApplicationTests { 
    @Resource private KafkaProducerService kafkaProducerService; @Test void testSendMessageSync() throws Exception { 
    String topic = "hello-kafka-test-topic"; String key = "key1"; String message = "firstMessage"; kafkaProducerService.sendMessageSync(topic, key, message); } @Test public void testSendMessageGetResult() throws Exception { 
    String topic = "hello-kafka-test-topic"; String key = "key"; String message = "helloSendMessageGetResult"; kafkaProducerService.sendMessageGetResult(topic, key, message); kafkaProducerService.sendMessageGetResult(topic, null, message); } @Test public void testSendMessageAsync() { 
    String topic = "hello-kafka-test-topic"; String message = "firstAsyncMessage"; kafkaProducerService.sendMessageAsync(topic, message); } @Test public void testMessageBuilder() throws Exception { 
    String topic = "hello-kafka-test-topic"; String key = "key1"; String message = "helloMessageBuilder"; kafkaProducerService.testMessageBuilder(topic, key, message); } / * 测试事务 */ @Test public void testSendMessageInTransaction() { 
    String topic = "hello-kafka-test-topic"; String key = "key1"; String message = "helloSendMessageInTransaction"; kafkaProducerService.sendMessageInTransaction(topic, key, message); } / * 测试批量消费 * @throws Exception */ @Test public void testConsumerBatch() throws Exception { 
    //写入多条数据到批量topic:hello-batch String topic = "hello-batch"; for(int i = 0; i < 20; i++) { 
    kafkaProducerService.sendMessageSync(topic, null, "batchMessage" + i); } } / * 测试消费者拦截器 * @throws Exception */ @Test public void testConsumerInterceptor() throws Exception { 
    String topic = "consumer-interceptor"; for(int i = 0; i < 2; i++) { 
    kafkaProducerService.sendMessageSync(topic,null, "normalMessage" + i); } kafkaProducerService.sendMessageSync(topic, null, "filteredMessage"); kafkaProducerService.sendMessageSync(topic, null, "filterMessage"); } } 

测试结果

先启动应用程序,然后依次执行 Junit 中的测试方法:

同步发送消息

执行方法testSendMessageSync,可以看到日志打印:

发送单条消息

消费日志:

消费单条

发送消息并获取消息发往的分区

执行方法testSendMessageGetResult,可以看到日志打印:

消费消息并获得发往分区编号

由于配置了自定义分区器,可以看到消息都发往了 0 号分区,如果没有配置自定义分区器,并且主题包含多个分区的话,正常情况下多条消息不会全部发往同一分区。(可以自行测试)

异步发送消息

异步发送消息无法通过 Junit 来测试,因为 Junit 方法执行完就结束了,没法看到成功或失败的回调打印。(可以通过System.in.read来阻塞,或则 Thread.sleep)。这里就不演示了。

MessageBuilder 只是以不同的形式来组装消息,可以自行测试。

以事务的形式发送消息

KafkaTemplate封装了方法executeInTransaction方法,可以让我们以事务的形式发送消息。

执行方法testSendMessageInTransaction,可以看到:

以事务的形式发送消息

抛出了异常,通过异常说明和没有该消息的消费日志,可以证明该消息并没有发送到 Kafka。

测试批量消费

批量消费需要修改上述消费者端的配置:将配置类KafkaConsumerConfiguration中的配置稍作修改,往 setBatchListener 方法传入 true,表示开启批量监听。

@Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); //设置 consumerFactory factory.setConsumerFactory(consumerFactory()); //设置是否开启批量监听 factory.setBatchListener(true); //设置消费者组中的线程数量 factory.setConcurrency(1); return factory; } 

先将批量消费监听器@KafkaListener注解注释掉并停止 SpringBoot 应用程序,再执行testConsumerBatch方法往主题中写入 20 条消息。

//@KafkaListener(id = "consumerBatch", topics = "hello-batch") public void consumerBatch(List<String> messages) { 
    log.info("consumerBatch =====> messageSize: {}", messages.size()); log.info(messages.toString()); } 

批量发送消息

写入消息之后启动 SpringBoot ,可以看到批量消费的日志打印:

批量消费打印

消费者拦截器

将批量监听改回单条,即 setBatchListener 传入 false。执行方法testConsumerInterceptor,日志打印:

消费者拦截器

可以看到绿框中的消息可以被正常消费,红框中的消息被拦截器过滤了,没有消费,因为消息内容包含filter,这是前面自定义消费者拦截器的逻辑。

源码

源码地址:https://github.com/leo/hello-kafka

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/202067.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月20日 上午7:58
下一篇 2026年3月20日 上午7:58


相关推荐

  • ios激活成功教程软件_qt.qpa.plugin:Could not

    ios激活成功教程软件_qt.qpa.plugin:Could not注意:一定要手动创建文件夹,在相应文件夹下进行操作,否则无法成功生成注册码激活成功教程步骤:1.安装qtp,一路默认下来,到要求输入License的界面2.拷贝mgn-mqt82.exe(下载)到C:\ProgramFiles\MercuryInteractive(自己手动创建)文件夹下3.自己手动创建C:\ProgramFiles\CommonFiles\Mercury

    2022年10月1日
    3
  • GitHub 狂揽 6.3k Star!AI Agent 系统学习教程爆火!

    GitHub 狂揽 6.3k Star!AI Agent 系统学习教程爆火!

    2026年3月12日
    1
  • 108是几位数_印度尼西亚总人口数

    108是几位数_印度尼西亚总人口数求给定区间 [X,Y] 中满足下列条件的整数个数:这个数恰好等于 K 个互不相等的 B 的整数次幂之和。例如,设 X=15,Y=20,K=2,B=2,则有且仅有下列三个数满足题意:17=24+2018=24+2120=24+22输入格式第一行包含两个整数 X 和 Y,接下来两行包含整数 K 和 B。输出格式只包含一个整数,表示满足条件的数的个数。数据范围1≤X≤Y≤231−1,1≤K≤20,2≤B≤10输入样例:15 2022输出样例:3#include<bit

    2022年8月9日
    7
  • 正定矩阵与半正定矩阵定义性质与理解

    正定矩阵与半正定矩阵定义性质与理解正定矩阵在线性代数里 正定矩阵 positivedefi 有时会简称为正定阵 定义 AA 是 n 阶方阵 如果对任何非零向量 xx 都有 xTAx0x TAx0 其中 xTx T 表示 xx 的转置 就称 AA 正定矩阵 性质 正定矩阵的行列式恒为正 实对称矩阵 AA 正定当且仅当 AA 与单位矩阵合同 两个正定矩阵的和是正定矩阵 正实数与正定矩阵的乘积是正定矩阵

    2026年3月18日
    3
  • 微信小程序云开发|个人博客小程序

    微信小程序云开发|个人博客小程序这篇文章详细的介绍了个人博客小程序的云开发流程 包括博客展示页面 添加博客页面的创建 以及云函数的上传 数据库的创建和使用 同时使用到了 form text 等组件以及使用富文本添加博客 本程序所有数据都存储在云开发里面 不需要开发者自己的服务器 功能包括云数据库 云函数 云存储等 欢迎大家学习

    2026年3月19日
    1
  • msfconsole攻击工具_服务器console接口是干嘛的

    msfconsole攻击工具_服务器console接口是干嘛的?Msfconsole工具概括:???Msfconsole简称(msf)是一款常用的渗透测试工具,包含了常见的漏洞利用模块和生成各种木马,方便于安全测试人员的使用.(1)进行端口扫描.(2)进行服务的扫描.(3)扫描3306(Mysql)端口的弱口令.(4)在msf模块里也可以使用nmap进行扫描.(5)扫描了服务器是用WinXP,然后对服务器进行渗透测试.

    2025年9月28日
    5

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号