package com.example.jet; import com.hazelcast.core.ManagedContext; import com.hazelcast.function.FunctionEx; import com.hazelcast.jet.Jet; import com.hazelcast.jet.JetInstance; import com.hazelcast.jet.config.JetConfig; import com.hazelcast.jet.config.JobConfig; import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.test.TestSources; import com.hazelcast.spring.context.SpringManagedContext; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import java.util.Map; import static com.hazelcast.jet.spring.JetSpringServiceFactories.bean; public class LocalJetTest { @Test public void test1() throws Exception { AnnotationConfigApplicationContext appContext = new AnnotationConfigApplicationContext(Config1.class, Config2.class); appContext.start(); Thread.sleep(10_000); } public static class MyService { public String toLowerCase(String s) { return s.toLowerCase(); } } @Configuration @PropertySource("classpath:spring.properties") public static class Config1 { @Bean public MyService myService() { return new MyService(); } } @Configuration public static class Config2 { @Bean public ManagedContext managedContext() { return new SpringManagedContext(); } @Bean public Pipeline samplePipeline(@Value("${token}") String token, @Value("${replacement}") String replacement) { FunctionEx inlineFunction = s -> s.equals(token) ? replacement : s; Pipeline p = Pipeline.create(); p.readFrom(TestSources.items("TEN", "TWENTY", "THIRTY")) .map(inlineFunction) .mapUsingService(bean(MyService.class), (myService, s) -> myService.toLowerCase(s)) .writeTo(Sinks.logger()); return p; } @Bean(destroyMethod = "shutdown") public JetInstance jet(ManagedContext managedContext, Map pipelines) { JetConfig jetConfig = new JetConfig() .configureHazelcast(hz -> hz.setManagedContext(managedContext)); JetInstance jet = Jet.newJetInstance(jetConfig); pipelines.forEach((name, pipeline) -> { jet.newJob(pipeline, new JobConfig().setName(name)); }); return jet; } } }