项目的依赖配置
引入spring流处理的kafka框架
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
引入kafka客户端相关依赖,注意scala版本
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.8.0</version>
<exclusions>
<exclusion>
<artifactId>scala-reflect</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
由于版本冲突,注意移除框架里面的kafka-streams
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
<exclusions>
<exclusion>
<artifactId>kafka-streams</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
实现一个流程处理线程
@Configuration
class TimeSequenceStreamRunner {
@Value("${gRPC.Server.ipaddr}")
private String gRPCServerIpaddr;
public static final String PY_PACKAGE = "pdd_sale_pred.sale_predict_run";
public static final String PY_METHOD = "predictor_main";
@Bean
public Function<KStream<String, String>, KStream<String, String>> timeSequenceDailySale(){
ManagedChannel channel = ManagedChannelBuilder.forTarget(gRPCServerIpaddr) .usePlaintext().build();
KafkaTimeSequenceClient client = new KafkaTimeSequenceClient(channel);
return kStream -> kStream
.filter((key,value) -> value !=null).map( (key,value) -> {
ResultEntry re = new ResultEntry();
try {
resultMsg reponse = client.setRequest(key, value, PY_PACKAGE, PY_METHOD);
re.setCode(reponse.getStatus().getNumber());
if(reponse.getStatus() == resultMsg.STATUS.SUCCESS){
re.setData(new JSONObject(reponse.getData()));
}
re.setMsg(reponse.getMessage());
log.warn("---------"+re.getCode());
}catch (Exception e){
log.error(e.getMessage());
re.setCode(500);
re.setMsg(e.getMessage());
}
return new KeyValue<String, String>(key,re.toString());
}
).filter( (key,value) -> value !=null );
};
}
spring yml 配置绑定kafka配置和流处理
spring:
kafka:
bootstrap-servers: kafka01:9092,kafka02:9092,kafka03:9092
producer:
acks: all
key-serializer: org.springframework.kafka.support.serializer.StringOrBytesSerializer
value-serializer: org.springframework.kafka.support.serializer.StringOrBytesSerializer
consumer:
enable-auto-commit: false
auto-offset-reset: latest
group-id: springpMb1
listener:
ack-mode: MANUAL_IMMEDIATE
cloud:
stream:
function:
definition: timeSequenceDailySale
bindings:
timeSequenceDailySale-in-0:
destination: jpdd-sale-pred-in
timeSequenceDailySale-out-0:
destination: jpdd-sale-pred-out
kafka:
binder:
autoCreateTopics: true
autoAddPartitions: true
replicationFactor: 3
brokers:
- kafka01:9092
- kafka02:9092
- kafka03:9092
streams:
binder:
configuration:
num:
stream:
threads: 10
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
定义监听kafka消息队列并处理
@Component
@Slf4j
public class TBImgsDownlodRunner {
@Autowired
private RestTemplate restTemplate ;
@Value("${app.tb-img-download.output.dir-path}")
private String downloadDir ;
@Value("${app.tb-img-download.output.dir-spread-num}")
private int dirSpreadNum ;
@Value("#{'${app.tb-img-download.msg}'.split(',')}")
private String[] topics;
@Autowired
private KafkaListenerEndpointRegistry registry;
@KafkaListener(topics = "#{'${app.tb-img-download.msg}'.split(',')}", concurrency = "${app.tb-img-download.concurrency:3}"
, properties = {
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=20",
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + "=18000",
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + "=50000",
ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + "=60000",
// ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG + "=7000",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + "=false"
} )
public void processMessage(@Payload String msg,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) Long offset,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment,
@Header(KafkaHeaders.CONSUMER) Consumer consumer
) {
try {
JSONObject msgJson = new JSONObject(msg);
String url = msgJson.getString("img_url");
BigInteger id = msgJson.getBigInteger("goods_id");
int platform = Arrays.asList(topics).indexOf(topic);
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Arrays.asList(MediaType.APPLICATION_OCTET_STREAM));
HttpEntity<String> entity = new HttpEntity<String>(headers);
ResponseEntity<byte[]> response = restTemplate.exchange(
url,
HttpMethod.GET, entity, byte[].class, "");
String name = Arrays.stream(url.split("/")).reduce((first, second) -> second)
.orElse(null);
if (response.getStatusCode() == HttpStatus.OK) {
String downloadDirid = downloadDir + offset % dirSpreadNum ;
File f = new File(downloadDirid);
if (!f.exists()) {
f.mkdir();
}
Files.write(Paths.get(downloadDirid+"/"+platform+" "+id+" "+name), response.getBody());
//log.debug(url);
}else{
//throw new IOException(url+" StatusCode:" + response.getStatusCode());
log.error(" StatusCode:" + response.getStatusCode()+"=====> id:"+id+" : "+url);
}
if(acknowledgment != null) {
acknowledgment.acknowledge();
}
}catch (IOException e){
e.printStackTrace();
log.error(e.getMessage());
}
}
}
通过KafkaTemplate和AdminClient管理kafka队列
@Autowired
private KafkaTemplate<String, String> template;
this.template.send(stm.getTopic(), stm.getKey(),stm.getValue());
@Autowired
private KafkaAdmin kafkaAdmin;
AdminClient admin = AdminClient.create(kafkaAdmin.getConfigurationProperties());
ListTopicsResult lr = admin.listTopics();
DescribeTopicsResult dtr = admin.describeTopics(Collections.singleton(stm.getTopic()));
TopicDescription td = dtr.values().get(stm.getTopic()).get();
Map<TopicPartition, OffsetSpec> mop = new HashMap<TopicPartition, OffsetSpec>();
for(int i =0;i <= stm.getPartition(); i++){
mop.put( new TopicPartition(stm.getTopic(), i),OffsetSpec.latest());
}
ResultEntry re = new ResultEntry();
try {
List<String> out = admin.listOffsets(mop).all().get().entrySet().stream().map(k -> "partition:" + k.getKey().partition() + " ==> offset:" + k.getValue().offset() + " ==> timestamp:" + k.getValue().timestamp()).collect(Collectors.toList());
admin.close();
re.setSuccess(out);
return re;
}catch (Exception e){
re.setError(e.getMessage());
}
admin.deleteTopics(Collections.singleton(stm.getTopic()));
admin.close();