HEROJOON 블로그(히로블)

Java gRPC API를 사용하여 gRPC 통신 4가지 예제 만들어 보기 본문

Backend

Java gRPC API를 사용하여 gRPC 통신 4가지 예제 만들어 보기

herojoon 2024. 7. 7. 23:21
반응형

gRPC란?

gRPC는 Google Remote Procedure Call의 약자로, Google에서 개발한 고성능 오픈소스 범용 RPC 프레임워크입니다.

gRPC

 

gRPC의 간단한 특징

  • Protocol Buffers(Protobuf)를 인터페이스 정의 언어(IDL: Interface Definition Language)로 사용
  • Protocol Buffers(Protobuf)는 다른 메시지 방식과 비교해 압축률이 좋고 빠른 속도의 장점을 제공
  • 다양한 프로그래밍 언어 지원 (.proto 파일을 다양한 언어의 소스파일로 생성 해줌)
  • HTTP/2 기반 전송을 통한 양방향 스트리밍 지원
  • SSL/TLS,  ALTS, Google을 이용한 토큰 인증 등 다양한 보안 지원

 

Stub이란?

  • Stub(스텁)은 gRPC에서 중요한 개념입니다. Stub Layer(스텁 레이어)는 Call Layer(호출 레이어)을 감싸는 Wrapper(래퍼)입니다.
  • gRPC는 .proto파일을 컴파일하여 자동으로 Stub 클래스 코드를 생성해줍니다.
  • Stub은 서버와 클라이언트가 통신하는데 사용됩니다.
  • 클라이언트가 원격 서버의 메서드를 호출할 때, 
    • Stub은 호출하는 메서드의 인자들을 직렬화하여 네트워크 전송가능한 형태로 만들고(마샬링)
    • Stub은 직렬화된 데이터를 gRPC 채널을 통해 서버로 전송합니다.
    • 서버로부터의 응답이 오면 Stub은 응답 데이터를 역직렬화하여(언마샬링) 클라이언트에 반환합니다.

 

RPC란?

원격 프로시저 호출(Remote Procedure Call)로 원격지의 프로세스나 메서드를 호출하여 사용할 수 있도록 해줍니다.
프로세스 간 통신기술이라고 할 수 있습니다.

 

Protocol Buffers란?

프로토콜 버퍼는 구조화된 데이터를 직렬화하기 위한 Google의 언어 중립적, 플랫폼 중립적, 확장 가능한 메커니즘입니다.

// .proto파일 예시
syntax = "proto3";

message SearchRequest {
  string query = 1;
  int32 page_number = 2;
  int32 results_per_page = 3;
}

 

gRPC와 REST 차이

  gRPC REST
호출 방식 클라이언트가 서버의 특정 함수를 직,간접적으로 호출하는 방식 클라이언트가 서버에 데이터나 행위를 요청하는 방식
통신 프로토콜 http/2.0
- gRPC를 사용하기 위해서는 로드밸런서에서 http/2.0을 지원해야 합니다.
http/1.1
데이터 형식 .proto 데이터 형식 (Protocol Buffers)
- 바이너리로 데이터 직렬화하여 사용
json, xml 데이터 형식 (주로 json)
양방향 스트리밍 지원 지원하지 않음 (WebSocket을 사용하여 가능)
서비스 정의
(Service Contract)
- 클라이언트와 서버 간 통신에 대한 규약, 약속에 대한 정의, 문서
gRPC는 서비스를 정의하는 파일로 .proto를 사용해야 하며(강제), 정의한 .proto 파일을 컴파일하여 클라이언트와 서버의 소스코드를 자동 생성해줍니다. 정해진 규약을 사용해야하는 엄격함이 있습니다. REST의 경우는 엄격한 규약이 강제로 정해져 있지 않습니다.(느슨함)
REST의 경우는 한 예로 OpenAPI와 같은 도구를 이용하여 API문서를 작성하고 데이터에 대한 정의를 지정할 수 있습니다.
브라우저 지원 지원하지 않음 (별도 Proxy 사용하면 됨) 지원
사용 예 저지연, 고성능 사용이 필요한 서비스 통신
실시간 스트리밍

ex) MSA 구조의 백엔드 서비스끼리의 통신
웹 통신
ex) 프론트엔드와 백엔드 간 통신

 

