Effortless File Streaming in MuleSoft: A Step-by-Step Guide

Introduction

In one of our projects, we were tasked with orchestrating file transfer jobs between multiple SFTP servers for a client and their various vendors. While each SFTP configuration had its own nuances, we maintained a consistent core functionality across all transfers. However, as the number of jobs grew, so did the complexity. Eventually we faced challenges managing nightly file transfers that exceeded 10,000 files, with individual file sizes reaching up to 5GB. We learned the original method of using out of the box SFTP connector components caused sporadic out of memory errors despite repeatedly scaling up our servers. This caused our production servers to constantly trigger its’ failsafe restart, resulting in interruptions that snowball into sluggish manual intervention. These issues required us to rethink our approach to handling large-scale file transfers efficiently. So we looked into building an SFTP transfer tool ourselves.

The high-volume problem

Let’s say you have around 90,000 files stacked in a folder on an SFTP server, ready to be moved to its destination. You start by attempting to retrieve the list of file names and folders. However, after a few minutes, your server stalls and throws a memory overload error. The issue with the SFTP connector’s “List” function is that it tries to store the metadata of all 90,000 files at once. As of the time this blog is written, there is no configurable option to limit the number of file metadata pull at this time. So let’s make a java function that does just that:

    public static String[] getFilesFromSFTP(int x, String folderPath, String host, int port, String username, String password) {
        JSch jsch = new JSch();
        Session session = null;
        ChannelSftp sftpChannel = null;
        List<String> fileList = new ArrayList<>();
        
        try {
            // Setup the session with the provided configuration
            session = jsch.getSession(username, host, port);
            session.setPassword(password);
            
            // Disable host key checking (use with caution for production)
            session.setConfig("StrictHostKeyChecking", "no");
            
            // Connect to the SFTP server
            session.connect();
            
            // Open an SFTP channel
            Channel channel = session.openChannel("sftp");
            sftpChannel = (ChannelSftp) channel;
            sftpChannel.connect();
            
            // Navigate to the folder path
            sftpChannel.cd(folderPath);
            
            // Get the files from the folder
            @SuppressWarnings("unchecked")
            Vector<ChannelSftp.LsEntry> fileEntries = sftpChannel.ls(folderPath);
            
            // Process the files up to x size
            for (int i = 0; i < x && i < fileEntries.size(); i++) {
                ChannelSftp.LsEntry entry = fileEntries.get(i);
                String filePath = folderPath + "/" + entry.getFilename();
                fileList.add(filePath);
            }
            
            // Convert the list to a string array
            String[] files = new String[fileList.size()];
            fileList.toArray(files);
            return files;
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // Clean up: Disconnect from the SFTP server
            if (sftpChannel != null && sftpChannel.isConnected()) {
                sftpChannel.disconnect();
            }
            if (session != null && session.isConnected()) {
                session.disconnect();
            }
        }
        
        return new String[0];  // Return empty array in case of error
    }

With this function in place, we can handle large volumes by processing them in smaller, more manageable batches, preventing one of our causes of server memory overload. Additionally, this approach can be extended to use queues, which allows us to orchestrate workers in retrieving and processing these transfers efficiently. This enables horizontal scaling and allows jobs to run asynchronously, resulting in faster overall throughput while offloading in memory storage on a single server.

The large file problem

