spring整合kafka maven配置

项目的依赖配置

引入spring流处理的kafka框架

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

引入kafka客户端相关依赖,注意scala版本

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.8.0</version>
    <exclusions>
        <exclusion>
            <artifactId>scala-reflect</artifactId>
            <groupId>org.scala-lang</groupId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.8.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

由于版本冲突,注意移除框架里面的kafka-streams

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>kafka-streams</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
    </exclusions>
</dependency>

实现一个流程处理线程

@Configuration
class TimeSequenceStreamRunner {

    @Value("${gRPC.Server.ipaddr}")
    private String gRPCServerIpaddr;

    public static final String PY_PACKAGE = "pdd_sale_pred.sale_predict_run";
    public static final String PY_METHOD = "predictor_main";

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> timeSequenceDailySale(){
        ManagedChannel channel = ManagedChannelBuilder.forTarget(gRPCServerIpaddr) .usePlaintext().build();
        KafkaTimeSequenceClient client = new KafkaTimeSequenceClient(channel);
        return kStream ->  kStream
                .filter((key,value) -> value !=null).map( (key,value) -> {
                    ResultEntry re = new ResultEntry();
                    try {
                        resultMsg reponse = client.setRequest(key, value, PY_PACKAGE, PY_METHOD);
                        re.setCode(reponse.getStatus().getNumber());
                        if(reponse.getStatus() == resultMsg.STATUS.SUCCESS){
                            re.setData(new JSONObject(reponse.getData()));
                        }
                        re.setMsg(reponse.getMessage());
                        log.warn("---------"+re.getCode());
                    }catch (Exception e){
                        log.error(e.getMessage());
                        re.setCode(500);
                        re.setMsg(e.getMessage());
                    }
                    return new KeyValue<String, String>(key,re.toString());
                }
        ).filter( (key,value) -> value !=null );
    };
}

spring yml 配置绑定kafka配置和流处理

spring:
  kafka:
    bootstrap-servers: kafka01:9092,kafka02:9092,kafka03:9092
    producer:
      acks: all
      key-serializer: org.springframework.kafka.support.serializer.StringOrBytesSerializer
      value-serializer: org.springframework.kafka.support.serializer.StringOrBytesSerializer
    consumer:
      enable-auto-commit: false
      auto-offset-reset: latest
      group-id: springpMb1
    listener:
      ack-mode: MANUAL_IMMEDIATE
  cloud:
    stream:
      function:
        definition: timeSequenceDailySale
        bindings:
          timeSequenceDailySale-in-0:
            destination: jpdd-sale-pred-in
          timeSequenceDailySale-out-0:
            destination: jpdd-sale-pred-out
      kafka:
        binder:
          autoCreateTopics: true
          autoAddPartitions: true
          replicationFactor: 3
          brokers:
            - kafka01:9092
            - kafka02:9092
            - kafka03:9092
        streams:
          binder:
            configuration:
              num:
                stream:
                  threads: 10
              default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

定义监听kafka消息队列并处理

@Component
@Slf4j
public class TBImgsDownlodRunner {

    @Autowired
    private RestTemplate restTemplate ;

    @Value("${app.tb-img-download.output.dir-path}")
    private String downloadDir ;

    @Value("${app.tb-img-download.output.dir-spread-num}")
    private int dirSpreadNum ;

    @Value("#{'${app.tb-img-download.msg}'.split(',')}")
    private String[] topics;

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @KafkaListener(topics = "#{'${app.tb-img-download.msg}'.split(',')}", concurrency = "${app.tb-img-download.concurrency:3}"
            , properties = {
                ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=20",
                ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + "=18000",
                ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + "=50000",
                ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + "=60000",
//                ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG + "=7000",
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + "=false"
            } )
    public void processMessage(@Payload String msg,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int  partition,
                        @Header(KafkaHeaders.OFFSET) Long offset,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                        @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment,
                       @Header(KafkaHeaders.CONSUMER) Consumer consumer

    ) {

        try {

            JSONObject msgJson = new JSONObject(msg);
            String url = msgJson.getString("img_url");
            BigInteger id = msgJson.getBigInteger("goods_id");
            int platform = Arrays.asList(topics).indexOf(topic);
            HttpHeaders headers = new HttpHeaders();
            headers.setAccept(Arrays.asList(MediaType.APPLICATION_OCTET_STREAM));
            HttpEntity<String> entity = new HttpEntity<String>(headers);
            ResponseEntity<byte[]> response = restTemplate.exchange(
                    url,
                    HttpMethod.GET, entity, byte[].class, "");
            String name = Arrays.stream(url.split("/")).reduce((first, second) -> second)
                    .orElse(null);
            if (response.getStatusCode() == HttpStatus.OK) {
                String downloadDirid = downloadDir + offset % dirSpreadNum ;
                File f = new File(downloadDirid);
                if (!f.exists()) {
                    f.mkdir();
                }
                Files.write(Paths.get(downloadDirid+"/"+platform+"    "+id+"    "+name), response.getBody());
                //log.debug(url);
            }else{
                //throw new IOException(url+" StatusCode:" + response.getStatusCode());
                log.error(" StatusCode:" + response.getStatusCode()+"=====> id:"+id+" : "+url);
            }
            if(acknowledgment != null) {
                acknowledgment.acknowledge();
            }
        }catch (IOException e){
            e.printStackTrace();
            log.error(e.getMessage());
        }
    }
}

通过KafkaTemplate和AdminClient管理kafka队列


@Autowired
private KafkaTemplate<String, String> template;

this.template.send(stm.getTopic(), stm.getKey(),stm.getValue());



@Autowired
private KafkaAdmin kafkaAdmin;

AdminClient admin = AdminClient.create(kafkaAdmin.getConfigurationProperties());
ListTopicsResult lr = admin.listTopics();


DescribeTopicsResult dtr = admin.describeTopics(Collections.singleton(stm.getTopic()));
TopicDescription td = dtr.values().get(stm.getTopic()).get();


Map<TopicPartition, OffsetSpec> mop = new HashMap<TopicPartition, OffsetSpec>();
for(int i =0;i <= stm.getPartition(); i++){
    mop.put( new TopicPartition(stm.getTopic(), i),OffsetSpec.latest());
}
ResultEntry re = new ResultEntry();
try {
    List<String> out = admin.listOffsets(mop).all().get().entrySet().stream().map(k -> "partition:" + k.getKey().partition() + " ==> offset:" + k.getValue().offset() + " ==> timestamp:" + k.getValue().timestamp()).collect(Collectors.toList());
    admin.close();
    re.setSuccess(out);
    return re;
}catch (Exception e){
    re.setError(e.getMessage());
}


admin.deleteTopics(Collections.singleton(stm.getTopic()));

admin.close();
Share