gRPC 통신 간략 소개

gRPC에서는 아래와 같이 4가지의 통신을 지원합니다.

 

1. Unary RPC (단항 RPC)

// 1.Unary RPC (단항 RPC)
rpc SayHello(HelloRequest) returns (HelloResponse);

 

2. Server Streaming RPC (서버 스트리밍 RPC)

// 2. Server Streaming RPC (서버 스트리밍 RPC)
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);

 

3. Client Streaming RPC (클라이언트 스트리밍 RPC)

// 3. Client Streaming RPC (클라이언트 스트리밍 RPC)
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);

 

4. Bidirectional Streaming RPC (양방향 스트리밍 RPC)

// 4. Bidirectional Streaming RPC (양방향 스트리밍 RPC)
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);

gRPC 통신 4가지

 

Java를 이용한 gRPC 예제 구현 해보기

개발 환경

  • Java 17
  • Spring Boot 3.3.0
  • Gradle 8.6
  • Google ProtoBuffers (proto 3)

 

Java에서 gRPC를 사용하기 위한 조건

 

Java에서 gRPC를 사용하기 요약

  1. build.gradle에 gRPC 의존성 추가
  2. .proto파일 생성 (서비스 정의)
  3. Gradle Build하여 Protocol Buffer Complier로 Java 서버, 클라이언트 코드 생성
  4. Java gRPC API를 사용하여 gRPC 통신 4가지 예제 만들어 보기 (클라이언트, 서버)
  5. 예제 실행해보기 ( 서버 애플리케이션 실행 후 클라이언트 애플리케이션 실행)

 

예제 시나리오

클라이언트에서 자기소개 하면 서버에서 인사와 질문 응답주기

gRPC java 예제

 

build.gradle에 gRPC 의존성 추가

<build.gradle파일에 추가해줘야 하는 gRPC 코드>

plugins {
    /* for gRPC */
    id 'com.google.protobuf' version '0.9.4'
}

/* for gRPC */
protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:3.25.1"
    }
    plugins {
        grpc {
            artifact = 'io.grpc:protoc-gen-grpc-java:1.64.0'
        }
    }
    generateProtoTasks {
        all()*.plugins {
            grpc {}
        }
    }
}

dependencies {
    /* for gRPC */
    runtimeOnly 'io.grpc:grpc-netty-shaded:1.64.0'
    implementation 'io.grpc:grpc-protobuf:1.64.0'
    implementation 'io.grpc:grpc-stub:1.64.0'
    compileOnly 'org.apache.tomcat:annotations-api:6.0.53' // necessary for Java 9+
}

 

<build.gradle파일 전체 코드>

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.3.0'
    id 'io.spring.dependency-management' version '1.1.5'

    /* for gRPC */
    id 'com.google.protobuf' version '0.9.4'
}

group = 'com.herojoon.grpc'
version = '0.0.1-SNAPSHOT'

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(17)
    }
}

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

/* for gRPC */
protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:3.25.1"
    }
    plugins {
        grpc {
            artifact = 'io.grpc:protoc-gen-grpc-java:1.64.0'
        }
    }
    generateProtoTasks {
        all()*.plugins {
            grpc {}
        }
    }
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-validation'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    compileOnly 'org.projectlombok:lombok'
    developmentOnly 'org.springframework.boot:spring-boot-devtools'
    annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'

    /* for gRPC */
    runtimeOnly 'io.grpc:grpc-netty-shaded:1.64.0'
    implementation 'io.grpc:grpc-protobuf:1.64.0'
    implementation 'io.grpc:grpc-stub:1.64.0'
    compileOnly 'org.apache.tomcat:annotations-api:6.0.53' // necessary for Java 9+
}

tasks.named('test') {
    useJUnitPlatform()
}

 

.proto 파일 작성 (Google Protocol Buffers "proto 3"버전 사용)

<hello.proto파일 코드>

/*
gRPC 서비스 정의
1) .proto파일은 src/main/proto 경로 아래 작성해줍니다.
2) .proto파일 작성 후 build Task를 실행시켜줍니다.
3) .proto파일에 정의한 서비스에 대하여 Java언어로 소스코드가 자동 생성됩니다.
4) 생성된 자바 소스코드는 build/classes/generated/source/proto/main 아래 생성됩니다.
 */