We’ve now batched the large volume of files into more manageable sets, but we are still encountering sporadic memory overload errors and server restarts. It turns out that the file read function in the SFTP connector is attempting to load the entire content of each file into memory. In fact, whenever a file larger than 500MB is read and loaded into memory, our servers immediately encounter errors and restart. To resolve this, we can add a transfer function to our Java library that streams a small buffer from the source server to the destination server.

    public static void transferFile(
            String srcHost, int srcPort, String srcUser, String srcPass,
            String destHost, int destPort, String destUser, String destPass,
            String destFolder, String fileName) {

        Session srcSession = null;
        Session destSession = null;
        ChannelSftp srcSftp = null;
        ChannelSftp destSftp = null;

        try {
            // Connect to source SFTP server
            srcSession = connectSFTP(srcHost, srcPort, srcUser, srcPass);
            srcSftp = (ChannelSftp) srcSession.openChannel("sftp");
            srcSftp.connect();

            // Connect to destination SFTP server
            destSession = connectSFTP(destHost, destPort, destUser, destPass);
            destSftp = (ChannelSftp) destSession.openChannel("sftp");
            destSftp.connect();

            // Stream file from source to destination
            try (InputStream inputStream = srcSftp.get(fileName);
                 OutputStream outputStream = destSftp.put(destFolder + "/" + fileName)) {

                byte[] buffer = new byte[32768];
                int bytesRead;
                while ((bytesRead = inputStream.read(buffer)) != -1) {
                    outputStream.write(buffer, 0, bytesRead);
                }
            }

            System.out.println("File '" + fileName + "' transferred successfully to " + destFolder);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // Cleanup
            if (srcSftp != null) srcSftp.disconnect();
            if (destSftp != null) destSftp.disconnect();
            if (srcSession != null) srcSession.disconnect();
            if (destSession != null) destSession.disconnect();
        }
    }

    private static Session connectSFTP(String host, int port, String user, String pass) throws JSchException {
        JSch jsch = new JSch();
        Session session = jsch.getSession(user, host, port);
        session.setPassword(pass);
        session.setConfig("StrictHostKeyChecking", "no");
        session.connect();
        return session;
    }

At a high level, this function simply performs the following:
- Initializes the SFTP connections of both source and destination servers
- Creates the input and output streams
- Performs the transfer
- Cleans up/disposes assets
In our case, this is fit our needs. But cases may vary and its best to make changes that would fit your own situations.

Lets Get This Into MuleSoft

There are several ways to execute Java code in MuleSoft. For our solution, we chose to call it directly within a DataWeave script embedded in our MuleSoft flow. The DataWeave script resembled the following:

%dw 2.0
output application/java
import java!SFTPFileTransfer

var srcHost = "source.example.com"
var srcPort = 22
var srcUser = "source_user"
var srcPass = "source_password"
var destHost = "destination.example.com"
var destPort = 22
var destUser = "dest_user"
var destPass = "dest_password"
var destFolder = "/remote/destination/folder"
var fileName = "example.txt"

---
SFTPFileTransfer::transferFile(
    srcHost, srcPort, srcUser, srcPass,
    destHost, destPort, destUser, destPass,
    destFolder, fileName
)

Be sure to add your library code under the src/main/java path, ensuring the class name matches what is expected in your code. Also, remember to include your dependency library (e.g., JSch) in the MuleSoft application’s pom.xml, just as you would in any standard Java program.

Conclusion

What did we accomplish?

  • We can now batch out transfers with new method that can retrieve a batch number of files at a time

  • We can now directly stream files from one SFTP server to another.

  • With small buffers of file content being loaded into memory, we can now scale up the number of file transfers when running asynchronously without overloading the server’s memory.

  • We are able to call out custom Java library methods directly in our MuleSoft flow

What did this solve?

  • We no longer need to scale our servers vertically due to lower memory demands.

  • We have the option to perform multiple transfer threads asynchronously on the same server.

  • We can scale horizontally with more smaller server for higher throughput of transfer.

  • We have a more durable solution able to handle higher volume and file sizes and less sporadic server restarts.

Related Links

https://www.geeksforgeeks.org/connecting-to-an-sftp-server-using-java-jsch-library/

https://docs.mulesoft.com/dataweave/latest/dataweave-cookbook-java-methods

https://docs.mulesoft.com/sftp-connector/latest/

Previous
Previous

Minimizing Downtime With Health Checks

Next
Next

Streamlining Microservice Development with .NET Aspire - A Developer’s Perspective