|
如需使用最新稳定版本,请使用 Spring Integration 7.0.4! |
出站网关
JPA 入站通道适配器允许您轮询数据库以检索一个或多个 JPA 实体。 检索到的数据随后用于启动 Spring Integration 流程,该流程将检索到的数据作为消息负载使用。
此外,您可以在流程末尾使用 JPA 出站通道适配器来持久化数据,本质上是在持久化操作结束时停止流程。
然而,您如何在流程中间执行 JPA 持久化操作呢?例如,您可能正在 Spring Integration 消息流中处理业务数据并希望将其持久化,但同时仍需要在下游使用其他组件。 或者,您不需要使用轮询器轮询数据库,而是需要执行 JPQL 查询并主动检索数据,然后在流程中的后续组件中处理这些数据。
这正是 JPA 出站网关发挥作用的地方。 它们使您能够持久化数据以及检索数据。 为了促进这些用途,Spring Integration 提供了两种类型的 JPA 出站网关:
-
更新出站网关
-
检索出站网关
每当使用出站网关执行保存、更新或删除数据库中某些记录的操作时,您需要使用一个更新型出站网关。
例如,如果您使用 entity 来持久化它,则返回一个合并并持久化的实体作为结果。
在其他情况下,则返回受影响的记录数(已更新或删除的记录数)。
当从数据库检索(选择)数据时,我们使用检索出站网关。 通过检索出站网关,我们可以使用 JPQL、命名查询(原生或基于 JPQL)或原生查询(SQL)来选择数据并检索结果。
更新型出站网关在功能上类似于出站通道适配器,不同之处在于,更新型出站网关在执行 JPA 操作后会将结果发送到网关的回复通道。
检索出站网关类似于入站通道适配器。
这种相似性是主要因素,促使我们使用中央 JpaExecutor 类来尽可能统一通用功能。
所有 JPA 出站网关的通用内容,类似于 outbound-channel-adapter,我们可以使用它来执行各种 JPA 操作:
-
实体类
-
JPA 查询语言 (JPQL)
-
原生查询
-
命名查询
有关配置示例,请参阅 JPA 出站网关示例。
通用配置参数
JPA 出站网关始终可以访问 Spring Integration Message 作为输入。
因此,以下参数可用:
parameter-source-factory-
一个
o.s.i.jpa.support.parametersource.ParameterSourceFactory的实例用于获取o.s.i.jpa.support.parametersource.ParameterSource的实例。ParameterSource用于解析查询中提供的参数的值。 如果您使用 JPA 实体执行操作,则忽略parameter-source-factory属性。parameter子元素与parameter-source-factory互斥,并且必须在提供的ParameterSourceFactory上进行配置。 可选。 use-payload-as-parameter-source-
如果设置为
true,则使用Message的有效负载作为参数的来源。 如果设置为false,则整个Message都可用作为参数的来源。 如果没有传入 JPA 参数,此属性默认为true。 这意味着,如果您使用了默认的BeanPropertyParameterSourceFactory,则有效负载的 Bean 属性将用作 JPA 查询的参数值来源。 然而,如果传入了 JPA 参数,此属性默认评估为false。 原因是 JPA 参数允许您提供 SpEL 表达式。 因此,能够访问整个Message(包括请求头)非常有益。 可选。
更新出站网关
以下清单显示了您可以在 updating-outbound-gateway 上设置的所有属性,并描述了关键属性:
<int-jpa:updating-outbound-gateway request-channel="" (1)
auto-startup="true"
entity-class=""
entity-manager=""
entity-manager-factory=""
id=""
jpa-operations=""
jpa-query=""
named-query=""
native-query=""
order=""
parameter-source-factory=""
persist-mode="MERGE"
reply-channel="" (2)
reply-timeout="" (3)
use-payload-as-parameter-source="true">
<int:poller/>
<int-jpa:transactional/>
<int-jpa:parameter name="" type="" value=""/>
<int-jpa:parameter name="" expression=""/>
</int-jpa:updating-outbound-gateway>
| 1 | 接收出站网关消息以执行所需操作的通道。
此属性类似于 channel 的 outbound-channel-adapter 属性。
可选。 |
| 2 | 执行所需的 JPA 操作后,网关发送响应到的通道。
如果未定义此属性,则请求消息必须包含 replyChannel 头信息。
可选。 |
| 3 | 指定网关等待向回复通道发送结果的时间。
仅在回复通道本身可能阻塞发送操作时适用(例如,当前已满的有界 QueueChannel)。
该值以毫秒为单位指定。
可选。 |
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置出站适配器:
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
@IntegrationComponentScan
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@MessagingGateway
interface JpaGateway {
@Gateway(requestChannel = "jpaUpdateChannel")
@Transactional
void updateStudent(StudentDomain payload);
}
@Bean
@ServiceActivator(channel = "jpaUpdateChannel")
public MessageHandler jpaOutbound() {
JpaOutboundGateway adapter =
new JpaOutboundGateway(new JpaExecutor(this.entityManagerFactory));
adapter.setOutputChannelName("updateResults");
return adapter;
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置出站适配器:
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("updateResults"));
}
}
检索出站网关
以下示例演示如何配置检索出站网关:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public IntegrationFlow retrievingGatewayFlow() {
return f -> f
.handle(Jpa.retrievingGateway(this.entityManagerFactory)
.jpaQuery("from Student s where s.id = :id")
.expectSingleResult(true)
.parameterExpression("id", "payload"))
.channel(c -> c.queue("retrieveResults"));
}
}
@Bean
fun retrievingGatewayFlow() =
integrationFlow {
handle(Jpa.retrievingGateway(this.entityManagerFactory)
.jpaQuery("from Student s where s.id = :id")
.expectSingleResult(true)
.parameterExpression("id", "payload"))
channel { queue("retrieveResults") }
}
@SpringBootApplication
@EntityScan(basePackageClasses = StudentDomain.class)
public class JpaJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(JpaJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public JpaExecutor jpaExecutor() {
JpaExecutor executor = new JpaExecutor(this.entityManagerFactory);
jpaExecutor.setJpaQuery("from Student s where s.id = :id");
executor.setJpaParameters(Collections.singletonList(new JpaParameter("id", null, "payload")));
jpaExecutor.setExpectSingleResult(true);
return executor;
}
@Bean
@ServiceActivator(channel = "jpaRetrievingChannel")
public MessageHandler jpaOutbound() {
JpaOutboundGateway adapter = new JpaOutboundGateway(jpaExecutor());
adapter.setOutputChannelName("retrieveResults");
adapter.setGatewayType(OutboundGatewayType.RETRIEVING);
return adapter;
}
}
<int-jpa:retrieving-outbound-gateway request-channel=""
auto-startup="true"
delete-after-poll="false"
delete-in-batch="false"
entity-class=""
id-expression="" (1)
entity-manager=""
entity-manager-factory=""
expect-single-result="false" (2)
id=""
jpa-operations=""
jpa-query=""
max-results="" (3)
max-results-expression="" (4)
first-result="" (5)
first-result-expression="" (6)
named-query=""
native-query=""
order=""
parameter-source-factory=""
reply-channel=""
reply-timeout=""
use-payload-as-parameter-source="true">
<int:poller></int:poller>
<int-jpa:transactional/>
<int-jpa:parameter name="" type="" value=""/>
<int-jpa:parameter name="" expression=""/>
</int-jpa:retrieving-outbound-gateway>
| 1 | (自 Spring Integration 4.0 起) 用于确定 primaryKey 方法相对于作为评估上下文根对象的 requestMessage 的 EntityManager.find(Class entityClass, Object primaryKey) 值的 SpEL 表达式。
如果存在,entityClass 参数从 entity-class 属性中确定。
否则,它从 payload 类中确定。
如果您使用 id-expression,则不允许使用其他所有属性。
可选。 |
| 2 | 一个布尔标志,表示选择操作是否预期返回单个结果或List个结果。
如果此标志设置为true,则单个实体将作为消息的有效负载发送。
如果返回多个实体,则抛出异常。
如果为false,则将List个实体作为消息的有效负载发送。
默认值为false。
可选。 |
| 3 | 此非零且非负的整数值告知适配器在执行选择操作时,不要选择超过指定数量的行。
默认情况下,如果未设置此属性,则给定查询将选择所有可能的记录。
此属性与 max-results-expression 互斥。
可选。 |
| 4 | 一个可用于查找结果集中最大结果数的表达式。
它与 max-results 互斥。
可选。 |
| 5 | 此非零且非负的整数值告知适配器从哪一条记录开始检索结果。
此属性与first-result-expression互斥。
版本3.0引入了此属性。
可选。 |
| 6 | 该表达式针对消息进行求值,以在结果集中找到第一条记录的位置。
此属性与 first-result 互斥。
版本 3.0 引入了此属性。
可选。 |
|
当您选择检索时删除实体,并且已检索到实体集合时,默认情况下,实体将逐个进行删除。 这可能导致性能问题。 或者,您可以将属性 JSR 317: Java™ Persistence 2.0 在第 4.10 章“批量更新和删除操作”中指出: “删除操作仅适用于指定类及其子类的实体。 它不会级联到相关实体。” 有关更多信息,请参见 JSR 317: Java™ Persistence 2.0 |
从 6.0 版本开始,当查询未返回任何实体时,Jpa.retrievingGateway() 将返回空列表结果。
此前,根据 requiresReply 的配置,会返回 null 以结束流程,或者抛出异常。
或者,若要恢复之前的行为,可在网关后添加一个 filter 以过滤掉空列表。
这在空列表处理属于下游逻辑一部分的应用中需要额外的配置。
有关可能的空列表处理选项,请参阅 Splitter Discard Channel。 |
JPA 出站网关示例
本节包含使用更新出站网关和检索出站网关的各种示例:
使用实体类进行更新
在以下示例中,更新出站网关使用 org.springframework.integration.jpa.test.entity.Student 实体类作为 JPA 定义参数进行持久化:
<int-jpa:updating-outbound-gateway request-channel="entityRequestChannel" (1)
reply-channel="entityResponseChannel" (2)
entity-class="org.springframework.integration.jpa.test.entity.Student"
entity-manager="em"/>
| 1 | 这是出站网关的请求通道。
它类似于 channel 的 outbound-channel-adapter 属性。 |
| 2 | 这是网关与出站适配器不同的地方。
这是接收 JPA 操作回复的通道。
然而,如果您不关心收到的回复,而只想执行该操作,使用 JPA outbound-channel-adapter 是合适的选择。
在本示例中,我们使用了实体类,回复即为作为 JPA 操作结果创建或合并的实体对象。 |
使用 JPQL 进行更新
以下示例使用 Java Persistence Query Language (JPQL) 更新实体, 这需要配置一个用于更新的出站网关:
<int-jpa:updating-outbound-gateway request-channel="jpaqlRequestChannel"
reply-channel="jpaqlResponseChannel"
jpa-query="update Student s set s.lastName = :lastName where s.rollNumber = :rollNumber" (1)
entity-manager="em">
<int-jpa:parameter name="lastName" expression="payload"/>
<int-jpa:parameter name="rollNumber" expression="headers['rollNumber']"/>
</int-jpa:updating-outbound-gateway>
| 1 | 网关执行的 JPQL 查询。
由于我们使用了更新出站网关,只有 update 和 delete 的 JPQL 查询才是合理的选择。 |
当您发送一个包含 String 负载的消息,且该消息还包含一个名为 rollNumber 的头部(其值为 long)时,指定学号的学生姓氏将被更新为消息负载中的值。
在使用更新网关时,返回值始终为一个整数,表示执行 JPA QL 所影响的记录数。
使用 JPQL 检索实体
以下示例使用检索出站网关和 JPQL 从数据库中检索(选择)一个或多个实体:
<int-jpa:retrieving-outbound-gateway request-channel="retrievingGatewayReqChannel"
reply-channel="retrievingGatewayReplyChannel"
jpa-query="select s from Student s where s.firstName = :firstName and s.lastName = :lastName"
entity-manager="em">
<int-jpa:parameter name="firstName" expression="payload"/>
<int-jpa:parameter name="lastName" expression="headers['lastName']"/>
</int-jpa:outbound-gateway>
使用实体检索id-expression
以下示例使用带有 id-expression 的检索出站网关从数据库中检索(查找)且仅检索一个实体:
primaryKey 是 id-expression 求值的结果。
entityClass 是消息类 payload。
<int-jpa:retrieving-outbound-gateway
request-channel="retrievingGatewayReqChannel"
reply-channel="retrievingGatewayReplyChannel"
id-expression="payload.id"
entity-manager="em"/>
使用命名查询进行更新
使用命名查询基本上与直接使用 JPQL 查询相同。
区别在于使用了 named-query 属性,如下例所示:
<int-jpa:updating-outbound-gateway request-channel="namedQueryRequestChannel"
reply-channel="namedQueryResponseChannel"
named-query="updateStudentByRollNumber"
entity-manager="em">
<int-jpa:parameter name="lastName" expression="payload"/>
<int-jpa:parameter name="rollNumber" expression="headers['rollNumber']"/>
</int-jpa:outbound-gateway>
| 您可以在此处找到使用 Spring Integration JPA 适配器的完整示例应用程序 here。 |