// protocol buffurs의 버전을 명시해줍니다. gRPC에서는 주로 proto3을 사용합니다.
syntax = "proto3";

/*
java_multiple_files (default value: false)
- false일 경우 .proto 파일로 생성될 클래스 파일이 하나로 생성됨
- true일 경우 .proto 파일로 생성될 클래스 파일이 나뉘어 생성됨
 */
option java_multiple_files = true;
option java_package = "com.herojoon.grpc";  // .proto 파일로 생성될 클래스 파일이 위치할 패키지명
option java_outer_classname = "HelloProto";  // .proto 파일로 생성될 클래스 파일명


/*
통신할 서비스를 정의합니다.
- gRPC 종류: https://grpc.io/docs/what-is-grpc/core-concepts/#rpc-life-cycle
 */
service Hello {
  rpc SayHello (HelloRequest) returns (HelloResponse);  // Unary RPC (단방향 RPC)
  rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);  // Server streaming RPC (서버 스트리밍 RPC)
  rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);  // Client streaming RPC (클라이언트 스트리밍 RPC)
  rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);  // Bidirectional streaming RPC (양방향 스트리밍 RPC)
}

/*
요청 메시지 정의 (클라이언트 -> 서버)
 */
message HelloRequest {
  string name = 1;      // 이름
  int32 age = 2;        // 나이
  string message = 3;   // 하고 싶은 말 메시지
}

/*
응답 메시지 정의 (서버 -> 클라이언트)
 */
message HelloResponse {
  string greeting_message = 1;  // 인사 메시지
  string question_message = 2;  // 질문 메시지
}

 

● proto 네이밍 컨벤션

공식문서에서 proto3의 네이밍 컨벤션은 아래와 같습니다.

  • message는 UpperCamelCase (PascalCase)로 권장합니다.
  • field는 SnakeCase로 작성을 권장합니다.
네이밍 컨벤션(Naming Convention)이란?
개발에서 함수명이나 변수명, 파일명 등을 어떤식으로 표기할지에 대한 네이밍 표기 규칙입니다.


CamelCase와 SnakeCase는 네이밍 컨벤션의 종류를 뜻합니다.


CamelCase는 단어의 구분자를 대문자로 표시해주는 네이밍 표기법입니다.
CamelCase는 lowerCamelCase와 UpperCamelCase로 나뉘는데
lowerCamelCase는 앞 글자가 소문자인 CamelCase를 뜻합니다. ex) helloGreetingReq
UpperCamelCase는 앞 글자가 대문자인 CamelCase를 뜻한다. ex) HelloGreetingReq


SnakeCase는 단어의 구분자를 언더바(_)로 표시해주는 네이밍 표기법입니다.
ex) hello_greeting_req

 

.proto파일 생성 위치

.proto파일은 /src/main/proto폴더 아래 생성해줍니다.

 

 .proto파일을 Java 서버, 클라이언트 코드로 생성

Gradle Build 해주면 해당 위치에 있는 .proto파일을 java 코드로 변환해줍니다.

gRPC는 Protocol Buffer Complier를 사용하여 .proto파일을 서버와 클라이언트 코드를 생성해줍니다.

 

.proto파일로 생성된 코드는 아래와 같이 project/build/generated/source/proto/main/ 아래로 코드가 생성됩니다.

 

Java gRPC API를 사용하여 gRPC 통신 4가지 예제 만들어 보기 (클라이언트, 서버)

 

예제 코드는 아래와 같이 만들었습니다.

예제 코드 예제 코드 설명
HelloGrpcClient.java 클라이언트 애플리케이션 Main
HelloGrpcClientCaller.java 클라이언트 동작 코드
HelloGrpcServer.java 서버 애플리케이션 Main
HelloGrpcServiceImpl.java 서버 동작 코드

 

예제1. Unary RPC (단항 RPC)

  • 단항 RPC - 클라이언트 코드
// HelloGrpcClient.java
package com.herojoon.grpc.unary.client;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

public class HelloGrpcClient {

