Synchronizing file processing threads across servers in linux
ProgrammingThis forum is for all programming questions.
The question does not have to be directly related to Linux and any language is fair game.
Notices
Welcome to LinuxQuestions.org, a friendly and active Linux Community.
You are currently viewing LQ as a guest. By joining our community you will have the ability to post topics, receive our newsletter, use the advanced search, subscribe to threads and access many other special features. Registration is quick, simple and absolutely free. Join our community today!
Note that registered members see fewer ads, and ContentLink is completely disabled once you log in.
If you have any problems with the registration process or your account login, please contact us. If you need to reset your password, click here.
Having a problem logging in? Please visit this page to clear all LQ-related cookies.
Get a virtual cloud desktop with the Linux distro that you want in less than five minutes with Shells! With over 10 pre-installed distros to choose from, the worry-free installation life is here! Whether you are a digital nomad or just looking for flexibility, Shells can put your Linux machine on the device that you want to use.
Exclusive for LQ members, get up to 45% off per month. Click here for more info.
Synchronizing file processing threads across servers in linux
I also posted this on stackoverflow. Not sure if posting in multiple locations is frowned upon.
I need to build a linux service/daemon which processes files. This daemon will most likely be multi-threaded and most likely be running on more than one node. What would be the best way to synchronize the threads of all daemons such that no two threads are processing the same file?
A couple ideas came to mind but wondering whether there is a better approach as I'm new to linux.
1. Create a directory structure such that only one daemon processes a directory. The daemon itself should be able to easily synchronize the threads within it such that no two threads are processing the same file.
2. Determine some mechanism using open() and maybe file attributes such that once a process can successfully open a file exclusively when the file is in some state, maybe some file attribute not set yet, the state is changed, by changing some file attribute, and that daemon can process the file knowing that no other daemon will process it.
3. Come up with a naming convention such that the names are somewhat equally distributed across some numerical name. Each daemon could then be configured to process some modulo number.
Example: file name = 987654321
We have a daemon running on two nodes. The configuration for each daemon would indicate the number of daemons and which modulo the daemon should process. Therefore one daemon would process modulo value 0 and the other would process modulo value 1.
987654321 % 2 = 1 so it would be processed by the daemon processing modulo 1.
4. I guess we could have a single daemon which divvies out the work to the processing daemons. The processing daemons could communicate with this single daemon which I'll call the "work manager" via some IPC mechanism.
'processing a file' means the 'work' will be contained in files and thus to process this work we need to process the files. 'node' is a server.
Let's say 10,000 files arrive in some directory per minute. Each file contains 100's, 1,000's maybe 10,000's of application events. We need to process those events. Processing can consist of doing some validation and then moving them to some storage, maybe HDFS. A single multi-threaded daemon might not be able to keep up with the load and therefore we might want the daemon running on multiple servers.
Let's say 10,000 files arrive in some directory per minute.
How are they arriving?
Quote:
2. Determine some mechanism using open() and maybe file attributes such that once a process can successfully open a file exclusively when the file is in some state, maybe some file attribute not set yet, the state is changed, by changing some file attribute, and that daemon can process the file knowing that no other daemon will process it.
You could possibly use flock, but how will the daemon decide which files to open() in the first place? (Trying all of them until finding an unlocked one sounds pretty inefficient)
Quote:
4. I guess we could have a single daemon which divvies out the work to the processing daemons. The processing daemons could communicate with this single daemon which I'll call the "work manager" via some IPC mechanism.
One possible IPC mechanism is the file system, e.g. file names (solution 3) or directory structure (solution 1). I think solutions 1 and 3 are incomplete on their own.
Some directory on the shared file system could be a directory used by an FTP server and thus the FTP server puts the files there. Others could simply be a file copy to a directory on the shared file system.
Quote:
You could possibly use flock, but how will the daemon decide which files to open() in the first place? (Trying all of them until finding an unlocked one sounds pretty inefficient)
I'm not familiar with flock, but yeah, seems it could be inefficient to check each file until you find one you can work on.
Quote:
One possible IPC mechanism is the file system, e.g. file names (solution 3) or directory structure (solution 1). I think solutions 1 and 3 are incomplete on their own.
I guess it rather depends on exactly how these disk are 'shared' between the nodes.
Assuming you're not getting into low-level cluster locking mechanism on a proper clustered system eg using NFS instead or a NAS/SAN, I'd say option 1 directories is simplest.
You do need to have some way of dealing with a node going away.
If you absolutely have to keep processing and also scoop up the unprocessed files (assuming available) of the missing node, you'd need each master process on each node to know when the other node(s) goes missing AND when it comes back (although you may want to handle that manually).
Possibly just try to 'read' a marker file that exists on the nodes local (not usually shared) disk on a regular basis.
Maybe just an auto-login test via ssh keys to see if the other nodes are there.
Co-ordinating multiple nodes will take some organisation.
Definitely monitor the file cnt in each dir and alarm if one fills up...
If you use a "master" node at which all "working" nodes "register" you could have the "master" node send a list of files the "working" node should process.
working node connects to master node port and reports its status (bored|done|dying)
master nodes sends file path to work on
working node starts processing
Involving multiple servers against this stream of work is an opportunity for Apache ActiveMQ and/or Apache Camel middleware software; all open source.
Only Two servers would run ActiveMQ in a Master/Slave arrangement, all other servers would connect to the Broker using client software. Java, C/C++, Ruby, and several other languages are supported as clients.
All clients listen to one QUEUE and repeatably take the next available queued work item. When no more work items are available the clients simply wait for more work to show up, as all clients listen to the same logical queue for messages that contain the full path filenames of the work file. ActiveMQ handles the concern for only one client working one file at one time. ActiveMQ can also handle exceptions where the client crashes before completing its task, in this case the work remains on the queue for another client to service.
If filesystem connectivity is an issue, then the content of the messages in ActiveMQ could include the WHOLE FILE and not just it name, removing the need for a shared file system among servers.
Work gets into the common queue in one of two+ ways. 1: File Watcher program that watches a directory tree for each new file being created and creates a QUEUE message with the name of the file or the name and contents of the file. 2: Modify the Work File Writer Program to originate ActiveMQ messages rather than writing files.
In general I usually try to limit the complexity and technology stacks that are required. Since the work originates in files and it seems using the file system to divvy up the work should be quite simple, I would rather not introduce additional complexities like queuing or a DB.
I do review prior comments before commenting myself, and I have to say the evolving solution seems to be duplication features that I've seen before in ActiveMQ. However, I do appreciate the Keep It Simple principal. Here is another bit of technology that seems to be relevant to the opportunity your issue presents Apache Kafka, you might find this link an interesting read.
Quote:
Originally Posted by nickdu
Let's say 10,000 files arrive in some directory per minute. Each file contains 100's, 1,000's maybe 10,000's of application events. We need to process those events. Processing can consist of doing some validation and then moving them to some storage, maybe HDFS. A single multi-threaded daemon might not be able to keep up with the load and therefore we might want the daemon running on multiple servers.
I wanted to quote the above to highlight the arrival rate and volume of files which must be serviced. I'll also presume that no single file can be lost.
What are your thoughts on the technologies you would like to include in a potential solution? example: C-Language daemons/program only, TCPIP Sockets, etc.
zhjim's suggestions for a master node model is the where I would begin (a version of your original item 4). Based on your workloads, thinking through the conversation between the worker node and the master needs to be fleshed out very carefully.
Assuming a shared filesystem between workers and the master node is ok. Your arrival rate and workload depth of each file, presents an opportunity to adjust the info exchanged between participants. Example: rather than having a worker get the name of one file from the master node, have the master node assign a whole directory tree (or a set of X count files) to the worker. You can balance the throughput of the entire system by adjusting the number of files included in each workers packet.
I'm going to suggest having the workers be a multi-threaded program(MT), where number of threads equal the number of cpu cores on that node plus one - for the scheduler. The MT worker's scheduler should distribute the contents of its assigned set of files to its internal thread-pool, without the need for thread-synchronization. I might add that his is a key performance and stability benefit - no thread contention on the workers, and the master node does not need any file locking mechanisms.
Other issues will present themselves, like how to restart/reassign work if a worker fails?; who is responsible for deleting the processed files? The master node can handle all of the recovery issues by retaining some awareness of what's been assigned, and acknowledged as complete by a given worker.
Summary:
Special input directory structure: /path/year/month/day/any-filename-target-files-ext
Master Node: Read input directory, prepares work file-sets for the workers
Worker Nodes: Requests next work packet from Master node, uses internal thread-pool to process files, responds to master node with completion message
Master Node: On completion response from worker, delete completed files in workers fileset, release previous fileset.
And I would probably counsel against multi-threaded workers, for two reasons:
(1) The various physical I/O streams very often will compete with one another ... for the use of the same physical "HDA = Head/Disk Assembly" (yup, revealing my "big iron" upbringing here), or for the channels leading to it.
(2) "What are you going to do if one of the worker threads does fail?" And, "what if one thread inadvertently impacts another thread, especially if you don't know it?" This is a last-mile problem.
I would simplify the design so that each worker was, simply, one fully-autonomous process, perhaps running under an init-like process (if not an actual copy of that well-seasoned daddy) which was simply responsible for keeping an eye on them. It takes a unit of work from the queue, commits it, rinse-and-repeat.
(And, incidentally, you don't have to buy-in to Apache Foundation's "Java-dripping technologies with the nonsensical cutesy-names." Unless you want to. They're good tools and they work as designed, but they seem to be designed for federal governments. Personally, I find their strategy to be overkill, but that's just me. You can easily find other things to do what they did, and in the same way, but which weigh-in a couple hundred megabytes lighter.)
Last edited by sundialsvcs; 09-16-2015 at 08:05 AM.
1. Somebody's got to do the work, so there is gonna be higher physical utilization. Plus the arrival rate of incoming files, and the amount of work each file contains seems to be substantial. (Oh, nothing wrong with Big Iron) Note: If the master node read the whole input file into a buffer and sent that buffer to the workers whenever they asked for more work; this would lower the physical io activity.
2. I expect them to fail and include code to capture the failure and restart the work until. There will be no inadvertent impact on other threads, by design no dependancies would be allowed. Only the scheduler talks to each thread.
Quote:
(And, incidentally, you don't have to buy-in to Apache Foundation's "Java-dripping technologies with the nonsensical cutesy-names." Unless you want to. They're good tools and they work as designed, but they seem to be designed for federal governments. Personally, I find their strategy to be overkill, but that's just me. You can easily find other things to do what they did, and in the same way, but which weigh-in a couple hundred megabytes lighter.)
My expression of Apache was attempting to address "What is available to do this? Or do we have to recreate the wheel again?". If we leave the pre-built tooling alone and start from scratch, I've not seen many useful libraries, aside from GLib.
A couple ideas came to mind but wondering whether there is a better approach as I'm new to linux.
I vote for the simplest solution. But just to be sure is the question only "How to make sure that never more than one process is working on one file?"?
And if there are only two processes running just have one look at odd timestamps and one look at even. Or if you control the file names then go by file name.
If you have more than two processes/nodes than go by last two digits of the timestamp. The simpler the better.
LinuxQuestions.org is looking for people interested in writing
Editorials, Articles, Reviews, and more. If you'd like to contribute
content, let us know.