ActiveMQ-based Protobuffer RPC

Protobuffer RPC communication library using ActiveMQ.

License

License

BSD 3-Clause
Categories

Categories

Protobuf Data Data Structures
GroupId

GroupId

com.mediamiser
ArtifactId

ArtifactId

protobuf-activemq-rpc
Last Version

Last Version

1.0.6
Release Date

Release Date

Type

Type

jar
Description

Description

ActiveMQ-based Protobuffer RPC
Protobuffer RPC communication library using ActiveMQ.

Download protobuf-activemq-rpc

How to add to project

<!-- https://jarcasting.com/artifacts/com.mediamiser/protobuf-activemq-rpc/ -->
<dependency>
    <groupId>com.mediamiser</groupId>
    <artifactId>protobuf-activemq-rpc</artifactId>
    <version>1.0.6</version>
</dependency>
// https://jarcasting.com/artifacts/com.mediamiser/protobuf-activemq-rpc/
implementation 'com.mediamiser:protobuf-activemq-rpc:1.0.6'
// https://jarcasting.com/artifacts/com.mediamiser/protobuf-activemq-rpc/
implementation ("com.mediamiser:protobuf-activemq-rpc:1.0.6")
'com.mediamiser:protobuf-activemq-rpc:jar:1.0.6'
<dependency org="com.mediamiser" name="protobuf-activemq-rpc" rev="1.0.6">
  <artifact name="protobuf-activemq-rpc" type="jar" />
</dependency>
@Grapes(
@Grab(group='com.mediamiser', module='protobuf-activemq-rpc', version='1.0.6')
)
libraryDependencies += "com.mediamiser" % "protobuf-activemq-rpc" % "1.0.6"
[com.mediamiser/protobuf-activemq-rpc "1.0.6"]

Dependencies

compile (6)

Group / Artifact Type Version
com.google.protobuf : protobuf-java jar 2.5.0
org.apache.activemq : activemq-client jar 5.11.1
com.google.guava : guava jar 18.0
org.apache.commons : commons-lang3 jar 3.3.2
org.apache.geronimo.specs : geronimo-jms_1.1_spec jar 1.1.1
org.slf4j : slf4j-api jar 1.7.13

test (8)

Group / Artifact Type Version
org.apache.activemq : activemq-broker jar 5.11.1
com.mediamiser : avogadro jar 1.0.3
junit : junit jar 4.12
org.mockito : mockito-core jar 1.10.8
io.takari.junit : takari-cpsuite jar 1.2.7
net.sf.jopt-simple : jopt-simple jar 4.7
log4j : log4j jar 1.2.17
org.slf4j : slf4j-log4j12 jar 1.7.13

Project Modules

There are no modules declared in this project.

Protobuf-ActiveMQ-RPC