    public static void main(String[] args) {
        // 스텁에 대한 gRPC 채널을 생성하고 연결하려는 서버 주소와 포트를 지정
        // 채널을 생성하려면 ManagedChannelBuilder를 사용합니다.
        ManagedChannel channel = ManagedChannelBuilder
                .forAddress("localhost", 8080)
                .usePlaintext()
                .build();

        HelloGrpcClientCaller helloGrpcClientCaller = new HelloGrpcClientCaller(channel);
        helloGrpcClientCaller.sendUnaryBlocking();
    }
}
// HelloGrpcClientCaller.java
package com.herojoon.grpc.unary.client;

import com.herojoon.grpc.HelloGrpc;
import com.herojoon.grpc.HelloRequest;
import com.herojoon.grpc.HelloResponse;
import io.grpc.ManagedChannel;
import lombok.extern.slf4j.Slf4j;

/**
 * 클라이언트 동작 코드
 */
@Slf4j  // 로그를 사용하기 위해 추가
public class HelloGrpcClientCaller {

    private ManagedChannel channel;
    private HelloGrpc.HelloBlockingStub blockingStub;

    public HelloGrpcClientCaller(ManagedChannel chl) {
        channel = chl;
        blockingStub = HelloGrpc.newBlockingStub(channel);
    }

    public void sendUnaryBlocking() {
        log.info(">>> Send Call");

        HelloResponse response = blockingStub.sayHello(HelloRequest.newBuilder()
                        .setName("herojoon")  // .proto에 정의한 request value
                        .setAge(10)  // .proto에 정의한 request value
                        .setMessage("Hello, Glad to meet you.")  // .proto에 정의한 request value
                        .build());

        log.info(">>> Response Data => [%s]".formatted(response));
    }
}

 

  • 단항 RPC - 서버 코드
// HelloGrpcServer.java
package com.herojoon.grpc.unary.server;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class HelloGrpcServer {

    public static void main(String[] args) throws IOException, InterruptedException {
        // 클라이언트 요청을 수신하는데 사용할 포트 지정
        Server grpcServer = ServerBuilder
                .forPort(8080)
                .addService(new HelloGrpcServiceImpl())  // 서비스 구현 클래스의 인스턴스를 생성하여 .addService() 메서드에 전달
                .build();

        grpcServer.start();
        grpcServer.awaitTermination();
    }
}
// HelloGrpcServiceImpl.java
package com.herojoon.grpc.unary.server;

import com.herojoon.grpc.HelloGrpc;
import com.herojoon.grpc.HelloRequest;
import com.herojoon.grpc.HelloResponse;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

/**
 * 서버 동작 코드 (클라이언트에서 요청을 받음)
 */
@Slf4j  // 로그를 사용하기 위해 추가
public class HelloGrpcServiceImpl extends HelloGrpc.HelloImplBase {

    // Unary RPC (단방향 RPC)
    @Override
    public void sayHello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
        log.info("=== Get Request");
        log.info("=== Request Data => [%s]".formatted(request));

        // 응답 데이터 셋업
        HelloResponse response = HelloResponse.newBuilder()
                .setGreetingMessage("Hello, %s".formatted(request.getName()))  // .proto에 정의한 response value
                .setQuestionMessage("What do you do for fun?")  // .proto에 정의한 response value
                .build();

        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }
}

 

● 예제2. Server Streaming RPC (서버 스트리밍 RPC)

  • 서버 스트리밍 RPC - 클라이언트 코드
// HelloGrpcClient.java
package com.herojoon.grpc.serverstreaming.client;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

public class HelloGrpcClient {

    public static void main(String[] args) {
        // 스텁에 대한 gRPC 채널을 생성하고 연결하려는 서버 주소와 포트를 지정
        // 채널을 생성하려면 ManagedChannelBuilder를 사용합니다.
        ManagedChannel channel = ManagedChannelBuilder
                .forAddress("localhost", 8080)
                .usePlaintext()
                .build();

        HelloGrpcClientCaller helloGrpcClientCaller = new HelloGrpcClientCaller(channel);
        helloGrpcClientCaller.sendServerStreamingBlocking();
    }
}
// HelloGrpcClientCaller.java
package com.herojoon.grpc.serverstreaming.client;

import com.herojoon.grpc.HelloGrpc;
import com.herojoon.grpc.HelloRequest;
import com.herojoon.grpc.HelloResponse;
import io.grpc.ManagedChannel;
import lombok.extern.slf4j.Slf4j;

import java.util.Iterator;

