spring 整合grpc maven自动构建

项目中遇到跨语言调度的情况,脱离了消息队列,使用远程调度的方式可以考虑使用rpc协议,推荐grpc,简单好用。

整合方式

  1. 通过工具输入命令,手段生成grpc代码,复制到项目去–太麻烦
  2. java通过maven插件,编译生成class文件,生成项目依赖,自动构建–方便

下面提供maven自动生成grpc依赖的文件的方式,整合spring/kafka stream

1.项目配置

<!-- 定义项目继承spring模块 -->
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.5.2</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<!-- 定义插件/依赖版本变量 -->
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <grpc.version>1.37.1</grpc.version>
    <protobuf.version>3.17.3</protobuf.version>
    <protoc.version>3.17.3</protoc.version>
    <spring-cloud.version>2020.0.3</spring-cloud.version>
</properties>

<!-- 项目依赖管理指定grpc和spring cloud父类POM,只是声明依赖,并不实现引入,自动实现项目的目录分配路径 -->
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-bom</artifactId>
            <version>${grpc.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<!-- 这里定义了所需的grpc依赖,spring的依赖不明确指出了 -->
<dependencies>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-netty-shaded</artifactId>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-protobuf</artifactId>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-stub</artifactId>
    </dependency>
     <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java-util</artifactId>
        <version>${protobuf.version}</version>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-testing</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
<!-- 项目构建使用了grpc下载/编译插件 -->
<!-- 1.它能够在target目录下下载grpc tools可执行文件 -->
<!-- 2.它能够使用grpc tools编译proto文件生成目标类 -->
<build>
    <extensions>
        <extension>
            <groupId>kr.motd.maven</groupId>
            <artifactId>os-maven-plugin</artifactId>
            <version>1.6.2</version>
        </extension>
    </extensions>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
        <plugin>
            <groupId>org.xolstice.maven.plugins</groupId>
            <artifactId>protobuf-maven-plugin</artifactId>
            <version>0.6.1</version>
            <configuration>
                <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
                <pluginId>grpc-java</pluginId>
                <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
            </configuration>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>compile-custom</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <!-- 附加:maven-jar-plugin 插件指定主要jar 执行类和依赖jar包复制 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <configuration>
                <archive>
                    <manifest>
                        <addClasspath>true</addClasspath>
                        <classpathPrefix>libs/</classpathPrefix>
                        <mainClass>
                            org.haooho.com.StreamingApplication
                        </mainClass>
                    </manifest>
                </archive>
            </configuration>
        </plugin>
    </plugins>
</build>

2.编写proto协议文件

syntax = "proto3";

option java_package = "proto.kafka.service";
option java_outer_classname = "KafkaRpcPredictionProto";
option java_multiple_files = true;
option objc_class_prefix = "HLW";
package proto.kafka.service;

// 定义服务
service KafkaRpcPrediction {
  //定义rpc方法
  rpc Prediction (msgRequest) returns (resultMsg) {}
}

// 定义请求消息
message msgRequest {
  string key = 1;
  string data = 2;
  string package = 3;
  string method = 4;
}

// 定义响应
message resultMsg {
  string key = 1;
  string message = 2;
  string data = 3;
  enum STATUS {
    NOTING = 0;
    ERRORS = 500;
    SUCCESS = 200;
    INIT = 100;
    NOT_REGULAR = 501;
  }
  STATUS status = 4;
}
// 把文件放到 resources 目录下

3.构建项目

mvn clean protobuf:compile protobuf:compile-custom package -DskipTest

·target\generated-sources\protobuf\grpc-java\ 含有生成的java文件

4.编辑代码,java这边我是作为客户端请求python预测模型的,使用了同步阻塞客户端实现,setRequest是主要的请求方法

@Log
public class KafkaTimeSequenceClient {

    private final KafkaRpcPredictionGrpc.KafkaRpcPredictionBlockingStub blockingStub;
    public KafkaTimeSequenceClient(Channel channel){
        this.blockingStub = KafkaRpcPredictionGrpc.newBlockingStub(channel);
    }
    public resultMsg setRequest(String key,String data,String packageName,String methodName) {
        msgRequest request = msgRequest.newBuilder().setData(data).setKey(key).setPackage(packageName).setMethod(methodName).build();
        resultMsg response;
        try {
            response = this.blockingStub.prediction(request);
        } catch (StatusRuntimeException e) {
            log.warning("RPC failed:"+ e.getStatus());
            throw e ;
        }
        return response;
    }


}

###下面提供python的实现

 ·requirements.txt == >python依赖需要包含: 
protobuf==3.17.3
grpcio==1.38.1
grpcio-tools==1.38.1


 ·安装依赖
pip install -r requirements.txt

 ·手段输入命令生成python rpc所需的文件,-I  指定执行目录。--python_out  指定protobuff生成代码输出目录 --grpc_python_out 指定grpc生成代码输出目录。指定模板协议文件
python -m grpc_tools.protoc -I  SteamingPrediction/src/main/proto --python_out=SteamingPrediction/src/main/python/grpc/proto/kafka/service  --grpc_python_out=SteamingPrediction/src/main/python/grpc/proto/kafka/service KafkaRpcPrediction.proto

最终在SteamingPrediction/src/main/python/grpc/proto/kafka/service 生成
1.protocol buffer code
2.Client and server classes corresponding to protobuf-defined services

###下面使启动线程作kafka stream流处理,createPredictionStream为具体调用grpc的实现方法

public static void main(final String[] args) throws IOException {
        final Properties props = getStreamsConfig(args);
        ManagedChannel channel = ManagedChannelBuilder.forTarget(props.getProperty(PREDICTION_RPC_ADDR)) .usePlaintext().build();
        KafkaTimeSequenceClient client = new KafkaTimeSequenceClient(channel);
        final StreamsBuilder builder = new StreamsBuilder();
        createPredictionStream(builder,client);
        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        final CountDownLatch latch = new CountDownLatch(1);
        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-sequence-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });
        try {
            streams.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }

