Here we shall discuss a sample use case for building an end-to-end data pipeline and use Spring Cloud Data Flow to implement the same. Besides SCDF we shall also use the metadata management capabilities offered by Arena to leverage metadata-driven operational efficiency of the data pipeline.
Consider a use case with simple processing transaction details for bank customers where the customer’s account information is ingested from a landing source into a data warehouse. Continuously incoming transactions data from multiple sources such as POS devices, websites, mobile apps, etc. are quality checked based on data quality rules. The data is further processed and tokenized for sensitive information related to user identity, payment, etc. using a tokenization algorithm that is then written into Apache Hive. There can be a weekly data profiling job that will join the Customer Account from the former setup and Transactions data from the latter, thereby executing statistical analysis to collect data profiling metrics.
We begin by installing Spring Cloud Data Flow with related dependencies. The SCDF website has starter guides for Local, Kubernetes, and Cloud Foundry. We’ll consider the Docker-driven local installation by using the docker-compose file hosted over the Spring Cloud Data Flow Github repo but make some modifications to it. For example, the file has configurations to use Kafka as the messaging binder but we modify the file to use RabbitMQ instead. The intent is to minimize the resource consumption in a local demo setup and at the same time familiarize ourselves with the syntax of the docker-compose.yml to understand the different services available as part of the SCDF installation. Moreover, we will also add docker image definitions for Prometheus and Grafana (those were removed from the docker-compose.yml since Spring Cloud Data Flow 2.2.x).
We copy the docker-compose.yml to a location ~/scdf/dataflow_docker
In order to run the compose file, first we need to specify the spring data flow version:
export DATAFLOW_VERSION=2.6.3 export SKIPPER_VERSION=2.5.2 docker-compose -f ~/scdf/dataflow_docker/docker-compose.yml up -d
(At the time of writing this article the latest stable versions were 2.6.3 and 2.5.2 for the Data Flow Server and Skipper Server respectively)
After running docker-compose, the following services will be up and running, which can then be validated by opening the browser links for the specific service.
Docker Dashboard displays the status of each running container
As we can infer from the use case definition there are basically two data pipelines to be implemented.
– The first pipeline involves the flow to read the transaction data from http source and forward it to data quality check processor, then to the tokenizer transformer and finally writing the data to hdfs which can be accessed using Apache Hive (We will not write a custom Hive sink with insert statements since establishing a Hive connection and triggering an insert for each record is a costly operation). This is a classic example of a stream data pipeline that can be implemented using SCDF with a stream pipeline DSL:
http|dataquality|tokenize|hdfs
We will use the out-of-box event streaming applications for the http source and the hdfs target and custom-built spring boot microservice applications for the processors for data quality and tokenize using reusable functions.
– The second pipeline involves a batch job that is triggered weekly to join the datasets for the Customer Account and Transactions and to do statistical data profiling on top of the joined data. This is achieved by running a profiling job written on top of the Spring Batch job execution framework in SCDF.
The next section in the blog focuses on the implementation of the first pipeline involving streaming data. The second pipeline definition and deployment would be discussed in an upcoming blog post.
The first activity we perform is to define the metadata in Arena for the Transactions data that is to be consumed from the HTTP sources. A metadata-driven pipeline definition helps us achieve dynamic capabilities to configure the functional aspects of the pipeline at runtime with zero-downtime. The metadata definition will involve defining the schema of the data record with respect to the fields of interest, the data quality rules that need to be applied in the field(s), and the tokenization algorithms to be applied to the fields containing sensitive data. Definition of the metadata for a dataset in Arena is done via Entities and each entity corresponds to a Hive Table created in the data lake that may be connected to the Arena application.
Here we see the technical information related to the Transactions entity that we create in Arena.
(Definition of transactions entity in Arena)
Table Name: Name of the Hive table that is created. In this case, it is transactions.
Target Schema: The database in Hive under which the table is created. In this case, it is zdp.
Default Cluster: The data lake cluster that is currently connected to the application.
Path: The HDFS path will have the data for the zdp.transactions Hive table. The streaming pipeline would be basically writing the data records into this path.
JDBC URL: The jdbc url to connect to the zdp.transactions Hive table using which we can write queries to read the data from the table.
Next, we define the fields for the data record and apply data quality rules like transaction_id and account_number should not be null, transaction_value should not be negative integer etc. over them and also the tokenization algorithm to the sensitive or pii information like credit_card_number, item_description etc.
(Applying NOT_NULL rule to transaction_id field)
(Applying SHA1 tokenization algorithm to transaction_card_number)
We will use the metadata definition in the downstream processing of the data record by invoking Arena REST APIs to fetch the field metadata and information about the rules and tokenization algorithms.
Once the metadata is defined, we write the Java code for the spring boot applications involved in the pipeline.
For the data source we will leverage the out-of-the-box available `http` application whereas for the processors would be custom-built Spring Boot applications.
To speed up the process of authoring the spring-boot applications, we use Spring Initializr which generates a stubbing for the Spring Boot applications with all related dependencies. We use the following details during the generation of our application code.
Project Type : Maven Language: Java Spring Boot: 2.3.4 Group: com.scdf.pipeline Packaging: JAR Java Version: 8
Clicking on the `Generate Project` will download a Maven project in zipped format which we can unzip and import into an IDE. We will use Eclipse with Spring Tools as our IDE. This will be the common step to generate all the applications, the artifact name being different for each case.
This is the first processor in the data stream pipeline. The artifact name for this project will be data-quality-checker. The functionality of this application will be to process each HTTP message from the source and apply the Data Quality rules as defined in the record metadata through the Arena Entity definition. If the values in the record passes the data quality validations for all the concerned fields, then it passes the record to the next processor. In SCDF terminology, this is a Processor application with Filter responsibility.
(data-quality-checker project generation using Spring Intializr)
Since we have opted to use RabbitMQ as the messaging broker, the appropriate binder is added to the project pom.xml as a dependency:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
Using Spring Cloud Stream 2.x requires the following dependencies to be added (otherwise the Stream to be created will remain in the deploying state. Refer here)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
Also, this application is essentially required to fetch the record metadata from a running Arena instance to get the rule and field association. Therefore we add a dependency scdf-pipeline-common that has classes to act as a client to Arena REST APIs and return a list of field-to-rule associations.
<dependency> <groupId>com.scdf.pipeline</groupId> <artifactId>scdf-pipeline-common</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>![]()
We then create the class DataQualityCheckProcessor to filter the data record that actually forwards the record passes data quality check criteria.
We annotate the class with @EnableBinding(Processor.class).This annotation indicates the application to be of type Processor and all the bootstrapping required for the same is performed based on the annotation provided. For source type applications this annotation is @EnableBinding(Source.class) while for sink type applications this annotation is @EnableBinding(Sink.class).
The next important annotation which is placed over a method with a boolean return type is
@Filter(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT). This annotation defines the input channel and the output channel and also highlights that the method intends to filter out data records to be sent forward. If the method returns true the message is forwarded to the next application in the pipeline.
These annotations are agnostic to the binder implementation being used, and so the developer should focus on writing the business logic inside the method without any concern over the messaging binder used at runtime.
The dataQualityCheck method receives the message from the HTTP source as a JSON string which we convert to a java.util.Map data structure. This object will have the field names used in the request as the keys and the values for the fields as values in the map. Next, we fetch a map of fieldnames to DQ rules mapping from the Arena instance and then apply the data quality check on the fields which has a rule associated with it. Any violation of the rules will cause the method to return a false value, whereas if all the rule associations to the fields pass, then a true value is returned.
The application code is then built using the Maven compiler plugin to generate the artifact data-quality-checker-0.0.1-SNAPSHOT.jar and make that available in the local maven repository. Since this application will be used in the pipeline hosted over the SCDF running in the docker containers, we need to mount the local maven repository to the docker containers running the dataflow-server and skipper-server by adding the following to the volumes section in the docker-compose.yml for the services dataflow-server and skipper-server:
volumes: - ~/.m2/repository:/root/.m2/repository
We then open the dataflow-server dashboard to register the data-quality-checker as a processor application. To this effect, we click the Add Application button under the Apps navigation menu. To directly access the app registration page we can use the URL https://localhost:9393/dashboard/#/apps/add/register. Since we have already mounted the maven repository we can specify the url for the application is as
maven://com.scdf.pipeline:data-quality-checker:jar:0.0.1-SNAPSHOT
After the registration is done successfully we will be able to see the name of the application under the list of available applications under the Processor type
The maven artifact name for this application is tokenizer. The structure and internals of the application is similar to the data-quality-checker application. However, the method, in this case, is annotated as @Transformer instead of @Filter. This indicates that the method will perform a transformation on the input message and return a transformed object as the return type, unlike the latter which returns a boolean to indicate whether the message needs to be forwarded or not.
The tokenize Record method receives the quality-checked record from the data-quality-checker processor as a JSON string which we convert to a java.util.Map data structure. This object has the field names used in the request as the keys and the values for the fields as values in the map. We fetch a map of field names to tokenizer algorithm mapping from the Arena instance and then apply the algorithm on the field values which has an algorithm with it.
Also, the next application in the pipeline is the pre-built hdfs sink application that would write data records to an HDFS location (which is the path corresponding to the Transactions table in Apache Hive). Thus, we have to transform the data record into a form that can be read by Hive. This is by default a comma-delimited dataset. This requires us to convert the output to be forwarded into a comma-delimited string.
The application code is then compiled, packaged, and registered with Spring Cloud Data Flow in the same manner as we did it for the data-quality-checker application
Now that we have all the applications ready and registered with the Spring Cloud Data Flow server, we can define and deploy the stream. We will use the data server dashboard UI to create the stream. The page can be accessed from https://localhost:9393/dashboard/#/streams/create. This UI allows us to drag and drop the applications and connect them in order of their functionality in the stream pipeline. Also, we can specify application-specific properties for each application. For example, to use the hdfs sink we need to specify the filesystem-uri and the output path. These values must match the ones associated with the Transactions entity in Arena.
The Stream DSL is automatically generated based on the items in the canvas. This is very helpful in generating a syntactically correct DSL that can define a stream pipeline.
Give a unique name to the data stream, such as transaction-stream and finally deploy the stream. The deployment of the stream would be done by the skipper-server.
Navigating to the summary tab gives us information about the application instances running for the stream and the runtime logs emitted by the individual applications.
Once the stream is deployed successfully we can now test out the operation of the stream pipeline. For this purpose, we will use curl to post Transaction data records to the HTTP source.
Once incoming data is posted to the HTTP source, it goes through the data quality application followed by the tokenizer application and is finally written to the HDFS location specified in the definition of the pipeline.
If we check the contents of the file in the location, we will see that the transformed and tokenized data is written into it. The value E8C32C51F16B4FAF64D93D9E48D289B919911336 is the SHA-1 tokenized value for the transaction-card-number.
Now, if we login to the Arena application and navigate to the Data Preview section for the Transactions entity we can see the data record. This indicates that the Hive engine was able to read the data record correctly (Data Preview in Arena uses Hive).
We continue the test with another curl request with the account_number to be null and this time on checking the output file directory in HDFS we find that the record is not written to the file. The reason is, the data-quality-checker prohibited the flow of this record to the other applications in the pipeline since it fails the rule not_null associated with the account_number field.
In this article, we have defined a data pipeline and explained how it differs from the traditional ETL process. We also tried to understand the different types of data pipelines and highlighted the challenges in implementing a robust and cost-effective data pipeline for enterprise use.
Then we introduced a Spring Cloud Data Flow toolkit that helps developers to setup cloud-native microservice driven data pipelines addressing most of the pipeline implementation challenges. We have explained all the core components of the SCDF ecosystem and presented the architectural overview on how the components interact with each other at runtime.
Finally, we presented a sample use case and compilation of a detailed walkthrough involving the steps to perform a quick setup using Docker containers and the necessary configuration and coding on how we can achieve the implementation of the sample use case to a fully functional data pipeline designed and deployed using Spring Cloud Data Flow.
Appendix:
docker-compose.yml
version: '3' services: #Use `docker exec -it dataflow-mysql /bin/sh` to logging the container mysql: image: mysql:5.7.25 container_name: dataflow-mysql environment: MYSQL_DATABASE: dataflow MYSQL_USER: root MYSQL_ROOT_PASSWORD: rootpw expose: - 3306 #Use `docker exec -it dataflow-rabbitmq /bin/sh` to logging the container rabbitmq: image: rabbitmq:management container_name: dataflow-rabbitmq environment: RABBITMQ_DEFAULT_USER: "guest" RABBITMQ_DEFAULT_PASS: "guest" RABBITMQ_DEFAULT_VHOST: "/" ports: - "5672:5672" - "15672:15672"
#Use `docker exec -it dataflow-server /bin/sh` to logging the container dataflow-server: image: springcloud/spring-cloud-dataflow-server:${DATAFLOW_VERSION:?DATAFLOW_VERSION is not set!} container_name: dataflow-server ports: - "9393:9393" environment: - spring.cloud.dataflow.applicationProperties.stream.spring.rabbitmq.host=rabbitmq - spring.cloud.dataflow.applicationProperties.stream.spring.rabbitmq.port=5672 - spring.cloud.dataflow.applicationProperties.stream.spring.rabbitmq.username=guest - spring.cloud.dataflow.applicationProperties.stream.spring.rabbitmq.password=guest - spring.cloud.skipper.client.serverUri=https://skipper-server:7577/api - SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/dataflow - SPRING_DATASOURCE_USERNAME=root - SPRING_DATASOURCE_PASSWORD=rootpw - SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driver depends_on: - rabbitmq entrypoint: "./wait-for-it.sh mysql:3306 -- java -jar /maven/spring-cloud-dataflow-server.jar" volumes: - ${HOST_MOUNT_PATH:-.}:${DOCKER_MOUNT_PATH:-/root/scdf} #Use `docker exec -it skipper /bin/sh` to logging the container skipper-server: image: springcloud/spring-cloud-skipper-server:${SKIPPER_VERSION:?SKIPPER_VERSION is not set!} container_name: skipper ports: - "7577:7577" - "20000-20105:20000-20105" environment: - SPRING_CLOUD_SKIPPER_SERVER_PLATFORM_LOCAL_ACCOUNTS_DEFAULT_PORTRANGE_LOW=20000 - SPRING_CLOUD_SKIPPER_SERVER_PLATFORM_LOCAL_ACCOUNTS_DEFAULT_PORTRANGE_HIGH=20100 - SPRING_DATASOURCE_URL=jdbc:mysql://mysql:3306/dataflow - SPRING_DATASOURCE_USERNAME=root - SPRING_DATASOURCE_PASSWORD=rootpw - SPRING_DATASOURCE_DRIVER_CLASS_NAME=org.mariadb.jdbc.Driver entrypoint: "./wait-for-it.sh mysql:3306 -- java -jar /maven/spring-cloud-skipper-server.jar" volumes: - ${HOST_MOUNT_PATH:-.}:${DOCKER_MOUNT_PATH:-/root/scdf} app-import: image:springcloud/openjdk:2.0.0.RELEASE container_name: dataflow-app-import depends_on: - dataflow-server command: > /bin/sh -c " ./wait-for-it.sh -t 180 dataflow-server:9393; wget -qO- 'https://dataflow-server:9393/apps' --post-data='uri=${STREAM_APPS_URI:-https://dataflow.spring.io/rabbitmq-maven-latest&force=true}' ; echo 'Stream apps imported' wget -qO- 'https://dataflow-server:9393/apps' --post-data='uri=${TASK_APPS_URI:-https://dataflow.spring.io/task-maven-latest&force=true}'; echo 'Task apps imported'" prometheus: image: springcloud/spring-cloud-dataflow-prometheus-local:${DATAFLOW_VERSION:?DATAFLOW_VERSION is not set! Use 'export DATAFLOW_VERSION=local-server-image-tag'} container_name: prometheus volumes: - ${HOST_MOUNT_PATH:-.}:${DOCKER_MOUNT_PATH:-/etc/prometheus} ports: - '9090:9090' depends_on: - service-discovery # Use `docker exec -it service-discovery /bin/sh` to logging the container service-discovery: image: springcloud/spring-cloud-dataflow-prometheus-service-discovery:0.0.4.RELEASE container_name: service-discovery volumes: - ${HOST_MOUNT_PATH:-.}:${DOCKER_MOUNT_PATH:-/tmp/scdf-targets/} expose: - '8181' ports: - '8181:8181' environment: - metrics.prometheus.target.cron=0/20 * * * * * - metrics.prometheus.target.filePath=/tmp/scdf-targets/targets.json - metrics.prometheus.target.discoveryUrl=https://dataflow-server:9393/runtime/apps - metrics.prometheus.target.overrideIp=skipper-server - server.port=8181 depends_on: - dataflow-server grafana: image: springcloud/spring-cloud-dataflow-grafana-prometheus:${DATAFLOW_VERSION:?DATAFLOW_VERSION is not set! Use 'export DATAFLOW_VERSION=local-server-image-tag'} container_name: grafana ports: - '3000:3000'
News By: Team Zaloni
Blogs By: Matthew Caspento
Blogs By: Haley Teeples
Blogs By: Indrajit Das
Blogs By: Pranjal Goswami
Blogs By: Amar Gurung