/**
 * 클라이언트 동작 코드
 */
@Slf4j  // 로그를 사용하기 위해 추가
public class HelloGrpcClientCaller {

    private ManagedChannel channel;
    private HelloGrpc.HelloBlockingStub blockingStub;
    
    // 생성한 채널로 stub 생성
    public HelloGrpcClientCaller(ManagedChannel chl) {
        channel = chl;
        blockingStub = HelloGrpc.newBlockingStub(channel);
    }

    public void sendServerStreamingBlocking() {
        log.info(">>> Send Call");

        // 요청은 하나만 보내고, 여러 개의 응답을 받는다.
        Iterator<HelloResponse> helloResponseIterator = blockingStub.lotsOfReplies(HelloRequest.newBuilder()
                .setName("herojoon")  // .proto에 정의한 request value
                .setAge(10)  // .proto에 정의한 request value
                .setMessage("Hello, Glad to meet you.")  // .proto에 정의한 request value
                .build());

        // 응답 출력
        helloResponseIterator.forEachRemaining(helloResponse -> {
            log.info(">>> Response Data => [%s]".formatted(helloResponse));
        });
    }
}

 

  • 서버 스트리밍 RPC - 서버 코드
// HelloGrpcServer.java
package com.herojoon.grpc.serverstreaming.server;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class HelloGrpcServer {

    public static void main(String[] args) throws IOException, InterruptedException {
        // 클라이언트 요청을 수신하는데 사용할 포트 지정
        Server grpcServer = ServerBuilder
                .forPort(8080)
                .addService(new HelloGrpcServiceImpl())  // 서비스 구현 클래스의 인스턴스를 생성하여 .addService() 메서드에 전달
                .build();

        grpcServer.start();
        grpcServer.awaitTermination();
    }
}

 

// HelloGrpcServiceImpl.java
package com.herojoon.grpc.serverstreaming.server;

import com.herojoon.grpc.HelloRequest;
import com.herojoon.grpc.HelloResponse;
import com.herojoon.grpc.HelloGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

/**
 * 서버 동작 코드 (클라이언트에서 요청을 받음)
 */
@Slf4j  // 로그를 사용하기 위해 추가
public class HelloGrpcServiceImpl extends HelloGrpc.HelloImplBase {

    private String[] questionMessages = {
            "What do you do for fun?",
            "What kind of books do you like?",
            "What kind of food do you like?",
            "What is your favoite color?",
            "What is your favoite sports?"
    };

    // Server Streaming RPC (서버 스트리밍 RPC)
    @Override
    public void lotsOfReplies(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
        log.info("=== Get Request");
        log.info("=== Request Data => [%s]".formatted(request));

        // 응답 데이터 셋업
        for (String questionMessage: questionMessages) {
            HelloResponse response = HelloResponse.newBuilder()
                    .setGreetingMessage("Hello, %s".formatted(request.getName()))  // .proto에 정의한 response value
                    .setQuestionMessage(questionMessage)  // .proto에 정의한 response value
                    .build();

            responseObserver.onNext(response);
        }

        responseObserver.onCompleted();
    }
}

 

예제3. Client Streaming RPC (클라이언트 스트리밍 RPC)

  • 클라이언트 스트리밍 RPC - 클라이언트 코드
// HelloGrpcClient.java
package com.herojoon.grpc.clientstreaming.client;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

public class HelloGrpcClient {

    public static void main(String[] args) throws InterruptedException {
        // 스텁에 대한 gRPC 채널을 생성하고 연결하려는 서버 주소와 포트를 지정
        // 채널을 생성하려면 ManagedChannelBuilder를 사용합니다.
        ManagedChannel channel = ManagedChannelBuilder
                .forAddress("localhost", 8080)
                .usePlaintext()
                .build();

        HelloGrpcClientCaller helloGrpcClientCaller = new HelloGrpcClientCaller(channel);
        helloGrpcClientCaller.sendClientStremingAsync();
    }
}
// HelloGrpcClientCaller.java
package com.herojoon.grpc.clientstreaming.client;

import com.herojoon.grpc.HelloGrpc;
import com.herojoon.grpc.HelloRequest;
import com.herojoon.grpc.HelloResponse;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 클라이언트 동작 코드
 */
@Slf4j  // 로그를 사용하기 위해 추가
public class HelloGrpcClientCaller {

