Breeze

Spring support for Storm

License

License

GroupId

GroupId

eu.icolumbo.breeze
ArtifactId

ArtifactId

breeze
Last Version

Last Version

1.2.2
Release Date

Release Date

Type

Type

jar
Description

Description

Breeze
Spring support for Storm
Project URL

Project URL

http://github.com/internet-research-network/breeze
Project Organization

Project Organization

iRN iColumbo
Source Code Management

Source Code Management

https://github.com/internet-research-network/breeze

Download breeze

How to add to project

<!-- https://jarcasting.com/artifacts/eu.icolumbo.breeze/breeze/ -->
<dependency>
    <groupId>eu.icolumbo.breeze</groupId>
    <artifactId>breeze</artifactId>
    <version>1.2.2</version>
</dependency>
// https://jarcasting.com/artifacts/eu.icolumbo.breeze/breeze/
implementation 'eu.icolumbo.breeze:breeze:1.2.2'
// https://jarcasting.com/artifacts/eu.icolumbo.breeze/breeze/
implementation ("eu.icolumbo.breeze:breeze:1.2.2")
'eu.icolumbo.breeze:breeze:jar:1.2.2'
<dependency org="eu.icolumbo.breeze" name="breeze" rev="1.2.2">
  <artifact name="breeze" type="jar" />
</dependency>
@Grapes(
@Grab(group='eu.icolumbo.breeze', module='breeze', version='1.2.2')
)
libraryDependencies += "eu.icolumbo.breeze" % "breeze" % "1.2.2"
[eu.icolumbo.breeze/breeze "1.2.2"]

Dependencies

compile (2)

Group / Artifact Type Version
org.apache.storm : storm-core jar 0.9.1-incubating
org.springframework : spring-context jar 3.2.8.RELEASE

test (2)

Group / Artifact Type Version
junit : junit jar 4.11
org.mockito : mockito-core jar 1.9.5

Project Modules

There are no modules declared in this project.

Spring integration for Storm

Breeze binds Storm topology components to POJOs. Write Spring beans and use them easily within a cluster.

The SpringSpout and SpringBolt classes are configured with a Spring bean and a method signature. The compiler automagically orders the processing steps based on the field names. Each topology gets a dedicated application context.

Breeze currently supports "none" grouping only.

Get Started

The kickstarter project demonstrates how to define a topology with the Breeze namespace and regular bean definitions.

Maven

<dependency>
	<groupId>eu.icolumbo.breeze</groupId>
	<artifactId>breeze</artifactId>
	<version>1.2.2</version>
</dependency>

The default topology starter can be used for local testing.

<plugin>
	<groupId>org.codehaus.mojo</groupId>
	<artifactId>exec-maven-plugin</artifactId>
	<version>1.1</version>
	<configuration>
		<mainClass>eu.icolumbo.breeze.namespace.TopologyStarter</mainClass>
		<arguments>
			<argument>demo</argument>
		</arguments>
		<systemProperties>
			<property>
				<key>localRun</key>
			</property>
		</systemProperties>
	</configuration>
</plugin>

Output Binding

For each read request on SpringSpout and for each execute request on SpringBolt the bean's configured method is invoked.

The scatter feature can split returned arrays and collections into multiple emissions. With scatter enabled a null return means no emit in which case bolts can act as a filter.

When no output fields are defined the return value is discarded. By default a single output field gives the return value as is. In case of multiple output fields the return value is read by property (getter). More complicated bindings may be defined with SpEL as shown below.

<storm:bolt beanType="com.example.EntityExtractor" signature="read(doc)"
		outputFields="nameCount names source">
	<breeze:field name="nameCount" expression="Names.Size()"/>
	<breeze:field name="source" expression="'x-0.9'"/>
</breeze:bolt>

Exceptions can be configured to cause a read delay.

<breeze:spout id="dumpFeed" beanType="com.example.DumpReader" signature="read()" outputFields="record">
	<storm:exception type="java.nio.BufferUnderflowException" delay="500"/>
</breeze:spout>

Transactions

Storm supports guaranteed message processing. Breeze provides this functionality with an ack and/or fail method signature. With the following example configuration DumpReader#ok is called with the hash code on succes and errors are reported with the value of getSerial() on record at DumpReader#bad.

<breeze:spout id="dumpFeed" beanType="com.example.DumpReader" signature="read()" outputFields="record">
	<breeze:field name="hash" expression="hashCode()"/>
	<breeze:transaction ack="ok(hash)" fail="bad(serial)"/>
</breeze:spout>

RPC

Storm's Distributed RPC or DRPC can also be configured with the beans XML extension.

<breeze:topology id="demo">
	<breeze:rpc signature="dgreet(s)" outputFields="greeting"/>
	<breeze:bolt beanType="com.example.Greeter" signature="greet(s)" outputFields="greeting"/>
</breeze:topology>
DRPCClient client = new DRPCClient("storm1.example.com", 3772);
String result = client.execute("dgreet", "World");
System.err.println(result);

Contributors

Versions

Version
1.2.2
1.2.1
1.2.0
1.1.0
1.0.0