com.github.cherrythefatbunny:packages

reactive support for dubbo based on reactor

License

License

GroupId

GroupId

com.github.cherrythefatbunny
ArtifactId

ArtifactId

packages
Last Version

Last Version

2.10.10.10
Release Date

Release Date

Type

Type

jar
Description

Description

reactive support for dubbo based on reactor
Project URL

Project URL

https://github.com/cherrythefatbunny/reactive-dubbo/packages/
Source Code Management

Source Code Management

https://github.com/cherrythefatbunny/reactive-dubbo/packages

Download packages

How to add to project

<!-- https://jarcasting.com/artifacts/com.github.cherrythefatbunny/packages/ -->
<dependency>
    <groupId>com.github.cherrythefatbunny</groupId>
    <artifactId>packages</artifactId>
    <version>2.10.10.10</version>
</dependency>
// https://jarcasting.com/artifacts/com.github.cherrythefatbunny/packages/
implementation 'com.github.cherrythefatbunny:packages:2.10.10.10'
// https://jarcasting.com/artifacts/com.github.cherrythefatbunny/packages/
implementation ("com.github.cherrythefatbunny:packages:2.10.10.10")
'com.github.cherrythefatbunny:packages:jar:2.10.10.10'
<dependency org="com.github.cherrythefatbunny" name="packages" rev="2.10.10.10">
  <artifact name="packages" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.github.cherrythefatbunny', module='packages', version='2.10.10.10')
)
libraryDependencies += "com.github.cherrythefatbunny" % "packages" % "2.10.10.10"
[com.github.cherrythefatbunny/packages "2.10.10.10"]

Dependencies

There are no dependencies for this project. It is a standalone project that does not depend on any other jars.

Project Modules

There are no modules declared in this project.

logo

maven license

Reactive support for Dubbo based on REACTOR

Getting started

Install

# git clone https://github.com/cherrythefatbunny/reactive-dubbo.git
# cd reactive-dubbo
# mvn clean install

Run redis and zookeeper

# cd demo/redis-and-zookeeper
# nohup mvn spring-boot:run &

Run Demo Provider

# cd ../provider
# nohup mvn spring-boot:run &

Run Demo Consumer

# cd ../consumer
# nohup mvn spring-boot:run &

Getting involved

Maven dependency

Both provider and consumer should add reactive-dubbo-starter dependency

<dependency>
    <groupId>com.github.cherrythefatbunny</groupId>
    <artifactId>reactive-dubbo-starter</artifactId>
    <version>LATESTVERSION</version>
</dependency>

Service definition

As provider side,you can define reactive services without specifying a reactive proxy factory(the default proxy factory has been changed to reactivejavassist by ReactiveDefaultPropertiesEnvironmentPostProcessor.java )

@Service
public class ReactiveServiceImpl implements ReactiveService {
}

Architecture

Dubbo with Reactive Dubbo

To make dubbo reactive,Reactive Dubbo will replace invokers and proxies with the reactive ones by specifying a reactive proxy factory architecture

Reactive proxy

A reactive proxy (the reactiveInvokerInvocationHandler actually works) wraps a formal functional invocation into a reactive publisher

ReactiveInvokerInvocationHandler.java:

public class ReactiveInvokerInvocationHandler extends InvokerInvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //if the invocation returns a publisher,make a publisher wrapping the real invocation
        Class returnType = method.getReturnType();
        if (Publisher.class.isAssignableFrom(returnType)) {
            RpcInvocation invocation = createInvocation(method, args);
            if (Mono.class.isAssignableFrom(returnType)) {
                invocation.setAttachment(KEY_PUBLISHER_TYPE,VALUE_PUBLISHER_MONO);
                return Mono.create(monoSink -> {
                    try {
                        CompletableFuture<Object> future
                                = (CompletableFuture<Object>) invoker.invoke(invocation).recreate();
                        future.whenComplete((v, t) -> {
                            if (t != null) {
                                monoSink.error(t);
                            } else {
                                monoSink.success(v);
                            }
                        });
                    } catch (Throwable throwable) {
                        if(LOGGER.isErrorEnabled()) {
                            LOGGER.error("mono invocation",throwable);
                        }
                        monoSink.error(throwable);
                    }
                });
            } else if (Flux.class.isAssignableFrom(returnType)) {
                invocation.setAttachment(KEY_PUBLISHER_TYPE,VALUE_PUBLISHER_FLUX);
                return Flux.create(fluxSink -> {
                    try {
                        CompletableFuture<Object> future
                                = (CompletableFuture<Object>) invoker.invoke(invocation).recreate();
                        future.whenComplete((v, t) -> {
                            if (t != null) {
                                fluxSink.error(t);
                            } else if (v instanceof List){
                                List list = (List) v;
                                if(list!=null) {
                                    list.forEach(fluxSink::next);
                                }
                                fluxSink.complete();
                            } else {
                                Exception ex = new IllegalArgumentException("unexpected return type:"+v.getClass());
                                fluxSink.error(ex);
                            }
                        });
                    } catch (Throwable throwable) {
                        if(LOGGER.isErrorEnabled()) {
                            LOGGER.error("flux invocation",throwable);
                        }
                        fluxSink.error(throwable);
                    }
                });
            } else {
                //TODO other publishers support
                throw new IllegalArgumentException(
                        String.format("%s not supported now", method.getReturnType().getSimpleName()));
            }
        }
        return super.invoke(proxy, method, args);
    }
}

