Unit 4: Advanced Java - Concurrency
GTU Computer Engineering Semester 4
newFixedThreadPool(n)newCachedThreadPool()newSingleThreadExecutor()newScheduledThreadPool(n)
import java.util.concurrent.*;
ExecutorService executor =
Executors.newFixedThreadPool(4);
// Submit tasks
for (int i = 1; i <= 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId +
" on " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// Shutdown executor
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Random;
// Task class for demonstration
class DataProcessingTask implements Callable {
private final int taskId;
private final int[] data;
private final Random random = new Random();
public DataProcessingTask(int taskId, int[] data) {
this.taskId = taskId;
this.data = data;
}
@Override
public ProcessingResult call() throws Exception {
String threadName = Thread.currentThread().getName();
System.out.println("Task " + taskId + " started on " + threadName);
// Simulate complex processing
Thread.sleep(random.nextInt(2000) + 1000);
// Calculate sum, average, max, min
long sum = 0;
int max = Integer.MIN_VALUE;
int min = Integer.MAX_VALUE;
for (int value : data) {
sum += value;
max = Math.max(max, value);
min = Math.min(min, value);
}
double average = (double) sum / data.length;
System.out.println("Task " + taskId + " completed on " + threadName);
return new ProcessingResult(taskId, sum, average, max, min);
}
}
// Result class
class ProcessingResult {
private final int taskId;
private final long sum;
private final double average;
private final int max, min;
public ProcessingResult(int taskId, long sum, double average, int max, int min) {
this.taskId = taskId;
this.sum = sum;
this.average = average;
this.max = max;
this.min = min;
}
@Override
public String toString() {
return String.format("Task %d: Sum=%d, Avg=%.2f, Max=%d, Min=%d",
taskId, sum, average, max, min);
}
}
public class ExecutorServiceDemo {
public static void main(String[] args) {
final int TASK_COUNT = 6;
final int DATA_SIZE = 1000;
// Create thread pool
ExecutorService executor = Executors.newFixedThreadPool(3);
List> futures = new ArrayList<>();
System.out.println("=== ExecutorService Demo ===");
System.out.println("Thread pool size: 3");
System.out.println("Number of tasks: " + TASK_COUNT);
long startTime = System.currentTimeMillis();
// Submit tasks
for (int i = 1; i <= TASK_COUNT; i++) {
// Generate random data for each task
int[] data = generateRandomData(DATA_SIZE);
DataProcessingTask task = new DataProcessingTask(i, data);
Future future = executor.submit(task);
futures.add(future);
}
System.out.println("All tasks submitted to executor");
// Collect results
List results = new ArrayList<>();
for (Future future : futures) {
try {
ProcessingResult result = future.get(); // Blocking call
results.add(result);
} catch (InterruptedException | ExecutionException e) {
System.err.println("Task failed: " + e.getMessage());
}
}
long endTime = System.currentTimeMillis();
// Display results
System.out.println("\n=== Results ===");
results.forEach(System.out::println);
System.out.printf("Total execution time: %d ms%n", endTime - startTime);
// Shutdown executor
executor.shutdown();
try {
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
System.out.println("Executor forcefully shutdown");
} else {
System.out.println("Executor shutdown gracefully");
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
private static int[] generateRandomData(int size) {
Random random = new Random();
int[] data = new int[size];
for (int i = 0; i < size; i++) {
data[i] = random.nextInt(100) + 1;
}
return data;
}
}
AtomicIntegerAtomicLongAtomicBooleanAtomicReference
import java.util.concurrent.atomic.*;
// Atomic counter example
class AtomicCounter {
private final AtomicInteger count =
new AtomicInteger(0);
public void increment() {
count.incrementAndGet();
}
public void decrement() {
count.decrementAndGet();
}
public int get() {
return count.get();
}
// Compare and swap operation
public boolean compareAndSet(int expected,
int newValue) {
return count.compareAndSet(expected,
newValue);
}
// Atomic add and return
public int addAndGet(int delta) {
return count.addAndGet(delta);
}
}
import java.util.concurrent.atomic.*;
import java.util.concurrent.*;
public class AtomicOperationsDemo {
private static final int THREAD_COUNT = 5;
private static final int OPERATIONS_PER_THREAD = 10000;
// Compare atomic vs non-atomic counters
private static volatile int regularCounter = 0;
private static AtomicInteger atomicCounter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
System.out.println("=== Atomic Operations Demo ===");
System.out.println("Threads: " + THREAD_COUNT);
System.out.println("Operations per thread: " + OPERATIONS_PER_THREAD);
// Test regular counter (race condition)
testRegularCounter();
// Test atomic counter (thread-safe)
testAtomicCounter();
// Demonstrate other atomic operations
demonstrateAtomicOperations();
}
private static void testRegularCounter() throws InterruptedException {
System.out.println("\n--- Testing Regular Counter (Race Condition) ---");
regularCounter = 0;
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
executor.submit(() -> {
for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
regularCounter++; // Not thread-safe!
}
latch.countDown();
});
}
latch.await();
executor.shutdown();
int expected = THREAD_COUNT * OPERATIONS_PER_THREAD;
System.out.println("Expected: " + expected);
System.out.println("Actual: " + regularCounter);
System.out.println("Data corruption: " + (expected != regularCounter));
}
private static void testAtomicCounter() throws InterruptedException {
System.out.println("\n--- Testing Atomic Counter (Thread-Safe) ---");
atomicCounter.set(0);
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
executor.submit(() -> {
for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
atomicCounter.incrementAndGet(); // Thread-safe!
}
latch.countDown();
});
}
latch.await();
executor.shutdown();
int expected = THREAD_COUNT * OPERATIONS_PER_THREAD;
System.out.println("Expected: " + expected);
System.out.println("Actual: " + atomicCounter.get());
System.out.println("Thread-safe: " + (expected == atomicCounter.get()));
}
}
i++
class VolatileExample {
private volatile boolean shutdown = false;
private volatile int counter = 0;
// Worker thread
public void worker() {
while (!shutdown) {
// Do work
System.out.println("Working... " +
counter);
counter++; // Not atomic!
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
System.out.println("Worker shutdown");
}
// Main thread
public void stopWorker() {
shutdown = true; // Visible immediately
}
public boolean isShutdown() {
return shutdown;
}
}
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ConcurrentCollectionsDemo {
public static void main(String[] args) throws InterruptedException {
// 1. ConcurrentHashMap - Thread-safe HashMap
ConcurrentHashMap concurrentMap = new ConcurrentHashMap<>();
// 2. CopyOnWriteArrayList - Thread-safe ArrayList (for read-heavy scenarios)
CopyOnWriteArrayList safeList = new CopyOnWriteArrayList<>();
// 3. BlockingQueue - Thread-safe queue with blocking operations
BlockingQueue queue = new ArrayBlockingQueue<>(100);
// 4. ConcurrentLinkedQueue - Non-blocking thread-safe queue
ConcurrentLinkedQueue linkedQueue = new ConcurrentLinkedQueue<>();
System.out.println("=== Concurrent Collections Demo ===");
// Demonstrate ConcurrentHashMap
demonstrateConcurrentHashMap(concurrentMap);
// Demonstrate BlockingQueue
demonstrateBlockingQueue(queue);
}
private static void demonstrateConcurrentHashMap(
ConcurrentHashMap map) throws InterruptedException {
System.out.println("\n--- ConcurrentHashMap Demo ---");
ExecutorService executor = Executors.newFixedThreadPool(4);
AtomicInteger totalOperations = new AtomicInteger(0);
// Multiple threads updating the same map
for (int i = 1; i <= 4; i++) {
final int threadId = i;
executor.submit(() -> {
for (int j = 1; j <= 10; j++) {
String key = "key" + (j % 5); // 5 different keys
// Atomic increment using compute method
map.compute(key, (k, v) -> {
totalOperations.incrementAndGet();
return (v == null) ? 1 : v + 1;
});
System.out.printf("Thread-%d: Updated %s = %d%n",
threadId, key, map.get(key));
}
});
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("Final map contents: " + map);
System.out.println("Total operations: " + totalOperations.get());
}
}
// BAD: Can cause deadlock
class DeadlockRisk {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
public void method1() {
synchronized(lock1) {
synchronized(lock2) {
// Critical section
}
}
}
public void method2() {
synchronized(lock2) {
synchronized(lock1) { // Deadlock!
// Critical section
}
}
}
}
import java.util.concurrent.locks.*;
// GOOD: Deadlock prevention using ordered locking
class BankAccount {
private final int accountId;
private double balance;
private final ReentrantLock lock = new ReentrantLock();
public BankAccount(int id, double initialBalance) {
this.accountId = id;
this.balance = initialBalance;
}
// Deadlock-free money transfer using ordered locking
public static boolean transfer(BankAccount from, BankAccount to, double amount) {
if (from == to) return false; // Same account
// Order locks by account ID to prevent deadlock
BankAccount firstLock = from.accountId < to.accountId ? from : to;
BankAccount secondLock = from.accountId < to.accountId ? to : from;
firstLock.lock.lock();
try {
secondLock.lock.lock();
try {
// Check sufficient funds
if (from.balance >= amount) {
from.balance -= amount;
to.balance += amount;
System.out.printf("Transferred $%.2f: Account-%d -> Account-%d%n",
amount, from.accountId, to.accountId);
System.out.printf("Balances: Account-%d=$%.2f, Account-%d=$%.2f%n",
from.accountId, from.balance, to.accountId, to.balance);
return true;
} else {
System.out.printf("Transfer failed: Insufficient funds in Account-%d%n",
from.accountId);
return false;
}
} finally {
secondLock.lock.unlock();
}
} finally {
firstLock.lock.unlock();
}
}
// Alternative: Using tryLock with timeout
public static boolean transferWithTimeout(BankAccount from, BankAccount to,
double amount, long timeoutMs) {
try {
if (from.lock.tryLock(timeoutMs, TimeUnit.MILLISECONDS)) {
try {
if (to.lock.tryLock(timeoutMs, TimeUnit.MILLISECONDS)) {
try {
if (from.balance >= amount) {
from.balance -= amount;
to.balance += amount;
return true;
}
} finally {
to.lock.unlock();
}
}
} finally {
from.lock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false; // Transfer failed or timed out
}
public double getBalance() {
lock.lock();
try {
return balance;
} finally {
lock.unlock();
}
}
}
Q: Write a Java program using ExecutorService to process multiple tasks concurrently. Each task should perform mathematical calculations (factorial, prime check, etc.) and return results. Demonstrate proper thread pool management and result collection.
import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
import java.math.BigInteger;
// Task for mathematical calculations
class MathTask implements Callable {
private final int number;
private final String operation;
private final int taskId;
public MathTask(int taskId, int number, String operation) {
this.taskId = taskId;
this.number = number;
this.operation = operation;
}
@Override
public MathResult call() throws Exception {
String threadName = Thread.currentThread().getName();
System.out.printf("Task %d (%s for %d) started on %s%n",
taskId, operation, number, threadName);
long startTime = System.currentTimeMillis();
Object result;
switch (operation.toLowerCase()) {
case "factorial":
result = calculateFactorial(number);
break;
case "prime":
result = isPrime(number);
break;
case "fibonacci":
result = calculateFibonacci(number);
break;
case "sum":
result = calculateSum(number);
break;
default:
throw new IllegalArgumentException("Unknown operation: " + operation);
}
long executionTime = System.currentTimeMillis() - startTime;
System.out.printf("Task %d completed on %s in %d ms%n",
taskId, threadName, executionTime);
return new MathResult(taskId, number, operation, result, executionTime, threadName);
}
private BigInteger calculateFactorial(int n) throws InterruptedException {
if (n < 0) throw new IllegalArgumentException("Factorial not defined for negative numbers");
if (n > 50) throw new IllegalArgumentException("Number too large for factorial calculation");
BigInteger result = BigInteger.ONE;
for (int i = 2; i <= n; i++) {
result = result.multiply(BigInteger.valueOf(i));
// Simulate work and check for interruption
if (i % 5 == 0) {
Thread.sleep(10);
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Factorial calculation interrupted");
}
}
}
return result;
}
private boolean isPrime(int n) throws InterruptedException {
if (n < 2) return false;
if (n == 2) return true;
if (n % 2 == 0) return false;
for (int i = 3; i * i <= n; i += 2) {
if (n % i == 0) return false;
// Check for interruption periodically
if (i % 1000 == 3) {
Thread.sleep(1);
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Prime check interrupted");
}
}
}
return true;
}
}
private long calculateFibonacci(int n) throws InterruptedException {
if (n < 0) throw new IllegalArgumentException("Fibonacci not defined for negative numbers");
if (n > 50) throw new IllegalArgumentException("Number too large for Fibonacci calculation");
if (n <= 1) return n;
long a = 0, b = 1, result = 0;
for (int i = 2; i <= n; i++) {
result = a + b;
a = b;
b = result;
if (i % 5 == 0) {
Thread.sleep(5);
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Fibonacci calculation interrupted");
}
}
}
return result;
}
private long calculateSum(int n) throws InterruptedException {
if (n < 1) throw new IllegalArgumentException("Sum calculation requires positive number");
long sum = 0;
for (int i = 1; i <= n; i++) {
sum += i;
if (i % 10000 == 0) {
Thread.sleep(1);
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Sum calculation interrupted");
}
}
}
return sum;
}
// Result class to hold calculation results
class MathResult {
private final int taskId;
private final int inputNumber;
private final String operation;
private final Object result;
private final long executionTime;
private final String threadName;
public MathResult(int taskId, int inputNumber, String operation,
Object result, long executionTime, String threadName) {
this.taskId = taskId;
this.inputNumber = inputNumber;
this.operation = operation;
this.result = result;
this.executionTime = executionTime;
this.threadName = threadName;
}
@Override
public String toString() {
return String.format("Task %d: %s(%d) = %s [%d ms on %s]",
taskId, operation, inputNumber, result, executionTime, threadName);
}
// Getters
public int getTaskId() { return taskId; }
public String getOperation() { return operation; }
public long getExecutionTime() { return executionTime; }
public Object getResult() { return result; }
}
public class ConcurrentMathProcessor {
public static void main(String[] args) {
final int THREAD_POOL_SIZE = 4;
System.out.println("=== Concurrent Mathematical Task Processor ===");
System.out.println("Thread pool size: " + THREAD_POOL_SIZE);
// Create thread pool
ExecutorService executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
List> futures = new ArrayList<>();
// Create various mathematical tasks
Object[][] tasks = {
{1, 10, "factorial"}, // 10!
{2, 97, "prime"}, // Check if 97 is prime
{3, 20, "fibonacci"}, // 20th Fibonacci number
{4, 1000, "sum"}, // Sum of 1 to 1000
{5, 15, "factorial"}, // 15!
{6, 101, "prime"}, // Check if 101 is prime
{7, 25, "fibonacci"}, // 25th Fibonacci number
{8, 5000, "sum"}, // Sum of 1 to 5000
{9, 89, "prime"}, // Check if 89 is prime
{10, 12, "factorial"} // 12!
};
long startTime = System.currentTimeMillis();
// Submit all tasks
for (Object[] taskData : tasks) {
int taskId = (Integer) taskData[0];
int number = (Integer) taskData[1];
String operation = (String) taskData[2];
MathTask task = new MathTask(taskId, number, operation);
Future future = executor.submit(task);
futures.add(future);
}
System.out.println("All tasks submitted to executor\n");
// Collect results
List results = new ArrayList<>();
int completedTasks = 0;
for (Future future : futures) {
try {
MathResult result = future.get(10, TimeUnit.SECONDS); // Timeout after 10 seconds
results.add(result);
completedTasks++;
} catch (InterruptedException e) {
System.err.println("Task interrupted: " + e.getMessage());
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
System.err.println("Task execution failed: " + e.getCause().getMessage());
} catch (TimeoutException e) {
System.err.println("Task timed out");
future.cancel(true); // Interrupt the task
}
}
long totalTime = System.currentTimeMillis() - startTime;
// Display results
System.out.println("\n=== EXECUTION RESULTS ===");
results.stream()
.sorted((r1, r2) -> Integer.compare(r1.getTaskId(), r2.getTaskId()))
.forEach(System.out::println);
// Display statistics
System.out.println("\n=== EXECUTION STATISTICS ===");
System.out.println("Total tasks: " + tasks.length);
System.out.println("Completed tasks: " + completedTasks);
System.out.println("Failed tasks: " + (tasks.length - completedTasks));
System.out.println("Total execution time: " + totalTime + " ms");
long totalTaskTime = results.stream().mapToLong(MathResult::getExecutionTime).sum();
System.out.println("Sum of individual task times: " + totalTaskTime + " ms");
System.out.println("Speedup factor: " + String.format("%.2fx", (double)totalTaskTime / totalTime));
// Shutdown executor
shutdownExecutor(executor);
}
}
private static void shutdownExecutor(ExecutorService executor) {
System.out.println("\n=== SHUTDOWN PROCESS ===");
// Disable new tasks from being submitted
executor.shutdown();
try {
// Wait a while for existing tasks to terminate
if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
System.out.println("Executor did not terminate gracefully, forcing shutdown...");
// Cancel currently executing tasks
executor.shutdownNow();
// Wait a while for tasks to respond to being cancelled
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
System.err.println("Executor did not terminate after forced shutdown");
} else {
System.out.println("Executor terminated after forced shutdown");
}
} else {
System.out.println("Executor terminated gracefully");
}
} catch (InterruptedException e) {
System.out.println("Shutdown interrupted, forcing immediate shutdown...");
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
Q: Explain the concept of deadlock in multithreading. Write a Java program that demonstrates both deadlock situation and its prevention using proper synchronization techniques.
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
// Resource class for demonstration
class Resource {
private final int id;
private final String name;
private final ReentrantLock lock = new ReentrantLock();
public Resource(int id, String name) {
this.id = id;
this.name = name;
}
public void acquire(String threadName) {
System.out.printf("%s trying to acquire %s%n", threadName, name);
lock.lock();
System.out.printf("%s acquired %s%n", threadName, name);
}
public boolean tryAcquire(String threadName, long timeoutMs) {
System.out.printf("%s trying to acquire %s (with timeout)%n", threadName, name);
try {
if (lock.tryLock(timeoutMs, TimeUnit.MILLISECONDS)) {
System.out.printf("%s acquired %s%n", threadName, name);
return true;
} else {
System.out.printf("%s failed to acquire %s (timeout)%n", threadName, name);
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
public void release(String threadName) {
lock.unlock();
System.out.printf("%s released %s%n", threadName, name);
}
public int getId() { return id; }
public String getName() { return name; }
}
// Task that can cause deadlock
class DeadlockTask implements Runnable {
private final Resource resource1;
private final Resource resource2;
private final String taskName;
public DeadlockTask(String name, Resource r1, Resource r2) {
this.taskName = name;
this.resource1 = r1;
this.resource2 = r2;
}
@Override
public void run() {
try {
// Acquire first resource
resource1.acquire(taskName);
// Simulate some work
Thread.sleep(100);
// Try to acquire second resource - potential deadlock here!
resource2.acquire(taskName);
System.out.printf("%s: Successfully acquired both resources!%n", taskName);
// Simulate critical section work
Thread.sleep(200);
} catch (InterruptedException e) {
System.out.printf("%s interrupted%n", taskName);
Thread.currentThread().interrupt();
} finally {
// Release resources in reverse order
try {
resource2.release(taskName);
} catch (Exception e) {
// Resource might not have been acquired
}
resource1.release(taskName);
}
}
}
// Task with deadlock prevention using ordered locking
class DeadlockFreeTask implements Runnable {
private final Resource resource1;
private final Resource resource2;
private final String taskName;
public DeadlockFreeTask(String name, Resource r1, Resource r2) {
this.taskName = name;
this.resource1 = r1;
this.resource2 = r2;
}
@Override
public void run() {
// Order resources by ID to prevent circular wait
Resource firstResource = resource1.getId() < resource2.getId() ? resource1 : resource2;
Resource secondResource = resource1.getId() < resource2.getId() ? resource2 : resource1;
try {
// Acquire resources in ordered manner
firstResource.acquire(taskName);
Thread.sleep(50); // Simulate work
secondResource.acquire(taskName);
System.out.printf("%s: Successfully acquired both resources (deadlock-free)!%n", taskName);
// Critical section work
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.printf("%s interrupted%n", taskName);
Thread.currentThread().interrupt();
} finally {
// Release in reverse order
try {
secondResource.release(taskName);
} catch (Exception e) {
// Might not have been acquired
}
firstResource.release(taskName);
}
}
}
// Task with timeout-based deadlock prevention
class TimeoutBasedTask implements Runnable {
private final Resource resource1;
private final Resource resource2;
private final String taskName;
private final long timeoutMs;
public TimeoutBasedTask(String name, Resource r1, Resource r2, long timeout) {
this.taskName = name;
this.resource1 = r1;
this.resource2 = r2;
this.timeoutMs = timeout;
}
@Override
public void run() {
boolean acquired1 = false, acquired2 = false;
try {
// Try to acquire first resource with timeout
acquired1 = resource1.tryAcquire(taskName, timeoutMs);
if (!acquired1) {
System.out.printf("%s: Failed to acquire first resource%n", taskName);
return;
}
Thread.sleep(50); // Simulate work
// Try to acquire second resource with timeout
acquired2 = resource2.tryAcquire(taskName, timeoutMs);
if (!acquired2) {
System.out.printf("%s: Failed to acquire second resource%n", taskName);
return;
}
System.out.printf("%s: Successfully acquired both resources (timeout-based)!%n", taskName);
// Critical section work
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.printf("%s interrupted%n", taskName);
Thread.currentThread().interrupt();
} finally {
// Release acquired resources
if (acquired2) resource2.release(taskName);
if (acquired1) resource1.release(taskName);
}
}
}
public class DeadlockDemo {
public static void main(String[] args) {
Resource resourceA = new Resource(1, "Resource-A");
Resource resourceB = new Resource(2, "Resource-B");
System.out.println("=== DEADLOCK DEMONSTRATION ===\n");
// 1. Demonstrate deadlock scenario
System.out.println("--- 1. Demonstrating Deadlock Scenario ---");
demonstrateDeadlock(resourceA, resourceB);
// Wait a bit between demonstrations
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 2. Demonstrate deadlock prevention using ordered locking
System.out.println("\n--- 2. Deadlock Prevention: Ordered Locking ---");
demonstrateOrderedLocking(resourceA, resourceB);
// 3. Demonstrate deadlock prevention using timeouts
System.out.println("\n--- 3. Deadlock Prevention: Timeout-Based ---");
demonstrateTimeoutBasedPrevention(resourceA, resourceB);
}
private static void demonstrateDeadlock(Resource resourceA, Resource resourceB) {
ExecutorService executor = Executors.newFixedThreadPool(2);
// Task 1: A -> B
DeadlockTask task1 = new DeadlockTask("Task-1", resourceA, resourceB);
// Task 2: B -> A (creates circular dependency)
DeadlockTask task2 = new DeadlockTask("Task-2", resourceB, resourceA);
Future> future1 = executor.submit(task1);
Future> future2 = executor.submit(task2);
try {
// Wait for tasks with timeout to detect deadlock
future1.get(3, TimeUnit.SECONDS);
future2.get(3, TimeUnit.SECONDS);
System.out.println("Both tasks completed successfully (no deadlock)");
} catch (TimeoutException e) {
System.err.println("DEADLOCK DETECTED! Tasks timed out.");
future1.cancel(true);
future2.cancel(true);
} catch (InterruptedException | ExecutionException e) {
System.err.println("Task execution error: " + e.getMessage());
}
executor.shutdownNow();
}
private static void demonstrateOrderedLocking(Resource resourceA, Resource resourceB) {
ExecutorService executor = Executors.newFixedThreadPool(2);
DeadlockFreeTask task1 = new DeadlockFreeTask("OrderedTask-1", resourceA, resourceB);
DeadlockFreeTask task2 = new DeadlockFreeTask("OrderedTask-2", resourceB, resourceA);
Future> future1 = executor.submit(task1);
Future> future2 = executor.submit(task2);
try {
future1.get(5, TimeUnit.SECONDS);
future2.get(5, TimeUnit.SECONDS);
System.out.println("Both ordered tasks completed successfully!");
} catch (Exception e) {
System.err.println("Ordered task execution error: " + e.getMessage());
}
executor.shutdown();
}
private static void demonstrateTimeoutBasedPrevention(Resource resourceA, Resource resourceB) {
ExecutorService executor = Executors.newFixedThreadPool(2);
TimeoutBasedTask task1 = new TimeoutBasedTask("TimeoutTask-1", resourceA, resourceB, 1000);
TimeoutBasedTask task2 = new TimeoutBasedTask("TimeoutTask-2", resourceB, resourceA, 1000);
Future> future1 = executor.submit(task1);
Future> future2 = executor.submit(task2);
try {
future1.get(5, TimeUnit.SECONDS);
future2.get(5, TimeUnit.SECONDS);
System.out.println("Both timeout-based tasks completed!");
} catch (Exception e) {
System.err.println("Timeout-based task execution error: " + e.getMessage());
}
executor.shutdown();
}
}
Task: Create a multi-threaded web scraper that processes URLs concurrently using ExecutorService, atomic counters, and concurrent collections.
WebCrawlerTask using Callable interfaceConcurrentHashMap to store crawled URLs and resultsCompletionService to process results as they completeLecture 17: Java Collections Framework