java如何集成流程,java-如何从Http集成流程创建Spring Reactor Flux?

  • Post author:
  • Post category:java


区别在于消息来自Http端点而不是JMS队列.问题是由于某些原因而无法填充消息通道,或者Flux.from()不会拾取它.日志条目显示GenericMessage是从Http Integration流中创建的,并带有有效负载作为路径变量,但是没有入队/未发布到通道?我尝试了.channel(MessageChannels.queue())和.channel(MessageChannels.publishSubscribe())

没有任何区别,事件流为空.这是代码:

@Bean

public Publisher> httpReactiveSource() {

return IntegrationFlows.

from(Http.inboundChannelAdapter(“/eventmessage/{id}”)

.requestMapping(r -> r

.methods(HttpMethod.POST)

)

.payloadExpression(“#pathVariables.id”)

)

.channel(MessageChannels.queue())

.log(LoggingHandler.Level.DEBUG)

.log()

.toReactivePublisher();

}

@GetMapping(value=”eventmessagechannel/{id}”, produces=MediaType.TEXT_EVENT_STREAM_VALUE)

public Flux eventMessages(@PathVariable String id){

return Flux.from(httpReactiveSource())

.map(Message::getPayload);

}

UPDATE1:

的build.gradle

buildscript {

ext {

springBootVersion = ‘2.0.0.M2’

}

repositories {

mavenCentral()

maven { url “https://repo.spring.io/snapshot” }

maven { url “https://repo.spring.io/milestone” }

}

dependencies {

classpath(“org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}”)

}

}

apply plugin: ‘java’

apply plugin: ‘eclipse’

apply plugin: ‘org.springframework.boot’

apply plugin: ‘io.spring.dependency-management’

version = ‘0.0.1-SNAPSHOT’

sourceCompatibility = 1.8

repositories {

mavenCentral()

maven { url “https://repo.spring.io/snapshot” }

maven { url “https://repo.spring.io/milestone” }

}

dependencies {

compile(‘org.springframework.boot:spring-boot-starter-freemarker’)

compile(‘org.springframework.boot:spring-boot-starter-integration’)

compile(‘org.springframework.boot:spring-boot-starter-web’)

compile(‘org.springframework.boot:spring-boot-starter-webflux’)

compile(‘org.springframework.integration:spring-integration-http’)

testCompile(‘org.springframework.boot:spring-boot-starter-test’)

testCompile(‘io.projectreactor:reactor-test’)

}

UPDATE2

当在一个文件中定义@SpringBootApplication和@RestController时,它可以工作,但是当@SpringBootApplication和@RestController在单独的文件中时,它停止工作.

TestApp.java

package com.example;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication

public class TestApp {

public static void main(String[] args) {

SpringApplication.run(TestApp.class, args);

}

}

TestController.java

package com.example.controller;

import org.springframework.context.annotation.Bean;

import org.reactivestreams.Publisher;

import org.springframework.http.HttpMethod;

import org.springframework.http.MediaType;

import org.springframework.integration.dsl.IntegrationFlows;

import org.springframework.integration.dsl.channel.MessageChannels;

import org.springframework.integration.handler.LoggingHandler;

import org.springframework.integration.http.dsl.Http;

import org.springframework.messaging.Message;

import org.springframework.web.bind.annotation.RestController;

import org.springframework.web.bind.annotation.GetMapping;

import reactor.core.publisher.Flux;

@RestController

public class TestController {

@Bean

public Publisher> httpReactiveSource() {

return IntegrationFlows.

from(Http.inboundChannelAdapter(“/message/{id}”)

.requestMapping(r -> r

.methods(HttpMethod.POST)

)

.payloadExpression(“#pathVariables.id”)

)

.channel(MessageChannels.queue())

.toReactivePublisher();

}

@GetMapping(value = “/events”, produces = MediaType.TEXT_EVENT_STREAM_VALUE)

public Flux eventMessages() {

return Flux.from(httpReactiveSource())

.map(Message::getPayload);

}

}

解决方法:

这对我很好:

@SpringBootApplication

@RestController

public class SpringIntegrationSseDemoApplication {

public static void main(String[] args) {

SpringApplication.run(SpringIntegrationSseDemoApplication.class, args);

}

@Bean

public Publisher> httpReactiveSource() {

return IntegrationFlows.

from(Http.inboundChannelAdapter(“/message/{id}”)

.requestMapping(r -> r

.methods(HttpMethod.POST)

)

.payloadExpression(“#pathVariables.id”)

)

.channel(MessageChannels.queue())

.toReactivePublisher();

}

@GetMapping(value = “/events”, produces = MediaType.TEXT_EVENT_STREAM_VALUE)

public Flux eventMessages() {

return Flux.from(httpReactiveSource())

.map(Message::getPayload);

}

}

我在POM中有以下依赖关系:

org.springframework.boot

spring-boot-starter-parent

2.0.0.BUILD-SNAPSHOT

org.springframework.boot

spring-boot-starter-integration

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-webflux

org.springframework.integration

spring-integration-http

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-maven-plugin

我运行该应用程序,并有两个终端:

curl http://localhost:8080/events

听上交所.

在第二个中,我执行以下操作:

curl -X POST http://localhost:8080/message/foo

curl -X POST http://localhost:8080/message/bar

curl -X POST http://localhost:8080/message/666

因此,第一个终端的响应如下:

data:foo

data:bar

data:666

注意,我们不需要spring-boot-starter-webflux依赖项. Flux to SSE与Servlet容器上的常规MVC很好地配合.

Spring Integration也将很快支持WebFlux:https://jira.spring.io/browse/INT-4300.因此,您将能够在其中进行如下配置:

IntegrationFlows

.from(Http.inboundReactiveGateway(“/sse”)

.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))

并且仅完全依赖WebFlux,而没有任何Servlet容器依赖性.