项目中遇到跨语言调度的情况,脱离了消息队列,使用远程调度的方式可以考虑使用rpc协议,推荐grpc,简单好用。
整合方式
- 通过工具输入命令,手段生成grpc代码,复制到项目去–太麻烦
- java通过maven插件,编译生成class文件,生成项目依赖,自动构建–方便
下面提供maven自动生成grpc依赖的文件的方式,整合spring/kafka stream
1.项目配置
<!-- 定义项目继承spring模块 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<!-- 定义插件/依赖版本变量 -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.37.1</grpc.version>
<protobuf.version>3.17.3</protobuf.version>
<protoc.version>3.17.3</protoc.version>
<spring-cloud.version>2020.0.3</spring-cloud.version>
</properties>
<!-- 项目依赖管理指定grpc和spring cloud父类POM,只是声明依赖,并不实现引入,自动实现项目的目录分配路径 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
<version>${grpc.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<!-- 这里定义了所需的grpc依赖,spring的依赖不明确指出了 -->
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<!-- 项目构建使用了grpc下载/编译插件 -->
<!-- 1.它能够在target目录下下载grpc tools可执行文件 -->
<!-- 2.它能够使用grpc tools编译proto文件生成目标类 -->
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 附加:maven-jar-plugin 插件指定主要jar 执行类和依赖jar包复制 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>libs/</classpathPrefix>
<mainClass>
org.haooho.com.StreamingApplication
</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
2.编写proto协议文件
syntax = "proto3";
option java_package = "proto.kafka.service";
option java_outer_classname = "KafkaRpcPredictionProto";
option java_multiple_files = true;
option objc_class_prefix = "HLW";
package proto.kafka.service;
// 定义服务
service KafkaRpcPrediction {
//定义rpc方法
rpc Prediction (msgRequest) returns (resultMsg) {}
}
// 定义请求消息
message msgRequest {
string key = 1;
string data = 2;
string package = 3;
string method = 4;
}
// 定义响应
message resultMsg {
string key = 1;
string message = 2;
string data = 3;
enum STATUS {
NOTING = 0;
ERRORS = 500;
SUCCESS = 200;
INIT = 100;
NOT_REGULAR = 501;
}
STATUS status = 4;
}
// 把文件放到 resources 目录下
3.构建项目
mvn clean protobuf:compile protobuf:compile-custom package -DskipTest
·target\generated-sources\protobuf\grpc-java\ 含有生成的java文件
4.编辑代码,java这边我是作为客户端请求python预测模型的,使用了同步阻塞客户端实现,setRequest是主要的请求方法
@Log
public class KafkaTimeSequenceClient {
private final KafkaRpcPredictionGrpc.KafkaRpcPredictionBlockingStub blockingStub;
public KafkaTimeSequenceClient(Channel channel){
this.blockingStub = KafkaRpcPredictionGrpc.newBlockingStub(channel);
}
public resultMsg setRequest(String key,String data,String packageName,String methodName) {
msgRequest request = msgRequest.newBuilder().setData(data).setKey(key).setPackage(packageName).setMethod(methodName).build();
resultMsg response;
try {
response = this.blockingStub.prediction(request);
} catch (StatusRuntimeException e) {
log.warning("RPC failed:"+ e.getStatus());
throw e ;
}
return response;
}
}
###下面提供python的实现
·requirements.txt == >python依赖需要包含:
protobuf==3.17.3
grpcio==1.38.1
grpcio-tools==1.38.1
·安装依赖
pip install -r requirements.txt
·手段输入命令生成python rpc所需的文件,-I 指定执行目录。--python_out 指定protobuff生成代码输出目录 --grpc_python_out 指定grpc生成代码输出目录。指定模板协议文件
python -m grpc_tools.protoc -I SteamingPrediction/src/main/proto --python_out=SteamingPrediction/src/main/python/grpc/proto/kafka/service --grpc_python_out=SteamingPrediction/src/main/python/grpc/proto/kafka/service KafkaRpcPrediction.proto
最终在SteamingPrediction/src/main/python/grpc/proto/kafka/service 生成
1.protocol buffer code
2.Client and server classes corresponding to protobuf-defined services
###下面使启动线程作kafka stream流处理,createPredictionStream为具体调用grpc的实现方法
public static void main(final String[] args) throws IOException {
final Properties props = getStreamsConfig(args);
ManagedChannel channel = ManagedChannelBuilder.forTarget(props.getProperty(PREDICTION_RPC_ADDR)) .usePlaintext().build();
KafkaTimeSequenceClient client = new KafkaTimeSequenceClient(channel);
final StreamsBuilder builder = new StreamsBuilder();
createPredictionStream(builder,client);
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-sequence-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (final Throwable e) {
System.exit(1);
}
System.exit(0);
}
流处理同样可以在spring的@KafkaListener订阅方法下下实现
static void createPredictionStream为具体的实现方法(final StreamsBuilder builder,final KafkaTimeSequenceClient client) {
final KStream<String, String> source = builder.stream(INPUT_TOPIC
, Consumed.with(stringSerde, stringSerde)
);
source.filter((key,value) -> value !=null).map( (key,value) -> {
ResultEntry re = new ResultEntry();
try {
resultMsg reponse = client.setRequest(key, value, PY_PACKAGE, PY_METHOD);
re.setCode(reponse.getStatus().getNumber());
if(reponse.getStatus() == resultMsg.STATUS.SUCCESS){
re.setData(new JSONObject(reponse.getData()));
}
re.setMsg(reponse.getMessage());
}catch (Exception e){
log.error(e.getMessage());
re.setCode(500);
re.setMsg(e.getMessage());
}
return new KeyValue<String, String>(key,re.toString());
}
).filter( (key,value) -> value !=null ).to(OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));
}
5.服务端代码,Python的实现,接受包名,方法名,参数,映射实现方法
class kafkaTimeSequenceServer(KafkaRpcPrediction_pb2_grpc.KafkaRpcPredictionServicer):
def Prediction(self, msg: KafkaRpcPrediction_pb2.msgRequest, context):
"""预测并且返回
"""
log = def_console_logger()
try:
package = msg.package
method = msg.method
result = self.__build_model_rule(package, method, msg.data)
log.info(result)
return KafkaRpcPrediction_pb2.resultMsg(key=msg.key, data=json.dumps(result, ensure_ascii=False), status=KafkaRpcPrediction_pb2.resultMsg.STATUS.SUCCESS, message="success")
except Exception as e:
print(traceback.format_exc())
mob = 'prediction.%s' % msg.package
log.error("error %s >>>> ask:package: '%s ==>main_rpc_class()' with method: '%s' with data: %s" % (traceback.format_exc().replace("\n", ""),mob, msg.method, msg.data))
return KafkaRpcPrediction_pb2.resultMsg(key=msg.key, message=str(e), status=KafkaRpcPrediction_pb2.resultMsg.STATUS.ERRORS)
def __build_model_rule(self, package, method, data):
mob = 'prediction.%s' % package
uh = importlib.import_module(mob)
clazz = getattr(uh, "main_rpc_class")()
return getattr(clazz, method)(data)
def serve(ipaddr,max_worker):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_worker))
KafkaRpcPrediction_pb2_grpc.add_KafkaRpcPredictionServicer_to_server(kafkaTimeSequenceServer(), server)
server.add_insecure_port(ipaddr)
server.start()
server.wait_for_termination()