    private ManagedChannel channel;
    private HelloGrpc.HelloStub asyncStub;

    private String[] names = {
            "herojoon",
            "yejin",
            "jonghoon",
            "sophia",
            "woojin"
    };

    private int minAge = 1;  // 1살 부터

    private int maxAge = 100;  // 100살 까지

    public HelloGrpcClientCaller(ManagedChannel chl) {
        channel = chl;
        asyncStub = HelloGrpc.newStub(channel);
    }

    public void sendClientStremingAsync() throws InterruptedException {
        log.info(">>> Send Call");

        // 요청 데이터 목록 생성
        List<HelloRequest> helloRequestList = new ArrayList<>();
        for (String name: names) {
            helloRequestList.add(
                    HelloRequest.newBuilder()
                            .setName(name)  // .proto에 정의한 request value
                            .setAge(getRandomAge())  // .proto에 정의한 request value
                            .setMessage("Hello, Glad to meet you.")  // .proto에 정의한 request value
                            .build()
            );
        }

        final CountDownLatch finishLatch = new CountDownLatch(1);
        StreamObserver<HelloResponse> responseObserver = new StreamObserver<HelloResponse>() {
            @Override
            public void onNext(HelloResponse helloResponse) {
                // 서버 응답 출력
                log.info(">>> Response Data => [%s]".formatted(helloResponse));
            }

            @Override
            public void onError(Throwable t) {  // 스트림에서 종료 오류 발생 시 수신
                Status status = Status.fromThrowable(t);
                log.warn(">>> Warning => [%s]".formatted(status));
                finishLatch.countDown();
            }

            @Override
            public void onCompleted() {  // 스트림이 성공적으로 완료되었다고 응답 받음
                log.info(">>> Finished.");
                finishLatch.countDown();
            }
        };

        StreamObserver<HelloRequest> requestObserver = asyncStub.lotsOfGreetings(responseObserver);
        try {
            for (HelloRequest req: helloRequestList) {
                requestObserver.onNext(req);
                log.info(">>> Req Name: " + req.getName());

                Thread.sleep(1000);
                if (finishLatch.getCount() == 0) {  // 오류 발생 시 다음 코드를 전송하더라도 처리되지 않기 때문에 전송하지 않도록 처리
                    // RPC completed or errored before we finished sending.
                    // Sending further requests won't error, but they will just be thrown away.
                    log.info(">>> Stop the next request");
                    return;
                }
            }
        } catch (RuntimeException e) {
            // Cancel RPC
            requestObserver.onError(e);
            throw e;
        }
        // Mark the end of requests
        requestObserver.onCompleted();

        // Receiving happens asynchronously
        finishLatch.await(1, TimeUnit.MINUTES);

        log.info(">>> End.");
    }

    // min 나이부터 max 나이까지 랜덤 값 return
    private int getRandomAge() {
        return (int) (Math.random() * (maxAge - minAge + 1)) + minAge;
    }
}

 

  • 클라이언트 스트리밍 RPC - 서버 코드
// HelloGrpcServer.java
package com.herojoon.grpc.clientstreaming.server;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class HelloGrpcServer {

    public static void main(String[] args) throws IOException, InterruptedException {
        // 클라이언트 요청을 수신하는데 사용할 포트 지정
        Server grpcServer = ServerBuilder
                .forPort(8080)
                .addService(new HelloGrpcServiceImpl())  // 서비스 구현 클래스의 인스턴스를 생성하여 .addService() 메서드에 전달
                .build();

        grpcServer.start();
        grpcServer.awaitTermination();
    }
}
// HelloGrpcServiceImpl.java
package com.herojoon.grpc.clientstreaming.server;

import com.herojoon.grpc.HelloGrpc;
import com.herojoon.grpc.HelloRequest;
import com.herojoon.grpc.HelloResponse;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;

/**
 * 서버 동작 코드 (클라이언트에서 요청을 받음)
 */
@Slf4j  // 로그를 사용하기 위해 추가
public class HelloGrpcServiceImpl extends HelloGrpc.HelloImplBase {