![Build Status] (http://img.shields.io/travis/MediaMiser/protobuf-activemq-rpc/master.svg?style=flat-square) ![Coverage Status] (http://img.shields.io/coveralls/MediaMiser/protobuf-activemq-rpc/master.svg?style=flat-square) ![Maven Central] (https://maven-badges.herokuapp.com/maven-central/com.mediamiser/protobuf-activemq-rpc/badge.svg?style=flat-square)

This library provides the classes necessary to create RPC clients and servers defined as [Google Protocol Buffer Services] (https://developers.google.com/protocol-buffers/docs/proto#services) using Apache ActiveMQ to communicate and act as a fault-tolerant message broker so that clients and servers do not need to communicate directly. View the [API documentation] (https://mediamiser.github.io/protobuf-activemq-rpc/apidocs/index.html) for more details.

This project was inspired/based in small part upon the protobuf-socket-rpc (MIT License) and protobuf-rpc-pro (Apache License 2.0) projects.

Requirements

To build this project, your development environment must have installed:

This project is designed to be used with:

Usage

To use this in your project, you should have an [Apache ActiveMQ] (http://activemq.apache.org/) cluster to connect to and add the following dependency to your project's pom.xml file:

<dependency>
  <groupId>com.mediamiser</groupId>
  <artifactId>protobuf-activemq-rpc</artifactId>
  <version>1.0.6</version>
</dependency>

RPCs and their input/output messages are defined in a language-agnostic manner using Google Protocol Buffers . An example can be found in tests as [src/test/proto/pings.proto] (src/test/proto/pings.proto).

package com.mediamiser.service;

option java_generic_services = true;
option java_package = "com.mediamiser.service";
option java_outer_classname = "PingsProtocol";

message Host {
  required string ip = 1;
  optional string hostname = 2;
}

message Pings {
  repeated uint32 time_ms = 1;
}

service PingsServiceV1 {
  rpc ping(Host) returns (Pings);
}

...

This proto file can be compiled using Maven and [maven-protoc-plugin] (http://sergei-ivanov.github.io/maven-protoc-plugin/index.html) (see pom.xml for example usage). These plugins require that the protoc compiler is executable from your shell and on your $PATH (on Ubuntu you can install it using [sudo install-protobuff.sh] (sbin/install-prodobuff.sh)). Running mvn compile will generate a PingsProtocol.java file providing messages and RPC stubs for PingsServiceV1.

Useful links

When writing your RPCs and messages, take into account the protobuf:

Client

A client that includes this library and has built the proto files for PingsServiceV1 can try to reach a server hosting this service using the code below. A full client is demonstrated in [ExampleClient.java] (src/test/java/com/mediamiser/service/ExampleClient.java).

Note that [ActiveMqChannel] (https://mediamiser.github.io/protobuf-activemq-rpc/apidocs/com/mediamiser/service/ActiveMqChannel.html) is a heavyweight object and it should be recycled if possible e.g. by using an [object pool] (https://commons.apache.org/proper/commons-pool/).

// Connect to an ActiveMQ broker (in this case a local one on your development
// machine)
final Connection connection =
    new ActiveMQConnectionFactory("tcp://localhost:61616")
        .createConnection();

// Start the connection to allow it to be used to send RPC calls
connection.start();

// Specify which services are available through this ActiveMQ broker
final Set<ServiceDescriptor> availableServices =
    Sets.newHashSet(PingsProtocol.PingsServiceV1.getDescriptor());

// Specify the time to live for RPC calls in milliseconds.  ActiveMQ and the
// server will drop messages that expire, and clients that are blocking will
// block for this amount of time before reporting failure. This can be set to
// ActiveMqChannel.DISABLE_EXPIRY to disable message expiry for a specific
// client (call forwarders on remote machines all respect each channel's
// individual expiry time).
final long timeToLiveMs = 1000;

// Create a channel to the available services with a specific call expiry time
final ActiveMqChannel channel =
    new ActiveMqChannel(connection, availableServices, timeToLiveMs);

// Create a controller to get status and error information
final RpcController controller = new ActiveMqController();

// Define a request
final Host request = Host.newBuilder()
        .setIp("127.0.0.1")
        .setHostname("42")
        .build();

// Synchronously make 1 call and wait for its completion
final Pings responseTimes =
    PingsProtocol.PingsServiceV1.newBlockingStub(channel)
        .ping(controller, request);

// Clean up resources
channel.close();
connection.close();

Server

A server can create a provider for a service and extends the automatically generated PingsServiceV1 class. Providers are instances which implement remote procedures and they are executed by [ActiveMqCallForwarder] (https://mediamiser.github.io/protobuf-activemq-rpc/apidocs/com/mediamiser/service/ActiveMqCallForwarder.html) instances (each of which can be thought of as a thread). Providers should be thread safe if you are going to share one provider instance with many forwarders.

// Copyright (c) 2014 MediaMiser Ltd. All rights reserved.
package com.mediamiser.service.providers;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.mediamiser.service.PingsProtocol;
import com.mediamiser.service.PingsProtocol.Host;
import com.mediamiser.service.PingsProtocol.Pings;

/**
 * This provider functions properly, but returns the requested hostname as a
 * response time to help indicate that the correct response was delivered to
 * the correct caller.
 *
 * @author Chris Fournier <[email protected]>
 */
public class PingsServiceProvider extends PingsProtocol.PingsServiceV1 {
    private static final Logger LOG =
        LoggerFactory.getLogger(PingsServiceProvider.class);

    @Override
    public void ping(final RpcController controller,
                     final Host request,
                     final RpcCallback<Pings> done) {
        LOG.trace("Got request to ping {} ({})",
            request.getIp(),
            request.getHostname());

        // Add fake times (in this case, the hostname, so that we can make sure
        // that the correct message got sent back to the correct caller during
        // tests)
        final Pings.Builder responseTimes = Pings.newBuilder();
        responseTimes.addTimeMs(Integer.parseInt(request.getHostname()));

        // Return the response times via the callback
        done.run(responseTimes.build());
    }

}

This provider alone will not serve requests; it must be connect to at least one [ActiveMqCallForwarder] (https://mediamiser.github.io/protobuf-activemq-rpc/apidocs/com/mediamiser/service/ActiveMqCallForwarder.html) which has a live connection to an ActiveMQ broker (as demonstrated superficially below and fully in [ExampleServer.java] (src/test/java/com/mediamiser/service/ExampleServer.java)).

// Connect to ActiveMQ
final Connection connection =
    new ActiveMQConnectionFactory("tcp://localhost:61616")
        .createConnection();

// Construct services to host
final Set<Service> availableServices =
    Sets.newHashSet(new PingsServiceProvider());

// Create a forwarder (akin to a thread) to host this service and store it
final ActiveMqCallForwarder forwarder =
    new ActiveMqCallForwarder(connection, availableServices);

// Begin processing calls
connection.start();

...

// Cleanup resources
connection.stop();
forwarder.close();
connection.close();

Development

Feel free to report an issue or submit a pull-request. Changes can be tested locally using:

mvn clean test

Licensing

Protobuf-ActiveMQ-RPC is licensed under the BSD 3-Clause license.

com.mediamiser

MediaMiser

Versions

Version
1.0.6