Reactive invoker

A reactive invoker intercepts a formal remote request and convert its result into publisher's parameterized type(e.g. Mono<String> -> String,Flux<String> -> List<String>).The invoker combines reactive provider apis with the CompletableFuture result which is greatly improved since Dubbo 2.7.0 AbstractReactiveProxyInvoker.java:

public abstract class AbstractReactiveProxyInvoker<T> extends AbstractProxyInvoker<T> {
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        String publisher = invocation.getAttachment(KEY_PUBLISHER_TYPE);
        if(StringUtils.isBlank(publisher)) {
            return super.invoke(invocation);
        }
        RpcContext rpcContext = RpcContext.getContext();
        try {
            Object obj = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            Mono mono = null;
            if(publisher.equals(VALUE_PUBLISHER_MONO)) {
                mono = (Mono) obj;
            } else if(publisher.equals(VALUE_PUBLISHER_FLUX)) {
                Flux<Object> flux = (Flux<Object>) obj;
                mono = flux.collect(ArrayList::new,ArrayList::add);
            }
            if(mono==null) {
                CompletableFuture future = new CompletableFuture();
                Exception ex = new IllegalArgumentException("unexpected publisher type:"+publisher);
                future.completeExceptionally(ex);
                return new AsyncRpcResult(future);
            } else {
                return new AsyncRpcResult(mono.toFuture());
            }
        } catch (InvocationTargetException e) {
            // TODO async throw exception before async thread write back, should stop asyncContext
            if (rpcContext.isAsyncStarted() && !rpcContext.stopAsync()) {
                LOGGER.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
            }
            return new RpcResult(e.getTargetException());
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}

RpcUtilsCracker

Since reactive responses will be the publisher's parameterized type which can cause HessianProtocolException ,the consumer side should convert the decode types.Here is my solution: crack the RpcUtils.getReturnTypes method with javassist.

RpcUtilsCracker.java:

public class RpcUtilsCracker {
    private static final Logger LOGGER = LoggerFactory.getLogger(RpcUtilsCracker.class);
    private static final String RPCUTILS_CLASS_NAME = "com.alibaba.dubbo.rpc.support.RpcUtils";
    public static void crack() {
        try {
            ClassPool classPool = ClassPool.getDefault();
            classPool.appendClassPath(new LoaderClassPath(RpcUtilsCracker.class.getClassLoader()));
            CtClass ctClass = classPool.get(RPCUTILS_CLASS_NAME);
            CtMethod ctMethod = ctClass.getDeclaredMethod("getReturnTypes");
            //rename method `getReturnTypes` to `getReturnTypes0`
            ctClass.removeMethod(ctMethod);
            ctMethod.setName("getReturnTypes0");
            ctClass.addMethod(ctMethod);
            //add new method `getReturnTypes` according to RpcUtilsCracker.getReturnTypes
            CtClass ctClass1 = classPool.get(RpcUtilsCracker.class.getName());
            ctMethod = new CtMethod(ctClass1.getDeclaredMethod("getReturnTypes"),ctClass,null);
            ctClass.addMethod(ctMethod);
            ctClass.toClass();
        } catch (NotFoundException|CannotCompileException e) {
            LOGGER.error("crack RpcUtils failed",e);
        }
    }
    public static Type[] getReturnTypes0(Invocation invocation) {
        return null;
    }

    /**
     * template method of RpcUtils.getReturnTypes
     * @param invocation the invocation instance of the interface
     * @return return types of the invokee
     * */
    public static Type[] getReturnTypes(Invocation invocation) {
        Type[] types = getReturnTypes0(invocation);
        if(types==null)return null;
        if (types[0] == Mono.class) {
            Class<Mono> returnType = (Class<Mono>) types[0];
            ParameterizedType genericReturnType = (ParameterizedType) types[1];
            return new Type[]{genericReturnType.getActualTypeArguments()[0]};
        }
        if (types[0] == Flux.class) {
            Class<Flux> returnType = (Class<Flux>) types[0];
            ParameterizedType genericReturnType = (ParameterizedType) types[1];
            genericReturnType = ParameterizedTypeImpl.make(List.class,new Type[]{genericReturnType.getActualTypeArguments()[0]},null);
            return new Type[]{List.class, genericReturnType};
        }
        return types;
    }
}

Versions

Version
2.10.10.10