    // Client Streaming RPC (클라이언트 스트리밍 RPC)
    @Override
    public StreamObserver<HelloRequest> lotsOfGreetings(StreamObserver<HelloResponse> responseObserver) {
        return new StreamObserver<HelloRequest>() {

            List<String> nameList = new ArrayList<>();

            int i = 0;  // 강제 오류 발생 테스트를 위한 변수 (바로 오류를 발생시키지 않고 중간에 오류 발생시키기 위해 사용)

            // 요청 데이터에 대한 처리
            @Override
            public void onNext(HelloRequest helloRequest) {
                nameList.add(helloRequest.getName());  // 응답 시 전달을 위해 요청 데이터의 name 값을 nameList에 담는다.
                log.info("=== name is %s [age: %d]".formatted(helloRequest.getName(), helloRequest.getAge()));

                /* 강제 오류 발생 테스트를 위한 코드 START (오류 테스트 시 주석 해제하여 사용) */
                /*i++;
                if (i == 2) {
                    try {
                        throw new Exception("Error Error Error!!");
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }*/
                /* 강제 오류 발생 테스트를 위한 코드 END */
            }

            // 오류 처리
            @Override
            public void onError(Throwable t) {
                log.warn("=== Warning => [%s]".formatted(t.getMessage()));
            }

            // 응답 전달
            @Override
            public void onCompleted() {
                responseObserver.onNext(HelloResponse.newBuilder()
                        .setGreetingMessage("Hello, [%s]".formatted(String.join(",", nameList)))
                        .setQuestionMessage("What do you do for fun?")
                        .build());

                responseObserver.onCompleted();
            }
        };
    }
}

 

예제4. Bidirectional Streaming RPC (양방향 스트리밍 RPC)

  • 양방향 스트리밍 RPC - 클라이언트 코드
// HelloGrpcClient.java
package com.herojoon.grpc.bidirectional.client;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import lombok.extern.slf4j.Slf4j;

@Slf4j  // 로그를 사용하기 위해 추가
public class HelloGrpcClient {

    public static void main(String[] args) throws InterruptedException {
        // 스텁에 대한 gRPC 채널을 생성하고 연결하려는 서버 주소와 포트를 지정
        // 채널을 생성하려면 ManagedChannelBuilder를 사용합니다.
        ManagedChannel channel = ManagedChannelBuilder
                .forAddress("localhost", 8080)
                .usePlaintext()
                .build();

        HelloGrpcClientCaller helloGrpcClientCaller = new HelloGrpcClientCaller(channel);
        helloGrpcClientCaller.sendBidirectionalStremingAsync();
    }
}
// HelloGrpcClientCaller.java
package com.herojoon.grpc.bidirectional.client;

import com.herojoon.grpc.HelloGrpc;
import com.herojoon.grpc.HelloRequest;
import com.herojoon.grpc.HelloResponse;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 클라이언트 동작 코드
 */
@Slf4j  // 로그를 사용하기 위해 추가
public class HelloGrpcClientCaller {

    private ManagedChannel channel;
    private HelloGrpc.HelloStub asyncStub;

    private String[] names = {
            "herojoon",
            "yejin",
            "jonghoon",
            "sophia",
            "woojin"
    };

    private int minAge = 1;  // 1살 부터

    private int maxAge = 100;  // 100살 까지

    public HelloGrpcClientCaller(ManagedChannel chl) {
        channel = chl;
        asyncStub = HelloGrpc.newStub(channel);
    }

    public void sendBidirectionalStremingAsync() throws InterruptedException {
        log.info(">>> Send Call");

        // 요청 데이터 목록 생성
        List<HelloRequest> helloRequestList = new ArrayList<>();
        for (String name: names) {
            helloRequestList.add(
                    HelloRequest.newBuilder()
                            .setName(name)  // .proto에 정의한 request value
                            .setAge(getRandomAge())  // .proto에 정의한 request value
                            .setMessage("Hello, Glad to meet you.")  // .proto에 정의한 request value
                            .build()
            );
        }

        final CountDownLatch finishLatch = new CountDownLatch(1);
        StreamObserver<HelloResponse> responseObserver = new StreamObserver<HelloResponse>() {
            @Override
            public void onNext(HelloResponse helloResponse) {
                // 서버 응답 출력
                log.info(">>> Response Data => [%s]".formatted(helloResponse));
            }

            @Override
            public void onError(Throwable t) {  // 스트림에서 종료 오류 발생 시 수신
                Status status = Status.fromThrowable(t);
                log.warn(">>> Warning => [%s]".formatted(status));
                finishLatch.countDown();
            }

            @Override
            public void onCompleted() {  // 스트림이 성공적으로 완료되었다고 응답 받음
                log.info(">>> Finished.");
                finishLatch.countDown();
            }
        };

        StreamObserver<HelloRequest> requestObserver = asyncStub.bidiHello(responseObserver);
        try {
            for (HelloRequest req: helloRequestList) {
                requestObserver.onNext(req);
            }
        } catch (RuntimeException e) {
            // Cancel RPC
            requestObserver.onError(e);
            throw e;
        }
        // Mark the end of requests
        requestObserver.onCompleted();

        // Receiving happens asynchronously
        finishLatch.await(1, TimeUnit.MINUTES);
    }