流处理同样可以在spring的@KafkaListener订阅方法下下实现

static void createPredictionStream为具体的实现方法(final StreamsBuilder builder,final KafkaTimeSequenceClient client) {
        final KStream<String, String> source = builder.stream(INPUT_TOPIC
        , Consumed.with(stringSerde, stringSerde)
        );
        source.filter((key,value) -> value !=null).map( (key,value) -> {
                    ResultEntry re = new ResultEntry();
                    try {
                        resultMsg reponse = client.setRequest(key, value, PY_PACKAGE, PY_METHOD);
                        re.setCode(reponse.getStatus().getNumber());
                        if(reponse.getStatus() == resultMsg.STATUS.SUCCESS){
                            re.setData(new JSONObject(reponse.getData()));
                        }
                        re.setMsg(reponse.getMessage());
                    }catch (Exception e){
                        log.error(e.getMessage());
                        re.setCode(500);
                        re.setMsg(e.getMessage());
                    }
                    return new KeyValue<String, String>(key,re.toString());
                }
        ).filter( (key,value) -> value !=null ).to(OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));
    }

5.服务端代码,Python的实现,接受包名,方法名,参数,映射实现方法

class kafkaTimeSequenceServer(KafkaRpcPrediction_pb2_grpc.KafkaRpcPredictionServicer):

    def Prediction(self, msg: KafkaRpcPrediction_pb2.msgRequest, context):
        """预测并且返回
        """
        log = def_console_logger()
        try:
            package = msg.package
            method = msg.method
            result = self.__build_model_rule(package, method, msg.data)
            log.info(result)
            return KafkaRpcPrediction_pb2.resultMsg(key=msg.key, data=json.dumps(result, ensure_ascii=False), status=KafkaRpcPrediction_pb2.resultMsg.STATUS.SUCCESS, message="success")
        except Exception as e:
            print(traceback.format_exc())
            mob = 'prediction.%s' % msg.package
            log.error("error %s >>>> ask:package: '%s ==>main_rpc_class()' with method: '%s' with data: %s" % (traceback.format_exc().replace("\n", ""),mob, msg.method, msg.data))
            return KafkaRpcPrediction_pb2.resultMsg(key=msg.key, message=str(e), status=KafkaRpcPrediction_pb2.resultMsg.STATUS.ERRORS)

    def __build_model_rule(self, package, method, data):
        mob = 'prediction.%s' % package
        uh = importlib.import_module(mob)
        clazz = getattr(uh, "main_rpc_class")()
        return getattr(clazz, method)(data)

def serve(ipaddr,max_worker):
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_worker))
    KafkaRpcPrediction_pb2_grpc.add_KafkaRpcPredictionServicer_to_server(kafkaTimeSequenceServer(), server)
    server.add_insecure_port(ipaddr)
    server.start()
    server.wait_for_termination()
Share