Source
Sources have been designed as a possible way to inject data in the dataflow from diverse sources: CSV files,
databases, sensors...
At the moment, only file-based Sources are available in OpenMOLE. If you need to interface OpenMOLE with an
external datasource, check the
contact information page to see how to reach the
OpenMOLE development team.
Sources are plugged in the data flow in a similar fashion to
hooks. Let's
consider this simple workflow:
val files = Val[Array[File]]
val result = Val[Double]
val hello =
ScalaTask("val result = computeFromFiles(files)") set (
inputs += files,
outputs += result
)
val s = ListFilesSource(workDirectory / "directory", files)
(hello source s)
The source
s is plugged at the beginning of the task
hello. The source is executed prior to
each execution of
hello. You can also plug multiple sources on the same task using the syntax:
hello source (s1, s2, s3)
.
This source lists directories and injects an array of
File objects into the dataflow. See how the range of files
selected can be filtered using a regular expression as a last parameter to the source builder.
val someVariable = Val[String]
val txtFiles = Val[Array[File]]
val files = Val[Array[File]]
val s1 = ListFilesSource(workDirectory / "directory", files)
val s2 =
ListFilesSource(workDirectory / "/${someVariable}/", txtFiles, ".*\\.txt") set (
inputs += someVariable
)
Likewise, you can inject an array of directories in the dataflow. Directories are also represented as
File
objects. Again, the selection can be done either by passing a complete directory name, or a glob pattern that will
be matched against the names of the directories found.
val someVariable = Val[String]
val dirs = Val[Array[File]]
val aaaDirs = Val[Array[File]]
// will fill dirs with all the subdirectories of "directory"
val s1 = ListDirectoriesSource(workDirectory / "directory", dirs)
val s2 =
// will fill aaaDirs with all the subdirectories of "directory" starting with aaa
ListDirectoriesSource(workDirectory / "${someVariable}", aaaDirs, "^aaa.*") set (
inputs += someVariable
)
Sources store each entry found in an Array. In most cases, you will want each of the entries to feed a different
task. Let's now see how this can be done by reusing what we've discovered with the
data processing sampling.
Here, we are collecting all the directories which named "care_archive". See how they are gathered in an
Array[File] container and can be explored by an
ExplorationTask using
the keyword
in
. This exploration generates one
analysisTask per directory collected by the
source.
val directoriesToAnalyze = Val[Array[File]]
val s = ListDirectoriesSource(workDirectory / "data/care_DoE", directoriesToAnalyze, "care_archive")
val inDir = Val[File]
val myWorkDirectory = "care_archive"
val analysisTask =
SystemExecTask(s"${myWorkDirectory}/re-execute.sh") set (
inputFiles += (inDir, myWorkDirectory)
)
val exploration = ExplorationTask(inDir in directoriesToAnalyze)
(exploration source s) -< analysisTask