    // min 나이부터 max 나이까지 랜덤 값 return
    private int getRandomAge() {
        return (int) (Math.random() * (maxAge - minAge + 1)) + minAge;
    }
}

 

  • 양방향 스트리밍 RPC - 서버 코드
// HelloGrpcServer.java
package com.herojoon.grpc.bidirectional.server;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class HelloGrpcServer {

    public static void main(String[] args) throws IOException, InterruptedException {
        // 클라이언트 요청을 수신하는데 사용할 포트 지정
        Server grpcServer = ServerBuilder
                .forPort(8080)
                .addService(new HelloGrpcServiceImpl())  // 서비스 구현 클래스의 인스턴스를 생성하여 .addService() 메서드에 전달
                .build();

        grpcServer.start();
        grpcServer.awaitTermination();
    }
}
// HelloGrpcServiceImpl.java
package com.herojoon.grpc.bidirectional.server;

import com.herojoon.grpc.HelloGrpc;
import com.herojoon.grpc.HelloRequest;
import com.herojoon.grpc.HelloResponse;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

/**
 * 서버 동작 코드 (클라이언트에서 요청을 받음)
 */
@Slf4j  // 로그를 사용하기 위해 추가
public class HelloGrpcServiceImpl extends HelloGrpc.HelloImplBase {

    // Bidirectional Streaming RPC (양방향 스트리밍 RPC)
    @Override
    public StreamObserver<HelloRequest> bidiHello(StreamObserver<HelloResponse> responseObserver) {
        return new StreamObserver<HelloRequest>() {
            @Override
            public void onNext(HelloRequest helloRequest) {
                log.info("=== name is %s".formatted(helloRequest.getName()));

                responseObserver.onNext(HelloResponse.newBuilder()
                        .setGreetingMessage("Hello, %s".formatted(helloRequest.getName()))
                        .setQuestionMessage("What do you do for fun?")
                        .build());
            }

            @Override
            public void onError(Throwable t) {
                log.warn("=== Warning => [%s]".formatted(t.getMessage()));
            }

            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }
}

 

● 예제 실행해보기 ( 서버 애플리케이션 실행 후 클라이언트 애플리케이션 실행)

  • 서버 애플리케이션 실행

서버 애플리케이션 실행

 

서버 애플리케이션의 main() task가 실행 완료되면 클라이언트 애플리케이션을 실행합니다.

서버 애플리케이션 실행 완료

 

  • 클라이언트 애플리케이션 실행

클라이언트 애플리케이션 실행

 

 

클라이언트 애플리케이션이 실행되며 클라이언트에서 서버로 요청 후 서버에서 응답을 전달해줍니다.

클라이언트 애플리케이션 실행 완료

 

서버 애플리케이션에서도 클라이언트 요청을 받아 처리한 로그를 확인할 수 있습니다.

서버 애플리케이션 로그

 

 

위 gRPC 예제에 나오는 Java CountDownLatch에 대해서는 아래 포스팅으로 작성해놓았습니다.

https://herojoon-dev.tistory.com/216

 

Java CountDownLatch를 이용한 Thread 대기 예제

목표Java에서 제공하는 CountDownLatch를 이해하고 CountDownLatch를 이용하여 Thread 대기 예제 해보기 이해하기CountDownLatch란?: CountDownLatch는 Java에서 일련의 스레드 작업이 끝난 후 다음 작업이 진행될 수

herojoon-dev.tistory.com

 

반응형
Comments